Синхронизация потоков

Тема в разделе "WASM.A&O", создана пользователем JingoBo, 21 фев 2012.

  1. JingoBo

    JingoBo New Member

    Публикаций:
    0
    Регистрация:
    21 фев 2012
    Сообщения:
    5
    Привет всем. Пишу на Delphi, но этот сайт просто посоветовали.

    Необходимо синхронизировать блоки кода с разными типами доступа, а именно :
    1. Read - Несколько(сколько угодно) потоков могут входить с таким доступом, при этом потоки на Write должны ожидать пока все потоки с Read "выйдут" из блока.
    2. Write - только 1 поток может войти в блок. При этом остальные потоки, хоть Write, хоть Read потоки должны ожидать выхода захватившего потока из блока. Захвативший поток далее может входить как Read.

    Есть такой класс в Delphi - TMultiReadExclusiveWriteSynchronizer. Но прямо скажем он гумно, слишком медленный, причем несколько не подходит к задаче.

    Подскажите пожалуйста статьи, примеры по данному поводу или алгоритмы, с использованием только событий и критических секций. Необходима максимальная скорость выполнения входа и выхода из блока.

    Спасибо!
     
  2. Apocalypse

    Apocalypse New Member

    Публикаций:
    0
    Регистрация:
    25 май 2011
    Сообщения:
    16
  3. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
  4. JingoBo

    JingoBo New Member

    Публикаций:
    0
    Регистрация:
    21 фев 2012
    Сообщения:
    5
    Apocalypse
    Да ну? Вот ну ни разу не читал.
    l_inc
    Спасибо, но там минимум Windows Vista. У меня минимум для дедиков с Windows 2000.
     
  5. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    JingoBo
    Можно и для систем по-младше придумать:
    Код (Text):
    1. var counter:integer;
    2.  
    3. while (counter <> 0) or (InterlockedCompareExchange(@counter,-1,0) <> 0) do ;
    4.  
    5. {writer}
    6.  
    7. InterlockedIncrement(@counter);
    8.  
    9.  
    10.  
    11. while True do begin
    12.     if counter < 0 then Continue;
    13.     tmp := InterlockedExchange(@counter,-1);
    14.     if tmp < 0 then Continue;
    15.     Inc(tmp);
    16.     InterlockedExchange(@counter,tmp);
    17.     Break;
    18. end;
    19.  
    20. {reader}
    21.  
    22. InterlockedDecrement(@counter);
    На паскале я сто лет назад писал. А на делфи, можно сказать, вообще никогда не писал. Надеюсь, из паскаля ещё правильно помню. И гугл говорит, что нужна ещё директива {$B-}.

    P.S. Нет. Тут я явно прогнал. Нужно защищать counter лучше. Ну значит возьмите реализацию из Таненбаума (Операционные системы. Разработка и реализация. В 3-ем издании точно есть). Только вместо мьютексов используйте спинлоки на основе InterlockedExchange.
     
  6. JingoBo

    JingoBo New Member

    Публикаций:
    0
    Регистрация:
    21 фев 2012
    Сообщения:
    5
    l_inc
    Ок, Спасибо, почитаю. Если что отпишу результаты.
     
  7. SilentSnowfall

    SilentSnowfall New Member

    Публикаций:
    0
    Регистрация:
    8 фев 2011
    Сообщения:
    27
    Гугли "read-write lock". Когда поймешь матчасть, то без проблем напишешь свою реализацию. Можешь посмотреть, как это реализовано у других, готового кода очень много.
     
  8. Pavia

    Pavia Well-Known Member

    Публикаций:
    0
    Регистрация:
    17 июн 2003
    Сообщения:
    2.409
    Адрес:
    Fryazino
    JingoBo
    Водишь версионность. Делается просто, переменная для каждого потока который read.
    0)ставим блокировку на чтение
    1) Из блока для чтения читаешь в эту переменную
    2) далее снимаешь блокировку
    3) затем работа идёт с этой переменной.

    А запись обычная с блокировкой. Будет работать быстро.
    В зависимости от задачи от блокировок можно отказаться, построив на атомарных инструкциях. Можно раскидать потоки по разным участкам блока.
    ИХМО пока свои шишки не набьёшь параллельной обработке не научишься.

    Второй способ уйти от потоков, сделать кооперативную многозадачность, тогда блокировки вообще не понадобятся.
     
  9. JingoBo

    JingoBo New Member

    Публикаций:
    0
    Регистрация:
    21 фев 2012
    Сообщения:
    5
    На синхронизации собаку съел еще в яве. Как реализовать своими путями Read/Write Lock знал(на событиях), но что то мне подсказывало, что это далеко не самый лучший вариант, т.к. работаю в одном процессе(и не в Ring-0 хехе) и цеплять объекты ядра не очень хорошо, т.к. они медленнее чем критические секции.

    Спасибо, я про это почитаю.

    Вчера скачал эту книжку, ОЧЕНЬ понравилась. Реализация Read/Write Lock(стр. 116) там до банальности проста и с ресурсами не емка(есстесна мутексы на КС поменять), там на одном мутексе и парочке атомарных спинлоков, буду использовать ее.

    Спасибо всем!
     
  10. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    JingoBo
    Критические секции тоже легко могут уйти в ядро, отправив поток ожидать следующий квант, если секция занята. Поэтому я и предлагал (если действительно нужна максимальная скорость) все объекты синхронизации ограничить только спинлоками:
    Код (Text):
    1. {acquire}
    2. while True do begin
    3.     if lock <> 0 then Continue;
    4.     tmp := InterlockedExchange(@lock,1);
    5.     if tmp <> 0 then Continue;
    6.     Break;
    7. end;
    8.  
    9.  
    10. {release}
    11. InterlockedExchange(@lock,0);
    P.S. Реализацию можно, естесственно, улучшить, если сделать asm-вставку (см. третий том учебника, раздел "8.10.6.1 Use the PAUSE instruction in Spin-Wait loops").
     
  11. leo

    leo Active Member

    Публикаций:
    0
    Регистрация:
    4 авг 2004
    Сообщения:
    2.542
    Адрес:
    Russia
    l_inc
    Более разумное решение - это некоторое ограниченное кол-во спинлоков + sleep(0) или крит.секция в случае неудачи, иначе может быть бестолковый разогрев проца в ожидании у моря погоды :)
     
  12. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    leo
    Так как раз это EnterCriticalSection и делает.
    А упомянутая pause разве именно это и не предотвращает (причём более эффективно, чем отдача своего кванта)? Ну по-крайней мере на современных процессорах.
     
  13. 100gold

    100gold New Member

    Публикаций:
    0
    Регистрация:
    26 фев 2010
    Сообщения:
    165
    крит. секция - это и есть несколько спинлоков, а потом WaitForSingleObject. Там как бы есть много нюансов внутри EnterCriticalSection, но суть именно такая.
     
  14. leo

    leo Active Member

    Публикаций:
    0
    Регистрация:
    4 авг 2004
    Сообщения:
    2.542
    Адрес:
    Russia
    l_inc
    "Бесконечные" спинлоки имеют смысл при выполнении двух условий: 1) время захвата ресурса (т.е. чтения\записи) сравнительно мало, и 2) все конкурирующие потоки с высокой вероятностью исполняются одновременно. Например, какой смысл тратить весь квант потока на бестолковые спинлоки, если поток, захвативший ресурс, в это время банально усыплен шедулером - не лучше ли не тратить бесполезно время, а передать управление этому потоку?
     
  15. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    leo
    Для максимальной скорости входа/выхода из критического участка, естесственно, важно иметь число потоков, работающих с этим участком, не превышающее числа процессоров/ядер, и при этом каждый из этих потоков должен быть назначен на свой процессор/ядро. Хотя в зависимости от частоты попадания потоков в критический участок можно слегка послабить это условие.

    По поводу первого условия неплохо бы уточнить, "сравнительно" относительно чего. Например, чтение-запись в зависимости от других условий могут быть и сравнимы с длительностью кванта (хотя от двух квантов уже явно не в пользу простых спинлоков), т.к. другой поток может чуть-чуть постоять на спинлоке, пока система поймёт, что больше ей ставить на ядро кроме прерванного в критическом участке потока некого (это значительно меньше длительности кванта).

    Другое "сравнительно" может относиться к длительности исполнения прочего кода потока. Если потоки вообще кроме этого критического участка ничего не исполняют, то здесь мы наталкиваемся на второе условие: потоки практически не могут исполняться параллельно. Тогда даже критические секции будут менее удачной идеей, чем мьютексы, т.к. критические секции не придерживаются FIFO, в результате чего неизбежно голодание других потоков.
     
  16. leo

    leo Active Member

    Публикаций:
    0
    Регистрация:
    4 авг 2004
    Сообщения:
    2.542
    Адрес:
    Russia
    l_inc
    Спасибо за ликбез, но адресовать его лучше не мне, а автору, у которого "Несколько(сколько угодно) потоков" - вот ему и объсни, будет ли предлагаемое тобой решение наилучшим при любых длительностях блокировки и при любом числе потоков ;)
     
  17. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    leo
    :) Я не сомневаюсь, что Вам ликбез не нужен. Но было бы странно отвечать автору на Ваше возражение. Будем считать, что мы разыграли сценку в лицах для ТС, если Вы не против. :)
     
  18. JingoBo

    JingoBo New Member

    Публикаций:
    0
    Регистрация:
    21 фев 2012
    Сообщения:
    5
    Спасибо всем! С проблемой разобрался.
     
  19. Andrzej

    Andrzej New Member

    Публикаций:
    0
    Регистрация:
    2 ноя 2008
    Сообщения:
    11
    Простейший read-write lock без spin-wait циклов, без проверки валидности структуры, порядка вызовов и переполнения счётчиков:

    Код (Text):
    1. HANDLE hEvent;
    2. DWORD lockState; // LOCK_STATE record readCounter :31, isLocked :1
    3. DWORD waitCounter;
    4.  
    5. // init:
    6. NtCreateEvent(&hEvent, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
    7. lockState = 0;
    8. waitCounter = 0;
    9.  
    10. // readLock:
    11. if (lockState & 1 == 1) {
    12. wait:   InterlockedAdd(&waitCounter, 1);
    13.     while (lockState & 1 == 1)
    14.         NtWaitForSingleObject(hEvent, FALSE, NULL);
    15.     InterlockedSub(&waitCounter, 1);
    16. }
    17. InterlockedAdd(&lockState, 2);
    18. if (lockState & 1 == 1) {
    19.     InterlockedSub(&lockState, 2);
    20.     goto wait;
    21. }
    22.  
    23. // readUnlock:
    24. InterlockedSub(&lockState, 2);
    25. if (ZF)
    26.     if (waitCounter)
    27.         NtSetEvent(hEvent, NULL);
    28.  
    29. // writeLock:
    30. if (InterlockedCompareExchange(&lockState, 1, 0) != 0) {
    31.     InterlockedAdd(&waitCounter, 1);
    32.     while (InterlockedCompareExchange(&lockState, 1, 0) != 0)
    33.         NtWaitForSingleObject(hEvent, FALSE, NULL);
    34.     InterlockedSub(&waitCounter, 1);
    35. }
    36.  
    37. // writeUnlock:
    38. lockState = 0;
    39. if (waitCounter)
    40.     NtSetEvent(hEvent, NULL);
    41.  
    42. //destroy:
    43. NtClose(hEvent);
    Соответственно, на однопроцессорных системах, в блокировке шины нет нужды, а вместо ожидания событий необходимо использовать NtYieldExecution() (информацию о количстве процессоров можно получить с помощью NtQuerySystemInformation()). Для обеспечения безопасности необходимо задействовать TLS, проверки на переполнение счётчиков и валидности структуры, и как следствие таких проверок, заменить непостредственное сложение/вычитание расшареных переменных (посколько они приводят к невалидному состоянию), на чтение в локальную переменную, проверку, сложение/вычитание и обмен с сравнением (хз как одним словом называется), ну и аварийный выход из ожидания по таймауту. Структуру необходимо размещать на одной кэш-линии для минимизации эффекта false sharing.

    _________________
    upd: Незнаю как на других языках, но на асме вышеописанный алгоритм очень просто пишется. В большинстве случаев, на входы в блоки read приходятся по одной блокировке шины. Вероятность фэйла вторых проверок в двойных проверках крайне мала, что обеспечит высокую скорость.

    _________________
    upd: Просто не получится все-таки:
    Код (Text):
    1. if (lockState & 1 == 1) {
    2. ...
    3. }
    4. InterlockedAdd(&lockState, 2);
    5. if (lockState & 1 == 1) {
    6.     InterlockedSub(&lockState, 2);
    7.     goto wait;
    8. }
    необходимо заменить на
    Код (Text):
    1. do {
    2.     DWORD temp = lockState;
    3.     if (temp & 1) goto wait;
    4. } while (InterlockedCompareExchange(&lockState, temp + 2, temp) != temp + 2)
    Но и в таком случае — всего одна блокировка шины в большинстве случаев.