Недавно сдавал одно ДЗ по системному программированию. Задание-то сдал, но с багами разобраться до сих пор не могу. Язык - си (не C++). Смысл задания в том, чтобы реализовать синхонизированный, но допускающий максимально возможную параллелизацию, циклический связный список, являющийся промежуточным буфером между производителями и потребителями. Над списком можно производить две основные операции: fetch (вынуть элемент из списка-буфера) и deposit (поместить элемент в список-буфер). Соответственно используются два указателя на начало (первый занятый элемент буфера) и конец (первый пустой элемент) буфера. При этом fetch блокирует поток до лучших времён, если буфер пуст, а deposit блокирует поток до лучших времён, если буфер забит до отказа. Всё это дело выглядит примерно следующим образом: Имеется три вида потоков: читающие входной файл, обрабатывающие данные и выводящие результаты. Между ними используется ровно два буфера, реализованных вышеописанным образом: входной и выходной. Все читающие читают данные из файла и пишут их во входной буфер. Все обрабатывающие берут данные из входного буфера, выполняют над ними некую ресурсоёмкую задачу (в предопределённой ф-ии search_and_insert) и кладут результаты работы в выходной буфер (всё ещё в рамках ф-ии search_and_insert); сама ф-ия search_and_insert нас не интересует. Все выводящие берут данные из выходного буфера и выводят их на экран. Для того, чтобы последующие потоки знали, когда завершаться, в буфер кладётся dummy-элемент, являющийся четырьмя нулевыми байтами (в задании предопределено, что входной файл не может содержать нулевых символов). Каждого из видов потоков создаётся одинаковое количество, задаваемое в параметрах запуска программы. Приведу только основные фрагменты кода: buffer.h: Код (Text): #include <pthread.h> /** Maximum number of elements in buffer */ #define RINGBUFFER_SIZE 20 /* Buffer element structure */ typedef struct _ITEM { struct _ITEM *next; char *string; } ITEM; typedef struct { pthread_mutex_t mutex; pthread_cond_t cond; _Bool locked; } WAIT; /** * Represents a synchronized ringbuffer-objekt with fixed size */ typedef struct { WAIT empty_wait; WAIT full_wait; pthread_mutex_t fetchLock; pthread_mutex_t depositLock; ITEM *start; ITEM *end; } ringbuffer_t; buffer.c: Код (Text): #include "buffer.h" #include <stdlib.h> #include <stdbool.h> #include <stdio.h> int init_buffer(ringbuffer_t* buf) { if (!(buf->start = buf->end = (ITEM *)calloc(1,sizeof(ITEM)))) return -1; ITEM *curItem = buf->start; for (int par = 1; par < RINGBUFFER_SIZE; curItem = curItem->next, par++) if(!(curItem->next = (ITEM *)calloc(1,sizeof(ITEM)))) { destroy_buffer(buf); return -1; } curItem->next = buf->start; pthread_cond_init(&buf->full_wait.cond,NULL);pthread_cond_init(&buf->empty_wait.cond,NULL); pthread_mutex_init(&buf->depositLock,NULL);pthread_mutex_init(&buf->fetchLock,NULL); pthread_mutex_init(&buf->full_wait.mutex,NULL);pthread_mutex_init(&buf->empty_wait.mutex,NULL); buf->full_wait.locked = false; buf->empty_wait.locked = true; return 0; } void deposit(ringbuffer_t* buf, char* str) { pthread_mutex_lock(&buf->depositLock); pthread_mutex_lock(&buf->full_wait.mutex); while (buf->full_wait.locked) pthread_cond_wait(&buf->full_wait.cond, &buf->full_wait.mutex); buf->end->string = str; buf->end = buf->end->next; if (buf->end->string) buf->full_wait.locked = true; pthread_mutex_unlock(&buf->full_wait.mutex); pthread_mutex_lock(&buf->empty_wait.mutex); if (buf->empty_wait.locked && buf->start->string) { buf->empty_wait.locked = false; pthread_cond_signal(&buf->empty_wait.cond); } pthread_mutex_unlock(&buf->empty_wait.mutex); pthread_mutex_unlock(&buf->depositLock); } char* fetch(ringbuffer_t* buf) { char *stringBuf; pthread_mutex_lock(&buf->fetchLock); pthread_mutex_lock(&buf->empty_wait.mutex); while (buf->empty_wait.locked) pthread_cond_wait(&buf->empty_wait.cond, &buf->empty_wait.mutex); stringBuf = buf->start->string; buf->start->string = NULL; buf->start = buf->start->next; if (!buf->start->string) buf->empty_wait.locked = true; pthread_mutex_unlock(&buf->empty_wait.mutex); pthread_mutex_lock(&buf->full_wait.mutex); if (buf->full_wait.locked && !buf->end->string) { buf->full_wait.locked = false; pthread_cond_signal(&buf->full_wait.cond); } pthread_mutex_unlock(&buf->full_wait.mutex); pthread_mutex_unlock(&buf->fetchLock); return stringBuf; } void destroy_buffer(ringbuffer_t* buf) { ITEM *nextItem = buf->start, *curItem; while (nextItem) { curItem = nextItem; nextItem = nextItem->next; free(curItem); if (nextItem == buf->start) break; } } main.c Код (Text): void* reader_thread(void* value) { char *chunk; size_t bytesRead; while (!feof(file)) { if (!(chunk = calloc(1,CHUNK_SIZE))) { printf("Could not allocate memory %s:\n", strerror(errno)); return NULL; } bytesRead = fread(chunk, CHUNK_SIZE, 1, file); deposit(&inBuffer, chunk); } chunk = calloc(1,4); deposit(&inBuffer, chunk); return NULL; } void* analyzer_thread(void* value) { char *chunk; for(;;) { chunk = fetch(&inBuffer); if (chunk[0]) { search_and_insert(chunk, strlen(chunk), &outBuffer); free(chunk); } else { deposit(&outBuffer, chunk); break; } } return NULL; } void* printer_thread(void* value) { char *chunk; for(;;) { chunk = fetch(&outBuffer); if (chunk[0]) { printf("%s\n", chunk); free(chunk); } else { free(chunk); break; } } return NULL; } Основной баг состоит в том, что после полной отработки на входном файле, один или несколько потоков reader_thread часто блокируются при попытке депозита dummy-элемента в заполненный буфер и никогда не разблокируются. При этом общее число задепозиченых dummy-элементов каким-то волшебным образом превышает число reader_thread-потоков на несколько десятков. Баг проявляется только на общем количестве всех потоков, превышающем размер буфера. Т.е. при размере буфера в 20 элементов баг проявляется на числе потоков, начиная с 21*3. Полные исходники+makefile в приложенном файле. Компилируются под unix или cygwin. Файл для тестов: http://www.megaupload.com/?d=3CC26BQ3 make run (идентичен запуску через: yeti.exe yeti.10M.txt 25) скомпилирует всё это дело и запустит тест на вышеуказанном тестовом файле при 25*3 потоках. make stat скомпилирует всё это дело и выполнит тесты на тестовых файлах для 1*3,2*3, ... 64*3 потоков.
Код (Text): void* reader_thread(void* value) .... } chunk = [b]calloc(1,4);[/b] deposit(&inBuffer, chunk); return NULL; А вы уверены что эти 4 байта - нули? Проверьте - не выделяется ли в chunk мусор - может туда нули надо записать?
AndreyMust19 Уверен. Если Вы сомневаетесь, что в unix/cygwin не так, как в MS CRT, см. спецификацию posix.
Ошибка оказалась гораздо более тупой и очевидной, чем мне казалось, и не связана с синхронизацией. Когда очередной поток типа reader_thread пытается выполнить fread после достижения конца входного файла, в массив chunk очевидно ничего не записывается и он остаётся инициализированным нулями. Т.о. он будет распознан последующими потоками, как dummy-элемент. Соответственно следующая попытка запихать реальный dummy-элемент в буфер приведёт к тому, что он будет лишним в буфере и, упрощённо говоря, не будет никем вытянут. Что при количестве reader_thread-потоков превышающем размер буфера приводит к тому, что буфер переполняется и очередной поток при попытке выполнить deposit блокируется навсегда. Тем не менее в коде есть ещё одна ошибка, т.к. в 30-35% запусков одна из выходных строк после обработки приложенного тестового входного файла (причём всё время одна и та же) появляется в выходном буфере дважды.