Because TCP recv() allows for partial returns, we must make sure that we loop and recv() all of the message. This is TCP sockets 101 and it is kind of embarrassing that I missed it. Note: TCP send() can also have partial returns, but I have not bothered handling this, even though it is trivial.
The other mistake that I made was that I always assumed that a varint32 was 4 bytes in size. This is incorrect, since it is, in fact, a VARINT!
These two mistakes necessitated changes in the C++ sending and receiving.
First, I wrote a function to parse out the delimiting varint and to fill a buffer with the bytes specified by that varint:
/* reads a varint delimited protocol buffers message from a TCP socket returns message in buffer, and returns number of bytes read (not including delimiter) */ int recvDelimProtobuf(int sock, unsigned char **buffer){ //read the delimiting varint byte by byte unsigned int length=0; int recv_bytes=0; char bite; int received=recv(sock, &bite, 1, 0); if(received<0) return received; else recv_bytes += received; length = (bite & 0x7f); while(bite & 0x80){ memset(&bite, 0, 1); received=recv(sock, &bite, 1, 0); if(received<0) return received; else recv_bytes += received; length|= (bite & 0x7F) << (7*(recv_bytes-1)); } //receive remainder of message recv_bytes=0; *buffer=(unsigned char *)malloc(sizeof(unsigned char) * length); while(recv_bytes < length){ received=recv(sock, *buffer + (sizeof(unsigned char) * recv_bytes), length-recv_bytes, 0); if(received<0) return received; else recv_bytes+=received; } return recv_bytes; }
The rest of the code now looks like this:
//allocate packet buffer unsigned char *buffer; int received=recvDelimProtobuf(clientSock, &buffer); //read varint delimited protobuf object in to buffer google::protobuf::io::ArrayInputStream arrayIn(buffer, received); google::protobuf::io::CodedInputStream codedIn(&arrayIn); google::protobuf::io::CodedInputStream::Limit msgLimit = codedIn.PushLimit(received); client.ParseFromCodedStream(&codedIn); codedIn.PopLimit(msgLimit); //purge buffer free(buffer);
This fixes the receiving side. To fix the sending side, we need to change the message size to actually reflect the size of the varint + message.
int varintsize = google::protobuf::io::CodedOutputStream::VarintSize32(serverAck.ByteSize()); int ackSize=serverAck.ByteSize()+varintsize; char* ackBuf=new char[ackSize]; //write varint delimiter to buffer google::protobuf::io::ArrayOutputStream arrayOut(ackBuf, ackSize); google::protobuf::io::CodedOutputStream codedOut(&arrayOut); codedOut.WriteVarint32(serverAck.ByteSize()); //write protobuf ack to buffer serverAck.SerializeToCodedStream(&codedOut); send(clientSock, ackBuf, ackSize, 0); delete(ackBuf);
Thanks to Johan Anderholm for helping me with this solution. As he mentions in a comment on my original blog entry, there is another elegant solution that uses boost::asio. I have not tried this but I am sure it works just as well!
Your java receiving has the same issue!
ReplyDeleteYou're absolutely right... I blame my lack of experience with NIO! Thanks for pointing that out.
DeleteHi Adam,
ReplyDeletei followed your code to handle a TCP protobuf connection between two
processes, but i have a crash into the protobuf library when i'm going to parse
the sent message. Above you can see some infos about code and traces.
Could you help me??? Thank you very much!
/**
** [0] This is the dump of the sent message:
** - first byte (0x08) should be the varint len, that is the "header"
** - others bytes should be the payload of the proto message (len = 8 bytes)
**/
[line: 180, tag buffer3] 08 08 01 1a
[line: 180, tag buffer3] 04 08 0a 10
[line: 180, tag buffer3] 14
/**
** [1] This is the dump of the received message after removing the varint len
**/
[line: 77, tag ParseMsg(1)] 08 01 1a 04
[line: 77, tag ParseMsg(1)] 08 0a 10 14
/* [2] Above the code to parse the message */
google::protobuf::io::ArrayInputStream arrIn(buffer, len);
google::protobuf::io::CodedInputStream input(&arrIn);
google::protobuf::io::CodedInputStream::Limit msgLimit = input.PushLimit(len);
input.PopLimit(msgLimit);
msgp->ParseFromCodedStream(&input);
/* [3] Above is showed the crash happened into the ParseFromCodedStream */
Program received signal SIGSEGV, Segmentation fault.
0x00157ba2 in InlineParseFromCodedStream (this=0xbffff190, input=0xbfffed08) at google/protobuf/message_lite.cc:131
131 google/protobuf/message_lite.cc: No such file or directory.
in google/protobuf/message_lite.cc
Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.80.el6.i686 libgcc-4.4.6-4.el6.i686 libstdc++-4.4.6-4.el6.i686 zlib-1.2.3-27.el6.i686
(gdb) bt
#0 0x00157ba2 in InlineParseFromCodedStream (this=0xbffff190, input=0xbfffed08) at google/protobuf/message_lite.cc:131
#1 google::protobuf::MessageLite::ParseFromCodedStream (this=0xbffff190, input=0xbfffed08) at google/protobuf/message_lite.cc:161
#2 0x0804f05f in PceMgr::ParseMsg(unsigned char*, int, pceGenericMessage*) ()
#3 0x0804f3f9 in PceMgr::Run() ()
#4 0x0808450e in main ()
Piero,
DeleteI am far from a protobuf expert, but it looks like there's something wrong with your installation. This seems to be the root of the problem: 131 google/protobuf/message_lite.cc: No such file or directory.
I'm not sure what else would cause this. Check that you installed protobuf properly. What version of protobuf are you using? I used 2.3 to make this guide, so it's possible there have been changes to the library since then.
Oh! I'm sorry ...i'm very distracted :-)...I'm gonna to check my installation. I'm using version 2.4.1. Thank you very much, i'll let you know :-)
ReplyDeleteThank You! You blog post saved at least couple of days ...
ReplyDelete