Parallel.For, хочу посоветоваться

Тема в разделе "WASM.WIN32", создана пользователем Jin X, 13 авг 2018.

  1. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Всем привет!

    Решил написать небольшой include для fasm под Windows, который будет позволять выполнять параллельные вычисления для for-циклов по аналогии с TParallel.For в Delphi или Parallel.For в C#, но без таких заморочек, как там, а что-то простенькое (строк на 500 асм-кода, ну может, чуть больше). Результат потом выложу.

    Кто не в теме – схема такая. Запускается функция инициализации, которая создаёт ждущие потоки. Далее (сразу или в нужный момент) вызывается функция, которой передаётся адрес процедуры цикла (ProcAddr) диапазон цикла for (либо кол-во итераций). Эта функция будит потоки (SetEvent) и либо завершает свою работу, либо ждёт окончания цикла. Потоки, в свою очередь, инициализируют цикл по определённым правилам (см. ниже вопросы) и вызывают на каждом цикле процедуру цикла (ProcAddr), передавая ей номер цикла и порядковый номер потока (чтобы было удобнее группировать результаты вычислений без использования lock и критических секций). В конце программист вызывает функцию завершения, которая даёт потокам команду завершить работу.

    В ходе реализации возникло несколько моментов, о которых я решил посоветоваться с местными спецами :)

    Итак, вопрос №1: какое кол-во потоков оптимально?
    Тут два варианта:
    • либо affinity mask текущего процесса,
    • либо общее число логических процессоров.
    Как лучше, на ваш взгляд, и почему?
    Кроме того, хочу по умолчанию умножать это число на 2 (потому что функция цикла может грузить процессор не на все 100%, а так всё же надёжнее будет).
    Ну и давать программисту возможность ограничивать общее число потоков сверху.

    Вопрос №1А (туда же): есть ли смысл назначать affinity mask или хотя бы номер идеального процессора для каждого потока?
    Мне кажется, что нет (система сама разберётся как лучше).

    Вопрос №2: как лучше распределять итерации циклов между потоками?
    Представим, что у нас 4 потока и мы делаем цикл i=0...999.
    • Сначала я хотел просто тупо разделить весь цикл поровну, т.е. thread1:i=0,4,8,12..., thread2:i=1,5,9,13..., thread3:i=2,6,10,14..., thread4:i=3,7,11,15... Но потом прикинул, что так не годится, поскольку для некоторых i процедура цикла может выполняться быстрее, чем для других. В результате получится, что какой-то поток закончит свою работу раньше других и далее параллельно будут работать только 3 потока, затем 2 и потом 1.
    • Далее я подумал, что можно делать так: каждый освободившийся поток берёт следующую по счёту итерацию. К примеру:
      Код (Text):
      1. time thread1 thread2 thread3 thread4
      2.  0     [0]     [1]     [2]     [3]
      3.  1      :       :       :       :
      4.  2     [4]      :       :       :
      5.  3      :       :      [5]      :
      6.  4      :      [6]      :      [7]
      7.  5      :       :      [8]      :
      8.  6     [9]      :       :       :
      9.  7      :      [10]     :      [11]
      10.  8      :       :      [12]     :
      Здесь в квадратных скобках указывается номер итерации (i), которую берёт себе поток, а двоеточием - выполнение расчётов в этой итерации. При такой схеме работы, что ни один из потоков не простаивает и захватывает итерацию в тот момент, когда закончил своё вычисление.
      Вроде бы идеальный вариант, но есть один минус: такой подход требует выполнять lock-операцию для получения каждого номера следующей итерации, что для может быть довольно дорогим по времени удовольствием для быстрых процедур цикла при большом кол-ве итераций.
    • Наконец я решил скомбинировать оба подхода и сделать так: взять какое-то число, например, 256. Нацело разделить на него кол-во итераций (в нашем случае 1000/256=3; если получится 0, принять 1). Обозначим результат как BLC (block loop size). Теперь мы выделяем каждому потоку по BLC циклов, после выполнения которых потоки захватывают (по аналогии с предыдущим алгоритмом, т.е. через lock) следующий блок. Т.о., кол-во lock-операций будет примерно равно 256 или чуть больше (максимум 511, но для циклов с большим кол-вом итераций будет 256-257 "захватов"). Такой маленький BLC=3 получился из-за небольшого кол-ва итераций цикла, для 1'000'000 итераций BLC = 3906.
      Код (Text):
      1. time thread1 thread2 thread3 thread4
      2.  0     [0]     [3]     [6]     [9]
      3.  1      :       :       7       :
      4.  2      1       :       :       10
      5.  3      :       4       :       11
      6.  4      2       :       8       :
      7.  5     [12]     5       :       :
      8.  6      :       :       :      [15]
      9.  7      13      [18]    :       :
      10.  8      14      :      [21]     16
      (в квадратных скобках указывается момент захвата блока итераций, а числа без скобок - итерации внутри блока).
      Можно взять в качестве делителя не 256, а 1000, например (давайте это число тоже обсудим). И давать программисту самому задавать его. Более того, можно ограничить число BLC, скажем, значением = 1000 сверху (а реально ли это надо? и каким числом ограничивать?) и кол-вом потоков снизу (на случай, если делитель будет меньше кол-ва потоков). Либо сделать так, чтобы на последних итерациях BLC было = (BLC/(ЧислоПотоков*2)) или что-то в таком духе.
      Такой подход, с одной стороны, не сильно нагружает процессор и шину lock-операциями, с другой - не позволяет процессору, который освободился раньше, простаивать долгое время (разве что на последних итерациях).
    А может, у вас есть более интересная идея (но не чересчур замороченная)?

    Вопрос №3: достаточно ли указания кол-ва итераций (причём, знаковым значением, т.е. 0x7FFFFFFF для 32-х бит) или обязательно нужно указывать и начальное, и конечное значение счётчика?
    Вариант с указанием обоих границ имеет некоторые сложности в реализации при кол-ве итераций, приближённых к максимально возможным.
    Но всё решаемо, конечно, при необходимости. Другой вопрос: есть ли такая необходимость?

    Что думаете обо всём этом?
    Повторюсь, что это не претензия на какую-то грандиозную задумку, а довольно простой, но всё же полезный инструмент для ускорения циклических вычислений.
     
  2. Pavia

    Pavia Well-Known Member

    Публикаций:
    0
    Регистрация:
    17 июн 2003
    Сообщения:
    2.409
    Адрес:
    Fryazino
    Нето несё. Оптимально плавающее значение. Число рабочех при постройке бассеина должно ровняться колличеству логических ядер.
    Пока она разберётся. Я бы при постройки раздал бы рабочем каждому свою маску affinity mask.
    Если путить на самотёк то безмаски рабочии начнут драться за кэш.
    Нармальные пацаны употребляют double-lock с rollback'ами.

    Делим задачу на число рабочих.
    Если кто закончил первее тот отбирает половину у самого загруженного.
    Для наглядности можете посметреть как это работает во FlashGet.
    Вся работа делится на 3 стадии. Начало транзакции выполнение операций и фиксация результата. При начале входим в транзакцию как есть через обычную проверку. А при фиксации применяем XCHG если есть конфликт и кто-то выполнил первее, то делаем откат rollback. Для большенства задач это пустая процедура.

    У вас будет блокировок в среднем не более чем 2xN, где N - число рабочих.

    Единственно что требуется от программиста это соблюдать свойства транзакций. В основном изолированности данных. Для некоторых задачь потребуется копирование исходных данных и выходных. Но это мелочи по сравнению с блокировками.
     
    Последнее редактирование: 14 авг 2018
  3. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Что значит плавающее? Как оно должно определяться?
    Я хочу создавать потоки заранее, чтобы при многократном использовании for'а не тратить время на создание и уничтожение потоков.

    Получается, по кругу, начиная со следующего относительно проца основного потока? Т.е. если основной на cpu2, то делаем так: thread1=cpu3, thread2=cpu4, thread3=cpu1... (чтобы в случае ограничения юзером кол-ва потоков либо при малом кол-ве итераций + асинхронном запуске for'а процессор основного потока с большей вероятностью был свободен от нитей for'а). Так?
    А основному потоку (на время цикла) есть смысл тоже применять маску из 1 бита (чтобы ОС не меняла номер процессора)?

    Хм... Вот как я это вижу.
    Для каждого потока задано кол-во оставшихся итераций (обозначим его как IC). К примеру, если у нас цикл 0...999 и 4 потока, то на старте для всех нитей IC=250 (и при запуске итерации это число будет уменьшаться без блокировки). Допустим, thread1 закончил раньше. При этом у thread2:IC=30, thread3:IC=20, thread4:IC=10.
    1. Ищем максимальное IC. Если это значение = 0, завершаем свою работу. Иначе мы нашли нить, у которой будем отбирать работу (это thread2, у него IC=30, обозначим его как ICx).
    2. Вычисляем NewIC = ICx/2 = 15.
    3. Делаем xchg ICx,NewIC (тут срабатывает блокировка).
    4. Смотрим какое значение мы получили через этот обмен. Если оно <= NewIC, значит goto 1 (такое возможно, когда ICx=1 или около того, либо ещё какой-то поток тоже закончил работу и влез раньше нас).
    5. Устанавливаем у себя IC = ICx-NewIC, выполняем оставшиеся итерации.
    Нормальная схема?

    Именно для этого я и хочу передавать процедуре цикла номер нити. Чтобы можно было заранее выделить N блоков с результатом и туда записывать их (если это, к примеру, сумма или произведение результатов каждой итерации цикла). Очень странно, что в Embarcadero до такого не додумались.
    А ещё, думаю, нужно при запуске for'а задать процедуру обработки результата, которая будет запускаться после for'а (в нитке, выполнившей последнюю итерацию). Полезно при асинхронных запусках for'а.
    Зачем запускать for асинхронно? К примеру, для отображения % выполнения в основном потоке или для ожидания ввода юзером какого-то значения (которое понадобится позже) и пр.
     
    Последнее редактирование: 14 авг 2018
  4. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Логика немного другая.
    Число рабочих при кладке кирпича должно равняться кол-ву строительных лесов * 2 (чтобы пока один намазывает раствор, другой переносил кирпич) :)
     
  5. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Ещё такой момент интересует: т.к. итерации всё равно будут идти не совсем последовательно (скажем, 1,3,2,5,4,6,7,9,8,11,10,13,12...), может, вообще стоит делить весь цикл на равные части и отдавать потокам целиковые куски?
    Т.е. не так:
    thread1=0,4,8,12...
    thread2=1,5,9,13...
    thread3=2,6,10,14...
    thread4=3,7,11,15...
    А вот так:
    thread1=0..249
    thread2=250..499
    thread3=500..749
    thread4=750..999
    Для упрощения реализации (и ускорения).
    В итоге итерации будут происходить примерно в таком виде: (0,250,750,1,1000,251,751,1001,252,2,752,253,1002,3,753,503,1003...).
    Или не стоит, т.к. иногда этот вариант может быть более напряжным? Хотя и там, и там цикл не последовательный...

    Ну и вопрос №3 (см. первый пост) тоже интересует...
     
  6. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Ещё и начальное значение нужно хранить, чтобы "отбирающий" поток знал, с какого места ему продолжать (т.к. "отбираемый" мог тоже у кого-то отобрать) :)
     
  7. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Вернее, не начальное значение нужно хранить, а номер следующей итерации.
    Более того, тут уже попахивает cmpxchg8b, ибо между нашим xchg ICx и чтением номера следующей итерации отбираемый поток может закончить свою работу и отобрать у кого-то другого, поменяв оба значения (когда мы одно обновили/получили, а второе только собираемся читать).
     
  8. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Я тут ещё прикинул, что лучше хранить номер последней итерации и кол-во оставшихся итераций (для каждого потока).
    Рабочий поток будет на каждом цикле уменьшать кол-во оставшихся (без lock'а), а номер текущей итерации будет храниться просто в регистре (другим он особо не нужен).
    Обирающий поток будет проверять кол-во оставшихся итераций и просто читать номер последней итерации, чтобы вычислить откуда ему продолжить (это всё без lock'а), а lock cmpxchg8b здесь будет работать эффективнее, т.к. поток меняет только одно из значений, и меньше вероятности "несрабатывания" (сравнения с ложным результатом). При этом решается ещё одна проблема: если хранить номер следующей (а не последней) итерации и кол-во оставшихся, рабочему потоку нужно как-то менять эти 2 значения одновременно. А тут только одно.
     
  9. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Кстати, кто мне скажет: может ли поток, выполняющий операцию с lock'ом вклиниться между микрооперациями, скажем, инструкции dec? Т.е.:
    time thread1 (dec) thread2 (xchg)
    0 temp = memory
    1 temp-- locked swap(memory, reg)
    2 memory = temp
    ???
    Или lock всегда ждёт завершения инструкции, выполняемой другими ядрами? (Хотя, сдаётся мне, что не ждёт... блин... тогда вариант уменьшения кол-ва оставшихся итераций из рабочего потока без lock'а не канает :dash1:).
     
  10. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Вам нужны CAS-операции (Compare And Swap)? Тогда вам не xchg инструкция нужна, а lock cmpxchg. На CAS-операциях можно атомарные счётчики строить, вот пример декремента:
    Код (ASM):
    1.  
    2. mov eax, [atomic_var] ; Сохраняем в eax текущее значение счётчика
    3. repeat:
    4. lea ebx, [eax-1] ; Вычисляем новое значение как eax - 1
    5. lock cmpxchg [atomic_var], ebx ; сравниваем содержимое atomic_var c eax. Если равны, то ZF = 1 и [atomic_var] = ebx, иначе ZF=0 и eax = [atomic_var]
    6. jnz repeat
    7. ; Здесь получаем гарантированно уменьшенную на 1 atomic_var, а в ebx - значение, полученное после декремента.
    8.  
     
  11. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    SadKo, здесь можно и lock xadd'ом обойтись для вычитания единицы, гораздо эффективнее будет. Другой вопрос (принципиальный) – как сделать так, чтобы в основном потоке не было lock'ов? Чтобы lock'и были только при завершении работы какого-то потока раньше остальных и отбирании части итераций у другого.

    Типа такого...
    Рабочий поток:
    Код (ASM):
    1. ; edi - номер нашего потока (предположим, что у нас первое дв. слово в массиве ThreadCounters - счётчик оставшихся итераций (включая текущий выполняемый), второе - номер итерации, следующей за последней для данного потока)
    2.   mov eax,-1
    3.   xadd [ThreadCounters+edi*8],eax  ; делаем без lock'а!!!
    4.   jnz .loop
    Отбирающий поток (который уже завершил свою работу):
    Код (ASM):
    1. ; eax уже содержит максимальное кол-во оставшихся итераций ([ThreadCounters+edi*8]) потока номер edi, у которого мы отбираем итерации,
    2.   cmp eax,1
    3.   jbe .exit  ; если нет потоков с достаточным кол-вом итераций, завершаем цикл текущего потока
    4. ; нам надо его обновить eax, ибо с момента его получения могло пройти уже несколько десятков тактов :)
    5.   mov edx,[ThreadCounters+edi*8+4]  ; номер итерации, следующей за последней
    6.   mov eax,[ThreadCounters+edi*8]
    7. .tryagain:
    8.   cmp eax,1
    9.   jbe .findanother  ; там выполняется уже последняя итерация, либо работа завершена?
    10.   lea ebx,[eax+1]
    11.   shr ebx,1  ; новое кол-во оставшихся операций потока edi
    12.   mov esi,eax
    13.   sub esi,ebx  ; esi = кол-во отбираемых итераций, т.е. на какое число итераций нам нужно уменьшить номер итерации, следующей за последней
    14.   mov ecx,edx
    15.   sub ecx,esi  ; ecx = новый номер итерации, следующей за последней потока edi
    16.   lock cmpxchg8b [ThreadCounters+edi*8]  ; сравнение с edx:eax и обмен с ecx:ebx
    17.   jnz .tryagain  ; не совпало - попробуем ещё раз (в edx:eax уже новые значения)!
    18.   mov [ThreadCounters+ebp*8+4],edx  ; будем считать, что в ebp номер нашего (отбирающего) потока
    19.   mov [ThreadCounters+ebp*8],esi
    20. ; ecx = номер следующей итерации для нашего потока
    Как-то так...
    Но тут есть проблема: если lock cmpxchg влезет внутрь xadd'а (причём, в неудачном месте), то трындец. Мы потеряем новое значение.

    Правда, у меня тут возникла одна мысль по поводу того, как можно улучшить ситуацию. Но надо эту мысль обмозговать...
     
  12. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    А что у нас товарищ Pavia молчит, как партизан? :)
     
  13. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Никак. Либо если разница между последним значением и текущим большая, то вычитать сразу по несколько чисел (выделять пачками) и обрабатывать их. Так можно минимизировать взаимные блокировки.
    А работа с атомиками - не такая уж дорогая операция по сравнению с штатными примитивами синхронизации (семафоры и т.д.).
     
  14. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Про критические секции и иже с ними семафоры и мьютексы речи даже не идёт.
    Операция относительно недорогая, когда нет lock'ов из нескольких потоков одновременно. А когда несколько потоков делают lock'и чуть ли через 2 инструкции, разница в скорости (с lock'ом и без) будет в десятки раз. К примеру, мы решили через распараллеливание просуммировать пару гигов памяти. И вся процедура цикла - это фактически add и ret. Такая параллельная работа может оказаться дольше однопоточной. Пример, может, не самый расхожий, но почему бы нет?
    Если без lock'ов это сделать нельзя, тогда мой третий вариант из первого сообщения будет наиболее оптимальным...

    Но я уверен, что вариант есть. Надо только хорошенько подумать :)
     
    Последнее редактирование: 15 авг 2018
  15. Black_mirror

    Black_mirror Active Member

    Публикаций:
    0
    Регистрация:
    14 окт 2002
    Сообщения:
    1.035
    Jin X, есть такой вариант:
    От задачи откусываются части, размер которых увеличивается на 5-10% или на фиксированное число итераций, и загоняются в массив начиная с самой большей.
    Потоки идут по массиву и командой xchg вытаскивают себе кусочек задачки обменивая его на 0, и выполняя если им достался не 0, иначе идут дальше.
    Когда все потоки дойдут до конца массива, задача будет решена, останется только результат собрать.
     
  16. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Ну тут опять lock, хотя не на каждой итерации. По сути, это не сильно отличается от моего 3-го варианта в первом сообщении. Только у меня без массива было.
     
  17. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Значит, вы неправильно параллелите задачи. Задача должна быть распараллелена так, чтобы стоимость взятия блокировки на атомарной операции была в разы меньше стоимости всех остальных операций.
    Если вам надо просуммировать несколько гигов памяти, то делите эти гиги памяти на блоки одинаковой длины и выполняете параллельную работу над блоками, при этом у каждого потока будет своё значение суммы, а в конце аггрегигуете все полученные суммы в одну.

    Вообще, при разработке многопоточных приложений я пользуюсь одним простым правилом: если вы хотите написать высокопроизводительное многопоточное приложение, то сведите конкуренцию к минимуму.
     
  18. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Это задача программиста, использующую параллельный for. А одна из моих задач, как автора этого механизма, минимизировать lock'и. Чтобы даже если программист будет тупо считать память побайтно, всё работало максимально быстро.

    Вот я и думаю о том, как её свести к минимуму. Для этого нужно либо выделить всем разные блоки (для 4 потоков – по 1/4) и не париться, если кто-то закончил раньше. Либо выдавать кусками (и лочить после каждого куска). Либо придумать что-то ещё, чем я и занимаюсь :)


    Собственно, я уже придумал такую схему, без lock'ов в основном цикле.
    Локи будут только у перехватчиков (это для алгоритма с перехватами, описанного чуть выше [после слов "Вот как я это вижу"], тут я его немного модифицировал, но суть та же).

    Теперь у каждого потока есть счётчик текущей и последней выполняемой для данного потока итерации (Cur и Last). В массиве ThreadCounters они расположены друг за другом (Cur0,Last0, Cur1,Last1, Cur2,Last2...).
    Код работающего потока:
    Код (ASM):
    1. ; ebx - номер текущего цикла (Cur), на старте - начальный номер
    2. ; esi - номер текущего потока, начиная с 0
    3.   cmp ebx,[ThreadCounters+esi*8+4]  ; Cur <= Last ?
    4.   jle .start  ; да, начинаем
    5.   jmp .finish  ; нет, пустой цикл, сразу выходим
    6. .loop:
    7.   mov [ThreadCounters+ebx*8],ebx  ; Cur (mov атомарен, lock ненужен) :)
    8. .start:
    9.   stdcall [MainProc]  ; вызов основной функции цикла
    10.   inc ebx
    11.   cmp ebx,[ThreadCounters+ebx*8+4]  ; Cur+1 <= Last ?
    12.   jle .loop  ; да, продолжаем цикл
    13.  
    14. .findjob:
    15. ; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток,
    16. ; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает)
    Перехватчик (отбирающий поток) делает так:
    Код (ASM):
    1. ; edi содержит номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 0
    2. ;   [хотя меньше 0 она не должна быть], завершаем работу, не доходя до этого места в коде)
    3.   mov ebp,ATTEMPT_COUNT  ; кол-во попыток отобрать работу у какого-либо потока, пусть будет = 4
    4.   mov edx,[ThreadCounters+edi*8+4]  ; edx = Last (обновляем данные)
    5.   mov eax,[ThreadCounters+edi*8]  ; eax = Cur
    6. .tryagain:
    7.   mov ebx,edx
    8.   sub ebx,eax  ; Last - Cur
    9. ; какой результат мы можем тут получить?
    10. ; если поток завершил работу или выполняет последнюю итерацию, то Cur = Last, ebx = 0
    11. ; если поток выполняет не самую последнюю итерацию, то ebx > 0 (даже если кол-во итераций = 0x80000000, а больше
    12. ;   быть не может, т.к. при работе в 1 потоке мы будет действовать по-другому, там всё это нагромождение ни к чему)
    13.   cmp ebx,1
    14.   jle .findjob  ; если этот поток уже завершён, выполняет последнюю итерацию или у него осталась ещё лишь 1 итерация,
    15.   ;   попробуем найти какой-нибудь другой поток (хотя... может, лучше сразу на выход?); последнюю итерацию отбирать
    16.   ;   нельзя, иначе может произойти казус, если lock cmpxchg8b попадёт между cmp-jle-mov в цикле кода работающего потока
    17. ;  inc ebx  ; раскомментировать, если мы хотим делить нечётные числа в свою пользу, а не в пользу потока-владельца
    18.   shr ebx,1  ; кол-во отбираемых итераций
    19.   mov ecx,edx
    20.   sub ecx,ebx  ; ecx = новый номер последней итерации, т.е. новый Last = (Last-(Last-Cur)/2)
    21.   mov ebx,eax  ; ebx = Cur, его мы не меняем, а только проверяем (чтобы он случайно не вышел за границы нового Last'а)
    22.   lock cmpxchg [ThreadCounters+edi*8],ecx  ; if (edx:eax == Last:Cur) { Last = ecx; Cur = ebx; )
    23.   jnz .robfail  ; облом! :(
    24. ; ура, успех!
    25.   lea ebx,[ecx+1]  ; наш новый Cur = Last+1 потока, у которого мы отобрали работу
    26.   mov [ThreadCounters+esi*8+4],0x80000000  ; Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет
    27.   ;   (влезая между двумя следующими mov'ами)
    28.   mov [ThreadCounters+esi*8],ebx  ; Cur
    29.   mov [ThreadCounters+esi*8+4],edx  ; наш новый Last
    30.   jmp .start
    31.  
    32. .robfail:
    33.   cmp edx,[ThreadCounters+edi*8+4]
    34.   jne .findjob  ; какая-то зараза (другой поток) успел влезть и перехватить работающий поток раньше нас - идём искать новую работу! :(
    35.   dec ebp
    36.   jnz .tryagain  ; мы просто не успели, поток перешёл на следующую итерацию, пробуем ещё ebp раз...
    37. ; если кол-во попыток исчерпано, продолжать смысла нет, поток слишком быстро отрабатывает итерации
    38.  
    39. .finish:
    40. ; завершение цикла для данного потока
    Вроде всё должно быть чётко. Но прошу проследить за моей мыслью, вдруг я что-то не учёл...
     
    Последнее редактирование: 16 авг 2018
  19. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Очепятка, ёлы-палы, не lock cmpxchg [ThreadCounters+edi*8],ecx там, а lock cmpxchg8b [ThreadCounters+edi*8] конечно же!
    Сначала хотел сделать без 8b, потом всё переделал, а тут забыл поменять обратно :wacko:
     
  20. Jin X

    Jin X Active Member

    Публикаций:
    0
    Регистрация:
    15 янв 2009
    Сообщения:
    369
    Адрес:
    Кольца Сатурна
    Всё, я придумал более интересный вариант!
    В кодах выше при очень быстрых циклах перехват невозможен, т.к. cmpxchg8b будет возвращать всё время nz, и после нескольких неудачных попыток перехвата поток уйдёт грустить.

    Что делаем в новом варианте?
    Снова возвращаемся к схеме, где у нас есть кол-во оставшихся итераций (Remain, раньше я его называл IC) [так даже будет удобнее искать самый отстающий поток] и номер последней итерации (Last).

    • Работающий поток выполняет основную процедуру цикла (MainProc), затем уменьшает счётчик оставшихся итераций на 1 и продолжает цикл, если результат > 0 (классический цикл). Далее проверяет номер последней итерации Last на значение 0x80000000 (это флаг, см. ниже). Если равно, тупо ждёт, когда изменится (это нужно для того, чтобы не прокрутить цикл мимо Last, пока он обновляется перехватчиком). Даже если у нас цикл от 0x80000000 до 0x80000000 (а числа у нас знаковые, т.е. это MinInt), то Remain изначально будет = 1, и после первого dec [Remain] цикл завершится, т.е. до этого места он даже не дойдёт и не поток не зависнет.
    • Перехватчик через xchg устанавливает Last = 0x80000000 (SpinLock, фактически... вернее, тут только Lock, а Spin, как я сказал выше, будет в работающем потоке). Если там уже было это значение, то идём искать другого, у кого можно "отжать" работу (ибо кто-то уже успел перехватить итерации этого потока раньше нас). Далее спокойно делаем свои вычисления и записываем новое значение в Last работающего потока, одновременно освобождая его из заточения. ВСЁ!!! Элементарно, Ватсон :). Даже если мы попадём в неудачное место, и работающий поток прокрутит ещё 1 итерацию, это не страшно, после установки флага 0x80000000 мы проверим, чтобы у нас был запас хотя бы в 1 итерацию.

    Тут есть, правда, один нюанс: если работающий и перехватчик находятся на одном ядре, то после установки флага 0x80000000, когда мы пойдём на поиск новой работы, мы снова найдём этот же поток. Но это легко решается проверкой Last'а на значение 0x80000000 - такие потоки мы просто не берём во внимание.

    Чуть позже (как напишу) скину код.
     
    Последнее редактирование: 17 авг 2018