diff --git a/qa/rpc-tests/test_framework/mininode.py b/qa/rpc-tests/test_framework/mininode.py --- a/qa/rpc-tests/test_framework/mininode.py +++ b/qa/rpc-tests/test_framework/mininode.py @@ -38,6 +38,16 @@ import copy from test_framework.siphash import siphash256 from test_framework.cdefs import MAX_BLOCK_SIGOPS_PER_MB +from test_framework.p2pbuffer import P2PBuffer + +# Derive a class for Bitcoin network message buffering +class MininodeP2PBuffer(P2PBuffer): + ''' + A buffer for receiving p2p network messages. + ''' + def __init__(self, network_magic): + super(MininodeP2PBuffer, self).__init__(network_magic, strip_magic=False) + BIP0031_VERSION = 60000 MY_VERSION = 70014 # past bip-31 for ping/pong @@ -53,6 +63,8 @@ NODE_BLOOM = (1 << 2) NODE_WITNESS = (1 << 3) +READ_BUFFER_SIZE=8192 + # Keep our own socket map for asyncore, so that we can track disconnects # ourselves (to workaround an issue with closing an asyncore socket when # using select) @@ -1622,7 +1634,6 @@ self.dstport = dstport self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.sendbuf = b"" - self.recvbuf = b"" self.ver_send = 209 self.ver_recv = 209 self.last_sent = 0 @@ -1631,6 +1642,8 @@ self.cb = callback self.disconnect = False self.nServices = 0 + # create a receive buffer + self.readbuffer = MininodeP2PBuffer(network_magic=self.MAGIC_BYTES[self.network]) if send_version: # stuff version msg into sendbuf @@ -1664,8 +1677,8 @@ self.show_debug_msg("MiniNode: Closing Connection to %s:%d... " % (self.dstaddr, self.dstport)) self.state = "closed" - self.recvbuf = b"" self.sendbuf = b"" + self.readbuffer.clear() try: self.close() except: @@ -1674,12 +1687,27 @@ def handle_read(self): try: - t = self.recv(8192) - if len(t) > 0: - self.recvbuf += t + recvd = b'' + # read all we can from the buffer + while True: + t = self.recv(READ_BUFFER_SIZE) + if t != b'': + recvd += t + else: + # connection has been closed from other end + break + if len(t) < READ_BUFFER_SIZE: + break + if len(recvd) > 0: + self.readbuffer.recv(recvd) self.got_data() - except: + else: + pass + except BlockingIOError as e: + # this one seems to occur after channel re-opened pass + except: + raise def readable(self): return True @@ -1702,52 +1730,124 @@ try: sent = self.send(self.sendbuf) - except: + except Exception as e: self.handle_close() return self.sendbuf = self.sendbuf[sent:] + def is_magic(self, databuffer): + ''' + Returns True if the databuffer starts with the correct network + start bytes ("network magic"), otherwise returns False. + ''' + return databuffer[:4] == self.MAGIC_BYTES[self.network] + + def check_message(self, msg): + ''' + Returns a tuple (success, result) where result is + True if msg is a well-formed message and False otherwise. + If False, result contains a string detailing the message problem. + Well-formedness is determined by wheither network magic, length, + and (if applicable) checksum are good. + msg_obj is a deserialized message object if the msg was well- + formed, otherwise None. + ''' + deser_msg = None + if msg == b'': + return (False, "message is empty") + # Sanity checks: network magic + if not self.is_magic(msg): + return (False, "message has bad magic") + # Sanity check: not multiple messages, these must be pulled + # apart by buffering beforehand + assert(msg[1:].find(self.MAGIC_BYTES[self.network]) == -1) + try: + if self.ver_recv < 209: + # An older version, without checksum. + if len(msg) < (4 + 12 + 4): + return (False, "message shorter than minimum size") + # Extract the command + command = msg[4:4+12].split(b"\x00", 1)[0] + msglen = struct.unpack(" (4 + 12 + 4 + msglen): + return (False, "message longer than length (%d > %d)" % (len(msg), 4 + 12 + 4 + msglen)) + # Get the body for later diagnostics if command does not parse. + msg_body = msg[4+12+4:] + else: + # A modern version which has a checksum. + if len(msg) < (4 + 12 + 4 + 4): + return (False, "message shorter than minimum size") + command = msg[4:4+12].split(b"\x00", 1)[0] + msglen = struct.unpack(" (4 + 12 + 4 + 4 + msglen): + return (False, "message longer than length (%d > %d)" % (len(msg), 4 + 12 + 4 + 4 + msglen)) + # Get the body for later diagnostics if command does not parse. + msg_body = msg[4+12+4+4:] + th = sha256(msg_body) + h = sha256(th) + if checksum != h[:4]: + return (False, "invalid checksum in message (recvd %s vs calc %s): " % (checksum, h[:4])) + # Checks ok, try to process the command. + if command in self.messagemap: + f = BytesIO(msg_body) + t = self.messagemap[command]() + t.deserialize(f) + deser_msg = t + else: + self.show_debug_msg("Unknown command: '" + command + "' " + + repr(msg)) + return (False, "unknown command in message: %s" % command) + except Exception as e: + raise + + # Return the deserialized message. + assert(deser_msg) + return (True, deser_msg) + def got_data(self): + ''' + It could be a complete message, or multiple complete messages, + or the beginning of a message, or some fragment of a partially + received message. Or any mixture, really, but for simplicity + we assume that no-one is injecting entirely bad data into the stream. + ''' try: while True: - if len(self.recvbuf) < 4: - return - if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: - raise ValueError("got garbage %s" % repr(self.recvbuf)) - if self.ver_recv < 209: - if len(self.recvbuf) < 4 + 12 + 4: - return - command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] - msglen = struct.unpack(">> buf1 = P2PBuffer(network_magic=b'foo') + >>> buf1.network_magic + b'foo' + >>> buf2 = P2PBuffer() + >>> buf2.network_magic is None + True + >>> buf1.recvbuf == buf2.recvbuf == b'' + True + ''' + # Input buffer into which unprocessed data will be placed. + self.recvbuf = b'' + + # Messages queue to which processed messages will be appended, + # waiting for collection. + self.messages = [] + + # Network magic (start bytes) identifying start of messages + self.network_magic = None + if network_magic: + self.network_magic = network_magic + + # Whether the get_next_message() function returns + # messages stripped of their magic or not + self.strip_magic = strip_magic + + # A stack of fragments that do not begin with + # correct magic. + self.bad_magic_fragments = [] + + # A stack of messages that begin with right magic, + # but have been returned by the application as + # being incomplete. + # If there is such a message in the list, and + # there are bad magic fragments, the bad magic + # fragments are added onto the incomplete message + # and this is pushed into messages when get_next_message + # is called, so it is returned first. + self.incomplete_messages = [] + + def store_incomplete_msg(self, data): + ''' + Store an incomplete message. + These must start with correct magic, + and they must not contain multiple messages. + Only one incomplete message can be processed + at a time. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> len(buf.incomplete_messages) + 0 + >>> buf.store_incomplete_msg(b'foo pretend this is incomplete') + >>> len(buf.incomplete_messages) + 1 + >>> buf.incomplete_messages + [b'foo pretend this is incomplete'] + ''' + with p2pbuffer_lock: + assert(data) + assert(type(data) == bytes) + assert(self._starts_with_magic(data)) + assert(not self.network_magic in data[1:]) + if len(self.incomplete_messages) > MAX_INCOMPLETE_IN_FLIGHT: + raise Exception("p2pbuffer: cannot handle more than 8 incomplete message at a time.") + else: + self.incomplete_messages.append(data) + + def push_bad_magic(self, data): + ''' + Push a bad-magic fragment onto the stack. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> len(buf.bad_magic_fragments) + 0 + >>> buf.push_bad_magic(b'gargamel') + >>> len(buf.bad_magic_fragments) + 1 + >>> buf.bad_magic_fragments + [b'gargamel'] + ''' + with p2pbuffer_lock: + assert(data) + assert(type(data) == bytes) + assert(not self._starts_with_magic(data)) + assert(not self.network_magic in data) + self.bad_magic_fragments.append(data) + + def pop_bad_magic(self): + ''' + Returns all the available bad magic fragments. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> buf.push_bad_magic(b'gargamel') + >>> buf.push_bad_magic(b' and ') + >>> buf.push_bad_magic(b'azrael') + >>> len(buf.bad_magic_fragments) + 3 + >>> buf.bad_magic_fragments + [b'gargamel', b' and ', b'azrael'] + >>> buf.pop_bad_magic() + b'gargamel and azrael' + >>> len(buf.bad_magic_fragments) + 0 + ''' + with p2pbuffer_lock: + result = b'' + while len(self.bad_magic_fragments): + result += self.bad_magic_fragments.pop(0) + return result + + def clear(self): + ''' + Clear out all internal buffers. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> buf.recv(b'foo is a word, foo is made of letters') + >>> buf.recvbuf + b'foo is a word, foo is made of letters' + >>> m = buf.get_next_message() + >>> buf.recvbuf + b'' + >>> len(buf.messages) + 1 + >>> buf.clear() + >>> len(buf.messages) + 0 + >>> buf.recvbuf == b'' + True + ''' + with p2pbuffer_lock: + self.clear_recv() + self.clear_msgs() + self.bad_magic_fragments = [] + + def clear_msgs(self): + ''' + Clear out the message queue + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> buf.recv(b'foo a foo b foo c') + >>> buf._process_recvbuf() + >>> len(buf.messages) + 3 + >>> buf.clear_msgs() + >>> len(buf.messages) + 0 + ''' + with p2pbuffer_lock: + self.messages = [] + + def clear_recv(self): + ''' + Clear out the receive buffer. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> buf.recv(b'foo is a word') + >>> buf.recvbuf + b'foo is a word' + >>> buf.clear_recv() + >>> buf.recvbuf == b'' + True + ''' + with p2pbuffer_lock: + self.recvbuf = b'' + + def recv(self, data=None): + ''' + Receive some data into the receive buffer. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> buf.recv(b'foo is a word') + >>> buf.recvbuf + b'foo is a word' + >>> buf.recv(b' which I use often') + >>> buf.recvbuf + b'foo is a word which I use often' + >>> buf.recv(None) + ''' + if data: + with p2pbuffer_lock: + assert(type(data) == bytes) + self.recvbuf += data + if len(self.recvbuf) < len(self.network_magic): + return + else: + # Enough data to see if it begins with magic + if not self._starts_with_magic(self.recvbuf): + # No magic? + magicpos = self.recvbuf.find(self.network_magic) + if magicpos != -1: + # Data has piled up on it, so we can + # extract the bad-magic fragment + frag = self.recvbuf[:magicpos] + self.push_bad_magic(frag) + # Remove the bad data + self.recvbuf = self.recvbuf[magicpos:] + assert(self._starts_with_magic(self.recvbuf)) + else: + # The recvbuf has no magic at all + self.push_bad_magic(self.recvbuf) + self.recvbuf = b'' + else: + # Data starts with magic, proceed as normal + pass + else: + pass + + def set_magic(self, network_magic): + ''' + Change the network magic. + + >>> buf = P2PBuffer(network_magic="foo") + >>> buf.network_magic + 'foo' + >>> buf.set_magic('bar') + >>> buf.network_magic + 'bar' + >>> buf.set_magic(None) + Traceback (most recent call last): + ... + AssertionError + ''' + assert(network_magic) + self.network_magic = network_magic + + def _starts_with_magic(self, data): + ''' + Returns True if the given data has the correct network + magic at its start, otherwise returns False. + + >>> buf = P2PBuffer(network_magic=b'yum') + >>> buf._starts_with_magic(b'flakes') + False + >>> buf._starts_with_magic(b'yummy') + True + >>> buf._starts_with_magic(b'not yummy') + False + >>> buf._starts_with_magic(b'') + False + ''' + return data[:len(self.network_magic)] == self.network_magic + + def _process_recvbuf(self): + ''' + Process the receive buffer to extract as many whole messages + as possible. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> len(buf.recvbuf) + 0 + >>> len(buf.messages) + 0 + >>> buf.bad_magic_fragments + [] + >>> buf.incomplete_messages + [] + >>> buf.recv(b'foo bar foo baz foo fi foo') + >>> buf._process_recvbuf() + >>> len(buf.messages) + 3 + >>> buf.messages + [b' bar ', b' baz ', b' fi '] + >>> buf.recvbuf + b'' + >>> buf.incomplete_messages + [] + >>> buf.bad_magic_fragments + [] + >>> buf.get_next_message() + b' bar ' + >>> buf.get_next_message() + b' baz ' + >>> buf.get_next_message() + b' fi ' + >>> buf.get_next_message() + >>> buf.recv(b' fum foo') + >>> buf.get_next_message() + >>> buf.recvbuf + b'foo' + >>> buf.recv(b' faz') + >>> buf.get_next_message() + b' faz' + >>> buf.recvbuf == b'' + True + >>> buf.get_next_message() + >>> buf.recv(b'foofoofoo') + >>> buf.get_next_message() + b'' + >>> buf.get_next_message() + >>> buf.recvbuf + b'' + >>> buf.recv(b'a') + >>> buf.get_next_message() + ''' + # We need network magic to be set to process things. + assert(self.network_magic) + + with p2pbuffer_lock: + # If receive buffer is empty or not even as long as magic, return, + # as there cannot be a full message in it yet. + if len(self.recvbuf) <= len(self.network_magic): + return + # Check if buffer begins on correct magic. + if self._starts_with_magic(self.recvbuf): + # Good message start. + # Split out first message + magicpos = self.recvbuf[1:].find(self.network_magic) + if magicpos == -1: + # Straightforward - no other magic later + self.messages.append(self.recvbuf[len(self.network_magic):]) + self.recvbuf = b'' + else: + # There is more magic - process first msg and then recurse + magicpos += 1 + self.messages.append(self.recvbuf[len(self.network_magic):magicpos]) + self.recvbuf = self.recvbuf[magicpos:] + split_buf = self.recvbuf.split(self.network_magic) + for msg in split_buf: + if msg != b'': + self.messages.append(msg) + if split_buf[:-1] == b'': + self.recvbuf = self.network_msg + else: + self.recvbuf = b'' + else: + # Buffer did not start with correct magic. + # Check if magic occurs later. + assert(type(self.recvbuf) == bytes) + magicpos = self.recvbuf[1:].find(self.network_magic) + if magicpos == -1: + self.push_bad_magic(self.recvbuf) + self.recvbuf = b'' + else: + magicpos += 1 + nomagic_fragment = self.recvbuf[:magicpos] + if len(self.messages) > 0: + # Add the fragment onto last message if there is one + self.messages[len(self.messages) - 1] += nomagic_fragment + # Now discard the bit that does not have right magic from the buffer + if len(self.recvbuf[magicpos:]) > len(self.network_magic): + self.recvbuf = self.recvbuf[magicpos:] + # Call ourselves recursively on the remainder + #self._process_recvbuf() + + def get_next_message(self): + ''' + Get next message from the queue, if there is one. + If not, it returns None. + Before passing on messages, it processes the receive + buffer. + + >>> buf = P2PBuffer(network_magic=b'foo') + >>> buf.recv(b'foo the first msg') + >>> buf.recv(b'foo the second msg') + >>> buf.get_next_message() + b' the first msg' + >>> buf.get_next_message() + b' the second msg' + >>> buf.get_next_message() + >>> buf.recv(b'fo') + >>> buf.get_next_message() + ''' + def wrap_appropriate_magic(msg): + if self.strip_magic: + if self._starts_with_magic(msg): + return msg[len(self.network_magic):] + else: + return msg + else: + if not self._starts_with_magic(msg): + return self.network_magic + msg + else: + return msg + + with p2pbuffer_lock: + # If there is data in receive buffer, + # try to extract messages from it. + if len(self.recvbuf) > 0: + self._process_recvbuf() + + # If we have some messages, return first one. + if len(self.incomplete_messages) > 0 and len(self.bad_magic_fragments) > 0: + # Try to recombine first incomplete message with bad-magic + # fragments and pass that back to client in case it is a + # well-formed message. + franken_msg = self.incomplete_messages.pop() + self.pop_bad_magic() + return wrap_appropriate_magic(franken_msg) + else: + # no incomplete/bad fragment recombination required + if len(self.messages) > 0: + retmsg = wrap_appropriate_magic(self.messages.pop(0)) + return retmsg + else: + return None + + +if __name__ == "__main__": + import doctest + doctest.testmod()