diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/messages.py old mode 100755 new mode 100644 similarity index 68% copy from test/functional/test_framework/mininode.py copy to test/functional/test_framework/messages.py index 42ae39b1f..7e21f0c1a --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/messages.py @@ -1,1610 +1,1201 @@ #!/usr/bin/env python3 # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik -# Copyright (c) 2010-2016 The Bitcoin Core developers +# Copyright (c) 2010-2017 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. -"""Bitcoin P2P network half-a-node. +"""Bitcoin test framework primitive and message strcutures -This python code was modified from ArtForz' public domain half-a-node, as -found in the mini-node branch of http://github.com/jgarzik/pynode. - -NodeConn: an object which manages p2p connectivity to a bitcoin node -NodeConnCB: a base class that describes the interface for receiving - callbacks with network messages from a NodeConn CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives + msg_block, msg_tx, msg_headers, etc.: data structures that represent network messages -ser_*, deser_*: functions that handle serialization/deserialization -""" -import asyncore +ser_*, deser_*: functions that handle serialization/deserialization.""" from codecs import encode -from collections import defaultdict import copy import hashlib from io import BytesIO -import logging import random import socket import struct -import sys import time -from threading import RLock, Thread from test_framework.siphash import siphash256 -from test_framework.cdefs import MAX_BLOCK_SIGOPS_PER_MB from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until MIN_VERSION_SUPPORTED = 60001 -MY_VERSION = 70014 # past bip-31 for ping/pong +# past bip-31 for ping/pong +MY_VERSION = 70014 MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" # from version 70001 onwards, fRelay should be appended to version messages (BIP37) MY_RELAY = 1 MAX_INV_SZ = 50000 +MAX_BLOCK_BASE_SIZE = 1000000 -COIN = 100000000 # 1 btc in satoshis +# 1 BCH in satoshis +COIN = 100000000 NODE_NETWORK = (1 << 0) # NODE_GETUTXO = (1 << 1) # NODE_BLOOM = (1 << 2) NODE_WITNESS = (1 << 3) NODE_XTHIN = (1 << 4) NODE_BITCOIN_CASH = (1 << 5) # Howmuch data will be read from the network at once READ_BUFFER_SIZE = 8192 -logger = logging.getLogger("TestFramework.mininode") - -# Keep our own socket map for asyncore, so that we can track disconnects -# ourselves (to workaround an issue with closing an asyncore socket when -# using select) -mininode_socket_map = dict() - -# One lock for synchronizing all data access between the networking thread (see -# NetworkThread below) and the thread running the test logic. For simplicity, -# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, -# and whenever adding anything to the send buffer (in send_message()). This -# lock should be acquired in the thread running the test logic to synchronize -# access to any data shared with the NodeConnCB or NodeConn. -mininode_lock = RLock() - # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() def ripemd160(s): return hashlib.new('ripemd160', s).digest() def hash256(s): return sha256(sha256(s)) def ser_compact_size(l): r = b"" if l < 253: r = struct.pack("B", l) elif l < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for i in range(nit): t = c() t.deserialize(f) r.append(t) return r # ser_function_name: Allow for an alternate serialization function on the # entries in the vector. def ser_vector(l, ser_function_name=None): r = ser_compact_size(len(l)) for i in l: if ser_function_name: r += getattr(i, ser_function_name)() else: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(l): r = ser_compact_size(len(l)) for i in l: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_string(f) r.append(t) return r def ser_string_vector(l): r = ser_compact_size(len(l)) for sv in l: r += ser_string(sv) return r def deser_int_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = struct.unpack("H", f.read(2))[0] def serialize(self): r = b"" r += struct.pack("H", self.port) return r def __repr__(self): return "CAddress(nServices=%i ip=%s port=%i)" % (self.nServices, self.ip, self.port) class CInv(): typemap = { 0: "Error", 1: "TX", 2: "Block", 4: "CompactBlock" } def __init__(self, t=0, h=0): self.type = t self.hash = h def deserialize(self, f): self.type = struct.unpack(" 21000000 * COIN: return False return True def __repr__(self): return "CTransaction(nVersion=%i vin=%s vout=%s nLockTime=%i)" \ % (self.nVersion, repr(self.vin), repr(self.vout), self.nLockTime) class CBlockHeader(): def __init__(self, header=None): if header is None: self.set_null() else: self.nVersion = header.nVersion self.hashPrevBlock = header.hashPrevBlock self.hashMerkleRoot = header.hashMerkleRoot self.nTime = header.nTime self.nBits = header.nBits self.nNonce = header.nNonce self.sha256 = header.sha256 self.hash = header.hash self.calc_sha256() def set_null(self): self.nVersion = 1 self.hashPrevBlock = 0 self.hashMerkleRoot = 0 self.nTime = 0 self.nBits = 0 self.nNonce = 0 self.sha256 = None self.hash = None def deserialize(self, f): self.nVersion = struct.unpack(" 1: newhashes = [] for i in range(0, len(hashes), 2): i2 = min(i + 1, len(hashes) - 1) newhashes.append(hash256(hashes[i] + hashes[i2])) hashes = newhashes return uint256_from_str(hashes[0]) def calc_merkle_root(self): hashes = [] for tx in self.vtx: tx.calc_sha256() hashes.append(ser_uint256(tx.sha256)) return self.get_merkle_root(hashes) def is_valid(self): self.calc_sha256() target = uint256_from_compact(self.nBits) if self.sha256 > target: return False for tx in self.vtx: if not tx.is_valid(): return False if self.calc_merkle_root() != self.hashMerkleRoot: return False return True def solve(self): self.rehash() target = uint256_from_compact(self.nBits) while self.sha256 > target: self.nNonce += 1 self.rehash() def __repr__(self): return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \ % (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, time.ctime(self.nTime), self.nBits, self.nNonce, repr(self.vtx)) class PrefilledTransaction(): def __init__(self, index=0, tx=None): self.index = index self.tx = tx def deserialize(self, f): self.index = deser_compact_size(f) self.tx = CTransaction() self.tx.deserialize(f) def serialize(self): r = b"" r += ser_compact_size(self.index) r += self.tx.serialize() return r def __repr__(self): return "PrefilledTransaction(index=%d, tx=%s)" % (self.index, repr(self.tx)) # This is what we send on the wire, in a cmpctblock message. class P2PHeaderAndShortIDs(): def __init__(self): self.header = CBlockHeader() self.nonce = 0 self.shortids_length = 0 self.shortids = [] self.prefilled_txn_length = 0 self.prefilled_txn = [] def deserialize(self, f): self.header.deserialize(f) self.nonce = struct.unpack("= 106: self.addrFrom = CAddress() self.addrFrom.deserialize(f) self.nNonce = struct.unpack("= 209: self.nStartingHeight = struct.unpack("= 70001: # Relay field is optional for version 70001 onwards try: self.nRelay = struct.unpack(" class msg_headers(): command = b"headers" def __init__(self, headers=None): self.headers = headers if headers is not None else [] def deserialize(self, f): # comment in bitcoind indicates these should be deserialized as blocks blocks = deser_vector(f, CBlock) for x in blocks: self.headers.append(CBlockHeader(x)) def serialize(self): blocks = [CBlock(x) for x in self.headers] return ser_vector(blocks) def __repr__(self): return "msg_headers(headers=%s)" % repr(self.headers) class msg_reject(): command = b"reject" REJECT_MALFORMED = 1 def __init__(self): self.message = b"" self.code = 0 self.reason = b"" self.data = 0 def deserialize(self, f): self.message = deser_string(f) self.code = struct.unpack("= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( - message.nVersion, MIN_VERSION_SUPPORTED) - conn.send_message(msg_verack()) - conn.nServices = message.nServices - - # Connection helper methods - - def add_connection(self, conn): - self.connection = conn - - def wait_for_disconnect(self, timeout=60): - def test_function(): return not self.connected - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - # Message receiving helper methods - - def wait_for_block(self, blockhash, timeout=60): - def test_function(): return self.last_message.get( - "block") and self.last_message["block"].block.rehash() == blockhash - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_getdata(self, timeout=60): - def test_function(): return self.last_message.get("getdata") - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_getheaders(self, timeout=60): - def test_function(): return self.last_message.get("getheaders") - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_inv(self, expected_inv, timeout=60): - """Waits for an INV message and checks that the first inv object in the message was as expected.""" - if len(expected_inv) > 1: - raise NotImplementedError( - "wait_for_inv() will only verify the first inv object") - - def test_function(): return self.last_message.get("inv") and \ - self.last_message["inv"].inv[0].type == expected_inv[0].type and \ - self.last_message["inv"].inv[0].hash == expected_inv[0].hash - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - def wait_for_verack(self, timeout=60): - def test_function(): return self.message_count["verack"] - wait_until(test_function, timeout=timeout, lock=mininode_lock) - - # Message sending helper functions - - def send_message(self, message): - if self.connection: - self.connection.send_message(message) - else: - logger.error("Cannot send message. No connection to node!") - - def send_and_ping(self, message): - self.send_message(message) - self.sync_with_ping() - - # Sync up with the node - def sync_with_ping(self, timeout=60): - self.send_message(msg_ping(nonce=self.ping_counter)) - - def test_function(): - if not self.last_message.get("pong"): - return False - return self.last_message["pong"].nonce == self.ping_counter - wait_until(test_function, timeout=timeout, lock=mininode_lock) - self.ping_counter += 1 - - -class NodeConn(asyncore.dispatcher): - """The actual NodeConn class - - This class provides an interface for a p2p connection to a specified node.""" - messagemap = { - b"version": msg_version, - b"verack": msg_verack, - b"addr": msg_addr, - b"inv": msg_inv, - b"getdata": msg_getdata, - b"getblocks": msg_getblocks, - b"tx": msg_tx, - b"block": msg_block, - b"getaddr": msg_getaddr, - b"ping": msg_ping, - b"pong": msg_pong, - b"headers": msg_headers, - b"getheaders": msg_getheaders, - b"reject": msg_reject, - b"mempool": msg_mempool, - b"feefilter": msg_feefilter, - b"sendheaders": msg_sendheaders, - b"sendcmpct": msg_sendcmpct, - b"cmpctblock": msg_cmpctblock, - b"getblocktxn": msg_getblocktxn, - b"blocktxn": msg_blocktxn - } - - MAGIC_BYTES = { - "mainnet": b"\xe3\xe1\xf3\xe8", - "testnet3": b"\xf4\xe5\xf3\xf4", - "regtest": b"\xda\xb5\xbf\xfa", - } - - def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): - asyncore.dispatcher.__init__(self, map=mininode_socket_map) - self.dstaddr = dstaddr - self.dstport = dstport - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.sendbuf = b"" - self.recvbuf = b"" - self.ver_send = 209 - self.ver_recv = 209 - self.last_sent = 0 - self.state = "connecting" - self.network = net - self.cb = callback - self.disconnect = False - self.nServices = 0 - - if send_version: - # stuff version msg into sendbuf - vt = msg_version() - vt.nServices = services - vt.addrTo.ip = self.dstaddr - vt.addrTo.port = self.dstport - vt.addrFrom.ip = "0.0.0.0" - vt.addrFrom.port = 0 - self.send_message(vt, True) - - logger.info('Connecting to Bitcoin Node: %s:%d' % - (self.dstaddr, self.dstport)) - - try: - self.connect((dstaddr, dstport)) - except: - self.handle_close() - self.rpc = rpc - - def handle_connect(self): - if self.state != "connected": - logger.debug("Connected & Listening: %s:%d" % - (self.dstaddr, self.dstport)) - self.state = "connected" - self.cb.on_open(self) - - def handle_close(self): - logger.debug("Closing connection to: %s:%d" % - (self.dstaddr, self.dstport)) - self.state = "closed" - self.recvbuf = b"" - self.sendbuf = b"" - try: - self.close() - except: - pass - self.cb.on_close(self) - - def handle_read(self): - with mininode_lock: - t = self.recv(READ_BUFFER_SIZE) - if len(t) > 0: - self.recvbuf += t - - while True: - msg = self.got_data() - if msg == None: - break - self.got_message(msg) - - def readable(self): - return True - - def writable(self): - with mininode_lock: - pre_connection = self.state == "connecting" - length = len(self.sendbuf) - return (length > 0 or pre_connection) - - def handle_write(self): - with mininode_lock: - # asyncore does not expose socket connection, only the first read/write - # event, thus we must check connection manually here to know when we - # actually connect - if self.state == "connecting": - self.handle_connect() - if not self.writable(): - return - - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - - def got_data(self): - try: - with mininode_lock: - if len(self.recvbuf) < 4: - return None - if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: - raise ValueError("got garbage %s" % repr(self.recvbuf)) - if len(self.recvbuf) < 4 + 12 + 4 + 4: - return - command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] - msglen = struct.unpack(" 500: - log_message += "... (msg truncated)" - logger.debug(log_message) - - def disconnect_node(self): - self.disconnect = True - - -class NetworkThread(Thread): - - def run(self): - while mininode_socket_map: - # We check for whether to disconnect outside of the asyncore - # loop to workaround the behavior of asyncore when using - # select - disconnected = [] - for fd, obj in mininode_socket_map.items(): - if obj.disconnect: - disconnected.append(obj) - [obj.handle_close() for obj in disconnected] - asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) - logger.debug("Network thread closing") diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 42ae39b1f..456d69513 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -1,1610 +1,425 @@ #!/usr/bin/env python3 # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Bitcoin P2P network half-a-node. This python code was modified from ArtForz' public domain half-a-node, as found in the mini-node branch of http://github.com/jgarzik/pynode. NodeConn: an object which manages p2p connectivity to a bitcoin node NodeConnCB: a base class that describes the interface for receiving callbacks with network messages from a NodeConn -CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: - data structures that should map to corresponding structures in - bitcoin/primitives -msg_block, msg_tx, msg_headers, etc.: - data structures that represent network messages -ser_*, deser_*: functions that handle serialization/deserialization """ - import asyncore -from codecs import encode from collections import defaultdict -import copy -import hashlib from io import BytesIO import logging -import random import socket import struct import sys import time from threading import RLock, Thread -from test_framework.siphash import siphash256 -from test_framework.cdefs import MAX_BLOCK_SIGOPS_PER_MB -from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until - -MIN_VERSION_SUPPORTED = 60001 -MY_VERSION = 70014 # past bip-31 for ping/pong -MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" -# from version 70001 onwards, fRelay should be appended to version messages (BIP37) -MY_RELAY = 1 - -MAX_INV_SZ = 50000 - -COIN = 100000000 # 1 btc in satoshis - -NODE_NETWORK = (1 << 0) -# NODE_GETUTXO = (1 << 1) -# NODE_BLOOM = (1 << 2) -NODE_WITNESS = (1 << 3) -NODE_XTHIN = (1 << 4) -NODE_BITCOIN_CASH = (1 << 5) - -# Howmuch data will be read from the network at once -READ_BUFFER_SIZE = 8192 +from test_framework.messages import * logger = logging.getLogger("TestFramework.mininode") # Keep our own socket map for asyncore, so that we can track disconnects # ourselves (to workaround an issue with closing an asyncore socket when # using select) mininode_socket_map = dict() # One lock for synchronizing all data access between the networking thread (see # NetworkThread below) and the thread running the test logic. For simplicity, # NodeConn acquires this lock whenever delivering a message to a NodeConnCB, # and whenever adding anything to the send buffer (in send_message()). This # lock should be acquired in the thread running the test logic to synchronize # access to any data shared with the NodeConnCB or NodeConn. mininode_lock = RLock() -# Serialization/deserialization tools - - -def sha256(s): - return hashlib.new('sha256', s).digest() - - -def ripemd160(s): - return hashlib.new('ripemd160', s).digest() - - -def hash256(s): - return sha256(sha256(s)) - - -def ser_compact_size(l): - r = b"" - if l < 253: - r = struct.pack("B", l) - elif l < 0x10000: - r = struct.pack(">= 32 - return rs - - -def uint256_from_str(s): - r = 0 - t = struct.unpack("> 24) & 0xFF - v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) - return v - - -def deser_vector(f, c): - nit = deser_compact_size(f) - r = [] - for i in range(nit): - t = c() - t.deserialize(f) - r.append(t) - return r - - -# ser_function_name: Allow for an alternate serialization function on the -# entries in the vector. -def ser_vector(l, ser_function_name=None): - r = ser_compact_size(len(l)) - for i in l: - if ser_function_name: - r += getattr(i, ser_function_name)() - else: - r += i.serialize() - return r - - -def deser_uint256_vector(f): - nit = deser_compact_size(f) - r = [] - for i in range(nit): - t = deser_uint256(f) - r.append(t) - return r - - -def ser_uint256_vector(l): - r = ser_compact_size(len(l)) - for i in l: - r += ser_uint256(i) - return r - - -def deser_string_vector(f): - nit = deser_compact_size(f) - r = [] - for i in range(nit): - t = deser_string(f) - r.append(t) - return r - - -def ser_string_vector(l): - r = ser_compact_size(len(l)) - for sv in l: - r += ser_string(sv) - return r - - -def deser_int_vector(f): - nit = deser_compact_size(f) - r = [] - for i in range(nit): - t = struct.unpack("H", f.read(2))[0] - - def serialize(self): - r = b"" - r += struct.pack("H", self.port) - return r - - def __repr__(self): - return "CAddress(nServices=%i ip=%s port=%i)" % (self.nServices, - self.ip, self.port) - - -class CInv(): - typemap = { - 0: "Error", - 1: "TX", - 2: "Block", - 4: "CompactBlock" - } - - def __init__(self, t=0, h=0): - self.type = t - self.hash = h - - def deserialize(self, f): - self.type = struct.unpack(" 21000000 * COIN: - return False - return True - - def __repr__(self): - return "CTransaction(nVersion=%i vin=%s vout=%s nLockTime=%i)" \ - % (self.nVersion, repr(self.vin), repr(self.vout), self.nLockTime) - - -class CBlockHeader(): - def __init__(self, header=None): - if header is None: - self.set_null() - else: - self.nVersion = header.nVersion - self.hashPrevBlock = header.hashPrevBlock - self.hashMerkleRoot = header.hashMerkleRoot - self.nTime = header.nTime - self.nBits = header.nBits - self.nNonce = header.nNonce - self.sha256 = header.sha256 - self.hash = header.hash - self.calc_sha256() - - def set_null(self): - self.nVersion = 1 - self.hashPrevBlock = 0 - self.hashMerkleRoot = 0 - self.nTime = 0 - self.nBits = 0 - self.nNonce = 0 - self.sha256 = None - self.hash = None - - def deserialize(self, f): - self.nVersion = struct.unpack(" 1: - newhashes = [] - for i in range(0, len(hashes), 2): - i2 = min(i + 1, len(hashes) - 1) - newhashes.append(hash256(hashes[i] + hashes[i2])) - hashes = newhashes - return uint256_from_str(hashes[0]) - - def calc_merkle_root(self): - hashes = [] - for tx in self.vtx: - tx.calc_sha256() - hashes.append(ser_uint256(tx.sha256)) - return self.get_merkle_root(hashes) - - def is_valid(self): - self.calc_sha256() - target = uint256_from_compact(self.nBits) - if self.sha256 > target: - return False - for tx in self.vtx: - if not tx.is_valid(): - return False - if self.calc_merkle_root() != self.hashMerkleRoot: - return False - return True - - def solve(self): - self.rehash() - target = uint256_from_compact(self.nBits) - while self.sha256 > target: - self.nNonce += 1 - self.rehash() - - def __repr__(self): - return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \ - % (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, - time.ctime(self.nTime), self.nBits, self.nNonce, repr(self.vtx)) - - -class PrefilledTransaction(): - def __init__(self, index=0, tx=None): - self.index = index - self.tx = tx - - def deserialize(self, f): - self.index = deser_compact_size(f) - self.tx = CTransaction() - self.tx.deserialize(f) - - def serialize(self): - r = b"" - r += ser_compact_size(self.index) - r += self.tx.serialize() - return r - - def __repr__(self): - return "PrefilledTransaction(index=%d, tx=%s)" % (self.index, repr(self.tx)) - -# This is what we send on the wire, in a cmpctblock message. - - -class P2PHeaderAndShortIDs(): - def __init__(self): - self.header = CBlockHeader() - self.nonce = 0 - self.shortids_length = 0 - self.shortids = [] - self.prefilled_txn_length = 0 - self.prefilled_txn = [] - - def deserialize(self, f): - self.header.deserialize(f) - self.nonce = struct.unpack("= 106: - self.addrFrom = CAddress() - self.addrFrom.deserialize(f) - self.nNonce = struct.unpack("= 209: - self.nStartingHeight = struct.unpack("= 70001: - # Relay field is optional for version 70001 onwards - try: - self.nRelay = struct.unpack(" -class msg_headers(): - command = b"headers" - - def __init__(self, headers=None): - self.headers = headers if headers is not None else [] - - def deserialize(self, f): - # comment in bitcoind indicates these should be deserialized as blocks - blocks = deser_vector(f, CBlock) - for x in blocks: - self.headers.append(CBlockHeader(x)) - - def serialize(self): - blocks = [CBlock(x) for x in self.headers] - return ser_vector(blocks) - - def __repr__(self): - return "msg_headers(headers=%s)" % repr(self.headers) - - -class msg_reject(): - command = b"reject" - REJECT_MALFORMED = 1 - - def __init__(self): - self.message = b"" - self.code = 0 - self.reason = b"" - self.data = 0 - - def deserialize(self, f): - self.message = deser_string(f) - self.code = struct.unpack("= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format( message.nVersion, MIN_VERSION_SUPPORTED) conn.send_message(msg_verack()) conn.nServices = message.nServices # Connection helper methods def add_connection(self, conn): self.connection = conn def wait_for_disconnect(self, timeout=60): def test_function(): return not self.connected wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message receiving helper methods def wait_for_block(self, blockhash, timeout=60): def test_function(): return self.last_message.get( "block") and self.last_message["block"].block.rehash() == blockhash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_getdata(self, timeout=60): def test_function(): return self.last_message.get("getdata") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_getheaders(self, timeout=60): def test_function(): return self.last_message.get("getheaders") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_inv(self, expected_inv, timeout=60): """Waits for an INV message and checks that the first inv object in the message was as expected.""" if len(expected_inv) > 1: raise NotImplementedError( "wait_for_inv() will only verify the first inv object") def test_function(): return self.last_message.get("inv") and \ self.last_message["inv"].inv[0].type == expected_inv[0].type and \ self.last_message["inv"].inv[0].hash == expected_inv[0].hash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_verack(self, timeout=60): def test_function(): return self.message_count["verack"] wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message sending helper functions def send_message(self, message): if self.connection: self.connection.send_message(message) else: logger.error("Cannot send message. No connection to node!") def send_and_ping(self, message): self.send_message(message) self.sync_with_ping() # Sync up with the node def sync_with_ping(self, timeout=60): self.send_message(msg_ping(nonce=self.ping_counter)) def test_function(): if not self.last_message.get("pong"): return False return self.last_message["pong"].nonce == self.ping_counter wait_until(test_function, timeout=timeout, lock=mininode_lock) self.ping_counter += 1 class NodeConn(asyncore.dispatcher): """The actual NodeConn class This class provides an interface for a p2p connection to a specified node.""" messagemap = { b"version": msg_version, b"verack": msg_verack, b"addr": msg_addr, b"inv": msg_inv, b"getdata": msg_getdata, b"getblocks": msg_getblocks, b"tx": msg_tx, b"block": msg_block, b"getaddr": msg_getaddr, b"ping": msg_ping, b"pong": msg_pong, b"headers": msg_headers, b"getheaders": msg_getheaders, b"reject": msg_reject, b"mempool": msg_mempool, b"feefilter": msg_feefilter, b"sendheaders": msg_sendheaders, b"sendcmpct": msg_sendcmpct, b"cmpctblock": msg_cmpctblock, b"getblocktxn": msg_getblocktxn, b"blocktxn": msg_blocktxn } MAGIC_BYTES = { "mainnet": b"\xe3\xe1\xf3\xe8", "testnet3": b"\xf4\xe5\xf3\xf4", "regtest": b"\xda\xb5\xbf\xfa", } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): asyncore.dispatcher.__init__(self, map=mininode_socket_map) self.dstaddr = dstaddr self.dstport = dstport self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.sendbuf = b"" self.recvbuf = b"" self.ver_send = 209 self.ver_recv = 209 self.last_sent = 0 self.state = "connecting" self.network = net self.cb = callback self.disconnect = False self.nServices = 0 if send_version: # stuff version msg into sendbuf vt = msg_version() vt.nServices = services vt.addrTo.ip = self.dstaddr vt.addrTo.port = self.dstport vt.addrFrom.ip = "0.0.0.0" vt.addrFrom.port = 0 self.send_message(vt, True) logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport)) try: self.connect((dstaddr, dstport)) except: self.handle_close() self.rpc = rpc def handle_connect(self): if self.state != "connected": logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) self.state = "connected" self.cb.on_open(self) def handle_close(self): logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport)) self.state = "closed" self.recvbuf = b"" self.sendbuf = b"" try: self.close() except: pass self.cb.on_close(self) def handle_read(self): with mininode_lock: t = self.recv(READ_BUFFER_SIZE) if len(t) > 0: self.recvbuf += t while True: msg = self.got_data() if msg == None: break self.got_message(msg) def readable(self): return True def writable(self): with mininode_lock: pre_connection = self.state == "connecting" length = len(self.sendbuf) return (length > 0 or pre_connection) def handle_write(self): with mininode_lock: # asyncore does not expose socket connection, only the first read/write # event, thus we must check connection manually here to know when we # actually connect if self.state == "connecting": self.handle_connect() if not self.writable(): return try: sent = self.send(self.sendbuf) except: self.handle_close() return self.sendbuf = self.sendbuf[sent:] def got_data(self): try: with mininode_lock: if len(self.recvbuf) < 4: return None if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: raise ValueError("got garbage %s" % repr(self.recvbuf)) if len(self.recvbuf) < 4 + 12 + 4 + 4: return command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] msglen = struct.unpack(" 500: log_message += "... (msg truncated)" logger.debug(log_message) def disconnect_node(self): self.disconnect = True class NetworkThread(Thread): def run(self): while mininode_socket_map: # We check for whether to disconnect outside of the asyncore # loop to workaround the behavior of asyncore when using # select disconnected = [] for fd, obj in mininode_socket_map.items(): if obj.disconnect: disconnected.append(obj) [obj.handle_close() for obj in disconnected] asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) logger.debug("Network thread closing")