Асинхронная работа и многопоточность на примере пайпов

Тема в разделе "WASM.WIN32", создана пользователем Ezrah, 27 сен 2011.

  1. Ezrah

    Ezrah Member

    Публикаций:
    0
    Регистрация:
    22 мар 2011
    Сообщения:
    411
    Пытаюсь вкурить исходник из сэмплов пакета Microsoft Detours:
    Код (Text):
    1. //////////////////////////////////////////////////////////////////////////////
    2. //
    3. //  Detours Test Program (syelogd.cpp of syelogd.exe)
    4. //
    5. //  Microsoft Research Detours Package, Version 3.0.
    6. //
    7. //  Copyright (c) Microsoft Corporation.  All rights reserved.
    8. //
    9. #include <windows.h>
    10. #include <stdio.h>
    11. #include <stdlib.h>
    12. #include <stddef.h>
    13. #include <string.h>
    14. #include "syelog.h"
    15.  
    16. #if (_MSC_VER < 1299)
    17. typedef ULONG * PULONG_PTR;
    18. typedef ULONG ULONG_PTR;
    19. typedef LONG * PLONG_PTR;
    20. typedef LONG LONG_PTR;
    21. #endif
    22.  
    23. enum {
    24.     CLIENT_AWAITING_PIPE_ACCEPT = 0x21,
    25.     CLIENT_AWAITING_PIPE_DATA   = 0x22,
    26. };
    27.  
    28. typedef struct _CLIENT : OVERLAPPED
    29. {
    30.     HANDLE          hPipe;
    31.     BOOL            fAwaitingAccept;
    32.     PVOID           Zero;
    33.     SYELOG_MESSAGE  Message;
    34. } CLIENT, *PCLIENT;
    35.  
    36. //////////////////////////////////////////////////////////////////////////////
    37. //
    38. BOOL        s_fLogToScreen  = TRUE;     // Log output to screen.
    39. BOOL        s_fExitAfterOne = FALSE;
    40. BOOL        s_fDeltaTime    = FALSE;
    41. HANDLE      s_hOutFile      = INVALID_HANDLE_VALUE;
    42.  
    43. LONG        s_nActiveClients = 0;
    44. LONGLONG    s_llStartTime = 0;
    45. LONGLONG    s_llLastTime = 0;
    46.  
    47. BOOL LogMessageV(BYTE nSeverity, PCHAR pszMsg, ...);
    48.  
    49. //////////////////////////////////////////////////////////////////////////////
    50. //
    51. VOID MyErrExit(PCSTR pszMsg)
    52. {
    53.     DWORD error = GetLastError();
    54.  
    55.     LogMessageV(SYELOG_SEVERITY_FATAL, "Error %d in %s.", error, pszMsg);
    56.     fprintf(stderr, "SYELOGD: Error %d in %s.\n", error, pszMsg);
    57.     fflush(stderr);
    58.     exit(1);
    59. }
    60.  
    61. //////////////////////////////////////////////////////////////////////////////
    62. //
    63. static PCSTR FileTimeToString(PCHAR pszBuffer, DWORD cbBuffer, FILETIME ftTime)
    64. {
    65.     (void)cbBuffer;
    66.  
    67.     static BOOL bGotTzi = FALSE;
    68.     static DWORD dwTzi = TIME_ZONE_ID_UNKNOWN;
    69.     static TIME_ZONE_INFORMATION tzi;
    70.     if (!bGotTzi) {
    71.         dwTzi = GetTimeZoneInformation(&tzi);
    72.         if (dwTzi == TIME_ZONE_ID_UNKNOWN) {
    73.             ZeroMemory(&tzi, sizeof(tzi));
    74.         }
    75.         bGotTzi = TRUE;
    76.     }
    77.     SYSTEMTIME stUtc;
    78.     SYSTEMTIME stLocal;
    79.  
    80.     pszBuffer[0] = '\0';
    81.  
    82.     if (s_fDeltaTime) {
    83.         if (s_llLastTime == 0) {
    84.             s_llLastTime = s_llStartTime;
    85.         }
    86.  
    87.         ULARGE_INTEGER ul;
    88.         ul.LowPart = ftTime.dwLowDateTime;
    89.         ul.HighPart = ftTime.dwHighDateTime;
    90.  
    91.         ULONG64 delta = ul.QuadPart - s_llLastTime;
    92.         s_llLastTime = ul.QuadPart;
    93.         delta /= 10000;
    94.  
    95. #ifdef _CRT_INSECURE_DEPRECATE
    96.             sprintf_s(pszBuffer, cbBuffer, "%7I64d", delta);
    97. #else
    98.             sprintf(pszBuffer, "%7I64d", delta);
    99. #endif
    100.     }
    101.     else {
    102.         if (!FileTimeToSystemTime(&ftTime, &stUtc)) {
    103. #ifdef _CRT_INSECURE_DEPRECATE
    104.             sprintf_s(pszBuffer, cbBuffer, "ft:%16I64d", *(LONGLONG *)&ftTime);
    105. #else
    106.             sprintf(pszBuffer, "ft:%16I64d", *(LONGLONG *)&ftTime);
    107. #endif
    108.             return pszBuffer;
    109.         }
    110.         else if (!SystemTimeToTzSpecificLocalTime(&tzi, &stUtc, &stLocal)) {
    111.             CopyMemory(&stLocal, &stUtc, sizeof(stLocal));
    112.         }
    113.  
    114. #ifdef _CRT_INSECURE_DEPRECATE
    115.         sprintf_s(pszBuffer, cbBuffer, "%4d%02d%02d%02d%02d%02d%03d",
    116.                   stLocal.wYear,
    117.                   stLocal.wMonth,
    118.                   stLocal.wDay,
    119.                   stLocal.wHour,
    120.                   stLocal.wMinute,
    121.                   stLocal.wSecond,
    122.                   stLocal.wMilliseconds);
    123. #else
    124.         sprintf(pszBuffer, "%4d%02d%02d%02d%02d%02d%03d",
    125.                 stLocal.wYear,
    126.                 stLocal.wMonth,
    127.                 stLocal.wDay,
    128.                 stLocal.wHour,
    129.                 stLocal.wMinute,
    130.                 stLocal.wSecond,
    131.                 stLocal.wMilliseconds);
    132. #endif
    133.     }
    134.     return pszBuffer;
    135. }
    136.  
    137. BOOL CloseConnection(PCLIENT pClient)
    138. {
    139.     LogMessageV(SYELOG_SEVERITY_INFORMATION, "Client closed pipe.");
    140.  
    141.     InterlockedDecrement(&s_nActiveClients);
    142.     if (pClient != NULL) {
    143.         if (pClient->hPipe != INVALID_HANDLE_VALUE) {
    144.             FlushFileBuffers(pClient->hPipe);
    145.             if (!DisconnectNamedPipe(pClient->hPipe)) {
    146.                 MyErrExit("DisconnectNamedPipe");
    147.             }
    148.             CloseHandle(pClient->hPipe);
    149.             pClient->hPipe = INVALID_HANDLE_VALUE;
    150.         }
    151.         GlobalFree(pClient);
    152.         pClient = NULL;
    153.     }
    154.  
    155.     if (s_fExitAfterOne) {
    156.         ExitProcess(0);
    157.     }
    158.     return TRUE;
    159. }
    160.  
    161. // Creates a pipe instance and initiate an accept request.
    162. //
    163. PCLIENT CreatePipeConnection(HANDLE hCompletionPort)
    164. {
    165.     HANDLE hPipe = CreateNamedPipe(SYELOG_PIPE_NAME,           // pipe name
    166.                                    PIPE_ACCESS_INBOUND |       // read-only access
    167.                                    FILE_FLAG_OVERLAPPED,       // overlapped mode
    168.                                    PIPE_TYPE_MESSAGE |         // message-type pipe
    169.                                    PIPE_READMODE_MESSAGE |     // message read mode
    170.                                    PIPE_WAIT,                   // blocking mode
    171.                                    PIPE_UNLIMITED_INSTANCES,   // unlimited instances
    172.                                    0,                          // output buffer size
    173.                                    0,                          // input buffer size
    174.                                    20000,                      // client time-out
    175.                                    NULL);                      // no security attributes
    176.     if (hPipe == INVALID_HANDLE_VALUE) {
    177.         MyErrExit("CreatePipe");
    178.     }
    179.  
    180.     // Allocate the client data structure.
    181.     //
    182.     PCLIENT pClient = (PCLIENT) GlobalAlloc(GPTR, sizeof(CLIENT));
    183.     if (pClient == NULL) {
    184.         MyErrExit("GlobalAlloc pClient");
    185.     }
    186.  
    187.     ZeroMemory(pClient, sizeof(*pClient));
    188.     pClient->hPipe = hPipe;
    189.     pClient->fAwaitingAccept = TRUE;
    190.  
    191.     // Associate file with our complietion port.
    192.     //
    193.     if (!CreateIoCompletionPort(pClient->hPipe, hCompletionPort, (ULONG_PTR)pClient, 0)) {
    194.         MyErrExit("CreateIoComplietionPort pClient");
    195.     }
    196.  
    197.     if (!ConnectNamedPipe(hPipe, pClient)) {
    198.         if (GetLastError() != ERROR_IO_PENDING &&
    199.             GetLastError() != ERROR_PIPE_LISTENING) {
    200.             MyErrExit("ConnectNamedPipe");
    201.         }
    202.     }
    203.     else {
    204.         LogMessageV(SYELOG_SEVERITY_INFORMATION,
    205.                     "ConnectNamedPipe accepted immediately.");
    206.     }
    207.     return pClient;
    208. }
    209.  
    210. BOOL LogMessageV(BYTE nSeverity, PCHAR pszMsg, ...)
    211. {
    212.     FILETIME ftOccurance;
    213.     CHAR szTime[64];
    214.     GetSystemTimeAsFileTime(&ftOccurance);
    215.     FileTimeToString(szTime, sizeof(szTime), ftOccurance);
    216.  
    217.     if (s_fLogToScreen) {
    218.         printf(s_fDeltaTime
    219.                ? "%-7.7s ---- --.%02x: "
    220.                : "%-17.17s ---- --.%02x: "
    221.                , szTime, nSeverity);
    222.         va_list args;
    223.         va_start(args, pszMsg);
    224.         vprintf(pszMsg, args);
    225.         va_end(args);
    226.         printf("\n");
    227.     }
    228.     if (s_hOutFile != INVALID_HANDLE_VALUE) {
    229.         DWORD cbWritten = 0;
    230.         CHAR szBuf[4096];
    231.         INT cbToWrite;
    232.  
    233. #ifdef _CRT_INSECURE_DEPRECATE
    234.         cbToWrite = _snprintf_s(szBuf, sizeof(szBuf), sizeof(szBuf)-1,
    235.                                 s_fDeltaTime
    236.                                 ? "%-7.7s ---- --.%02x: "
    237.                                 : "%-17.17s ---- --.%02x: "
    238.                                 , szTime, nSeverity);
    239. #else
    240.         cbToWrite = _snprintf(szBuf, sizeof(szBuf),
    241.                               s_fDeltaTime
    242.                               ? "%-7.7s ---- --.%02x: "
    243.                               : "%-17.17s ---- --.%02x: "
    244.                               , szTime, nSeverity);
    245. #endif
    246.  
    247.         va_list args;
    248.         va_start(args, pszMsg);
    249. #ifdef _CRT_INSECURE_DEPRECATE
    250.         cbToWrite += _vsnprintf_s(szBuf + cbToWrite,
    251.                                   sizeof(szBuf) - cbToWrite,
    252.                                   sizeof(szBuf) - cbToWrite - 1,
    253.                                   pszMsg, args);
    254. #else
    255.         cbToWrite += _vsnprintf(szBuf + cbToWrite, sizeof(szBuf) - cbToWrite, pszMsg, args);
    256. #endif
    257.         va_end(args);
    258.  
    259. #ifdef _CRT_INSECURE_DEPRECATE
    260.         cbToWrite += _snprintf_s(szBuf + cbToWrite,
    261.                                  sizeof(szBuf) - cbToWrite,
    262.                                  sizeof(szBuf) - cbToWrite - 1,
    263.                                  "\n");
    264. #else
    265.         cbToWrite += _snprintf(szBuf + cbToWrite, sizeof(szBuf) - cbToWrite, "\n");
    266. #endif
    267.  
    268.         if (cbToWrite < 0) {
    269.             szBuf[sizeof(szBuf)-2] = '\n';
    270.             szBuf[sizeof(szBuf)-1] = '\0';
    271.             cbToWrite = sizeof(szBuf);
    272.         }
    273.         WriteFile(s_hOutFile, szBuf, cbToWrite, &cbWritten, NULL);
    274.     }
    275.     return TRUE;
    276. }
    277.  
    278. BOOL LogMessage(PSYELOG_MESSAGE pMessage, DWORD nBytes)
    279. {
    280.     // Sanity check the size of the message.
    281.     //
    282.     if (nBytes > pMessage->nBytes) {
    283.         nBytes = pMessage->nBytes;
    284.     }
    285.     if (nBytes >= sizeof(*pMessage)) {
    286.         nBytes = sizeof(*pMessage) - 1;
    287.     }
    288.  
    289.     // Don't log message if there isn't and message text.
    290.     //
    291.     if (nBytes <= offsetof(SYELOG_MESSAGE, szMessage)) {
    292.         return FALSE;
    293.     }
    294.  
    295.     CHAR szTime[64];
    296.     FileTimeToString(szTime, sizeof(szTime), pMessage->ftOccurance);
    297.  
    298.     PCHAR pszMsg = pMessage->szMessage;
    299.     while (*pszMsg) {
    300.         pszMsg++;
    301.     }
    302.     while (pszMsg > pMessage->szMessage && isspace(pszMsg[-1])) {
    303.         *--pszMsg = '\0';
    304.     }
    305.  
    306.     if (s_fLogToScreen) {
    307.         printf(s_fDeltaTime
    308.                ? "%-7.7s %4d %02x.%02x: %s\n"
    309.                : "%-17.17s %4d %02x.%02x: %s\n",
    310.                szTime,
    311.                pMessage->nProcessId,
    312.                pMessage->nFacility,
    313.                pMessage->nSeverity,
    314.                pMessage->szMessage);
    315.     }
    316.     if (s_hOutFile != INVALID_HANDLE_VALUE) {
    317.         DWORD cbWritten = 0;
    318.         CHAR szBuf[4096];
    319.         INT cbToWrite;
    320.  
    321. #ifdef _CRT_INSECURE_DEPRECATE
    322.         cbToWrite = _snprintf_s(szBuf, sizeof(szBuf), sizeof(szBuf) - 1,
    323.                                 s_fDeltaTime
    324.                                 ? "%-7.7s %4d %02x.%02x: %s\n"
    325.                                 : "%-17.17s %4d %02x.%02x: %s\n",
    326.                                 szTime,
    327.                                 pMessage->nProcessId,
    328.                                 pMessage->nFacility,
    329.                                 pMessage->nSeverity,
    330.                                 pMessage->szMessage);
    331. #else
    332.         cbToWrite = _snprintf(szBuf, sizeof(szBuf),
    333.                               s_fDeltaTime
    334.                               ? "%-7.7s %4d %02x.%02x: %s\n"
    335.                               : "%-17.17s %4d %02x.%02x: %s\n",
    336.                               szTime,
    337.                               pMessage->nProcessId,
    338.                               pMessage->nFacility,
    339.                               pMessage->nSeverity,
    340.                               pMessage->szMessage);
    341. #endif
    342.  
    343.         if (cbToWrite < 0) {
    344.             szBuf[sizeof(szBuf)-2] = '\n';
    345.             szBuf[sizeof(szBuf)-1] = '\0';
    346.             cbToWrite = sizeof(szBuf);
    347.         }
    348.         WriteFile(s_hOutFile, szBuf, cbToWrite, &cbWritten, NULL);
    349.     }
    350.     return TRUE;
    351. }
    352.  
    353. DWORD WINAPI WorkerThread(LPVOID pvVoid)
    354. {
    355.     PCLIENT pClient;
    356.     BOOL b;
    357.     LPOVERLAPPED lpo;
    358.     DWORD nBytes;
    359.     HANDLE hCompletionPort = (HANDLE)pvVoid;
    360.  
    361.     for (BOOL fKeepLooping = TRUE; fKeepLooping;) {
    362.         pClient = NULL;
    363.         lpo = NULL;
    364.         nBytes = 0;
    365.         b = GetQueuedCompletionStatus(hCompletionPort,
    366.                                       &nBytes, (PULONG_PTR)&pClient, &lpo, INFINITE);
    367.  
    368.         if (!b || lpo == NULL) {
    369.             fKeepLooping = FALSE;
    370.             MyErrExit("GetQueuedCompletionState");
    371.             break;
    372.         }
    373.         else if (!b) {
    374.             if (pClient) {
    375.                 if (GetLastError() == ERROR_BROKEN_PIPE) {
    376.                     LogMessageV(SYELOG_SEVERITY_INFORMATION, "Client closed pipe.");
    377.                 }
    378.                 else {
    379.                     LogMessageV(SYELOG_SEVERITY_ERROR,
    380.                                 "GetQueuedCompletionStatus failed %d [%p]",
    381.                                 GetLastError(), pClient);
    382.                 }
    383.                 CloseConnection(pClient);
    384.             }
    385.             continue;
    386.         }
    387.  
    388.         if (pClient->fAwaitingAccept) {
    389.             InterlockedIncrement(&s_nActiveClients);
    390.             pClient->fAwaitingAccept = FALSE;
    391.             b = ReadFile(pClient->hPipe,
    392.                          &pClient->Message,
    393.                          sizeof(pClient->Message),
    394.                          &nBytes,
    395.                          pClient);
    396.             if (!b) {
    397.                 if (GetLastError() != ERROR_IO_PENDING) {
    398.                     LogMessageV(SYELOG_SEVERITY_ERROR,
    399.                                 "ReadFile failed %d.", GetLastError());
    400.                     continue;
    401.                 }
    402.             }
    403.  
    404.             CreatePipeConnection(hCompletionPort);
    405.         }
    406.         else {
    407.             if (nBytes < offsetof(SYELOG_MESSAGE, szMessage)) {
    408.                 CloseConnection(pClient);
    409.             }
    410.  
    411.             if (pClient->Message.fTerminate) {
    412.                 LogMessageV(SYELOG_SEVERITY_NOTICE,
    413.                             "Client requested terminate on next connection close.");
    414.                 s_fExitAfterOne = TRUE;
    415.             }
    416.  
    417.             LogMessage(&pClient->Message, nBytes);
    418.  
    419.             b = ReadFile(pClient->hPipe,
    420.                          &pClient->Message,
    421.                          sizeof(pClient->Message),
    422.                          &nBytes,
    423.                          pClient);
    424.             if (!b && GetLastError() == ERROR_BROKEN_PIPE) {
    425.                 CloseConnection(pClient);
    426.             }
    427.         }
    428.     }
    429.     return 0;
    430. }
    431.  
    432. BOOL CreateWorkers(HANDLE hCompletionPort)
    433. {
    434.     DWORD dwThread;
    435.     HANDLE hThread;
    436.     DWORD i;
    437.     SYSTEM_INFO SystemInfo;
    438.  
    439.     GetSystemInfo(&SystemInfo);
    440.  
    441.     for (i = 0; i < 2 * SystemInfo.dwNumberOfProcessors; i++) {
    442.         hThread = CreateThread(NULL, 0, WorkerThread, hCompletionPort, 0, &dwThread);
    443.         if (!hThread) {
    444.             MyErrExit("CreateThread WorkerThread");
    445.             // Unreachable: return FALSE;
    446.         }
    447.         CloseHandle(hThread);
    448.     }
    449.     return TRUE;
    450. }
    451.  
    452. //////////////////////////////////////////////////////////////////////////////
    453. //
    454. BOOL WINAPI ControlHandler(DWORD dwCtrlType)
    455. {
    456.     switch (dwCtrlType) {
    457.       case CTRL_C_EVENT:
    458.       case CTRL_BREAK_EVENT:
    459.       case CTRL_CLOSE_EVENT:
    460.       case CTRL_LOGOFF_EVENT:
    461.       case CTRL_SHUTDOWN_EVENT:
    462.         LogMessageV(SYELOG_SEVERITY_INFORMATION, "User requested stop.");
    463.         printf("\nSYELOGD: Closing connections.\n");
    464.         if (s_hOutFile != INVALID_HANDLE_VALUE) {
    465.             printf("Closing file.\n");
    466.             FlushFileBuffers(s_hOutFile);
    467.             CloseHandle(s_hOutFile);
    468.             s_hOutFile = INVALID_HANDLE_VALUE;
    469.         }
    470.         ExitProcess(0);
    471.     }
    472.     return FALSE;
    473. }
    474.  
    475. DWORD main(int argc, char **argv)
    476. {
    477.     HANDLE hCompletionPort;
    478.     BOOL fNeedHelp = FALSE;
    479.  
    480.     GetSystemTimeAsFileTime((FILETIME *)&s_llStartTime);
    481.     SetConsoleCtrlHandler(ControlHandler, TRUE);
    482.  
    483.     int arg = 1;
    484.     for (; arg < argc; arg++) {
    485.         if (argv[arg][0] == '-' || argv[arg][0] == '/') {
    486.             CHAR *argn = argv[arg] + 1;
    487.             CHAR *argp = argn;
    488.             while (*argp && *argp != ':') {
    489.                 argp++;
    490.             }
    491.             if (*argp == ':') {
    492.                 *argp++ = '\0';
    493.             }
    494.  
    495.             switch (argn[0]) {
    496.  
    497.               case 'd':                                 // Delta time.
    498.               case 'D':
    499.                 s_fDeltaTime = TRUE;
    500.                 break;
    501.  
    502.               case 'o':                                 // Only one.
    503.               case 'O':
    504.                 s_fExitAfterOne = TRUE;
    505.                 break;
    506.  
    507.               case 'q':                                 // Quiet.
    508.               case 'Q':
    509.                 s_fLogToScreen = FALSE;
    510.                 break;
    511.  
    512.               case '?':                                 // Help.
    513.                 fNeedHelp = TRUE;
    514.                 break;
    515.  
    516.               default:
    517.                 fNeedHelp = TRUE;
    518.                 printf("SYELOGD: Bad argument: %s:%s\n", argn, argp);
    519.                 break;
    520.             }
    521.         }
    522.         else {
    523.             if (s_hOutFile != INVALID_HANDLE_VALUE) {
    524.                 printf("SYELOGD: Error, more than one output file specified.\n\n");
    525.                 fNeedHelp = TRUE;
    526.                 break;
    527.             }
    528.  
    529.             s_hOutFile = CreateFile(argv[arg],
    530.                                     GENERIC_WRITE,
    531.                                     FILE_SHARE_READ,
    532.                                     NULL,
    533.                                     CREATE_ALWAYS,
    534.                                     FILE_ATTRIBUTE_NORMAL |
    535.                                     FILE_FLAG_SEQUENTIAL_SCAN,
    536.                                     NULL);
    537.             if (s_hOutFile == INVALID_HANDLE_VALUE) {
    538.                 printf("SYELOGD: Error opening output file: %s: %d\n\n",
    539.                        argv[arg], GetLastError());
    540.                 fNeedHelp = TRUE;
    541.                 break;
    542.             }
    543.             else {
    544.                 printf("SYELOGD: Logging to %s.\n", argv[arg]);
    545.             }
    546.         }
    547.     }
    548.     if (fNeedHelp) {
    549.         printf("Usage:\n"
    550.                "    syelogd [options] {output_file}\n"
    551.                "Options:\n"
    552.                "    /d         List delta time in ms from previous event (not absolute time).\n"
    553.                "    /o         Exit after one client disconnects.\n"
    554.                "    /q         Disable event logging to screen (quiet mode).\n"
    555.                "    /?         Display this help message.\n"
    556.                "Summary:\n"
    557.                "    If given, all events will be logged to the output file.\n"
    558.                "\n");
    559.         exit(1);
    560.     }
    561.  
    562.  
    563.     // Create the completion port.
    564.     hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
    565.     if (hCompletionPort == NULL) {
    566.         MyErrExit("CreateIoCompletionPort");
    567.     }
    568.  
    569.     // Create completion port worker threads.
    570.     //
    571.     CreateWorkers(hCompletionPort);
    572.     CreatePipeConnection(hCompletionPort);
    573.  
    574.     printf("SYELOGD: Ready for clients.  Press Ctrl-C to stop.\n");
    575.     while (argc) {
    576.         Sleep(10000);
    577.     }
    578.  
    579.     SetConsoleCtrlHandler(ControlHandler, FALSE);
    580.  
    581.     if (s_hOutFile != INVALID_HANDLE_VALUE) {
    582.         FlushFileBuffers(s_hOutFile);
    583.         CloseHandle(s_hOutFile);
    584.         s_hOutFile = INVALID_HANDLE_VALUE;
    585.     }
    586.  
    587.     return 0;
    588. }
    589. //
    590. //////////////////////////////////////////////////////////////////////////////
    Конкретно не понятен следующий фрагмент в функции WorkerThread:
    Код (Text):
    1.         if (pClient->fAwaitingAccept) {
    2.             InterlockedIncrement(&s_nActiveClients);
    3.             pClient->fAwaitingAccept = FALSE;
    4.             b = ReadFile(pClient->hPipe,
    5.                          &pClient->Message,
    6.                          sizeof(pClient->Message),
    7.                          &nBytes,
    8.                          pClient);
    9.             if (!b) {
    10.                 if (GetLastError() != ERROR_IO_PENDING) {
    11.                     LogMessageV(SYELOG_SEVERITY_ERROR,
    12.                                 "ReadFile failed %d.", GetLastError());
    13.                     continue;
    14.                 }
    15.             }
    16.  
    17.             CreatePipeConnection(hCompletionPort);
    18.         }
    19.         else {
    20.             if (nBytes < offsetof(SYELOG_MESSAGE, szMessage)) {
    21.                 CloseConnection(pClient);
    22.             }
    23.  
    24.             if (pClient->Message.fTerminate) {
    25.                 LogMessageV(SYELOG_SEVERITY_NOTICE,
    26.                             "Client requested terminate on next connection close.");
    27.                 s_fExitAfterOne = TRUE;
    28.             }
    29.  
    30.             LogMessage(&pClient->Message, nBytes);
    31.  
    32.             b = ReadFile(pClient->hPipe,
    33.                          &pClient->Message,
    34.                          sizeof(pClient->Message),
    35.                          &nBytes,
    36.                          pClient);
    37.             if (!b && GetLastError() == ERROR_BROKEN_PIPE) {
    38.                 CloseConnection(pClient);
    39.             }
    40.         }
    Вкратце объясню что происходит.
    Создаётся несколько рабочих потоков, принимающих из пайпа сообщения. Пайп ещё не создан, все потоки висят на GetQueuedCompletionStatus.
    Создаётся пайп, присоединяется первый клиент. При соденинении с клиентом создаётся ещё один пайп, который начинает ждать соединения со следующим клиентом и т.д., пока со всеми желающими клиентами не будут установлены соединения в виде пайпов.
    Так вот, когда один из рабочих потоков читает первое сообщение от клиента, в структуре CLIENT сбрасывается флаг fAwaitingAccept. Если этот флаг установлен в TRUE, то от клиента ранее сообщения не получались. После того, как первое сообщение считано в буфер и сброшен флаг, выполнение опять попадает на GetQueuedCompletionStatus, где ждёт второго сообщения. Судя по коду, первое сообщение будет обработано (с помощью LogMessage) только после того, как придёт новое сообщение. Т.е. алгоритм таков: при приходе сообщения считать его в буфер. При приходе следующего сообщения вывести предыдущее и считать новое в буфер. Но это неправильно, ведь если клиент не будет отсылать новые сообщения, последнее отправленное не будет выведено. Я прав или ошибаюсь?

    Не удивлюсь, если ошибка в коде -- их там полно, но мне важно понять чья ошибка. Вот например из той же функции WorkerThread:
    Код (Text):
    1.         if (!b || lpo == NULL) {
    2.             fKeepLooping = FALSE;
    3.             MyErrExit("GetQueuedCompletionState");
    4.             break;
    5.         }
    6.         else if (!b) {
    7. //////////////////////////////// сюда управление никогда не попадёт ////////////////////////////////
    8.             if (pClient) {
    9.                 if (GetLastError() == ERROR_BROKEN_PIPE) {
    10.                     LogMessageV(SYELOG_SEVERITY_INFORMATION, "Client closed pipe.");
    11.                 }
    12.                 else {
    13.                     LogMessageV(SYELOG_SEVERITY_ERROR,
    14.                                 "GetQueuedCompletionStatus failed %d [%p]",
    15.                                 GetLastError(), pClient);
    16.                 }
    17.                 CloseConnection(pClient);
    18.             }
    19.             continue;
    20.         }