C. Unix. Синхронизация производителей-потребителей.

Тема в разделе "LANGS.C", создана пользователем l_inc, 19 дек 2009.

  1. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    Недавно сдавал одно ДЗ по системному программированию. Задание-то сдал, но с багами разобраться до сих пор не могу. :)
    Язык - си (не C++). Смысл задания в том, чтобы реализовать синхонизированный, но допускающий максимально возможную параллелизацию, циклический связный список, являющийся промежуточным буфером между производителями и потребителями.
    Над списком можно производить две основные операции: fetch (вынуть элемент из списка-буфера) и deposit (поместить элемент в список-буфер). Соответственно используются два указателя на начало (первый занятый элемент буфера) и конец (первый пустой элемент) буфера. При этом fetch блокирует поток до лучших времён, если буфер пуст, а deposit блокирует поток до лучших времён, если буфер забит до отказа. Всё это дело выглядит примерно следующим образом:
    [​IMG]
    Имеется три вида потоков: читающие входной файл, обрабатывающие данные и выводящие результаты. Между ними используется ровно два буфера, реализованных вышеописанным образом: входной и выходной. Все читающие читают данные из файла и пишут их во входной буфер. Все обрабатывающие берут данные из входного буфера, выполняют над ними некую ресурсоёмкую задачу (в предопределённой ф-ии search_and_insert) и кладут результаты работы в выходной буфер (всё ещё в рамках ф-ии search_and_insert); сама ф-ия search_and_insert нас не интересует. Все выводящие берут данные из выходного буфера и выводят их на экран.
    Для того, чтобы последующие потоки знали, когда завершаться, в буфер кладётся dummy-элемент, являющийся четырьмя нулевыми байтами (в задании предопределено, что входной файл не может содержать нулевых символов). Каждого из видов потоков создаётся одинаковое количество, задаваемое в параметрах запуска программы.
    Приведу только основные фрагменты кода:
    buffer.h:
    Код (Text):
    1. #include <pthread.h>
    2.  
    3. /** Maximum number of elements in buffer */
    4. #define RINGBUFFER_SIZE 20
    5.  
    6. /* Buffer element structure */
    7. typedef struct _ITEM
    8. {
    9.     struct _ITEM *next;
    10.     char *string;
    11. } ITEM;
    12. typedef struct
    13. {
    14.     pthread_mutex_t mutex;
    15.     pthread_cond_t cond;
    16.     _Bool locked;
    17. } WAIT;
    18.  
    19. /**
    20.  * Represents a synchronized ringbuffer-objekt with fixed size
    21.  */
    22. typedef struct
    23. {
    24.     WAIT empty_wait;
    25.     WAIT full_wait;
    26.     pthread_mutex_t fetchLock;
    27.     pthread_mutex_t depositLock;
    28.     ITEM *start;
    29.     ITEM *end;
    30. } ringbuffer_t;
    buffer.c:
    Код (Text):
    1. #include "buffer.h"
    2.  
    3. #include <stdlib.h>
    4. #include <stdbool.h>
    5. #include <stdio.h>
    6.  
    7. int init_buffer(ringbuffer_t* buf)
    8. {
    9.     if (!(buf->start = buf->end = (ITEM *)calloc(1,sizeof(ITEM))))
    10.         return -1;
    11.  
    12.     ITEM *curItem = buf->start;
    13.     for (int par = 1; par < RINGBUFFER_SIZE; curItem = curItem->next, par++)
    14.         if(!(curItem->next = (ITEM *)calloc(1,sizeof(ITEM))))
    15.         {
    16.             destroy_buffer(buf);
    17.             return -1;
    18.         }
    19.     curItem->next = buf->start;
    20.  
    21.     pthread_cond_init(&buf->full_wait.cond,NULL);pthread_cond_init(&buf->empty_wait.cond,NULL);
    22.     pthread_mutex_init(&buf->depositLock,NULL);pthread_mutex_init(&buf->fetchLock,NULL);
    23.     pthread_mutex_init(&buf->full_wait.mutex,NULL);pthread_mutex_init(&buf->empty_wait.mutex,NULL);
    24.  
    25.     buf->full_wait.locked = false;
    26.     buf->empty_wait.locked = true;
    27.     return 0;
    28. }
    29.  
    30. void deposit(ringbuffer_t* buf, char* str)
    31. {
    32.     pthread_mutex_lock(&buf->depositLock);
    33.         pthread_mutex_lock(&buf->full_wait.mutex);
    34.             while (buf->full_wait.locked)
    35.                 pthread_cond_wait(&buf->full_wait.cond, &buf->full_wait.mutex);
    36.             buf->end->string = str;
    37.             buf->end = buf->end->next;
    38.             if (buf->end->string)
    39.                 buf->full_wait.locked = true;
    40.         pthread_mutex_unlock(&buf->full_wait.mutex);
    41.  
    42.         pthread_mutex_lock(&buf->empty_wait.mutex);
    43.             if (buf->empty_wait.locked && buf->start->string)
    44.             {
    45.                 buf->empty_wait.locked = false;
    46.                 pthread_cond_signal(&buf->empty_wait.cond);
    47.             }
    48.         pthread_mutex_unlock(&buf->empty_wait.mutex);
    49.     pthread_mutex_unlock(&buf->depositLock);
    50. }
    51.  
    52. char* fetch(ringbuffer_t* buf)
    53. {
    54.     char *stringBuf;
    55.     pthread_mutex_lock(&buf->fetchLock);
    56.         pthread_mutex_lock(&buf->empty_wait.mutex);
    57.             while (buf->empty_wait.locked)
    58.                 pthread_cond_wait(&buf->empty_wait.cond, &buf->empty_wait.mutex);
    59.             stringBuf = buf->start->string;
    60.             buf->start->string = NULL;
    61.             buf->start = buf->start->next;
    62.             if (!buf->start->string)
    63.                 buf->empty_wait.locked = true;
    64.         pthread_mutex_unlock(&buf->empty_wait.mutex);
    65.  
    66.         pthread_mutex_lock(&buf->full_wait.mutex);
    67.             if (buf->full_wait.locked && !buf->end->string)
    68.             {
    69.                 buf->full_wait.locked = false;
    70.                 pthread_cond_signal(&buf->full_wait.cond);
    71.             }
    72.         pthread_mutex_unlock(&buf->full_wait.mutex);
    73.     pthread_mutex_unlock(&buf->fetchLock);
    74.     return stringBuf;
    75. }
    76.  
    77. void destroy_buffer(ringbuffer_t* buf)
    78. {
    79.     ITEM *nextItem = buf->start, *curItem;
    80.     while (nextItem)
    81.     {
    82.         curItem = nextItem; nextItem = nextItem->next;
    83.         free(curItem);
    84.         if (nextItem == buf->start)
    85.             break;
    86.     }
    87. }
    main.c
    Код (Text):
    1. void* reader_thread(void* value)
    2. {
    3.     char *chunk;
    4.     size_t bytesRead;
    5.     while (!feof(file))
    6.     {
    7.         if (!(chunk = calloc(1,CHUNK_SIZE)))
    8.         {
    9.             printf("Could not allocate memory %s:\n", strerror(errno));
    10.             return NULL;
    11.         }
    12.         bytesRead = fread(chunk, CHUNK_SIZE, 1, file);
    13.         deposit(&inBuffer, chunk);
    14.     }
    15.     chunk = calloc(1,4);
    16.     deposit(&inBuffer, chunk);
    17.     return NULL;
    18. }
    19.  
    20. void* analyzer_thread(void* value)
    21. {
    22.     char *chunk;
    23.     for(;;)
    24.     {
    25.         chunk = fetch(&inBuffer);
    26.         if (chunk[0])
    27.         {
    28.             search_and_insert(chunk, strlen(chunk), &outBuffer);
    29.             free(chunk);
    30.         }
    31.         else
    32.         {
    33.             deposit(&outBuffer, chunk);
    34.             break;
    35.         }
    36.     }
    37.     return NULL;
    38. }
    39.  
    40. void* printer_thread(void* value)
    41. {
    42.     char *chunk;
    43.     for(;;)
    44.     {
    45.         chunk = fetch(&outBuffer);
    46.         if (chunk[0])
    47.         {
    48.             printf("%s\n", chunk);
    49.             free(chunk);
    50.         }
    51.         else
    52.         {
    53.             free(chunk);
    54.             break;
    55.         }
    56.     }
    57.     return NULL;
    58. }
    Основной баг состоит в том, что после полной отработки на входном файле, один или несколько потоков reader_thread часто блокируются при попытке депозита dummy-элемента в заполненный буфер и никогда не разблокируются. При этом общее число задепозиченых dummy-элементов каким-то волшебным образом превышает число reader_thread-потоков на несколько десятков. Баг проявляется только на общем количестве всех потоков, превышающем размер буфера. Т.е. при размере буфера в 20 элементов баг проявляется на числе потоков, начиная с 21*3.

    Полные исходники+makefile в приложенном файле. Компилируются под unix или cygwin.

    Файл для тестов: http://www.megaupload.com/?d=3CC26BQ3

    make run (идентичен запуску через: yeti.exe yeti.10M.txt 25) скомпилирует всё это дело и запустит тест на вышеуказанном тестовом файле при 25*3 потоках.
    make stat скомпилирует всё это дело и выполнит тесты на тестовых файлах для 1*3,2*3, ... 64*3 потоков.
     
  2. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    Собственно вопрос состоит в том, откуда берётся этот баг. :)
     
  3. AndreyMust19

    AndreyMust19 New Member

    Публикаций:
    0
    Регистрация:
    20 окт 2008
    Сообщения:
    714
    Код (Text):
    1. void* reader_thread(void* value)
    2. ....
    3. }
    4.     chunk = [b]calloc(1,4);[/b]
    5.     deposit(&inBuffer, chunk);
    6.     return NULL;
    А вы уверены что эти 4 байта - нули? Проверьте - не выделяется ли в chunk мусор - может туда нули надо записать?
     
  4. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    AndreyMust19
    Уверен. Если Вы сомневаетесь, что в unix/cygwin не так, как в MS CRT, см. спецификацию posix.
     
  5. l_inc

    l_inc New Member

    Публикаций:
    0
    Регистрация:
    29 сен 2005
    Сообщения:
    2.566
    Ошибка оказалась гораздо более тупой и очевидной, чем мне казалось, и не связана с синхронизацией.
    Когда очередной поток типа reader_thread пытается выполнить fread после достижения конца входного файла, в массив chunk очевидно ничего не записывается и он остаётся инициализированным нулями. Т.о. он будет распознан последующими потоками, как dummy-элемент. Соответственно следующая попытка запихать реальный dummy-элемент в буфер приведёт к тому, что он будет лишним в буфере и, упрощённо говоря, не будет никем вытянут. Что при количестве reader_thread-потоков превышающем размер буфера приводит к тому, что буфер переполняется и очередной поток при попытке выполнить deposit блокируется навсегда.

    Тем не менее в коде есть ещё одна ошибка, т.к. в 30-35% запусков одна из выходных строк после обработки приложенного тестового входного файла (причём всё время одна и та же) появляется в выходном буфере дважды.