Крашится многопоточное приложение при входе в код функционального объекта

Тема в разделе "WASM.BEGINNERS", создана пользователем Aoizora, 29 дек 2023.

  1. Aoizora

    Aoizora Active Member

    Публикаций:
    0
    Регистрация:
    29 янв 2017
    Сообщения:
    352
    У меня задача: написать программу для пакетной обработки комманд, так, чтобы
    1. команды обрабатывались многопоточно
    2. ввод данных управлялся внешним кодом.
    Многопоточная обработка команд При старте программы должно быть создано в дополнение к существующему основному потоку ещё три дополнительных. Дадим им условные имена: main – основной поток (в пользовательском приложении)
    1. log – поток для вывода данных в консоль
    2. file1 – первый поток для вывода в файл
    3. file2 – второй поток для вывода в файл Основная логика обработки меняется таким образом, что блок команд после своего формирования должен быть отправлен в консоль (потоком log) и сразу в файл (одним из потоков file1 или file2).
    При этом отправка блока в файл распределяется между файловыми потоками. Можно напрямую отправлять, например, чётные команды через поток file1, а нечётные – через file2. Но лучшим решение станет использование единой очереди команд, которую будут обрабатывать оба файловых потока одновременно.

    Я начал с подводящих упражнений - хочу обменяться сообщениями между разными потоками через очередь сообщений. Я написал такой код:

    Класс команды:

    Код (C):
    1. #pragma once
    2.  
    3. #include <string>
    4. #include <ctime>
    5. #include <memory>
    6.  
    7. class command
    8. {
    9. public:
    10.     command()
    11.         : timestamp(0), cmd(std::string())
    12.     {
    13.  
    14.     }
    15.  
    16.     command(std::time_t time, const std::string& body)
    17.         : timestamp(time), cmd(body)
    18.     {
    19.  
    20.     }
    21.  
    22.     std::time_t time() const
    23.     {
    24.         return timestamp;
    25.     }
    26.  
    27.     std::string body() const
    28.     {
    29.         return cmd;
    30.     }
    31.  
    32.     bool isOpenScopeCommand() const
    33.     {
    34.         return cmd == "{";
    35.     }
    36.  
    37.     bool isCloseScopeCommand() const
    38.     {
    39.         return cmd == "}";
    40.     }
    41.  
    42.     std::unique_ptr<command> move()
    43.     {
    44.         return std::unique_ptr<command>(new command(std::move(*this)));
    45.     }
    46.  
    47. private:
    48.     std::time_t timestamp;
    49.     std::string cmd;
    50. };
    Очередь сообщений:

    Код (C):
    1. #pragma once
    2.  
    3. #include <functional>
    4. #include <queue>
    5. #include <mutex>
    6. #include <memory>
    7. #include <condition_variable>
    8.  
    9. #include "command.h"
    10.  
    11. class message_queue
    12. {
    13. public:
    14.     message_queue() = default;
    15.  
    16.     ~message_queue() = default;
    17.  
    18.     void put(command& cmd)
    19.     {
    20.         {
    21.             std::unique_lock<std::mutex> lock(mutex);
    22.             q.push(cmd.move());
    23.         }
    24.  
    25.         condition.notify_one();
    26.     }
    27.     std::unique_ptr<command> get(int timeout = 0)
    28.     {
    29.         std::unique_lock<std::mutex> lock(mutex);
    30.  
    31.         if (timeout <= 0)
    32.         {
    33.             condition.wait(lock, [this] {
    34.                 return !q.empty();
    35.             });
    36.         }
    37.         else
    38.         {
    39.             auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
    40.                 return !q.empty();
    41.             });
    42.  
    43.             if (timeoutOccured)
    44.                 return nullptr;
    45.         }
    46.  
    47.         auto cmd = q.front()->move();
    48.         q.pop();
    49.         return cmd;
    50.     }
    51.  
    52.     std::unique_ptr<command> try_get()
    53.     {
    54.         std::lock_guard<std::mutex> lock(mutex);
    55.  
    56.         if (!q.empty())
    57.         {
    58.             auto msg = q.front()->move();
    59.             q.pop();
    60.             return msg;
    61.         }
    62.         else
    63.         {
    64.             return { nullptr };
    65.         }
    66.     }
    67.  
    68. private:
    69.     std::queue<std::unique_ptr<command>> q;
    70.     std::mutex mutex;
    71.     std::condition_variable condition;
    72. };
    73.  
    Читатель из консоли:

    Код (C):
    1. #pragma once
    2.  
    3. #include <vector>
    4. #include <algorithm>
    5. #include <string>
    6. #include <iostream>
    7.  
    8. #include "command.h"
    9. #include "accumulator.h"
    10.  
    11. class background_reader
    12. {
    13. public:
    14.     background_reader(accumulator& acc)
    15.         : acc(acc)
    16.     {
    17.  
    18.     }
    19.  
    20.     void operator()() const
    21.     {
    22.         std::string cmd;
    23.         while (std::getline(std::cin, cmd))
    24.         {
    25.             if (!cmd.empty())
    26.             {
    27.                 acc.put(command(std::time(nullptr), cmd));
    28.                 std::cout << "accumulated: " << cmd << std::endl;
    29.             }
    30.         }
    31.     }
    32.  
    33.  
    34. private:
    35.     accumulator& acc;
    36. };
    Логгер в консоль:

    Код (C):
    1. #pragma once
    2.  
    3. #include <iostream>
    4. #include <fstream>
    5.  
    6. #include "message_queue.h"
    7. #include "command.h"
    8.  
    9. class background_logger
    10. {
    11. public:
    12.     background_logger(message_queue& queue)
    13.         : queue(queue)
    14.     {
    15.  
    16.     }
    17.  
    18.     void operator()() const
    19.     {
    20.         while (true)
    21.         {
    22.             auto cmd = queue.try_get();
    23.             std::cout << cmd->body();
    24.         }
    25.     }
    26.  
    27. private:
    28.     message_queue& queue;
    29. };
    30.  
    Аккумулятор команд, нужный для отправки команд пакетами

    Код (C):
    1. #pragma once
    2.  
    3. #include <vector>
    4.  
    5. #include "command.h"
    6. #include "message_queue.h"
    7.  
    8. class accumulator
    9. {
    10. public:
    11.     accumulator(message_queue& queue, int bulk_size)
    12.         : queue(queue), bulk_size(bulk_size)
    13.     {
    14.         commands.reserve(bulk_size);
    15.     }
    16.  
    17.     void put(const command& cmd)
    18.     {
    19.         if (cmd.isOpenScopeCommand())
    20.         {
    21.             if (scope_level != 0)
    22.             {
    23.                 flush();
    24.             }
    25.             ++scope_level;
    26.         }
    27.         else if (cmd.isCloseScopeCommand() && (scope_level != 0))
    28.         {
    29.             --scope_level;
    30.             if (scope_level == 0)
    31.             {
    32.                 flush();
    33.             }
    34.         }
    35.         else
    36.         {
    37.             commands.push_back(cmd);
    38.             if ((scope_level == 0) && commands.size() == bulk_size)
    39.             {
    40.                 flush();
    41.             }
    42.         }
    43.     }
    44.  
    45. private:
    46.     void flush()
    47.     {
    48.         for (auto& cmd : commands)
    49.         {
    50.             queue.put(cmd);
    51.         }
    52.         commands.clear();
    53.     }
    54.  
    55. private:
    56.     message_queue& queue;
    57.     std::vector<command> commands;
    58.  
    59.     int bulk_size = 0;
    60.     int scope_level = 0;
    61. };
    Проблема состоит в том, что поток при входе в код крашится. Проблема возникает только когда в программе есть больше одного дополнительного потока. Почему так?
    --- Сообщение объединено, 29 дек 2023 ---
    Конпелирую на линуксе, может быть это важно
     
  2. mantissa

    mantissa Мембер Команда форума

    Публикаций:
    0
    Регистрация:
    9 сен 2022
    Сообщения:
    139
    Попробуй отключить оптимизации. Если плохо синхронизовать потоки между собой - случаются краши при агрессивных оптимизациях.
     
  3. Aoizora

    Aoizora Active Member

    Публикаций:
    0
    Регистрация:
    29 янв 2017
    Сообщения:
    352
    Отключил оптимизации, все равно крашится: сегфолт

    target_compile_options(async PRIVATE "-O0;")

    Код (C):
    1. cmake_minimum_required(VERSION 3.5)
    2.  
    3. set(PATCH_VERSION "1" CACHE INTERNAL "Patch version")
    4. set(PROJECT_VERSION 0.0.${PATCH_VERSION})
    5.  
    6. project(async VERSION ${PROJECT_VERSION})
    7.  
    8. set(CMAKE_CXX_STANDARD 20)
    9. set(CMAKE_CXX_STANDARD_REQUIRED ON)
    10.  
    11. add_executable (async src/async/main.cpp
    12.     src/async/command.h
    13.     src/async/console_reader.h
    14.     src/async/console_logger.h
    15.     src/async/message_queue.h
    16.     src/async/accumulator.h)
    17.  
    18. target_compile_options(async PRIVATE "-O0;")
    19.  
    20. set(CPACK_GENERATOR DEB)
    21. set(CPACK_PACKAGE_VERSION_MAJOR "${PROJECT_VERSION_MAJOR}")
    22. set(CPACK_PACKAGE_VERSION_MINOR "${PROJECT_VERSION_MINOR}")
    23. set(CPACK_PACKAGE_VERSION_PATCH "${PROJECT_VERSION_PATCH}")
    24. set(CPACK_PACKAGE_CONTACT example@example.ru)
    25.  
    26. include(CPack)
    --- Сообщение объединено, 29 дек 2023 ---
    Дополнение: у товарища на винде краша нет
    --- Сообщение объединено, 29 дек 2023 ---
    Нашли причину краша: здесь при cmd == nullptr


    Код (C):
    1.  void operator()() const
    2.     {
    3.         while (true)
    4.         {
    5.             auto cmd = queue.try_get();
    6.             std::cout << cmd->body();
    7.         }
    8.     }
     
  4. q2e74

    q2e74 Active Member

    Публикаций:
    0
    Регистрация:
    18 окт 2018
    Сообщения:
    989
    Последнее редактирование: 29 дек 2023
  5. alex_dz

    alex_dz Active Member

    Публикаций:
    0
    Регистрация:
    26 июл 2006
    Сообщения:
    352
    и почему очередь отдает мусор? (NULL cmd)
     
  6. Aoizora

    Aoizora Active Member

    Публикаций:
    0
    Регистрация:
    29 янв 2017
    Сообщения:
    352
    Я переделал очередь, теперь она будет отдавать пустое сообщение по умолчанию


    Код (Text):
    1. template<typename Message>
    2. class MessageQueue
    3. {
    4. public:
    5.     void push(Message&& msg)
    6.     {
    7.         {
    8.             std::unique_lock<std::mutex> lock(mutex);
    9.  
    10.             messages.emplace_front(msg);
    11.         }
    12.         condition.notify_one();
    13.     }
    14.  
    15.         Message& top(int timeout = 0)
    16.         {
    17.             std::unique_lock<std::mutex> lock(mutex);
    18.  
    19.             if (timeout <= 0)
    20.             {
    21.                 condition.wait(lock, [this] {
    22.                     return !messages.empty();
    23.                 });
    24.             }
    25.             else
    26.             {
    27.                 auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
    28.                     return !messages.empty();
    29.                 });
    30.  
    31.                 if (timeoutOccured)
    32.                     return Message::DEFAULT;
    33.             }
    34.  
    35.             return messages.front();
    36.         }
    37.  
    38.         void pop()
    39.         {
    40.             std::unique_lock<std::mutex> lock(mutex);
    41.  
    42.             if (!messages.empty())
    43.             {
    44.                 messages.pop_front();
    45.             }
    46.         }
    47.  
    48. private:
    49.     std::list<Message> messages;
    50.     std::mutex mutex;
    51.     std::condition_variable condition;
    52. };