Я хочу написать свой легковесный простой брокер сообщений типа Kafka. Написал такой код. Класс сообщения message.h Код (C++): #pragma once #include <memory> namespace nano { template<typename T> struct message { T body; static message<T> DEFAULT; }; // Forward declare the connection template <typename T> class connection; template<typename T> struct owned_message { std::shared_ptr<connection<T>> remote = nullptr; message<T> msg; static owned_message<T> DEFAULT; }; } В message.cpp определены поля DEFAULT, нужные для возврата сообщения "по умолчанию": Код (C++): #include "message.h" namespace nano { template<typename T> message<T> message<T>::DEFAULT = {}; template<typename T> owned_message<T> owned_message<T>::DEFAULT = {}; } Класс соединения connection.h: Код (C++): #pragma once #include <iostream> #include <memory> #include <string> #include <boost/asio.hpp> #include "message_queue.h" #include "message.h" namespace nano { template<typename T> class connection : public std::enable_shared_from_this<connection<T>> { public: enum class owner { SERVER, CLIENT }; connection(owner parent, boost::asio::io_service& io, boost::asio::ip::tcp::socket socket, message_queue<owned_message<T>>& queue) : io(io), socket(std::move(socket)), income(queue) { owner_type = parent; } void disconnect() { if (is_connected()) { boost::asio::post(io, [this]() { socket.close(); }); } } bool is_connected() const { return socket.is_open(); } void send(const message<T>& msg) { boost::asio::post(io, [this, msg]() { outcome.push(msg); send(); }); } void connect() { if (owner_type == owner::SERVER) { if (socket.is_open()) { read(); } } } private: void send() { auto msg = outcome.top(); boost::asio::async_write(socket, boost::asio::buffer(msg.body, msg.body.size()), [this](std::error_code ec, std::size_t length) { // TODO }); } void read() { boost::asio::streambuf buffer; boost::asio::async_read(socket, buffer, [this, &buffer](std::error_code ec, std::size_t length) { if (!ec) { std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl; } }); } private: // Контекст Boost ASIO boost::asio::io_context& io; // Каждое соединение имеет уникальный сокет для удаленной части соединения boost::asio::ip::tcp::socket socket; // Ссылка на очередь входящих сообщений родительского объекта message_queue<owned_message<T>>& income; // Очередь хранит сообщения, которые будут отправлены удаленной части соединения message_queue<message<T>> outcome; owner owner_type = owner::SERVER; }; } Интерфейс сервера server_interface.h: Код (C++): #pragma once #include <iostream> #include <cstdint> #include <thread> #include <memory> #include <list> #include <string> #include <boost/asio.hpp> #include "connection.h" #include "message_queue.h" #include "message.h" namespace nano { template<typename T> class server_interface { public: server_interface(std::uint16_t port) : acceptor(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) { } virtual ~server_interface() { stop(); } bool start() { try { listen(); thread = std::thread([this]() { io.run(); }); } catch (std::exception& e) { std::cerr << "[Server] stopped." << std::endl; return false; } std::cout << "[Server] started." << std::endl; return true; } void stop() { io.stop(); if (thread.joinable()) thread.join(); std::cout << "[Server] stopped." << std::endl; } void listen() { acceptor.async_accept([this](std::error_code ec, boost::asio::ip::tcp::socket socket) { if (!ec) { std::cout << "[Server] new connection: " << socket.remote_endpoint() << std::endl; std::shared_ptr<connection<T>> conn = std::make_shared<connection<T>>(connection<T>::owner::SERVER, io, std::move(socket), messages); // Даем возможность отклонить соединение if (on_connect(conn)) { connections.push_back(std::move(conn)); connections.back()->connect(); } } listen(); }); } void message_client(std::shared_ptr<connection<T>> client, const message<T>& msg) { if (client && client->is_connected()) { client->send(msg); } else { // Remove dead clients connections.erase(std::remove(connections.begin(), connections.end(), nullptr), connections.end()); } } void update() { messages.wait(); while (!messages.empty()) { auto msg = messages.top(); on_message(msg.remote, msg); messages.pop(); } } protected: virtual bool on_connect(std::shared_ptr<connection<T>> conn) { return false; } virtual void on_disconnect(std::shared_ptr<connection<T>> conn) { } virtual void on_message(std::shared_ptr<connection<T>> conn, const owned_message<T>& msg) { } protected: boost::asio::io_context io; boost::asio::ip::tcp::acceptor acceptor; std::thread thread; std::list<std::shared_ptr<connection<T>>> connections; message_queue<owned_message<T>> messages; }; } В очереди сообщений нет ничего особенного: Код (C++): #pragma once #include <list> #include <mutex> #include <memory> #include <condition_variable> #include <stdexcept> template<typename Message> class message_queue { public: void push(const Message& msg) { { std::unique_lock<std::mutex> lock(mutex); messages.emplace_front(msg); } condition.notify_one(); } Message& top(int timeout = 0) { std::unique_lock<std::mutex> lock(mutex); if (timeout <= 0) { condition.wait(lock, [this] { return !messages.empty(); }); } else { auto timeoutOccured = !condition.wait_for(lock, std::chrono::milliseconds(timeout), [this] { return !messages.empty(); }); if (timeoutOccured) throw std::runtime_error("Timeout"); } return messages.front(); } void pop() { std::unique_lock<std::mutex> lock(mutex); if (!messages.empty()) { messages.pop_front(); } } bool empty() { std::unique_lock<std::mutex> lock(mutex); return messages.empty(); } void wait() { while (messages.empty()) { std::unique_lock<std::mutex> lock(mutex); condition.wait(lock); } } private: std::list<Message> messages; std::mutex mutex; std::condition_variable condition; }; Реализация сервера получается простая с удобными обработчиками сообщений и соединений: Код (C++): #include "server_interface.h" #include "message.h" class message_server : public nano::server_interface<std::string> { using connection_t = std::shared_ptr<nano::connection<std::string>>; using message_t = nano::owned_message<std::string>; public: message_server(std::uint16_t port) : server_interface(port) { } protected: bool on_connect(connection_t conn) override { return true; } void on_disconnect(connection_t conn) override { std::cout << "Removing client" << std::endl; } void on_message(connection_t conn, const message_t& msg) override { conn->send(msg.msg); } }; int main() { message_server server(60000); server.start(); while (true) { server.update(); } } Проблема в том, что при соединении по телнету с сервером (telnet localhost 60000) сообщения от телнет-клиента не обрабатываются. В коде происходит такой сценарий: 1) Подключается телнет-клиент 2) В сервере создается соединение, сохраняется в списке клиентов и сервер начинает читать данные от клиента: Код (C++): // Даем возможность отклонить соединение if (on_connect(conn)) { connections.push_back(std::move(conn)); connections.back()->connect(); } Соединяемся с клиентом: Код (C++): void connect() { if (owner_type == owner::SERVER) { if (socket.is_open()) { read(); } } } Вот тут возникает проблема: Код (C++): void read() { boost::asio::streambuf buffer; boost::asio::async_read(socket, buffer, [this, &buffer](std::error_code ec, std::size_t length) { if (!ec) { std::cout << std::string(std::istreambuf_iterator<char>(&buffer), {}) << std::endl; } }); } Вызывается метод async_read, но он не получает никакие данные и из него происходит выход. В дальнейшем этот метод больше не вызывается никак. По идее этот метод должен зависнуть и ждать, пока телнет-клиент пришлет данные, но вместо этого происходит выход из этого метода. Почему так? Как правильно использоватб этот метод, чтобы дождаться данные от телнет-клиента?
Но ведь ты делаешь асинхронное чтение из сокета. async_read сразу возвращает управление, продолжая читать где-то в фоне. Когда чтение завершится или произойдёт ошибка - вызовется заданный каллбэк. https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/reference/async_read/overload1.html В сам каллбэк управление не попадает вообще или не срабатывает условие if (!ec)?
Вроде я разобрался. Функция async_read сразу после вызова возвращает управление, но вроде бы он ставит коллбэк в очередь и он выполнится при поступлении данных. Чтобы колбэк вызывался постоянно, надо его вызвать из самого себя прямо или через другую функцию, например read_header Код (Text): namespace nano { template<typename T> class connection : public std::enable_shared_from_this<connection<T>> { public: connection(boost::asio::io_service& io, boost::asio::ip::tcp::socket socket, message_queue<owned_message<T>>& queue) : io(io), socket(std::move(socket)), income(queue) { } void disconnect() { if (is_connected()) { boost::asio::post(io, [this]() { socket.close(); }); } } bool is_connected() const { return socket.is_open(); } void send(const message<T>& msg) { boost::asio::post(io, [this, msg]() { outcome.push(msg); send(); }); } void connect() { if (socket.is_open()) { read_header(); } } private: void send() { auto msg = outcome.top(); boost::asio::async_write(socket, boost::asio::buffer(msg.body, msg.body.size()), [this](const boost::system::error_code& ec, std::size_t length) { // TODO }); } void read_header() { socket.async_read_some(boost::asio::buffer(&msg.header, sizeof(msg.header)), [this](const boost::system::error_code& ec, std::size_t length) { if (!ec) { if (msg.header.size > 0) { msg.body.resize(msg.header.size); read_body(); } else { addToIncomingMessageQueue(); } } }); } void read_body() { socket.async_read_some(boost::asio::buffer(msg.body.data(), msg.body.size()), [this](const boost::system::error_code& ec, std::size_t length) { if (!ec) { addToIncomingMessageQueue(); } }); } void addToIncomingMessageQueue() { std::cout << msg.body.data() << std::endl; read_header(); } private: // Контекст Boost ASIO boost::asio::io_context& io; // Каждое соединение имеет уникальный сокет для удаленной части соединения boost::asio::ip::tcp::socket socket; // Входящие сообщения создаются асинхронно, поэтому мы сохраним частично созданное сообщение // здесь, пока оно не будет готово message<T> msg; // Ссылка на очередь входящих сообщений родительского объекта message_queue<owned_message<T>>& income; // Очередь хранит сообщения, которые будут отправлены удаленной части соединения message_queue<message<T>> outcome; }; }