boost asio async_write: ¿cómo no intercalar llamadas async_write?

Aquí está mi implementación:

  • El cliente A envía un mensaje para el cliente B
  • El servidor procesa el mensaje async_read la cantidad correcta de datos y esperará nuevos datos del Cliente A (para no bloquear al Cliente A)
  • Posteriormente el Servidor procesará la información (probablemente haga una consulta de MySQL) y luego envíe el mensaje al Cliente B con async_write .

El problema es que si el Cliente A envía un mensaje realmente rápido, async_writes se intercalará antes de llamar al controlador async_write anterior.

¿Hay una manera simple de evitar este problema?

EDIT 1: Si un Cliente C envía un mensaje al Cliente B justo después del Cliente A, el mismo problema debería aparecer …

EDIT 2: ¿Esto funcionaría? porque parece bloquear, no sé dónde …

  namespace structure { class User { public: User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) : m_socket(io_service, context), m_strand(io_service), is_writing(false) {} ssl_socket& getSocket() { return m_socket; } boost::asio::strand getStrand() { return m_strand; } void push(std::string str) { m_strand.post(boost::bind(&structure::User::strand_push, this, str)); } void strand_push(std::string str) { std::cout << "pushing: " << boost::this_thread::get_id() << std::endl; m_queue.push(str); if (!is_writing) { write(); std::cout << "going to write" << std::endl; } std::cout << "Already writing" << std::endl; } void write() { std::cout << "writing" << std::endl; is_writing = true; std::string str = m_queue.front(); boost::asio::async_write(m_socket, boost::asio::buffer(str.c_str(), str.size()), boost::bind(&structure::User::sent, this) ); } void sent() { std::cout << "sent" << std::endl; m_queue.pop(); if (!m_queue.empty()) { write(); return; } else is_writing = false; std::cout << "done sent" << std::endl; } private: ssl_socket m_socket; boost::asio::strand m_strand; std::queue m_queue; bool is_writing; }; } #endif 

¿Hay una manera simple de evitar este problema?

Sí, mantenga una cola saliente para cada cliente. Inspeccione el tamaño de la cola en el controlador de finalización async_write ; si no es cero, inicie otra operación async_write . Aquí hay una muestra

 #include  #include  #include  #include  #include  class Connection { public: Connection( boost::asio::io_service& io_service ) : _io_service( io_service ), _strand( _io_service ), _socket( _io_service ), _outbox() { } void write( const std::string& message ) { _strand.post( boost::bind( &Connection::writeImpl, this, message ) ); } private: void writeImpl( const std::string& message ) { _outbox.push_back( message ); if ( _outbox.size() > 1 ) { // outstanding async_write return; } this->write(); } void write() { const std::string& message = _outbox[0]; boost::asio::async_write( _socket, boost::asio::buffer( message.c_str(), message.size() ), _strand.wrap( boost::bind( &Connection::writeHandler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ) ); } void writeHandler( const boost::system::error_code& error, const size_t bytesTransferred ) { _outbox.pop_front(); if ( error ) { std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl; return; } if ( !_outbox.empty() ) { // more messages to send this->write(); } } private: typedef std::deque Outbox; private: boost::asio::io_service& _io_service; boost::asio::io_service::strand _strand; boost::asio::ip::tcp::socket _socket; Outbox _outbox; }; int main() { boost::asio::io_service io_service; Connection foo( io_service ); } 

algunos puntos clave

  • the boost::asio::io_service::strand protege el acceso a Connection::_outbox
  • se distribuye un controlador desde Connection::write() ya que es público

no era obvio para mí si usaba prácticas similares en el ejemplo de su pregunta, ya que todos los métodos son públicos.

Solo trato de mejorar la gran respuesta de Sam. Los puntos de mejora son:

  • async_write intenta enviar cada byte del búfer (s) antes de completarlo, lo que significa que debe suministrar todos los datos de entrada que tenga a la operación de escritura, de lo contrario, la sobrecarga de encuadre puede boost debido a que los paquetes TCP son más pequeños de lo que podrían haber estado.

  • asio::streambuf , aunque es muy conveniente de usar, no es de copia cero. El siguiente ejemplo demuestra un enfoque de copia cero : mantenga los fragmentos de datos de entrada donde están y use una sobrecarga de dispersión / recostackción de async_write que tome una secuencia de búferes de entrada (que son solo indicadores de los datos de entrada reales).

Código fuente completo:

 #include  #include  #include  #include  #include  #include  #include  #include  using namespace std::chrono_literals; using boost::asio::ip::tcp; class Server { class Connection : public std::enable_shared_from_this { friend class Server; void ProcessCommand(const std::string& cmd) { if (cmd == "stop") { server_.Stop(); return; } if (cmd == "") { Close(); return; } std::thread t([this, self = shared_from_this(), cmd] { for (int i = 0; i < 30; ++i) { Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n"); } server_.io_service_.post([this, self] { DoReadCmd(); }); }); t.detach(); } void DoReadCmd() { read_timer_.expires_from_now(server_.read_timeout_); read_timer_.async_wait([this](boost::system::error_code ec) { if (!ec) { std::cout << "Read timeout\n"; Shutdown(); } }); boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) { read_timer_.cancel(); if (!ec) { const char* p = boost::asio::buffer_cast(buf_in_.data()); std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1)); buf_in_.consume(bytes_read); ProcessCommand(cmd); } else { Close(); } }); } void DoWrite() { active_buffer_ ^= 1; // switch buffers for (const auto& data : buffers_[active_buffer_]) { buffer_seq_.push_back(boost::asio::buffer(data)); } write_timer_.expires_from_now(server_.write_timeout_); write_timer_.async_wait([this](boost::system::error_code ec) { if (!ec) { std::cout << "Write timeout\n"; Shutdown(); } }); boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) { write_timer_.cancel(); std::lock_guard lock(buffers_mtx_); buffers_[active_buffer_].clear(); buffer_seq_.clear(); if (!ec) { std::cout << "Wrote " << bytes_transferred << " bytes\n"; if (!buffers_[active_buffer_ ^ 1].empty()) // have more work DoWrite(); } else { Close(); } }); } bool Writing() const { return !buffer_seq_.empty(); } Server& server_; boost::asio::streambuf buf_in_; std::mutex buffers_mtx_; std::vector buffers_[2]; // a double buffer std::vector buffer_seq_; int active_buffer_ = 0; bool closing_ = false; bool closed_ = false; boost::asio::deadline_timer read_timer_, write_timer_; tcp::socket socket_; public: Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) { } void Start() { socket_.set_option(tcp::no_delay(true)); DoReadCmd(); } void Close() { closing_ = true; if (!Writing()) Shutdown(); } void Shutdown() { if (!closed_) { closing_ = closed_ = true; boost::system::error_code ec; socket_.shutdown(tcp::socket::shutdown_both, ec); socket_.close(); server_.active_connections_.erase(shared_from_this()); } } void Write(std::string&& data) { std::lock_guard lock(buffers_mtx_); buffers_[active_buffer_ ^ 1].push_back(std::move(data)); // move input data to the inactive buffer if (!Writing()) DoWrite(); } }; void DoAccept() { if (acceptor_.is_open()) { auto session = std::make_shared(*this); acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) { if (!ec) { active_connections_.insert(session); session->Start(); } DoAccept(); }); } } boost::asio::io_service io_service_; tcp::acceptor acceptor_; std::unordered_set> active_connections_; const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30); const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30); public: Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { } void Run() { std::cout << "Listening on " << acceptor_.local_endpoint() << "\n"; DoAccept(); io_service_.run(); } void Stop() { acceptor_.close(); { std::vector> sessionsToClose; copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose)); for (auto& s : sessionsToClose) s->Shutdown(); } active_connections_.clear(); io_service_.stop(); } }; int main() { try { Server srv(8888); srv.Run(); } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << "\n"; } }