Пишу свой брокер сообщений

Тема в разделе "WASM.BEGINNERS", создана пользователем Aoizora, 12 янв 2024.

Метки:
  1. Aoizora

    Aoizora Active Member

    Публикаций:
    0
    Регистрация:
    29 янв 2017
    Сообщения:
    365
    Я хочу написать свой легковесный простой брокер сообщений типа Kafka. Написал такой код.

    Класс сообщения message.h

    Код (C++):
    1. #pragma once
    2.  
    3. #include <memory>
    4.  
    5. namespace nano
    6. {
    7.     template<typename T>
    8.     struct message
    9.     {
    10.         T body;
    11.  
    12.         static message<T> DEFAULT;
    13.     };
    14.  
    15.     // Forward declare the connection
    16.     template <typename T>
    17.     class connection;
    18.  
    19.     template<typename T>
    20.     struct owned_message
    21.     {
    22.         std::shared_ptr<connection<T>> remote = nullptr;
    23.         message<T> msg;
    24.  
    25.         static owned_message<T> DEFAULT;
    26.     };
    27. }
    В message.cpp определены поля DEFAULT, нужные для возврата сообщения "по умолчанию":


    Код (C++):
    1. #include "message.h"
    2.  
    3. namespace nano
    4. {
    5.     template<typename T>
    6.     message<T> message<T>::DEFAULT = {};
    7.  
    8.     template<typename T>
    9.     owned_message<T> owned_message<T>::DEFAULT = {};
    10. }
    Класс соединения connection.h:

    Код (C++):
    1. #pragma once
    2.  
    3. #include <iostream>
    4. #include <memory>
    5. #include <string>
    6. #include <boost/asio.hpp>
    7.  
    8. #include "message_queue.h"
    9. #include "message.h"
    10.  
    11. namespace nano
    12. {
    13.     template<typename T>
    14.     class connection : public std::enable_shared_from_this<connection<T>>
    15.     {
    16.     public:
    17.         enum class owner
    18.         {
    19.             SERVER,
    20.             CLIENT
    21.         };
    22.  
    23.         connection(owner parent, boost::asio::io_service& io, boost::asio::ip::tcp::socket socket, message_queue<owned_message<T>>& queue)
    24.             : io(io), socket(std::move(socket)), income(queue)
    25.         {
    26.             owner_type = parent;
    27.         }
    28.  
    29.         void disconnect()
    30.         {
    31.             if (is_connected())
    32.             {
    33.                 boost::asio::post(io, [this]() {
    34.                     socket.close();
    35.                 });
    36.             }
    37.         }
    38.  
    39.         bool is_connected() const
    40.         {
    41.             return socket.is_open();
    42.         }
    43.  
    44.         void send(const message<T>& msg)
    45.         {
    46.             boost::asio::post(io, [this, msg]() {
    47.                 outcome.push(msg);
    48.                 send();
    49.             });
    50.         }
    51.  
    52.         void connect()
    53.         {
    54.             if (owner_type == owner::SERVER)
    55.             {
    56.                 if (socket.is_open())
    57.                 {
    58.                     read();
    59.                 }
    60.             }
    61.         }
    62.  
    63.     private:
    64.         void send()
    65.         {
    66.             auto msg = outcome.top();
    67.             boost::asio::async_write(socket, boost::asio::buffer(msg.body, msg.body.size()),
    68.                 [this](std::error_code ec, std::size_t length) {
    69.                 // TODO
    70.             });
    71.         }
    72.  
    73.         void read()
    74.         {
    75.             boost::asio::streambuf buffer;
    76.             boost::asio::async_read(socket, buffer,
    77.                 [this, &buffer](std::error_code ec, std::size_t length) {
    78.                 if (!ec)
    79.                 {
    80.                     std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl;
    81.                 }
    82.             });
    83.         }
    84.  
    85.     private:
    86.         // Контекст Boost ASIO
    87.         boost::asio::io_context& io;
    88.  
    89.         // Каждое соединение имеет уникальный сокет для удаленной части соединения
    90.         boost::asio::ip::tcp::socket socket;
    91.  
    92.         // Ссылка на очередь входящих сообщений родительского объекта
    93.         message_queue<owned_message<T>>& income;
    94.  
    95.         // Очередь хранит сообщения, которые будут отправлены удаленной части соединения
    96.         message_queue<message<T>> outcome;
    97.  
    98.         owner owner_type = owner::SERVER;
    99.     };
    100. }
    Интерфейс сервера server_interface.h:


    Код (C++):
    1. #pragma once
    2.  
    3. #include <iostream>
    4. #include <cstdint>
    5. #include <thread>
    6. #include <memory>
    7. #include <list>
    8. #include <string>
    9. #include <boost/asio.hpp>
    10.  
    11. #include "connection.h"
    12. #include "message_queue.h"
    13. #include "message.h"
    14.  
    15. namespace nano
    16. {
    17.     template<typename T>
    18.     class server_interface
    19.     {
    20.     public:
    21.         server_interface(std::uint16_t port)
    22.             : acceptor(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
    23.         {
    24.  
    25.         }
    26.  
    27.         virtual ~server_interface()
    28.         {
    29.             stop();
    30.         }
    31.  
    32.         bool start()
    33.         {
    34.             try
    35.             {
    36.                 listen();
    37.                 thread = std::thread([this]() {
    38.                     io.run();
    39.                 });
    40.             }
    41.             catch (std::exception& e)
    42.             {
    43.                 std::cerr << "[Server] stopped." << std::endl;
    44.                 return false;
    45.             }
    46.  
    47.             std::cout << "[Server] started." << std::endl;
    48.             return true;
    49.         }
    50.  
    51.         void stop()
    52.         {
    53.             io.stop();
    54.             if (thread.joinable())
    55.                 thread.join();
    56.             std::cout << "[Server] stopped." << std::endl;
    57.         }
    58.  
    59.         void listen()
    60.         {
    61.             acceptor.async_accept([this](std::error_code ec, boost::asio::ip::tcp::socket socket) {
    62.                 if (!ec)
    63.                 {
    64.                     std::cout << "[Server] new connection: " << socket.remote_endpoint() << std::endl;
    65.  
    66.                     std::shared_ptr<connection<T>> conn =
    67.                         std::make_shared<connection<T>>(connection<T>::owner::SERVER,
    68.                             io, std::move(socket), messages);
    69.  
    70.                     // Даем возможность отклонить соединение
    71.                     if (on_connect(conn))
    72.                     {
    73.                         connections.push_back(std::move(conn));
    74.                         connections.back()->connect();
    75.                     }
    76.                 }
    77.  
    78.                 listen();
    79.             });
    80.         }
    81.  
    82.         void message_client(std::shared_ptr<connection<T>> client, const message<T>& msg)
    83.         {
    84.             if (client && client->is_connected())
    85.             {
    86.                 client->send(msg);
    87.             }
    88.             else
    89.             {
    90.                 // Remove dead clients
    91.                 connections.erase(std::remove(connections.begin(), connections.end(), nullptr),
    92.                     connections.end());
    93.             }
    94.         }
    95.  
    96.         void update()
    97.         {
    98.             messages.wait();
    99.  
    100.             while (!messages.empty())
    101.             {
    102.                 auto msg = messages.top();
    103.                 on_message(msg.remote, msg);
    104.                 messages.pop();
    105.             }
    106.         }
    107.  
    108.     protected:
    109.         virtual bool on_connect(std::shared_ptr<connection<T>> conn)
    110.         {
    111.             return false;
    112.         }
    113.  
    114.         virtual void on_disconnect(std::shared_ptr<connection<T>> conn)
    115.         {
    116.  
    117.         }
    118.  
    119.         virtual void on_message(std::shared_ptr<connection<T>> conn, const owned_message<T>& msg)
    120.         {
    121.  
    122.         }
    123.  
    124.     protected:
    125.         boost::asio::io_context io;
    126.         boost::asio::ip::tcp::acceptor acceptor;
    127.         std::thread thread;
    128.         std::list<std::shared_ptr<connection<T>>> connections;
    129.         message_queue<owned_message<T>> messages;
    130.     };
    131. }
    132.  
    В очереди сообщений нет ничего особенного:


    Код (C++):
    1. #pragma once
    2.  
    3. #include <list>
    4. #include <mutex>
    5. #include <memory>
    6. #include <condition_variable>
    7. #include <stdexcept>
    8.  
    9. template<typename Message>
    10. class message_queue
    11. {
    12. public:
    13.     void push(const Message& msg)
    14.     {
    15.         {
    16.             std::unique_lock<std::mutex> lock(mutex);
    17.  
    18.             messages.emplace_front(msg);
    19.         }
    20.         condition.notify_one();
    21.     }
    22.  
    23.     Message& top(int timeout = 0)
    24.     {
    25.         std::unique_lock<std::mutex> lock(mutex);
    26.  
    27.         if (timeout <= 0)
    28.         {
    29.             condition.wait(lock, [this] {
    30.                 return !messages.empty();
    31.             });
    32.         }
    33.         else
    34.         {
    35.             auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
    36.                 return !messages.empty();
    37.             });
    38.  
    39.             if (timeoutOccured)
    40.                 throw std::runtime_error("Timeout");
    41.         }
    42.  
    43.         return messages.front();
    44.     }
    45.  
    46.     void pop()
    47.     {
    48.         std::unique_lock<std::mutex> lock(mutex);
    49.  
    50.         if (!messages.empty())
    51.         {
    52.             messages.pop_front();
    53.         }
    54.     }
    55.  
    56.     bool empty()
    57.     {
    58.         std::unique_lock<std::mutex> lock(mutex);
    59.  
    60.         return messages.empty();
    61.     }
    62.  
    63.     void wait()
    64.     {
    65.         while (messages.empty())
    66.         {
    67.             std::unique_lock<std::mutex> lock(mutex);
    68.             condition.wait(lock);
    69.         }
    70.     }
    71.  
    72. private:
    73.     std::list<Message> messages;
    74.     std::mutex mutex;
    75.     std::condition_variable condition;
    76. };
    Реализация сервера получается простая с удобными обработчиками сообщений и соединений:


    Код (C++):
    1. #include "server_interface.h"
    2. #include "message.h"
    3.  
    4. class message_server : public nano::server_interface<std::string>
    5. {
    6.     using connection_t = std::shared_ptr<nano::connection<std::string>>;
    7.     using message_t = nano::owned_message<std::string>;
    8.  
    9. public:
    10.     message_server(std::uint16_t port)
    11.         : server_interface(port)
    12.     {
    13.  
    14.     }
    15.  
    16. protected:
    17.     bool on_connect(connection_t conn) override
    18.     {
    19.         return true;
    20.     }
    21.  
    22.     void on_disconnect(connection_t conn) override
    23.     {
    24.         std::cout << "Removing client" << std::endl;
    25.     }
    26.  
    27.     void on_message(connection_t conn, const message_t& msg) override
    28.     {
    29.         conn->send(msg.msg);
    30.     }
    31. };
    32.  
    33. int main()
    34. {
    35.     message_server server(60000);
    36.     server.start();
    37.  
    38.     while (true)
    39.     {
    40.         server.update();
    41.     }
    42. }
    43.  
    Проблема в том, что при соединении по телнету с сервером (telnet localhost 60000) сообщения от телнет-клиента не обрабатываются. В коде происходит такой сценарий:
    1) Подключается телнет-клиент
    2) В сервере создается соединение, сохраняется в списке клиентов и сервер начинает читать данные от клиента:

    Код (C++):
    1. // Даем возможность отклонить соединение
    2.                     if (on_connect(conn))
    3.                     {
    4.                         connections.push_back(std::move(conn));
    5.                         connections.back()->connect();
    6.                     }
    Соединяемся с клиентом:


    Код (C++):
    1. void connect()
    2.         {
    3.             if (owner_type == owner::SERVER)
    4.             {
    5.                 if (socket.is_open())
    6.                 {
    7.                     read();
    8.                 }
    9.             }
    10.         }
    Вот тут возникает проблема:


    Код (C++):
    1. void read()
    2.         {
    3.             boost::asio::streambuf buffer;
    4.             boost::asio::async_read(socket, buffer,
    5.                 [this, &buffer](std::error_code ec, std::size_t length) {
    6.                 if (!ec)
    7.                 {
    8.                     std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl;
    9.                 }
    10.             });
    11.         }
    Вызывается метод async_read, но он не получает никакие данные и из него происходит выход. В дальнейшем этот метод больше не вызывается никак. По идее этот метод должен зависнуть и ждать, пока телнет-клиент пришлет данные, но вместо этого происходит выход из этого метода. Почему так?

    Как правильно использоватб этот метод, чтобы дождаться данные от телнет-клиента?
     
  2. HoShiMin

    HoShiMin Well-Known Member

    Публикаций:
    5
    Регистрация:
    17 дек 2016
    Сообщения:
    1.486
    Адрес:
    Россия, Нижний Новгород
    Но ведь ты делаешь асинхронное чтение из сокета. async_read сразу возвращает управление, продолжая читать где-то в фоне. Когда чтение завершится или произойдёт ошибка - вызовется заданный каллбэк.
    https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/reference/async_read/overload1.html
    В сам каллбэк управление не попадает вообще или не срабатывает условие if (!ec)?
     
  3. Aoizora

    Aoizora Active Member

    Публикаций:
    0
    Регистрация:
    29 янв 2017
    Сообщения:
    365
    Вроде я разобрался. Функция async_read сразу после вызова возвращает управление, но вроде бы он ставит коллбэк в очередь и он выполнится при поступлении данных. Чтобы колбэк вызывался постоянно, надо его вызвать из самого себя прямо или через другую функцию, например read_header

    Код (Text):
    1. namespace nano
    2. {
    3.     template<typename T>
    4.     class connection : public std::enable_shared_from_this<connection<T>>
    5.     {
    6.     public:
    7.  
    8.         connection(boost::asio::io_service& io, boost::asio::ip::tcp::socket socket, message_queue<owned_message<T>>& queue)
    9.             : io(io), socket(std::move(socket)), income(queue)
    10.         {
    11.  
    12.         }
    13.  
    14.         void disconnect()
    15.         {
    16.             if (is_connected())
    17.             {
    18.                 boost::asio::post(io, [this]() {
    19.                     socket.close();
    20.                 });
    21.             }
    22.         }
    23.  
    24.         bool is_connected() const
    25.         {
    26.             return socket.is_open();
    27.         }
    28.  
    29.         void send(const message<T>& msg)
    30.         {
    31.             boost::asio::post(io, [this, msg]() {
    32.                 outcome.push(msg);
    33.                 send();
    34.             });
    35.         }
    36.  
    37.         void connect()
    38.         {
    39.             if (socket.is_open())
    40.             {
    41.                 read_header();
    42.             }
    43.         }
    44.  
    45.     private:
    46.         void send()
    47.         {
    48.             auto msg = outcome.top();
    49.             boost::asio::async_write(socket, boost::asio::buffer(msg.body, msg.body.size()),
    50.                 [this](const boost::system::error_code& ec, std::size_t length) {
    51.                 // TODO
    52.             });
    53.         }
    54.  
    55.         void read_header()
    56.         {
    57.             socket.async_read_some(boost::asio::buffer(&msg.header, sizeof(msg.header)),
    58.                 [this](const boost::system::error_code& ec, std::size_t length) {
    59.                 if (!ec)
    60.                 {
    61.                     if (msg.header.size > 0)
    62.                     {
    63.                         msg.body.resize(msg.header.size);
    64.                         read_body();
    65.                     }
    66.                     else
    67.                     {
    68.                         addToIncomingMessageQueue();
    69.                     }
    70.                 }
    71.             });
    72.         }
    73.  
    74.         void read_body()
    75.         {
    76.             socket.async_read_some(boost::asio::buffer(msg.body.data(), msg.body.size()),
    77.                 [this](const boost::system::error_code& ec, std::size_t length) {
    78.                 if (!ec)
    79.                 {
    80.                     addToIncomingMessageQueue();
    81.                 }
    82.             });
    83.         }
    84.  
    85.         void addToIncomingMessageQueue()
    86.         {
    87.             std::cout << msg.body.data() << std::endl;
    88.             read_header();
    89.         }
    90.  
    91.     private:
    92.         // Контекст Boost ASIO
    93.         boost::asio::io_context& io;
    94.  
    95.         // Каждое соединение имеет уникальный сокет для удаленной части соединения
    96.         boost::asio::ip::tcp::socket socket;
    97.  
    98.         // Входящие сообщения создаются асинхронно, поэтому мы сохраним частично созданное сообщение
    99.         // здесь, пока оно не будет готово
    100.         message<T> msg;
    101.  
    102.         // Ссылка на очередь входящих сообщений родительского объекта
    103.         message_queue<owned_message<T>>& income;
    104.  
    105.         // Очередь хранит сообщения, которые будут отправлены удаленной части соединения
    106.         message_queue<message<T>> outcome;
    107.     };
    108. }
    109.