Changeset View
Changeset View
Standalone View
Standalone View
qa/rpc-tests/test_framework/p2pbuffer.py
- This file was added.
#!/usr/bin/env python3 | |||||
# Copyright (c) 2017 The Bitcoin developers | |||||
# Distributed under the MIT software license, see the accompanying | |||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | |||||
""" | |||||
P2P input buffer class. | |||||
""" | |||||
from threading import RLock | |||||
p2pbuffer_lock = RLock() | |||||
# Impose a limit on number of incomplete messages that we keep. | |||||
# 8 is a totally arbitrary number, picked because 1 didn't work | |||||
# with certain tests - clearly some tests need a few. | |||||
# If there are too many incompletes flying around in any test, | |||||
# something must be pretty wrong though, so look at the details before | |||||
# raising this parameter. | |||||
MAX_INCOMPLETE_IN_FLIGHT=8 | |||||
class P2PBuffer(object): | |||||
''' | |||||
The P2PBuffer class receives p2p data packets (or fragments thereof) | |||||
and attempts to filter out bad / incomplete data while supplying a | |||||
stream of well-formed packets. | |||||
''' | |||||
def __init__(self, network_magic=None, strip_magic=True): | |||||
''' | |||||
>>> buf1 = P2PBuffer(network_magic=b'foo') | |||||
>>> buf1.network_magic | |||||
b'foo' | |||||
>>> buf2 = P2PBuffer() | |||||
>>> buf2.network_magic is None | |||||
True | |||||
>>> buf1.recvbuf == buf2.recvbuf == b'' | |||||
True | |||||
''' | |||||
# Input buffer into which unprocessed data will be placed. | |||||
self.recvbuf = b'' | |||||
# Messages queue to which processed messages will be appended, | |||||
# waiting for collection. | |||||
self.messages = [] | |||||
# Network magic (start bytes) identifying start of messages | |||||
self.network_magic = None | |||||
if network_magic: | |||||
self.network_magic = network_magic | |||||
# Whether the get_next_message() function returns | |||||
# messages stripped of their magic or not | |||||
self.strip_magic = strip_magic | |||||
# A stack of fragments that do not begin with | |||||
# correct magic. | |||||
self.bad_magic_fragments = [] | |||||
# A stack of messages that begin with right magic, | |||||
# but have been returned by the application as | |||||
# being incomplete. | |||||
# If there is such a message in the list, and | |||||
# there are bad magic fragments, the bad magic | |||||
# fragments are added onto the incomplete message | |||||
# and this is pushed into messages when get_next_message | |||||
# is called, so it is returned first. | |||||
self.incomplete_messages = [] | |||||
def store_incomplete_msg(self, data): | |||||
''' | |||||
Store an incomplete message. | |||||
These must start with correct magic, | |||||
and they must not contain multiple messages. | |||||
Only one incomplete message can be processed | |||||
at a time. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> len(buf.incomplete_messages) | |||||
0 | |||||
>>> buf.store_incomplete_msg(b'foo pretend this is incomplete') | |||||
>>> len(buf.incomplete_messages) | |||||
1 | |||||
>>> buf.incomplete_messages | |||||
[b'foo pretend this is incomplete'] | |||||
''' | |||||
with p2pbuffer_lock: | |||||
assert(data) | |||||
assert(type(data) == bytes) | |||||
assert(self._starts_with_magic(data)) | |||||
assert(not self.network_magic in data[1:]) | |||||
if len(self.incomplete_messages) > MAX_INCOMPLETE_IN_FLIGHT: | |||||
raise Exception("p2pbuffer: cannot handle more than 8 incomplete message at a time.") | |||||
else: | |||||
self.incomplete_messages.append(data) | |||||
def push_bad_magic(self, data): | |||||
''' | |||||
Push a bad-magic fragment onto the stack. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> len(buf.bad_magic_fragments) | |||||
0 | |||||
>>> buf.push_bad_magic(b'gargamel') | |||||
>>> len(buf.bad_magic_fragments) | |||||
1 | |||||
>>> buf.bad_magic_fragments | |||||
[b'gargamel'] | |||||
''' | |||||
with p2pbuffer_lock: | |||||
assert(data) | |||||
assert(type(data) == bytes) | |||||
assert(not self._starts_with_magic(data)) | |||||
assert(not self.network_magic in data) | |||||
self.bad_magic_fragments.append(data) | |||||
def pop_bad_magic(self): | |||||
''' | |||||
Returns all the available bad magic fragments. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> buf.push_bad_magic(b'gargamel') | |||||
>>> buf.push_bad_magic(b' and ') | |||||
>>> buf.push_bad_magic(b'azrael') | |||||
>>> len(buf.bad_magic_fragments) | |||||
3 | |||||
>>> buf.bad_magic_fragments | |||||
[b'gargamel', b' and ', b'azrael'] | |||||
>>> buf.pop_bad_magic() | |||||
b'gargamel and azrael' | |||||
>>> len(buf.bad_magic_fragments) | |||||
0 | |||||
''' | |||||
with p2pbuffer_lock: | |||||
result = b'' | |||||
while len(self.bad_magic_fragments): | |||||
result += self.bad_magic_fragments.pop(0) | |||||
return result | |||||
def clear(self): | |||||
''' | |||||
Clear out all internal buffers. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> buf.recv(b'foo is a word, foo is made of letters') | |||||
>>> buf.recvbuf | |||||
b'foo is a word, foo is made of letters' | |||||
>>> m = buf.get_next_message() | |||||
>>> buf.recvbuf | |||||
b'' | |||||
>>> len(buf.messages) | |||||
1 | |||||
>>> buf.clear() | |||||
>>> len(buf.messages) | |||||
0 | |||||
>>> buf.recvbuf == b'' | |||||
True | |||||
''' | |||||
with p2pbuffer_lock: | |||||
self.clear_recv() | |||||
self.clear_msgs() | |||||
self.bad_magic_fragments = [] | |||||
def clear_msgs(self): | |||||
''' | |||||
Clear out the message queue | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> buf.recv(b'foo a foo b foo c') | |||||
>>> buf._process_recvbuf() | |||||
>>> len(buf.messages) | |||||
3 | |||||
>>> buf.clear_msgs() | |||||
>>> len(buf.messages) | |||||
0 | |||||
''' | |||||
with p2pbuffer_lock: | |||||
self.messages = [] | |||||
def clear_recv(self): | |||||
''' | |||||
Clear out the receive buffer. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> buf.recv(b'foo is a word') | |||||
>>> buf.recvbuf | |||||
b'foo is a word' | |||||
>>> buf.clear_recv() | |||||
>>> buf.recvbuf == b'' | |||||
True | |||||
''' | |||||
with p2pbuffer_lock: | |||||
self.recvbuf = b'' | |||||
def recv(self, data=None): | |||||
''' | |||||
Receive some data into the receive buffer. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> buf.recv(b'foo is a word') | |||||
>>> buf.recvbuf | |||||
b'foo is a word' | |||||
>>> buf.recv(b' which I use often') | |||||
>>> buf.recvbuf | |||||
b'foo is a word which I use often' | |||||
>>> buf.recv(None) | |||||
''' | |||||
if data: | |||||
with p2pbuffer_lock: | |||||
assert(type(data) == bytes) | |||||
self.recvbuf += data | |||||
if len(self.recvbuf) < len(self.network_magic): | |||||
return | |||||
else: | |||||
# Enough data to see if it begins with magic | |||||
if not self._starts_with_magic(self.recvbuf): | |||||
# No magic? | |||||
magicpos = self.recvbuf.find(self.network_magic) | |||||
if magicpos != -1: | |||||
# Data has piled up on it, so we can | |||||
# extract the bad-magic fragment | |||||
frag = self.recvbuf[:magicpos] | |||||
self.push_bad_magic(frag) | |||||
# Remove the bad data | |||||
self.recvbuf = self.recvbuf[magicpos:] | |||||
assert(self._starts_with_magic(self.recvbuf)) | |||||
else: | |||||
# The recvbuf has no magic at all | |||||
self.push_bad_magic(self.recvbuf) | |||||
self.recvbuf = b'' | |||||
else: | |||||
# Data starts with magic, proceed as normal | |||||
pass | |||||
else: | |||||
pass | |||||
def set_magic(self, network_magic): | |||||
''' | |||||
Change the network magic. | |||||
>>> buf = P2PBuffer(network_magic="foo") | |||||
>>> buf.network_magic | |||||
'foo' | |||||
>>> buf.set_magic('bar') | |||||
>>> buf.network_magic | |||||
'bar' | |||||
>>> buf.set_magic(None) | |||||
Traceback (most recent call last): | |||||
... | |||||
AssertionError | |||||
''' | |||||
assert(network_magic) | |||||
self.network_magic = network_magic | |||||
def _starts_with_magic(self, data): | |||||
''' | |||||
Returns True if the given data has the correct network | |||||
magic at its start, otherwise returns False. | |||||
>>> buf = P2PBuffer(network_magic=b'yum') | |||||
>>> buf._starts_with_magic(b'flakes') | |||||
False | |||||
>>> buf._starts_with_magic(b'yummy') | |||||
True | |||||
>>> buf._starts_with_magic(b'not yummy') | |||||
False | |||||
>>> buf._starts_with_magic(b'') | |||||
False | |||||
''' | |||||
return data[:len(self.network_magic)] == self.network_magic | |||||
def _process_recvbuf(self): | |||||
''' | |||||
Process the receive buffer to extract as many whole messages | |||||
as possible. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> len(buf.recvbuf) | |||||
0 | |||||
>>> len(buf.messages) | |||||
0 | |||||
>>> buf.bad_magic_fragments | |||||
[] | |||||
>>> buf.incomplete_messages | |||||
[] | |||||
>>> buf.recv(b'foo bar foo baz foo fi foo') | |||||
>>> buf._process_recvbuf() | |||||
>>> len(buf.messages) | |||||
3 | |||||
>>> buf.messages | |||||
[b' bar ', b' baz ', b' fi '] | |||||
>>> buf.recvbuf | |||||
b'' | |||||
>>> buf.incomplete_messages | |||||
[] | |||||
>>> buf.bad_magic_fragments | |||||
[] | |||||
>>> buf.get_next_message() | |||||
b' bar ' | |||||
>>> buf.get_next_message() | |||||
b' baz ' | |||||
>>> buf.get_next_message() | |||||
b' fi ' | |||||
>>> buf.get_next_message() | |||||
>>> buf.recv(b' fum foo') | |||||
>>> buf.get_next_message() | |||||
>>> buf.recvbuf | |||||
b'foo' | |||||
>>> buf.recv(b' faz') | |||||
>>> buf.get_next_message() | |||||
b' faz' | |||||
>>> buf.recvbuf == b'' | |||||
True | |||||
>>> buf.get_next_message() | |||||
>>> buf.recv(b'foofoofoo') | |||||
>>> buf.get_next_message() | |||||
b'' | |||||
>>> buf.get_next_message() | |||||
>>> buf.recvbuf | |||||
b'' | |||||
>>> buf.recv(b'a') | |||||
>>> buf.get_next_message() | |||||
''' | |||||
# We need network magic to be set to process things. | |||||
assert(self.network_magic) | |||||
with p2pbuffer_lock: | |||||
# If receive buffer is empty or not even as long as magic, return, | |||||
# as there cannot be a full message in it yet. | |||||
if len(self.recvbuf) <= len(self.network_magic): | |||||
return | |||||
# Check if buffer begins on correct magic. | |||||
if self._starts_with_magic(self.recvbuf): | |||||
# Good message start. | |||||
# Split out first message | |||||
magicpos = self.recvbuf[1:].find(self.network_magic) | |||||
if magicpos == -1: | |||||
# Straightforward - no other magic later | |||||
self.messages.append(self.recvbuf[len(self.network_magic):]) | |||||
self.recvbuf = b'' | |||||
else: | |||||
# There is more magic - process first msg and then recurse | |||||
magicpos += 1 | |||||
self.messages.append(self.recvbuf[len(self.network_magic):magicpos]) | |||||
self.recvbuf = self.recvbuf[magicpos:] | |||||
split_buf = self.recvbuf.split(self.network_magic) | |||||
for msg in split_buf: | |||||
if msg != b'': | |||||
self.messages.append(msg) | |||||
if split_buf[:-1] == b'': | |||||
self.recvbuf = self.network_msg | |||||
else: | |||||
self.recvbuf = b'' | |||||
else: | |||||
# Buffer did not start with correct magic. | |||||
# Check if magic occurs later. | |||||
assert(type(self.recvbuf) == bytes) | |||||
magicpos = self.recvbuf[1:].find(self.network_magic) | |||||
if magicpos == -1: | |||||
self.push_bad_magic(self.recvbuf) | |||||
self.recvbuf = b'' | |||||
else: | |||||
magicpos += 1 | |||||
nomagic_fragment = self.recvbuf[:magicpos] | |||||
if len(self.messages) > 0: | |||||
# Add the fragment onto last message if there is one | |||||
self.messages[len(self.messages) - 1] += nomagic_fragment | |||||
# Now discard the bit that does not have right magic from the buffer | |||||
if len(self.recvbuf[magicpos:]) > len(self.network_magic): | |||||
self.recvbuf = self.recvbuf[magicpos:] | |||||
# Call ourselves recursively on the remainder | |||||
#self._process_recvbuf() | |||||
def get_next_message(self): | |||||
''' | |||||
Get next message from the queue, if there is one. | |||||
If not, it returns None. | |||||
Before passing on messages, it processes the receive | |||||
buffer. | |||||
>>> buf = P2PBuffer(network_magic=b'foo') | |||||
>>> buf.recv(b'foo the first msg') | |||||
>>> buf.recv(b'foo the second msg') | |||||
>>> buf.get_next_message() | |||||
b' the first msg' | |||||
>>> buf.get_next_message() | |||||
b' the second msg' | |||||
>>> buf.get_next_message() | |||||
>>> buf.recv(b'fo') | |||||
>>> buf.get_next_message() | |||||
''' | |||||
def wrap_appropriate_magic(msg): | |||||
if self.strip_magic: | |||||
if self._starts_with_magic(msg): | |||||
return msg[len(self.network_magic):] | |||||
else: | |||||
return msg | |||||
else: | |||||
if not self._starts_with_magic(msg): | |||||
return self.network_magic + msg | |||||
else: | |||||
return msg | |||||
with p2pbuffer_lock: | |||||
# If there is data in receive buffer, | |||||
# try to extract messages from it. | |||||
if len(self.recvbuf) > 0: | |||||
self._process_recvbuf() | |||||
# If we have some messages, return first one. | |||||
if len(self.incomplete_messages) > 0 and len(self.bad_magic_fragments) > 0: | |||||
# Try to recombine first incomplete message with bad-magic | |||||
# fragments and pass that back to client in case it is a | |||||
# well-formed message. | |||||
franken_msg = self.incomplete_messages.pop() + self.pop_bad_magic() | |||||
return wrap_appropriate_magic(franken_msg) | |||||
else: | |||||
# no incomplete/bad fragment recombination required | |||||
if len(self.messages) > 0: | |||||
retmsg = wrap_appropriate_magic(self.messages.pop(0)) | |||||
return retmsg | |||||
else: | |||||
return None | |||||
if __name__ == "__main__": | |||||
import doctest | |||||
doctest.testmod() |