Wednesday, January 9, 2013

Cross-platform communication using Google's Protocol Buffers Revisited

In my previous blog entry regarding cross-platform communication using Protobuf, I made some very elementary mistakes. The most glaring of which was my improper use of TCP.
Because TCP recv() allows for partial returns, we must make sure that we loop and recv() all of the message. This is TCP sockets 101 and it is kind of embarrassing that I missed it. Note: TCP send() can also have partial returns, but I have not bothered handling this, even though it is trivial.
The other mistake that I made was that I always assumed that a varint32 was 4 bytes in size. This is incorrect, since it is, in fact, a VARINT!

These two mistakes necessitated changes in the C++ sending and receiving.
First, I wrote a function to parse out the delimiting varint and to fill a buffer with the bytes specified by that varint:
/* 
   reads a varint delimited protocol buffers message from a TCP socket
   returns message in buffer, and returns number of bytes read (not including delimiter)
*/
int recvDelimProtobuf(int sock, unsigned char **buffer){
    //read the delimiting varint byte by byte
    unsigned int length=0;
    int recv_bytes=0;
    char bite;
    int received=recv(sock, &bite, 1, 0);
    if(received<0)
        return received;
    else
        recv_bytes += received;
    length = (bite & 0x7f);
    while(bite & 0x80){
        memset(&bite, 0, 1);
        received=recv(sock, &bite, 1, 0);
        if(received<0)
            return received;
        else
            recv_bytes += received;
        length|= (bite & 0x7F) << (7*(recv_bytes-1));
    }

    //receive remainder of message
    recv_bytes=0;
    *buffer=(unsigned char *)malloc(sizeof(unsigned char) * length);
    while(recv_bytes < length){
        received=recv(sock, *buffer + (sizeof(unsigned char) * recv_bytes), length-recv_bytes, 0);
        if(received<0)
            return received;
        else
            recv_bytes+=received;
    }
    return recv_bytes;
}

The rest of the code now looks like this:
    //allocate packet buffer
    unsigned char *buffer;
	int received=recvDelimProtobuf(clientSock, &buffer);

	//read varint delimited protobuf object in to buffer
	google::protobuf::io::ArrayInputStream arrayIn(buffer, received);
	google::protobuf::io::CodedInputStream codedIn(&arrayIn);
	google::protobuf::io::CodedInputStream::Limit msgLimit = codedIn.PushLimit(received);
	client.ParseFromCodedStream(&codedIn);
	codedIn.PopLimit(msgLimit);

	//purge buffer
	free(buffer);

This fixes the receiving side. To fix the sending side, we need to change the message size to actually reflect the size of the varint + message.
	int varintsize = google::protobuf::io::CodedOutputStream::VarintSize32(serverAck.ByteSize());
	int ackSize=serverAck.ByteSize()+varintsize;
	char* ackBuf=new char[ackSize];

	//write varint delimiter to buffer
	google::protobuf::io::ArrayOutputStream arrayOut(ackBuf, ackSize);
	google::protobuf::io::CodedOutputStream codedOut(&arrayOut);
	codedOut.WriteVarint32(serverAck.ByteSize());

	//write protobuf ack to buffer
	serverAck.SerializeToCodedStream(&codedOut);
	send(clientSock, ackBuf, ackSize, 0);
	delete(ackBuf);

Thanks to Johan Anderholm for helping me with this solution. As he mentions in a comment on my original blog entry, there is another elegant solution that uses boost::asio. I have not tried this but I am sure it works just as well!

6 comments:

  1. Your java receiving has the same issue!

    ReplyDelete
    Replies
    1. You're absolutely right... I blame my lack of experience with NIO! Thanks for pointing that out.

      Delete
  2. Hi Adam,
    i followed your code to handle a TCP protobuf connection between two
    processes, but i have a crash into the protobuf library when i'm going to parse
    the sent message. Above you can see some infos about code and traces.
    Could you help me??? Thank you very much!


    /**
    ** [0] This is the dump of the sent message:
    ** - first byte (0x08) should be the varint len, that is the "header"
    ** - others bytes should be the payload of the proto message (len = 8 bytes)
    **/
    [line: 180, tag buffer3] 08 08 01 1a
    [line: 180, tag buffer3] 04 08 0a 10
    [line: 180, tag buffer3] 14

    /**
    ** [1] This is the dump of the received message after removing the varint len
    **/
    [line: 77, tag ParseMsg(1)] 08 01 1a 04
    [line: 77, tag ParseMsg(1)] 08 0a 10 14

    /* [2] Above the code to parse the message */
    google::protobuf::io::ArrayInputStream arrIn(buffer, len);
    google::protobuf::io::CodedInputStream input(&arrIn);
    google::protobuf::io::CodedInputStream::Limit msgLimit = input.PushLimit(len);
    input.PopLimit(msgLimit);

    msgp->ParseFromCodedStream(&input);

    /* [3] Above is showed the crash happened into the ParseFromCodedStream */
    Program received signal SIGSEGV, Segmentation fault.
    0x00157ba2 in InlineParseFromCodedStream (this=0xbffff190, input=0xbfffed08) at google/protobuf/message_lite.cc:131
    131 google/protobuf/message_lite.cc: No such file or directory.
    in google/protobuf/message_lite.cc
    Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.80.el6.i686 libgcc-4.4.6-4.el6.i686 libstdc++-4.4.6-4.el6.i686 zlib-1.2.3-27.el6.i686
    (gdb) bt
    #0 0x00157ba2 in InlineParseFromCodedStream (this=0xbffff190, input=0xbfffed08) at google/protobuf/message_lite.cc:131
    #1 google::protobuf::MessageLite::ParseFromCodedStream (this=0xbffff190, input=0xbfffed08) at google/protobuf/message_lite.cc:161
    #2 0x0804f05f in PceMgr::ParseMsg(unsigned char*, int, pceGenericMessage*) ()
    #3 0x0804f3f9 in PceMgr::Run() ()
    #4 0x0808450e in main ()

    ReplyDelete
    Replies
    1. Piero,

      I am far from a protobuf expert, but it looks like there's something wrong with your installation. This seems to be the root of the problem: 131 google/protobuf/message_lite.cc: No such file or directory.

      I'm not sure what else would cause this. Check that you installed protobuf properly. What version of protobuf are you using? I used 2.3 to make this guide, so it's possible there have been changes to the library since then.

      Delete
  3. Oh! I'm sorry ...i'm very distracted :-)...I'm gonna to check my installation. I'm using version 2.4.1. Thank you very much, i'll let you know :-)

    ReplyDelete
  4. Thank You! You blog post saved at least couple of days ...

    ReplyDelete

Note: Only a member of this blog may post a comment.