I have a boost asio project that I am at wits end with. I have defined a TCPConnect class which is inherited by both a TCPSession and a TCPClient class. The reason for this is because I would like both the server and client side of the connection to be directly used for sending and receiving messages over full duplex TCP/IP. Only the TCPSession class listens for new connections, and only the TCPClient class makes an outgoing connection.
When the client connects to the server it sends a handshake message and blocks for the response. The server receives the message and sends a handshake ack back. On sending the ack, the server considers the connection completed. On receiving the ack, the client considers the connection completed.
The problem that I am having is that only the client side (the TCPClient object) can call its inherited TCPConnect::Send() and have it be received by the remote TCPSession object. If the server side (the TCPSession object) calls TCPConnect::Send(), it puts the message on the line without a problem, but the message is never received by the remote TCPClient object.
I must say I am a total beginner at working with boost. I have looked into this issue, trying to word and reword my search query, but have not found a solution. On both sides I have the TCPSession and TCPClient objects sitting on an async_read(), but on the client side the callback for the async_read() does not get called when the server side sends a message. I am assuming this has to do with how the io_service is set up on the TCPSession object's side, and possibly with threading.
So, a TCPServer is started inside a TCPWorker::run() function, this function being run in its own thread. The TCPWorker class owns the io_service object. The threaded run() function instantiates a TCPServer object and then calls io_service::run(). The TCPServicer object is responsible for creating TCPSession objects for new connections. For each new connection, the TCPServer creates a new TCPSession object with the io_service pointer and calls async_accept on its acceptor. Once a new connection is accepted, the TCPSession's socket is set for an async_read(). So it is known that io_service and the multiple sockets that can be created to use it are in one thread.
On the TCPClient side, when Connect() is called, if an io_service doesn't exist and/or a thread does not yet exist for the io_service, they are created using the following lines:
if (!m_pClientServiceThread) { if (!m_pclient_io_service) // if no io_service has yet been created m_pclient_io_service = new boost::asio::io_service; m_pClientServiceThread = new boost::thread(boost::bind(&boost::asio::io_service::run, m_pclient_io_service)); }
So the service is being run in a thread. Then a new resolver and a new boost::asio::ip::tcp::socket are created given the io_service pointer, and boost::asio::connect() is called given the socket and a valid resolved endpoint. So the client seems to have its io_service running in a thread. Having successfully made the connection, I then send a handshake message using boost::asio::read(), and sit with boost::asio::read() waiting on the handshare response. One receiving a valid response, I pass the socket to an async_read() to wait for incoming messages.
I have looked at this for so long now without figuring out why the client side async_read() does no receive a message sent from the server side, even though the server side receives the message that is sent from the client side.
Please help me to figure this out. I am quite sure there is something simple I am not seeing, but as I have said, I am not a boost expert, so I am not sure what it is.
Added code:
TcpConnect class:
class TcpConnect : public boost::enable_shared_from_this<TcpConnect>
{
    TcpConnect(boost::asio::io_service* pio_service, ConnType connType, std::string sHostIp, int iHostPort)
        : m_pio_service(pio_service)
        , eConnType(connType)
        , m_strHostIp(sHostIp)
        , m_iHostPort(iHostPort)
    {
    }
    virtual ~TcpConnect() { /* does what is needed - this works */ }
    bool SendBlocked(CmdPacket& msg, CmdPacket& rsp)
    {
        // Function used for handling connection handshake response
        
        std::size_t write_length = boost::asio::write( *m_pSocket, boost::asio::buffer(msg.Serialize(), (std::size_t)msg.SerializedLength()));
        if (msg.SerializedLength() != write_length)
        {
            return false;
        }
        boost::asio::streambuf sbuff;
        boost::system::error_code error;
        size_t reply_length(0);
        // read header for message body length
        byte* buffer = rsp.CreateBuffer(MSG_HEADER_LENGTH);
        reply_length = boost::asio::read(*m_pSocket, boost::asio::buffer(buffer, MSG_HEADER_LENGTH), boost::asio::transfer_exactly(MSG_HEADER_LENGTH), error); 
        if (error || !rsp.ReadMessageLength())
        {
            /* error handled here */
            return false;
        }
        // read message body
        int expectedlen = rsp.BodyLength();
        buffer = rsp.CreateBuffer(expectedlen);
        reply_length = boost::asio::read(*m_pSocket, boost::asio::buffer(buffer, expectedlen), boost::asio::transfer_exactly(expectedlen), error);
        if (error)
        {
            /* error handled here */
            return false;
        }
        if (!rsp.Deserialize() || reply_length != rsp.BodyLength())
        {
            /* error handled here */
            return false;
        }
        return true;
    }
    bool Send(CmdPacket& msg)
    {
        bool bStatus = true;
        size_t write_length = 0;
        try
        {
            write_length = boost::asio::write( *m_pSocket, boost::asio::buffer(msg.Serialize(), (std::size_t)msg.SerializedLength()) );
        }
        catch (...)
        {
            /* error handled here */
            return false;
        }
        if (write_length != msg.SerializedLength())
        {
            /* error handled here */
            return false;
        }
        return true;
    }
    
    void StartAsyncRead()
    {
        m_pInMsg = new CmdPacket();
        boost::asio::async_read(*m_pSocket, boost::asio::buffer(m_pInMsg->CreateBuffer(MSG_HEADER_LENGTH), MSG_HEADER_LENGTH),
            boost::bind(&TcpConnect::handle_read_header, shared_from_this(),
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
    }
    void handle_read_header(const boost::system::error_code& error, size_t bytes_transferred)
    {
        if (!error && bytes_transferred == MSG_HEADER_LENGTH && m_pInMsg->ReadMessageLength())
        {
            boost::asio::async_read(*m_pSocket,
                boost::asio::buffer(m_pInMsg->CreateBuffer(m_pInMsg->SerializedLength()), m_pInMsg->SerializedLength()),
                boost::bind(&TcpConnect::handle_read_body, shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
        }
        else if (error)
        {
            /* errors handled */
        }
    }
    void handle_read_body(const boost::system::error_code& error, size_t bytes_transferred)
    {
        bool deserialized = false;
        if (!error && (deserialized = m_pInMsg->Deserialize()))
        {
            // if not yet connected, expecting handshake message, in which case acknowledge, otherwise error
            if (m_pInMsg->IsHandshake())
            {
                m_pInMsg->SetAcknowledge(true);
                std::size_t write_length = boost::asio::write(
                    *m_pSocket, boost::asio::buffer(m_pInMsg->Serialize(), (std::size_t)m_pInMsg->SerializedLength()));
                if (write_length == m_pInMsg->SerializedLength())
                {
                    /* we sent the acknowledgement, so we consider we're connected */
                }
                else
                {
                    /* handling error here */
                    return;
                }
                delete m_pInMsg;
                m_pInMsg = NULL;
            }
            // if graceful disconnect, notify the connection manager of new status, which will remove the connection from the map
            else if (m_pInMsg->IsDisconnect())
            {
                /* disconnect request handled here */
                return;
            }
            else
            {
                /* message received, passing it to the local process here */
            }
            // set up to handle the next read
            m_pInMsg = new CmdPacket;
            boost::asio::async_read(*m_pSocket, boost::asio::buffer(m_pInMsg->CreateBuffer(MSG_HEADER_LENGTH), MSG_HEADER_LENGTH),
                boost::bind(&TcpConnect::handle_read_header, shared_from_this(),
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
        }
        else if (error)
        {
            /* handle case where boost error */
        }
        else if (!deserialized)
        {
            /* handle case where message not correctly deserialized */
        }
    }
protected:
    ConnType eConnType;
    boost::asio::ip::tcp::socket* m_pSocket{ 0 };
    boost::asio::io_service* m_pio_service;
    boost::asio::ip::tcp::resolver::iterator m_itEndpoint;
    CmdPacket* m_pInMsg{ 0 };
    std::string m_strHostIp;
    int m_iHostPort;
};
typedef boost::shared_ptr<TcpConnect> ConnectionPtr;
TcpClient class:
class TcpClient : public TcpConnect
{
public:
    TcpClient(boost::asio::io_service& io_service, ConnType connType, const std::string& sIP, int iPort)
        : TcpConnect(&io_service, connType, sIP, iPort)
    {
    }
    ~TcpClient() { /* does what is needed - this works */ }
    //virtual ObjType Type() { return OT_CLIENT; }
    //virtual int sessionId() { return -1; } // client end does not have a session id
    //Use the following to initialize and to to reestablish the connection.
    bool Connect()
    {
        bool bStatus = true;
        //Convert the port to a string
        std::stringstream ss;
        ss << m_iHostPort;
        std::string strPort = ss.str();
        
        //Establish the connection
        try
        {
            boost::system::error_code ec;
            // create TCP resolver and query and resolve the endpoint
            boost::asio::ip::tcp::resolver resolver(*m_pio_service);
            boost::asio::ip::tcp::resolver::query query(m_strHostIp.c_str(), strPort.c_str());
            boost::asio::ip::tcp::resolver::iterator m_iterEndpoint = resolver.resolve(query, ec);
            if (ec)
            {
                /* error handled here */
                bStatus = false;
            }
            else
            {
                // close an old socket (shouldn't ever be the case)
                if (m_pSocket != NULL) CloseSocket();  /* NOTE: this is defined in TcpConnect, but not shown here */
                // create the socket on the io_service object and connect to the endpoint
                m_pSocket = new boost::asio::ip::tcp::socket(*m_pio_service);
                boost::asio::connect(*m_pSocket, m_iterEndpoint, ec);
                if (ec)
                {
                    /* error handled here */
                    bStatus = false;
                }
            }
        } //end try
        catch(...)
        {
            /* error handled here */
            bStatus = false;
        }
        
        return bStatus;
    }
};
typedef boost::shared_ptr<TcpClient> TcpClientPtr;
TcpServer class (run by TcpWorker and creates TcpSession objects):
class TcpServer;
class TcpSession : public TcpConnect
{
public:
    TcpSession(boost::asio::io_service& io_service)
        : TcpConnect(&io_service)
        , m_session_id(next_session_id())
        , m_pSocket(new tcp::socket(io_service))
    {
    }
    virtual ~TcpSession() { /* NOTE: m_pSocket is owned and deleted by TcpConnect */ }
private:
    int next_session_id()
    {
        static int id = 0;
        return (++id > 0) ? id : 1;
    }
private:
    int m_session_id;
};
typedef boost::shared_ptr<TcpSession> TcpSessionPtr;
class TcpServer
{
public:
    TcpServer(boost::asio::io_service& io_service, short port)
        : m_pio_service(&io_service)
        , m_acceptor(io_service, tcp::endpoint(tcp::v4(), port))
    {
        m_acceptor.listen();
    }
    ~TcpServer()
    {
        boost::system::error_code errorcode;
        m_acceptor.close(errorcode);
    }
    void start_accept()
    {
        TcpSessionPtr new_session(new TcpSession(*m_pio_service));
        // start listening for this session
        m_acceptor.async_accept(new_session->socket(),
            boost::bind(&TcpServer::handle_accept, this, new_session,
            boost::asio::placeholders::error));
        
        new_session.reset();
    }
private:
    void handle_accept(TcpSessionPtr new_session, const boost::system::error_code& error)
    {
        if (!error)
        {
            new_session->StartAsyncRead(); /* NOTE: there is code for aggregating session objects */
            /* NOTE: The result of an async_read() will be handled in TcpConnect::handle_read_header() */
        }
        else
        {
            /* error handled here */
        }
        // listen for the next connection
        start_accept();
    }
private:
    boost::asio::io_service* m_pio_service;
    tcp::acceptor m_acceptor;
};
class TcpWorker
{
public:
    TcpWorker(unsigned int port)
        : m_port(port)
    {}
    ~TcpWorker() {}
    
    void StopWorker()
    {
        if (!m_io_service.stopped())
        {
            m_io_service.stop();
            while (!m_io_service.stopped()) { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); }
        }
    }
    void operator()()  // threaded run function started from Communicator::Listen()
    {
        TcpServer server(m_io_service, (short)m_port);
        server.start_accept(); // set up async_accept() for listening
        std::size_t inumhandlers = m_io_service.run();  // blocks here until StopWorker() is called
    }
private:
    unsigned int m_port;
    boost::asio::io_service m_io_service;
    bool m_running;
};
Communicator class:
class Communicator {
public:
    Communicator() = default;
    ~Communicator() { /* does what is needed - this works */ }
    
    bool Listen()
    {
        if (!m_pServerThread || !m_pServerWorker)
        {
            m_pServerWorker = new TcpWorker(m_myPort);
            m_pServerThread = new boost::thread(&TcpWorker::operator(), m_pServerWorker);
            return true;
        }
        return false;
    }
    bool Connect(int srcId, int destId, std::string ipaddr, int port)
    {
        bool ret = false;
        if (connected(destId))
        {
            ret = true;
        }
        else
        {
            // if io_service is not running, start it (happens if never started, or if no remaining client sockets running)
            if (!ClientThreadRunning())
            {
                if (m_pClientThread) // since going to create a new thread, make sure this one is deleted if exists
                    delete m_pClientThread;
                if (!m_pclient_io_service) // if no io_service has yet been created
                    m_pclient_io_service = new boost::asio::io_service;
                m_pClientServiceThread = new boost::thread(boost::bind(&boost::asio::io_service::run, m_pclient_io_service));
            }
            // create the connection. Wait for Ack before returning.
            TcpClientPtr client(new TcpClient(*m_pclient_io_service, destId, ip, port));
            // connect to the client and do a handshake
            if (client->Connect())
            {
                // if an initial handshake works, we're connected
                CmdPacket msg(CMD_NONE, srcId, destId, port, ipaddr), rsp;
                msg.SetHandshake(); // this starts the handshake protocol, which is completed on receiving the necessary response.
                if (!client->SendBlocked(msg, rsp) || rsp != msg)
                {
                    client.reset();
                    ret = false;
                }
                else
                {
                    // Connected, now set up for asynchronous reading
                    client->StartAsyncRead(m_pclient_io_service);
                    // save it in the class
                    connection = client;
                    ret = true;
                }
            }
            // decrement reference count, if not added to shared pointer map, this will set client object for deletion
            client.reset();
        }
        return ret;
    }
    bool sendMessage(CmdPacket& msg)
    {
        bool bret = false;
        if (connection != nullptr)
        {
            iter->second->Send(msg);
        }
        return bret;
    }
private:
    TcpConnect *connection{ 0 };
    TcpWorker* m_pServerWorker{ 0 };
    boost::thread *m_pServerThread{ 0 };
    boost::thread* m_pClientThread{ 0 };
    boost::asio::io_service* m_pclient_io_service{ 0 };
    ConnectionPtr connection{ 0 };
};

