diff --git a/test/functional/abc-p2p-compactblocks.py b/test/functional/abc-p2p-compactblocks.py index 4cc40bd22..dbbf53ed5 100755 --- a/test/functional/abc-p2p-compactblocks.py +++ b/test/functional/abc-p2p-compactblocks.py @@ -1,383 +1,381 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Copyright (c) 2017 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ This test checks simple acceptance of bigger blocks via p2p. It is derived from the much more complex p2p-fullblocktest. The intention is that small tests can be derived from this one, or this one can be extended, to cover the checks done for bigger blocks (e.g. sigops limits). """ from test_framework.test_framework import ComparisonTestFramework from test_framework.util import * from test_framework.comptool import TestManager, TestInstance, RejectResult from test_framework.blocktools import * import time from test_framework.script import * from test_framework.cdefs import (ONE_MEGABYTE, LEGACY_MAX_BLOCK_SIZE, MAX_BLOCK_SIGOPS_PER_MB, MAX_TX_SIGOPS_COUNT) from collections import deque # far into the future MONOLITH_START_TIME = 2000000000 class PreviousSpendableOutput(): def __init__(self, tx=CTransaction(), n=-1): self.tx = tx self.n = n # the output we're spending # TestNode: A peer we use to send messages to bitcoind, and store responses. class TestNode(NodeConnCB): def __init__(self): self.last_sendcmpct = None self.last_cmpctblock = None self.last_getheaders = None self.last_headers = None super().__init__() def on_sendcmpct(self, conn, message): self.last_sendcmpct = message def on_cmpctblock(self, conn, message): self.last_cmpctblock = message self.last_cmpctblock.header_and_shortids.header.calc_sha256() def on_getheaders(self, conn, message): self.last_getheaders = message def on_headers(self, conn, message): self.last_headers = message for x in self.last_headers.headers: x.calc_sha256() def clear_block_data(self): with mininode_lock: self.last_sendcmpct = None self.last_cmpctblock = None class FullBlockTest(ComparisonTestFramework): # Can either run this test as 1 node with expected answers, or two and compare them. # Change the "outcome" variable from each TestInstance object to only do # the comparison. def set_test_params(self): self.num_nodes = 1 self.setup_clean_chain = True self.block_heights = {} self.tip = None self.blocks = {} self.excessive_block_size = 16 * ONE_MEGABYTE self.extra_args = [['-norelaypriority', '-whitelist=127.0.0.1', '-limitancestorcount=999999', '-limitancestorsize=999999', '-limitdescendantcount=999999', '-limitdescendantsize=999999', '-maxmempool=99999', "-monolithactivationtime=%d" % MONOLITH_START_TIME, "-excessiveblocksize=%d" % self.excessive_block_size]] def add_options(self, parser): super().add_options(parser) parser.add_option( "--runbarelyexpensive", dest="runbarelyexpensive", default=True) def run_test(self): self.test = TestManager(self, self.options.tmpdir) self.test.add_all_connections(self.nodes) # Start up network handling in another thread NetworkThread().start() # Set the blocksize to 2MB as initial condition self.nodes[0].setexcessiveblock(self.excessive_block_size) self.nodes[0].setmocktime(MONOLITH_START_TIME) self.test.run() def add_transactions_to_block(self, block, tx_list): [tx.rehash() for tx in tx_list] block.vtx.extend(tx_list) # this is a little handier to use than the version in blocktools.py def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])): tx = create_transaction(spend_tx, n, b"", value, script) return tx def next_block(self, number, spend=None, script=CScript([OP_TRUE]), block_size=0, extra_txns=0): if self.tip == None: base_block_hash = self.genesis_hash block_time = int(time.time()) + 1 else: base_block_hash = self.tip.sha256 block_time = self.tip.nTime + 1 # First create the coinbase height = self.block_heights[base_block_hash] + 1 coinbase = create_coinbase(height) coinbase.rehash() if spend == None: # We need to have something to spend to fill the block. assert_equal(block_size, 0) block = create_block(base_block_hash, coinbase, block_time) else: # all but one satoshi to fees coinbase.vout[0].nValue += spend.tx.vout[spend.n].nValue - 1 coinbase.rehash() block = create_block(base_block_hash, coinbase, block_time) # Make sure we have plenty engough to spend going forward. spendable_outputs = deque([spend]) def get_base_transaction(): # Create the new transaction tx = CTransaction() # Spend from one of the spendable outputs spend = spendable_outputs.popleft() tx.vin.append(CTxIn(COutPoint(spend.tx.sha256, spend.n))) # Add spendable outputs for i in range(4): tx.vout.append(CTxOut(0, CScript([OP_TRUE]))) spendable_outputs.append(PreviousSpendableOutput(tx, i)) return tx tx = get_base_transaction() # Make it the same format as transaction added for padding and save the size. # It's missing the padding output, so we add a constant to account for it. tx.rehash() base_tx_size = len(tx.serialize()) + 18 # If a specific script is required, add it. if script != None: tx.vout.append(CTxOut(1, script)) # Put some random data into the first transaction of the chain to randomize ids. tx.vout.append( CTxOut(0, CScript([random.randint(0, 256), OP_RETURN]))) # Add the transaction to the block self.add_transactions_to_block(block, [tx]) # Add transaction until we reach the expected transaction count for _ in range(extra_txns): self.add_transactions_to_block(block, [get_base_transaction()]) # If we have a block size requirement, just fill # the block until we get there current_block_size = len(block.serialize()) while current_block_size < block_size: # We will add a new transaction. That means the size of # the field enumerating how many transaction go in the block # may change. current_block_size -= len(ser_compact_size(len(block.vtx))) current_block_size += len(ser_compact_size(len(block.vtx) + 1)) # Create the new transaction tx = get_base_transaction() # Add padding to fill the block. script_length = block_size - current_block_size - base_tx_size if script_length > 510000: if script_length < 1000000: # Make sure we don't find ourselves in a position where we # need to generate a transaction smaller than what we expected. script_length = script_length // 2 else: script_length = 500000 script_output = CScript([b'\x00' * script_length]) tx.vout.append(CTxOut(0, script_output)) # Add the tx to the list of transactions to be included # in the block. self.add_transactions_to_block(block, [tx]) current_block_size += len(tx.serialize()) # Now that we added a bunch of transaction, we need to recompute # the merkle root. block.hashMerkleRoot = block.calc_merkle_root() # Check that the block size is what's expected if block_size > 0: assert_equal(len(block.serialize()), block_size) # Do PoW, which is cheap on regnet block.solve() self.tip = block self.block_heights[block.sha256] = height assert number not in self.blocks self.blocks[number] = block return block def get_tests(self): self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) self.block_heights[self.genesis_hash] = 0 spendable_outputs = [] # save the current tip so it can be spent by a later block def save_spendable_output(): spendable_outputs.append(self.tip) # get an output that we previously marked as spendable def get_spendable_output(): return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0) # returns a test case that asserts that the current tip was accepted def accepted(): return TestInstance([[self.tip, True]]) # returns a test case that asserts that the current tip was rejected def rejected(reject=None): if reject is None: return TestInstance([[self.tip, False]]) else: return TestInstance([[self.tip, reject]]) # move the tip back to a previous block def tip(number): self.tip = self.blocks[number] # adds transactions to the block and updates state def update_block(block_number, new_transactions): block = self.blocks[block_number] self.add_transactions_to_block(block, new_transactions) old_sha256 = block.sha256 block.hashMerkleRoot = block.calc_merkle_root() block.solve() # Update the internal state just like in next_block self.tip = block if block.sha256 != old_sha256: self.block_heights[ block.sha256] = self.block_heights[old_sha256] del self.block_heights[old_sha256] self.blocks[block_number] = block return block # shorthand for functions block = self.next_block # Create a new block block(0) save_spendable_output() yield accepted() # Now we need that block to mature so we can spend the coinbase. test = TestInstance(sync_every_block=False) for i in range(99): block(5000 + i) test.blocks_and_transactions.append([self.tip, True]) save_spendable_output() # Fork block bfork = block(5555) bfork.nTime = MONOLITH_START_TIME update_block(5555, []) test.blocks_and_transactions.append([self.tip, True]) # Get to one block of the May 15, 2018 HF activation for i in range(5): block(5100 + i) test.blocks_and_transactions.append([self.tip, True]) # Send it all to the node at once. yield test # collect spendable outputs now to avoid cluttering the code later on out = [] for i in range(100): out.append(get_spendable_output()) # Check that compact block also work for big blocks node = self.nodes[0] peer = TestNode() peer.add_connection(NodeConn('127.0.0.1', p2p_port(0), node, peer)) - # Start up network handling in another thread and wait for connection - # to be etablished - NetworkThread().start() + # Wait for connection to be etablished peer.wait_for_verack() # Wait for SENDCMPCT def received_sendcmpct(): return (peer.last_sendcmpct != None) wait_until(received_sendcmpct, timeout=30) sendcmpct = msg_sendcmpct() sendcmpct.version = 1 sendcmpct.announce = True peer.send_and_ping(sendcmpct) # Exchange headers def received_getheaders(): return (peer.last_getheaders != None) wait_until(received_getheaders, timeout=30) # Return the favor peer.send_message(peer.last_getheaders) # Wait for the header list def received_headers(): return (peer.last_headers != None) wait_until(received_headers, timeout=30) # It's like we know about the same headers ! peer.send_message(peer.last_headers) # Send a block b1 = block(1, spend=out[0], block_size=ONE_MEGABYTE + 1) yield accepted() # Checks the node to forward it via compact block def received_block(): return (peer.last_cmpctblock != None) wait_until(received_block, timeout=30) # Was it our block ? cmpctblk_header = peer.last_cmpctblock.header_and_shortids.header cmpctblk_header.calc_sha256() assert(cmpctblk_header.sha256 == b1.sha256) # Send a large block with numerous transactions. peer.clear_block_data() b2 = block(2, spend=out[1], extra_txns=70000, block_size=self.excessive_block_size - 1000) yield accepted() # Checks the node forwards it via compact block wait_until(received_block, timeout=30) # Was it our block ? cmpctblk_header = peer.last_cmpctblock.header_and_shortids.header cmpctblk_header.calc_sha256() assert(cmpctblk_header.sha256 == b2.sha256) # In order to avoid having to resend a ton of transactions, we invalidate # b2, which will send all its transactions in the mempool. node.invalidateblock(node.getbestblockhash()) # Let's send a compact block and see if the node accepts it. # Let's modify b2 and use it so that we can reuse the mempool. tx = b2.vtx[0] tx.vout.append(CTxOut(0, CScript([random.randint(0, 256), OP_RETURN]))) tx.rehash() b2.vtx[0] = tx b2.hashMerkleRoot = b2.calc_merkle_root() b2.solve() # Now we create the compact block and send it comp_block = HeaderAndShortIDs() comp_block.initialize_from_block(b2) peer.send_and_ping(msg_cmpctblock(comp_block.to_p2p())) # Check that compact block is received properly assert(int(node.getbestblockhash(), 16) == b2.sha256) if __name__ == '__main__': FullBlockTest().main() diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 1cc5bd960..3ba69df7c 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -1,1753 +1,1762 @@ #!/usr/bin/env python3 # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. - -# -# mininode.py - Bitcoin P2P network half-a-node -# -# This python code was modified from ArtForz' public domain half-a-node, as -# found in the mini-node branch of http://github.com/jgarzik/pynode. -# -# NodeConn: an object which manages p2p connectivity to a bitcoin node -# NodeConnCB: a base class that describes the interface for receiving -# callbacks with network messages from a NodeConn -# CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: -# data structures that should map to corresponding structures in -# bitcoin/primitives -# msg_block, msg_tx, msg_headers, etc.: -# data structures that represent network messages -# ser_*, deser_*: functions that handle serialization/deserialization - +"""Bitcoin P2P network half-a-node. + +This python code was modified from ArtForz' public domain half-a-node, as +found in the mini-node branch of http://github.com/jgarzik/pynode. + +NodeConn: an object which manages p2p connectivity to a bitcoin node +NodeConnCB: a base class that describes the interface for receiving + callbacks with network messages from a NodeConn +CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: + data structures that should map to corresponding structures in + bitcoin/primitives +msg_block, msg_tx, msg_headers, etc.: + data structures that represent network messages +ser_*, deser_*: functions that handle serialization/deserialization +""" import asyncore from codecs import encode from collections import defaultdict import copy import hashlib from io import BytesIO import logging import random import socket import struct import sys import time from threading import RLock, Thread from test_framework.siphash import siphash256 from test_framework.cdefs import MAX_BLOCK_SIGOPS_PER_MB from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until BIP0031_VERSION = 60000 MY_VERSION = 70014 # past bip-31 for ping/pong MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" # from version 70001 onwards, fRelay should be appended to version messages (BIP37) MY_RELAY = 1 MAX_INV_SZ = 50000 COIN = 100000000 # 1 btc in satoshis NODE_NETWORK = (1 << 0) NODE_GETUTXO = (1 << 1) NODE_BLOOM = (1 << 2) NODE_WITNESS = (1 << 3) NODE_XTHIN = (1 << 4) NODE_BITCOIN_CASH = (1 << 5) # Howmuch data will be read from the network at once READ_BUFFER_SIZE = 8192 logger = logging.getLogger("TestFramework.mininode") # Keep our own socket map for asyncore, so that we can track disconnects # ourselves (to workaround an issue with closing an asyncore socket when # using select) mininode_socket_map = dict() # One lock for synchronizing all data access between the networking thread (see # NetworkThread below) and the thread running the test logic. For simplicity, -# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB, +# NodeConn acquires this lock whenever delivering a message to a NodeConnCB, # and whenever adding anything to the send buffer (in send_message()). This # lock should be acquired in the thread running the test logic to synchronize # access to any data shared with the NodeConnCB or NodeConn. mininode_lock = RLock() # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() def ripemd160(s): return hashlib.new('ripemd160', s).digest() def hash256(s): return sha256(sha256(s)) def ser_compact_size(l): r = b"" if l < 253: r = struct.pack("B", l) elif l < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for i in range(nit): t = c() t.deserialize(f) r.append(t) return r # ser_function_name: Allow for an alternate serialization function on the # entries in the vector. def ser_vector(l, ser_function_name=None): r = ser_compact_size(len(l)) for i in l: if ser_function_name: r += getattr(i, ser_function_name)() else: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(l): r = ser_compact_size(len(l)) for i in l: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_string(f) r.append(t) return r def ser_string_vector(l): r = ser_compact_size(len(l)) for sv in l: r += ser_string(sv) return r def deser_int_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = struct.unpack("H", f.read(2))[0] def serialize(self): r = b"" r += struct.pack("H", self.port) return r def __repr__(self): return "CAddress(nServices=%i ip=%s port=%i)" % (self.nServices, self.ip, self.port) class CInv(): typemap = { 0: "Error", 1: "TX", 2: "Block", 4: "CompactBlock" } def __init__(self, t=0, h=0): self.type = t self.hash = h def deserialize(self, f): self.type = struct.unpack(" 21000000 * COIN: return False return True def __repr__(self): return "CTransaction(nVersion=%i vin=%s vout=%s nLockTime=%i)" \ % (self.nVersion, repr(self.vin), repr(self.vout), self.nLockTime) class CBlockHeader(): def __init__(self, header=None): if header is None: self.set_null() else: self.nVersion = header.nVersion self.hashPrevBlock = header.hashPrevBlock self.hashMerkleRoot = header.hashMerkleRoot self.nTime = header.nTime self.nBits = header.nBits self.nNonce = header.nNonce self.sha256 = header.sha256 self.hash = header.hash self.calc_sha256() def set_null(self): self.nVersion = 1 self.hashPrevBlock = 0 self.hashMerkleRoot = 0 self.nTime = 0 self.nBits = 0 self.nNonce = 0 self.sha256 = None self.hash = None def deserialize(self, f): self.nVersion = struct.unpack(" 1: newhashes = [] for i in range(0, len(hashes), 2): i2 = min(i + 1, len(hashes) - 1) newhashes.append(hash256(hashes[i] + hashes[i2])) hashes = newhashes return uint256_from_str(hashes[0]) def calc_merkle_root(self): hashes = [] for tx in self.vtx: tx.calc_sha256() hashes.append(ser_uint256(tx.sha256)) return self.get_merkle_root(hashes) def is_valid(self): self.calc_sha256() target = uint256_from_compact(self.nBits) if self.sha256 > target: return False for tx in self.vtx: if not tx.is_valid(): return False if self.calc_merkle_root() != self.hashMerkleRoot: return False return True def solve(self): self.rehash() target = uint256_from_compact(self.nBits) while self.sha256 > target: self.nNonce += 1 self.rehash() def __repr__(self): return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \ % (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, time.ctime(self.nTime), self.nBits, self.nNonce, repr(self.vtx)) class CUnsignedAlert(): def __init__(self): self.nVersion = 1 self.nRelayUntil = 0 self.nExpiration = 0 self.nID = 0 self.nCancel = 0 self.setCancel = [] self.nMinVer = 0 self.nMaxVer = 0 self.setSubVer = [] self.nPriority = 0 self.strComment = b"" self.strStatusBar = b"" self.strReserved = b"" def deserialize(self, f): self.nVersion = struct.unpack("= 106: self.addrFrom = CAddress() self.addrFrom.deserialize(f) self.nNonce = struct.unpack("= 209: self.nStartingHeight = struct.unpack("= 70001: # Relay field is optional for version 70001 onwards try: self.nRelay = struct.unpack(" class msg_headers(): command = b"headers" def __init__(self): self.headers = [] def deserialize(self, f): # comment in bitcoind indicates these should be deserialized as blocks blocks = deser_vector(f, CBlock) for x in blocks: self.headers.append(CBlockHeader(x)) def serialize(self): blocks = [CBlock(x) for x in self.headers] return ser_vector(blocks) def __repr__(self): return "msg_headers(headers=%s)" % repr(self.headers) class msg_reject(): command = b"reject" REJECT_MALFORMED = 1 def __init__(self): self.message = b"" self.code = 0 self.reason = b"" self.data = 0 def deserialize(self, f): self.message = deser_string(f) self.code = struct.unpack(" BIP0031_VERSION: conn.send_message(msg_pong(message.nonce)) def on_verack(self, conn, message): conn.ver_recv = conn.ver_send self.verack_received = True def on_version(self, conn, message): if message.nVersion >= 209: conn.send_message(msg_verack()) conn.ver_send = min(MY_VERSION, message.nVersion) if message.nVersion < 209: conn.ver_recv = conn.ver_send conn.nServices = message.nServices # Connection helper methods def add_connection(self, conn): self.connection = conn def wait_for_disconnect(self, timeout=60): def test_function(): return not self.connected wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message receiving helper methods def wait_for_block(self, blockhash, timeout=60): def test_function(): return self.last_message.get( "block") and self.last_message["block"].block.rehash() == blockhash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_getdata(self, timeout=60): def test_function(): return self.last_message.get("getdata") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_getheaders(self, timeout=60): def test_function(): return self.last_message.get("getheaders") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_inv(self, expected_inv, timeout=60): """Waits for an INV message and checks that the first inv object in the message was as expected.""" if len(expected_inv) > 1: raise NotImplementedError( "wait_for_inv() will only verify the first inv object") def test_function(): return self.last_message.get("inv") and \ self.last_message["inv"].inv[0].type == expected_inv[0].type and \ self.last_message["inv"].inv[0].hash == expected_inv[0].hash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_verack(self, timeout=60): def test_function(): return self.message_count["verack"] wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message sending helper functions def send_message(self, message): if self.connection: self.connection.send_message(message) else: logger.error("Cannot send message. No connection to node!") def send_and_ping(self, message): self.send_message(message) self.sync_with_ping() # Sync up with the node def sync_with_ping(self, timeout=60): self.send_message(msg_ping(nonce=self.ping_counter)) - def test_function(): return self.last_message.get( - "pong") and self.last_message["pong"].nonce == self.ping_counter + def test_function(): + if not self.last_message.get("pong"): + return False + return self.last_message["pong"].nonce == self.ping_counter wait_until(test_function, timeout=timeout, lock=mininode_lock) self.ping_counter += 1 # The actual NodeConn class # This class provides an interface for a p2p connection to a specified node class NodeConn(asyncore.dispatcher): messagemap = { b"version": msg_version, b"verack": msg_verack, b"addr": msg_addr, b"alert": msg_alert, b"inv": msg_inv, b"getdata": msg_getdata, b"getblocks": msg_getblocks, b"tx": msg_tx, b"block": msg_block, b"getaddr": msg_getaddr, b"ping": msg_ping, b"pong": msg_pong, b"headers": msg_headers, b"getheaders": msg_getheaders, b"reject": msg_reject, b"mempool": msg_mempool, b"feefilter": msg_feefilter, b"sendheaders": msg_sendheaders, b"sendcmpct": msg_sendcmpct, b"cmpctblock": msg_cmpctblock, b"getblocktxn": msg_getblocktxn, b"blocktxn": msg_blocktxn } MAGIC_BYTES = { "mainnet": b"\xe3\xe1\xf3\xe8", "testnet3": b"\xf4\xe5\xf3\xf4", "regtest": b"\xda\xb5\xbf\xfa", } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): asyncore.dispatcher.__init__(self, map=mininode_socket_map) self.dstaddr = dstaddr self.dstport = dstport self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.sendbuf = b"" self.recvbuf = b"" self.ver_send = 209 self.ver_recv = 209 self.last_sent = 0 self.state = "connecting" self.network = net self.cb = callback self.disconnect = False self.nServices = 0 if send_version: # stuff version msg into sendbuf vt = msg_version() vt.nServices = services vt.addrTo.ip = self.dstaddr vt.addrTo.port = self.dstport vt.addrFrom.ip = "0.0.0.0" vt.addrFrom.port = 0 self.send_message(vt, True) logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport)) try: self.connect((dstaddr, dstport)) except: self.handle_close() self.rpc = rpc def handle_connect(self): if self.state != "connected": - logger.debug("Connected & Listening: \n") + logger.debug("Connected & Listening: %s:%d" % + (self.dstaddr, self.dstport)) self.state = "connected" self.cb.on_open(self) def handle_close(self): - logger.debug("Closing Connection to %s:%d... " % + logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport)) self.state = "closed" self.recvbuf = b"" self.sendbuf = b"" try: self.close() except: pass self.cb.on_close(self) def handle_read(self): - try: - with mininode_lock: - t = self.recv(READ_BUFFER_SIZE) - if len(t) > 0: - self.recvbuf += t - except: - pass + with mininode_lock: + t = self.recv(READ_BUFFER_SIZE) + if len(t) > 0: + self.recvbuf += t while True: msg = self.got_data() if msg == None: break self.got_message(msg) def readable(self): return True def writable(self): with mininode_lock: pre_connection = self.state == "connecting" length = len(self.sendbuf) return (length > 0 or pre_connection) def handle_write(self): with mininode_lock: # asyncore does not expose socket connection, only the first read/write # event, thus we must check connection manually here to know when we # actually connect if self.state == "connecting": self.handle_connect() if not self.writable(): return try: sent = self.send(self.sendbuf) except: self.handle_close() return self.sendbuf = self.sendbuf[sent:] def got_data(self): try: with mininode_lock: if len(self.recvbuf) < 4: return None if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: raise ValueError("got garbage %s" % repr(self.recvbuf)) if self.ver_recv < 209: if len(self.recvbuf) < 4 + 12 + 4: return None command = self.recvbuf[4:4 + 12].split(b"\x00", 1)[0] msglen = struct.unpack( "= 209: th = sha256(data) h = sha256(th) tmsg += h[:4] tmsg += data with mininode_lock: self.sendbuf += tmsg self.last_sent = time.time() def got_message(self, message): if message.command == b"version": if message.nVersion <= BIP0031_VERSION: self.messagemap[b'ping'] = msg_ping_prebip31 if self.last_sent + 30 * 60 < time.time(): self.send_message(self.messagemap[b'ping']()) - logger.debug("Received message from %s:%d: %s" % - (self.dstaddr, self.dstport, repr(message))) + self._log_message("receive", message) self.cb.deliver(self, message) + def _log_message(self, direction, msg): + if direction == "send": + log_message = "Send message to " + elif direction == "receive": + log_message = "Received message from " + log_message += "%s:%d: %s" % (self.dstaddr, + self.dstport, repr(msg)[:500]) + if len(log_message) > 500: + log_message += "... (msg truncated)" + logger.debug(log_message) + def disconnect_node(self): self.disconnect = True class NetworkThread(Thread): def run(self): while mininode_socket_map: # We check for whether to disconnect outside of the asyncore # loop to workaround the behavior of asyncore when using # select disconnected = [] for fd, obj in mininode_socket_map.items(): if obj.disconnect: disconnected.append(obj) [obj.handle_close() for obj in disconnected] asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) logger.debug("Network thread closing") # An exception we can raise if we detect a potential disconnect # (p2p or rpc) before the test is complete class EarlyDisconnectError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value)