Parallel.For, хочу посоветоваться

Тема в разделе "WASM.WIN32", создана пользователем Jin X, 13 авг 2018.

  1. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Там ещё (в предыдущем коде) я по запаре вместо esi использовал ebx в паре мест (в адресации).

    И всё-таки реализация накладывает свои коррективы на теорию.
    Не будет Remain'а, я ж от него отказался в прошлый раз как раз из-за того, что нужно менять сразу 2 переменные в основном цикле.
    Итак, имеем структуру ThreadData, которая содержит 4 поля dword для каждого потока: Handle, Lock (флаг запрета обработки итераций), Current (номер текущей итерации), Last (номер последней итерации).
    Обозначу смещения до них как @Handle, @Lock, @Current и @Last (0, 4, 8 и 12).

    Работающий поток:
    Код работающего потока:
    Код (ASM):
    1. ; esi = номер текущего потока, начиная с 0.
    2.   mov ebp,[ForProc]  ; ebp = адрес основной процедуры цикла
    3.   lea edi,[esi+esi]
    4.   lea [edi*8+ThreadData]  ; edi = ThreadData + esi*16
    5.   mov ebx,[edi+@Current]
    6.   cmp ebx,[edi+@Last]  ; Current <= Last ?
    7.   jle .start  ; да, стартуем
    8.   jmp .finish  ; нет, это пустой цикл, сразу выходим
    9.  
    10. ; Spin-цикл ожидания разблокировки (пока кто-то пытается отобрать у нас работу)
    11. .pause:
    12.   mov eax,16
    13. @@:
    14.   pause
    15.   cmp dword [edi+@Lock],0
    16.   je .start  ; выходим из цикла
    17.   dec eax
    18.   jnz @B
    19.   invoke SwitchToThread  ; раз в 16 итераций переключаем контекст, т.к. при таком большом числе
    20.     ; циклов есть немалая вероятность, что перехватывающий поток находится на том же ядре
    21.   jmp .pause
    22.  
    23. .loop:
    24.   mov [edi+@Current],ebx  ; mov атомарен, lock не нужен
    25.   cmp dword [edi+@Lock],0
    26.   jne .pause  ; у нас пытаются отобрать работу?
    27. .start:
    28.   stdcall ebp, ebx, esi  ; вызов основной функции цикла (с 2 параметрами: номером итерации и потока)
    29.   inc ebx  ; Current++
    30.   cmp ebx,[edi+@Last]  ; Current+1 <= Last ?
    31.   jle .loop  ; да, продолжаем цикл
    32.  
    33. .findjob:
    34. ; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток,
    35. ; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает)
    Перехватчик (отбирающий поток) делает так:
    Код (ASM):
    1. ; edx = номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 1
    2. ;   завершаем работу, не доходя до этого места в коде)
    3. ; eax = Last этого потока
    4.   lea ecx,[edx+edx]
    5.   lea ecx,[ecx*8+ThreadData]  ; ecx = ThreadData + edx*16
    6.   mov edx,1
    7.   xchg [ecx+@Lock],edx  ; блокируем работающий поток (даже если мы попали не туда, и он
    8.     ; прокрутит ещё одну итерацию, это не страшно, дальше заблокируется, у нас есть запас)
    9.   test edx,edx
    10.   jnz .findjob  ; какая-то зараза (другой поток) успела влезть и перехватить работающий поток раньше нас - идём искать новую работу
    11.   mov edx,eax
    12.   sub edx,[ecx+@Current]  ; кол-во оставшихся итераций
    13.   cmp edx,1  ; Last-Current <= 1 ?
    14.   jle .toolate  ; да, осталось слишком мало итераций
    15.   shr edx,1  ; кол-во отбираемых итераций
    16.   mov ebx,eax
    17.   sub ebx,edx  ; новый @Last
    18.   cmpxchg [ecx+@Last],ebx  ; if (eax = Last) Last = ebx
    19.   mov [ecx+@Lock],0  ; освобождаем поток
    20.   jnz .findjob  ; оказывается, другой поток успел отхватить работу аж до xchg!
    21.  
    22.   inc ebx  ; наш новый Current = Last+1 потока, у которого мы отобрали работу
    23.   mov [edi+@Last],0x80000000  ; Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет
    24.     ; (влезая между двумя следующими mov'ами)
    25.   mov [edi+@Current],ebx
    26.   mov [edi+@Last],eax  ; наш новый Last
    27.   jmp .start
    28.  
    29. .toolate:
    30.   mov [ecx+@Lock],0  ; освобождаем поток
    31.   jmp .findjob  ; идём искать новую работу
    32.  
    33. .finish:
    34. ; завершение цикла для данного потока
    И этот вариант мне тоже не до конца нравится из-за длинной блокировки основного потока при перехвате (и SwitchToThread). Более длинной, чем при cmpxchg8b. Но вариант с cmpxchg8b может обломаться при быстрых циклах. Короче, х/з. Ещё что-то придумывать???
     
  2. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    А почему нельзя сделать так?
    Поток 1 увеличивает current каждый раз на 1 и приступает к работе, пока current <= last.
    Поток 2 хочет украсть данные. Он вычисляет delta = (current - last)/2 Потока 1 и увеличивает current Потока 1 на это значение в атомарном режиме.
    Далее поток записывает к себе значение current 1-го потока до изменения и last = current + delta.
    Всё, потоку 2 дальше можно продолжать работу.

    Условие, при котором можно красть данные у целевого потока: last - current > 1. Так что ворующему потоку можно быстро пробежаться по массиву задач потоков и выбрать целевой поток тот, у которого разница будет максимальной.
     
  3. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Проблема в том, что мы не знаем, в каком месте Поток1 будет исполняться, когда Поток2 увеличит его Current на Delta.
    Например, это может произойти между чтением и записью Current'а Потоком1.
    Даже если Поток1 делает inc [Current], он же делает это не атомарно, а эта инструкция состоит из нескольких микроопераций вроде temp=Current, temp++, Current=temp. И если наш lock сработает в момент между temp=Current и Current=temp, произойдёт вот что (допустим, Current = 10, Last = 20):
    1. Поток1: temp=Current (10)
    2. Поток1: temp++ (11)
    3. Поток2: lock Current+=Delta (15)
    4. Поток1: Current=temp (11)
    Ooops!
    И это не выдумки, я специально протестил, что будет, если 2 потока будут делать inc X параллельно по 1 млрд раз, но один из них будет с lock'ом, а второй – без. Результат порядка 1.9 млрд. Это, конечно, несравнимо больше, чем когда они оба делают это без lock'а (там вообще результат ≈ 1.001 ... 1.01 млрд), но до 2 млрд всё же не дотягивает (как и ожидалось, в принципе).

    В общем, у меня появилась одна идейка.
    Надо просто в Lock записывать не 1 и 0, а изначально записать 0x7FFFFFFF, а потом заносить новый Last, но уменьшенный на 1. А работающий в цикле поток будет останавливаться не при любом ненулевом значении, а только когда Current дойдёт до Lock (ну или превысит его). Т.е. он почти никогда не будет подвисать.
    Надо только это реализовать сначала, а то каждый раз возникает очередная "эврика", а в ходе реализации какой-нибудь нюанс появляется. Но тут вроде всё должно получиться :)
     
  4. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Итак, новый вариант... :)
    Только в Lock мы будем записывать новый Last, не уменьшая его на 1.

    Работающий поток:
    Код (ASM):
    1. ; THREAD_UNLOCKED = 0x7FFFFFFF - значение, при котором поток разблокирован (MaxInt)
    2. ; esi = номер текущего потока, начиная с 0.
    3.   mov ebp,[ForProc]  ; ebp = адрес основной процедуры цикла
    4.   lea edi,[esi+esi]
    5.   lea [edi*8+ThreadData]  ; edi = ThreadData + esi*16
    6.   mov ebx,[edi+@Current]
    7.   cmp ebx,[edi+@Last]  ; Current <= Last ?
    8.   jle .start  ; да, стартуем
    9.   jmp .finish  ; нет, это пустой цикл, сразу выходим
    10.  
    11. .loop:
    12.   mov [edi+@Current],ebx  ; Current++ (mov атомарен, lock не нужен)
    13. .start:
    14.   stdcall ebp, ebx, esi  ; вызов основной функции цикла (с 2 параметрами: номером итерации и потока)
    15.   inc ebx  ; Current+1
    16.   cmp ebx,[edi+@Lock]  ; Current+1 <= Lock ?
    17.   jle .continue  ; да, продолжаем
    18. ; делаем паузу, если достигли значение Lock (spin-цикл ожидания разблокировки)
    19. .pause:
    20.   mov eax,16
    21. @@:
    22.   pause
    23.   cmp dword [edi+@Lock],THREAD_UNLOCKED
    24.   je .continue  ; выходим из spin-цикла
    25.   dec eax
    26.   jnz @B
    27.   invoke SwitchToThread  ; раз в 16 итераций переключаем контекст, т.к. при таком большом числе
    28.     ; циклов есть немалая вероятность, что перехватывающий поток находится на том же ядре
    29.   jmp .pause
    30. ; продолжаем цикл
    31. .continue:
    32.   cmp ebx,[edi+@Last]  ; Current+1 <= Last ?
    33.   jle .loop  ; да, продолжаем цикл
    34.  
    35. .findjob:
    36. ; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток,
    37. ; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает)
    Перехватчик:
    Код (ASM):
    1. ; ecx = номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 1
    2. ;   завершаем работу, не доходя до этого места в коде)
    3. ; ebx = Last этого потока
    4.   add ecx,ecx
    5.   lea ecx,[ecx*8+ThreadData]  ; ecx = ThreadData + ecx*16
    6.   mov eax,ebx
    7.   mov edx,ebx
    8.   sub eax,[ecx+@Current]
    9.   shr eax,1  ; (Last - Current) / 2
    10.   sub edx,eax  ; edx = Last - eax = новый Lock, он же новый Last
    11.   mov eax,THREAD_UNLOCKED
    12.   lock cmpxchg [ecx+@Lock],edx  ; if (Lock == THREAD_UNLOCKED) { Lock = edx ; zf = 1; } else { eax = Lock; zf = 0; }
    13.   jnz .findjob  ; какая-то зараза (другой поток) успела влезть и перехватить работающий поток раньше нас, идём искать новую работу
    14.   cmp [ecx+@Current],edx
    15.   jge .toolate  ; поток уже дошёл до итерации Lock (ситуация Current = Lock = новый Last нам тоже не подходит)
    16.   mov eax,ebx
    17.   cmpxchg [ecx+@Last],edx  ; if (eax == Last) { Last = edx; }
    18.   mov [ecx+@Lock],THREAD_UNLOCKED  ; освобождаем поток
    19.   jnz .findjob  ; оказывается, поток закончил работу и взял итерации у кого-то другого,
    20.     ; либо другой поток успел отжать работу у нашего подопечного ещё до lock cmpxchg!
    21.   inc edx  ; наш новый Current = новый Last потока, у которого мы отобрали работу + 1
    22.   mov [edi+@Last],0x80000000  ; наш Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет
    23.     ; (влезая между двумя следующими mov'ами)
    24.   mov [edi+@Current],edx
    25.   mov [edi+@Last],eax
    26.   jmp .start
    27.  
    28. .toolate:
    29.   mov [ecx+@Lock],THREAD_UNLOCKED  ; освобождаем поток
    30.   jmp .findjob  ; идём искать новую работу
    31.  
    32. .finish:
    33. ; завершение цикла для данного потока
    Осталось теперь проверить на практике :)