Как устранить гонку при использовании именованного канала?

Тема в разделе "WASM.UNIX", создана пользователем drem1lin, 2 апр 2017.

Метки:
  1. drem1lin

    drem1lin Member

    Публикаций:
    0
    Регистрация:
    17 мар 2009
    Сообщения:
    300
    Всем привет!
    Я хочу связать два процесса двунаправленной связью на основе named pipe, т.е. обе стороны могут пересылать сообщения друг другу используя один pipe. Для этого создал класс Pipe и небольшую программу его использующую.

    Код (Text):
    1. #ifndef __NamedPipeClass_H__
    2. #define __NamedPipeClass_H__
    3.  
    4. #include <thread>
    5.  
    6.  
    7. class NamedPipeClass
    8. {
    9. public:
    10.   NamedPipeClass(char* _pipeName);
    11.   NamedPipeClass(char* _pipeName, void (*_onRead)(unsigned char*, unsigned int, NamedPipeClass*));
    12.  
    13.   int Connect();
    14.   int Create();
    15.   int Disconnect();
    16.   int Delete();
    17.  
    18.   int Write(unsigned char * buffer, unsigned int bufferLen);
    19.  
    20.   ~NamedPipeClass();
    21. private:
    22.   char pipeName[256];
    23.   int pipeDescriptor;
    24.  
    25.   void (*onRead)(unsigned char*, unsigned int, NamedPipeClass*);
    26.   int ReadingThread();
    27.  
    28.   bool bExit;
    29.  
    30.  
    31. };
    32.  
    33. #endif
    с реализацией
    Код (Text):
    1. #include "NamedPipeClass.h"
    2.  
    3. #include <stdio.h>
    4. #include <unistd.h>
    5. #include <sys/stat.h>
    6. #include <fcntl.h>
    7. #include <sys/wait.h>
    8. #include <errno.h>
    9. #include <string.h>
    10. #include <thread>
    11.  
    12. #define MAX_BUF 4096
    13.  
    14.  
    15. NamedPipeClass::NamedPipeClass(char* _pipeName, void (*_onRead)(unsigned char*, unsigned int, NamedPipeClass*))
    16. {
    17.   memset(pipeName, 0, 256*sizeof(char));
    18.   strcpy(pipeName, _pipeName);
    19.   onRead = _onRead;
    20.   pipeDescriptor =0;
    21.   bExit = false;
    22. }
    23.  
    24. NamedPipeClass::NamedPipeClass(char* _pipeName)
    25.  : NamedPipeClass::NamedPipeClass(pipeName, NULL)
    26. {
    27. }
    28.  
    29.  
    30. NamedPipeClass::~NamedPipeClass()
    31. {
    32.   close(pipeDescriptor);
    33.   unlink(pipeName);
    34. }
    35.  
    36. int NamedPipeClass::Create()
    37. {
    38.   /* create the FIFO (named pipe) */
    39.   if(mkfifo(pipeName, S_IFIFO|S_IRWXU|S_IRWXG|S_IRWXO) != 0)
    40.     {
    41.     printf("create named pipe error = %d\n", errno); /* произошла ошибка */
    42.       return errno;
    43.     }
    44.  
    45.     return 0;
    46. }
    47. /*
    48. void* call_ReadingThread(void* Pipe) {
    49.     HLTNamedPipeClass* network = static_cast<HLTNamedPipeClass*>(Pipe);
    50.  
    51.     void* result = network->SocketHandler(somearg);
    52.  
    53.     // do something w/ result
    54.  
    55.     return nullptr;
    56. }
    57. */
    58.  
    59. int NamedPipeClass::Connect()
    60. {
    61.   //int err = 0;
    62.   pipeDescriptor = open(pipeName, O_RDWR);
    63.   if (pipeDescriptor == -1) {
    64.       printf("client open FIFO error = %d\n", errno); /* произошла ошибка */
    65.       return errno;
    66.   }
    67.  
    68.   /*err = pthread_create(&ReadThreadId, NULL, &call_ReadingThread, this);
    69.     if (err != 0)
    70.     {
    71.         printf("\ncan't create thread :[%s]", strerror(err));
    72.         return err;
    73.     }*/
    74.  
    75.   std::thread Reading(&NamedPipeClass::ReadingThread, this);
    76.   Reading.detach();
    77.  
    78.  
    79.     return 0;
    80. }
    81. int NamedPipeClass::ReadingThread()
    82. {
    83.   unsigned char buf[MAX_BUF];
    84.   ssize_t bytesReaded = 0;
    85.   while(!bExit)
    86.   {
    87.     bytesReaded = read(pipeDescriptor, buf, MAX_BUF);
    88.     if(bytesReaded>0)
    89.     {
    90.       if(onRead!=NULL)
    91.         onRead(buf, bytesReaded, this);
    92.     }
    93.   }
    94.   return 0;
    95. }
    96.  
    97. int NamedPipeClass::Disconnect()
    98. {
    99.   close(pipeDescriptor);
    100.     return 0;
    101. }
    102.  
    103. int NamedPipeClass::Delete()
    104. {
    105.   unlink(pipeName);
    106.     return 0;
    107. }
    108.  
    109. int NamedPipeClass::Write(unsigned char * buffer, unsigned int bufferLen)
    110. {
    111.   return write(pipeDescriptor, buffer, bufferLen);
    112. }
    и основной программой
    Код (Text):
    1. #include <stdio.h>
    2. #include <unistd.h>
    3. #include <sys/stat.h>
    4. #include <fcntl.h>
    5. #include <sys/wait.h>
    6. #include <errno.h>
    7. #include <string.h>
    8. #include <thread>
    9. #include <iostream>
    10.  
    11. #include "NamedPipeClass.h"
    12.  
    13. bool g_ReaderStop = false;
    14.  
    15. void OnRead1(unsigned char * buf, unsigned int bufLen, NamedPipeClass* pipe)
    16. {
    17.   printf("OnRead1 %s\n", buf);
    18. }
    19.  
    20. void OnRead2(unsigned char * buf, unsigned int bufLen, NamedPipeClass* pipe)
    21. {
    22.   printf("OnRead2 %s\n", buf);
    23.   if(buf[0]=='q')
    24.   {
    25.     g_ReaderStop=true;
    26.   }
    27. }
    28.  
    29. int Writer(NamedPipeClass* hltPipe)
    30. {
    31.   unsigned char c;
    32.   do
    33.   {
    34.     std::cin>>c;
    35.     hltPipe->Write(&c, 1);
    36.   } while(c!='q');
    37.   return 0;
    38. }
    39.  
    40. int main() {
    41.     char * myfifo = "//tmp//myfifo\0";
    42.     pid_t pid_1 = 0;
    43.  
    44.     switch(pid_1=fork()) {
    45.   case -1:
    46.         printf("fork error %d\n", pid_1); /* произошла ошибка */
    47.     break;
    48.   case 0:
    49.     {
    50.       NamedPipeClass* hltPipe = new NamedPipeClass(myfifo, OnRead2);
    51.  
    52.       hltPipe->Create();
    53.       hltPipe->Connect();
    54.  
    55.       while(!g_ReaderStop)
    56.       {
    57.         sleep(1);
    58.       }
    59.       hltPipe->Disconnect();
    60.       hltPipe->Delete();
    61.     }
    62.   default:
    63.     {
    64.       NamedPipeClass* hltPipe = new NamedPipeClass(myfifo, OnRead1);
    65.       //hltPipe->Create();
    66.       hltPipe->Connect();
    67.  
    68.       std::thread writer(Writer, hltPipe);
    69.       writer.join();
    70.  
    71.       hltPipe->Disconnect();
    72.       hltPipe->Delete();
    73.     }
    74.     return 0;
    75.     }
    76. }

    и в результате получил гонку
    Как исправить подобную ситуацию? необходимо два pipe? с радостью приму любые комментарии, в том числе и по структуре. Может у кого есть готовый пример подобной реализации?
     
  2. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    А зачем вам named pipe?
    Для двунаправленной связи обычно используют сокеты.
    Нехорошо давать двум дочерним процессам читать из одного пайпа. В этом случае обычно для каждого процесса открывают свой пайп и пишут в нужный пайп, чтобы передать данные.
     
  3. drem1lin

    drem1lin Member

    Публикаций:
    0
    Регистрация:
    17 мар 2009
    Сообщения:
    300
    большое спасибо, я пришел к тому же ответу
     
  4. SadKo

    SadKo Владимир Садовников

    Публикаций:
    8
    Регистрация:
    4 июн 2007
    Сообщения:
    1.610
    Адрес:
    г. Санкт-Петербург
    Ещё один момент при программировании пайпов: хорошим тоном считается, чтобы после fork() родительский процесс закрывал файловый дескриптор на чтение, а дочерний процесс - файловый дескриптор на запись.