diff --git a/test/functional/test_framework/netutil.py b/test/functional/test_framework/netutil.py index 1a2b30a30..643c2592e 100644 --- a/test/functional/test_framework/netutil.py +++ b/test/functional/test_framework/netutil.py @@ -1,182 +1,182 @@ #!/usr/bin/env python3 # Copyright (c) 2014-2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Linux network utilities. Roughly based on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal """ import array import os import socket import struct import sys from errno import EINVAL, ENOENT # STATE_ESTABLISHED = '01' # STATE_SYN_SENT = '02' # STATE_SYN_RECV = '03' # STATE_FIN_WAIT1 = '04' # STATE_FIN_WAIT2 = '05' # STATE_TIME_WAIT = '06' # STATE_CLOSE = '07' # STATE_CLOSE_WAIT = '08' # STATE_LAST_ACK = '09' STATE_LISTEN = '0A' # STATE_CLOSING = '0B' def get_socket_inodes(pid): ''' Get list of socket inodes for process pid. ''' base = f'/proc/{pid}/fd' inodes = [] for item in os.listdir(base): try: target = os.readlink(os.path.join(base, item)) except OSError as err: if err.errno == ENOENT: # The file which is gone in the meantime continue elif err.errno == EINVAL: # Not a link continue else: raise else: if target.startswith('socket:'): inodes.append(int(target[8:-1])) return inodes def _remove_empty(array): return [x for x in array if x != ''] def _convert_ip_port(array): host, port = array.split(':') # convert host from mangled-per-four-bytes form as used by kernel host = bytes.fromhex(host) host_out = '' for x in range(0, len(host) // 4): (val,) = struct.unpack('=I', host[x * 4:(x + 1) * 4]) host_out += f'{val:08x}' return host_out, int(port, 16) def netstat(typ='tcp'): ''' Function to return a list with status of tcp connections at linux systems To get pid of all network process running on system, you must run this script as superuser ''' with open(f"/proc/net/{typ}", 'r', encoding='utf8') as f: content = f.readlines() content.pop(0) result = [] for line in content: # Split lines and remove empty spaces. line_array = _remove_empty(line.split(' ')) tcp_id = line_array[0] l_addr = _convert_ip_port(line_array[1]) r_addr = _convert_ip_port(line_array[2]) state = line_array[3] # Need the inode to match with process pid. inode = int(line_array[9]) nline = [tcp_id, l_addr, r_addr, state, inode] result.append(nline) return result def get_bind_addrs(pid): ''' Get bind addresses as (host,port) tuples for process pid. ''' inodes = get_socket_inodes(pid) bind_addrs = [] for conn in netstat('tcp') + netstat('tcp6'): if conn[3] == STATE_LISTEN and conn[4] in inodes: bind_addrs.append(conn[1]) return bind_addrs # from: http://code.activestate.com/recipes/439093/ def all_interfaces(): ''' Return all interfaces that are up ''' # Linux only, so only import when required import fcntl is_64bits = sys.maxsize > 2**32 struct_size = 40 if is_64bits else 32 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) max_possible = 8 # initial value while True: - bytes = max_possible * struct_size - names = array.array('B', b'\0' * bytes) + inbytes = max_possible * struct_size + names = array.array('B', b'\0' * inbytes) outbytes = struct.unpack('iL', fcntl.ioctl( s.fileno(), 0x8912, # SIOCGIFCONF - struct.pack('iL', bytes, names.buffer_info()[0]) + struct.pack('iL', inbytes, names.buffer_info()[0]) ))[0] - if outbytes == bytes: + if outbytes == inbytes: max_possible *= 2 else: break namestr = names.tobytes() return [(namestr[i:i + 16].split(b'\0', 1)[0], socket.inet_ntoa(namestr[i + 20:i + 24])) for i in range(0, outbytes, struct_size)] def addr_to_hex(addr): ''' Convert string IPv4 or IPv6 address to binary address as returned by get_bind_addrs. Very naive implementation that certainly doesn't work for all IPv6 variants. ''' if '.' in addr: # IPv4 addr = [int(x) for x in addr.split('.')] elif ':' in addr: # IPv6 sub = [[], []] # prefix, suffix x = 0 addr = addr.split(':') for i, comp in enumerate(addr): if comp == '': # skip empty component at beginning or end if i == 0 or i == (len(addr) - 1): continue x += 1 # :: skips to suffix assert x < 2 else: # two bytes per component val = int(comp, 16) sub[x].append(val >> 8) sub[x].append(val & 0xff) nullbytes = 16 - len(sub[0]) - len(sub[1]) assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0) addr = sub[0] + ([0] * nullbytes) + sub[1] else: raise ValueError(f'Could not parse address {addr}') return bytearray(addr).hex() def test_ipv6_local(): ''' Check for (local) IPv6 support. ''' import socket # By using SOCK_DGRAM this will not actually make a connection, but it will # fail if there is no route to IPv6 localhost. have_ipv6 = True try: s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) s.connect(('::1', 0)) except socket.error: have_ipv6 = False return have_ipv6 diff --git a/test/functional/test_framework/script.py b/test/functional/test_framework/script.py index 4bb607b64..0b6e5e175 100644 --- a/test/functional/test_framework/script.py +++ b/test/functional/test_framework/script.py @@ -1,769 +1,769 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Functionality to build scripts, as well as SignatureHash(). This file is modified from python-bitcoinlib. """ import struct import unittest from typing import Dict, List from .messages import ( CTransaction, CTxOut, hash256, ser_string, ser_uint256, sha256, uint256_from_str, ) from .ripemd160 import ripemd160 MAX_SCRIPT_ELEMENT_SIZE = 520 OPCODE_NAMES: Dict["CScriptOp", str] = {} def hash160(s: bytes) -> bytes: return ripemd160(sha256(s)) def bn2vch(v): """Convert number to bitcoin-specific little endian format.""" # We need v.bit_length() bits, plus a sign bit for every nonzero number. n_bits = v.bit_length() + (v != 0) # The number of bytes for that is: n_bytes = (n_bits + 7) // 8 # Convert number to absolute value + sign in top bit. encoded_v = 0 if v == 0 else abs(v) | ((v < 0) << (n_bytes * 8 - 1)) # Serialize to bytes return encoded_v.to_bytes(n_bytes, 'little') _opcode_instances: List["CScriptOp"] = [] class CScriptOp(int): """A single script opcode""" __slots__ = () @staticmethod def encode_op_pushdata(d): """Encode a PUSHDATA op, returning bytes""" if len(d) < 0x4c: # OP_PUSHDATA return b'' + bytes([len(d)]) + d elif len(d) <= 0xff: # OP_PUSHDATA1 return b'\x4c' + bytes([len(d)]) + d elif len(d) <= 0xffff: # OP_PUSHDATA2 return b'\x4d' + struct.pack(b'>= 8 if r[-1] & 0x80: r.append(0x80 if neg else 0) elif neg: r[-1] |= 0x80 return bytes([len(r)]) + r @staticmethod def decode(vch): result = 0 # We assume valid push_size and minimal encoding value = vch[1:] if len(value) == 0: return result for i, byte in enumerate(value): result |= int(byte) << 8 * i if value[-1] >= 0x80: # Mask for all but the highest result bit num_mask = (2**(len(value) * 8) - 1) >> 1 result &= num_mask result *= -1 return result class CScript(bytes): """Serialized script A bytes subclass, so you can use this directly whenever bytes are accepted. Note that this means that indexing does *not* work - you'll get an index by byte rather than opcode. This format was chosen for efficiency so that the general case would not require creating a lot of little CScriptOP objects. iter(script) however does iterate by opcode. """ __slots__ = () @classmethod def __coerce_instance(cls, other): # Coerce other into bytes if isinstance(other, CScriptOp): other = bytes([other]) elif isinstance(other, CScriptNum): if (other.value == 0): other = bytes([CScriptOp(OP_0)]) else: other = CScriptNum.encode(other) elif isinstance(other, int): if 0 <= other <= 16: other = bytes([CScriptOp.encode_op_n(other)]) elif other == -1: other = bytes([OP_1NEGATE]) else: other = CScriptOp.encode_op_pushdata(bn2vch(other)) elif isinstance(other, (bytes, bytearray)): other = CScriptOp.encode_op_pushdata(other) return other def __add__(self, other): # add makes no sense for a CScript() raise NotImplementedError def join(self, iterable): # join makes no sense for a CScript() raise NotImplementedError def __new__(cls, value=b''): if isinstance(value, bytes) or isinstance(value, bytearray): return super().__new__(cls, value) else: def coerce_iterable(iterable): for instance in iterable: yield cls.__coerce_instance(instance) # Annoyingly on both python2 and python3 bytes.join() always # returns a bytes instance even when subclassed. return super().__new__( cls, b''.join(coerce_iterable(value))) def raw_iter(self): """Raw iteration Yields tuples of (opcode, data, sop_idx) so that the different possible PUSHDATA encodings can be accurately distinguished, as well as determining the exact opcode byte indexes. (sop_idx) """ i = 0 while i < len(self): sop_idx = i opcode = self[i] i += 1 if opcode > OP_PUSHDATA4: yield (opcode, None, sop_idx) else: datasize = None pushdata_type = None if opcode < OP_PUSHDATA1: pushdata_type = f'PUSHDATA({opcode})' datasize = opcode elif opcode == OP_PUSHDATA1: pushdata_type = 'PUSHDATA1' if i >= len(self): raise CScriptInvalidError( 'PUSHDATA1: missing data length') datasize = self[i] i += 1 elif opcode == OP_PUSHDATA2: pushdata_type = 'PUSHDATA2' if i + 1 >= len(self): raise CScriptInvalidError( 'PUSHDATA2: missing data length') datasize = self[i] + (self[i + 1] << 8) i += 2 elif opcode == OP_PUSHDATA4: pushdata_type = 'PUSHDATA4' if i + 3 >= len(self): raise CScriptInvalidError( 'PUSHDATA4: missing data length') datasize = self[i] + (self[i + 1] << 8) + \ (self[i + 2] << 16) + (self[i + 3] << 24) i += 4 else: assert False # shouldn't happen data = bytes(self[i:i + datasize]) # Check for truncation if len(data) < datasize: raise CScriptTruncatedPushDataError( f'{pushdata_type}: truncated data', data) i += datasize yield (opcode, data, sop_idx) def __iter__(self): """'Cooked' iteration Returns either a CScriptOP instance, an integer, or bytes, as appropriate. See raw_iter() if you need to distinguish the different possible PUSHDATA encodings. """ for (opcode, data, sop_idx) in self.raw_iter(): if data is not None: yield data else: opcode = CScriptOp(opcode) if opcode.is_small_int(): yield opcode.decode_op_n() else: yield CScriptOp(opcode) def __repr__(self): def _repr(o): if isinstance(o, bytes): return f"x('{o.hex()}')" else: return repr(o) ops = [] i = iter(self) while True: op = None try: op = _repr(next(i)) except CScriptTruncatedPushDataError as err: op = f'{_repr(err.data)}...' break except CScriptInvalidError as err: op = f'' break except StopIteration: break finally: if op is not None: ops.append(op) return f"CScript([{', '.join(ops)}])" SIGHASH_ALL = 1 SIGHASH_NONE = 2 SIGHASH_SINGLE = 3 SIGHASH_FORKID = 0x40 SIGHASH_ANYONECANPAY = 0x80 def FindAndDelete(script, sig): """Consensus critical, see FindAndDelete() in Satoshi codebase""" r = b'' last_sop_idx = sop_idx = 0 skip = True for (opcode, data, sop_idx) in script.raw_iter(): if not skip: r += script[last_sop_idx:sop_idx] last_sop_idx = sop_idx if script[sop_idx:sop_idx + len(sig)] == sig: skip = True else: skip = False if not skip: r += script[last_sop_idx:] return CScript(r) def SignatureHash(script, txTo, inIdx, hashtype): """Consensus-correct SignatureHash - Returns (hash, err) to precisely match the consensus-critical behavior of + Returns (sighash, err) to precisely match the consensus-critical behavior of the SIGHASH_SINGLE bug. (inIdx is *not* checked for validity) """ HASH_ONE = b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' if inIdx >= len(txTo.vin): return (HASH_ONE, f"inIdx {inIdx} out of range ({len(txTo.vin)})") txtmp = CTransaction(txTo) for txin in txtmp.vin: txin.scriptSig = b'' txtmp.vin[inIdx].scriptSig = FindAndDelete( script, CScript([OP_CODESEPARATOR])) if (hashtype & 0x1f) == SIGHASH_NONE: txtmp.vout = [] for i in range(len(txtmp.vin)): if i != inIdx: txtmp.vin[i].nSequence = 0 elif (hashtype & 0x1f) == SIGHASH_SINGLE: outIdx = inIdx if outIdx >= len(txtmp.vout): return (HASH_ONE, f"outIdx {outIdx} out of range ({len(txtmp.vout)})") tmp = txtmp.vout[outIdx] txtmp.vout = [] for _ in range(outIdx): txtmp.vout.append(CTxOut(-1)) txtmp.vout.append(tmp) for i in range(len(txtmp.vin)): if i != inIdx: txtmp.vin[i].nSequence = 0 if hashtype & SIGHASH_ANYONECANPAY: tmp = txtmp.vin[inIdx] txtmp.vin = [] txtmp.vin.append(tmp) s = txtmp.serialize() s += struct.pack(b"