diff --git a/contrib/message-capture/message-capture-docs.md b/contrib/message-capture/message-capture-docs.md new file mode 100644 --- /dev/null +++ b/contrib/message-capture/message-capture-docs.md @@ -0,0 +1,25 @@ +# Per-Peer Message Capture + +## Purpose + +This feature allows for message capture on a per-peer basis. It answers the simple question: "Can I see what messages my node is sending and receiving?" + +## Usage and Functionality + +* Run `bitcoind` with the `-capturemessages` option. +* Look in the `message_capture` folder in your datadir. + * Typically this will be `~/.bitcoin/message_capture`. + * See that there are many folders inside, one for each peer names with its IP address and port. + * Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`). +* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments. + * See the `-h` option for help. + * To see all messages, both sent and received, for all peers use: + ``` + ./contrib/message-capture/message-capture-parser.py -o out.json \ + ~/.bitcoin/message_capture/**/*.dat + ``` + * Note: The messages in the given `.dat` files will be interleaved in chronological order. So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order. + * If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`. +* View the resulting output. + * The output file is `JSON` formatted. + * Suggestion: use `jq` to view the output, with `jq . out.json` diff --git a/contrib/message-capture/message-capture-parser.py b/contrib/message-capture/message-capture-parser.py new file mode 100755 --- /dev/null +++ b/contrib/message-capture/message-capture-parser.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +# Copyright (c) 2020 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Parse message capture binary files. +To be used in conjunction with -capturemessages. +""" + +import argparse +import os +import shutil +import sys +from io import BytesIO +import json +from pathlib import Path +from typing import Any, Dict, List, Optional + +sys.path.append( + os.path.join( + os.path.dirname(__file__), + '../../test/functional')) + +from test_framework.messages import ser_uint256 # noqa: E402 +from test_framework.p2p import MESSAGEMAP # noqa: E402 + +TIME_SIZE = 8 +LENGTH_SIZE = 4 +MSGTYPE_SIZE = 12 + +# The test framework classes stores hashes as large ints in many cases. +# These are variables of type uint256 in core. +# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes. +# As such, they are itemized here. +# Any variables with these names that are of type int are actually uint256 variables. +# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py) +HASH_INTS = [ + "blockhash", + "block_hash", + "hash", + "hashMerkleRoot", + "hashPrevBlock", + "hashstop", + "limited_proofid", + "prev_header", + "sha256", + "stop_hash", +] + +HASH_INT_VECTORS = [ + "hashes", + "headers", + "vHave", + "vHash", +] + + +class ProgressBar: + def __init__(self, total: float): + self.total = total + self.running = 0 + + def set_progress(self, progress: float): + cols = shutil.get_terminal_size()[0] + if cols <= 12: + return + max_blocks = cols - 9 + num_blocks = int(max_blocks * progress) + print('\r[ {}{} ] {:3.0f}%' + .format('#' * num_blocks, + ' ' * (max_blocks - num_blocks), + progress * 100), + end='') + + def update(self, more: float): + self.running += more + self.set_progress(self.running / self.total) + + +def to_jsonable(obj: Any) -> Any: + if hasattr(obj, "__dict__"): + return obj.__dict__ + elif hasattr(obj, "__slots__"): + ret: Dict[str, Any] = {} + for slot in obj.__slots__: + val = getattr(obj, slot, None) + if slot in HASH_INTS and isinstance(val, int): + ret[slot] = ser_uint256(val).hex() + elif slot in HASH_INT_VECTORS and isinstance(val[0], int): + ret[slot] = [ser_uint256(a).hex() for a in val] + else: + ret[slot] = to_jsonable(val) + return ret + elif isinstance(obj, list): + return [to_jsonable(a) for a in obj] + elif isinstance(obj, bytes): + return obj.hex() + else: + return obj + + +def process_file(path: str, messages: List[Any], recv: bool, + progress_bar: Optional[ProgressBar]) -> None: + with open(path, 'rb') as f_in: + if progress_bar: + bytes_read = 0 + + while True: + if progress_bar: + # Update progress bar + diff = f_in.tell() - bytes_read - 1 + progress_bar.update(diff) + bytes_read = f_in.tell() - 1 + + # Read the Header + tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE) + if not tmp_header_raw: + break + tmp_header = BytesIO(tmp_header_raw) + time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") + msgtype: bytes = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0] + length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") + + # Start converting the message to a dictionary + msg_dict = {} + msg_dict["direction"] = "recv" if recv else "sent" + msg_dict["time"] = time + # "size" is less readable here, but more readable in the output + msg_dict["size"] = length + + msg_ser = BytesIO(f_in.read(length)) + + # Determine message type + if msgtype not in MESSAGEMAP: + # Unrecognized message type + try: + msgtype_tmp = msgtype.decode() + if not msgtype_tmp.isprintable(): + raise UnicodeDecodeError + msg_dict["msgtype"] = msgtype_tmp + except UnicodeDecodeError: + msg_dict["msgtype"] = "UNREADABLE" + msg_dict["body"] = msg_ser.read().hex() + msg_dict["error"] = "Unrecognized message type." + messages.append(msg_dict) + print( + f"WARNING - Unrecognized message type {msgtype} in {path}", + file=sys.stderr) + continue + + # Deserialize the message + msg = MESSAGEMAP[msgtype]() + msg_dict["msgtype"] = msgtype.decode() + + try: + msg.deserialize(msg_ser) + except KeyboardInterrupt: + raise + except Exception: + # Unable to deserialize message body + msg_ser.seek(0, os.SEEK_SET) + msg_dict["body"] = msg_ser.read().hex() + msg_dict["error"] = "Unable to deserialize message." + messages.append(msg_dict) + print( + f"WARNING - Unable to deserialize message in {path}", + file=sys.stderr) + continue + + # Convert body of message into a jsonable object + if length: + msg_dict["body"] = to_jsonable(msg) + messages.append(msg_dict) + + if progress_bar: + # Update the progress bar to the end of the current file + # in case we exited the loop early + # Go to end of file + f_in.seek(0, os.SEEK_END) + diff = f_in.tell() - bytes_read - 1 + progress_bar.update(diff) + + +def main(): + parser = argparse.ArgumentParser( + description=__doc__, + epilog="EXAMPLE \n\t{0} -o out.json /message_capture/**/*.dat".format( + sys.argv[0]), + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument( + "capturepaths", + nargs='+', + help="binary message capture files to parse.") + parser.add_argument( + "-o", "--output", + help="output file. If unset print to stdout") + parser.add_argument( + "-n", "--no-progress-bar", + action='store_true', + help="disable the progress bar. Automatically set if the output is not a terminal") + args = parser.parse_args() + capturepaths = [Path.cwd() / Path(capturepath) + for capturepath in args.capturepaths] + output = Path.cwd() / Path(args.output) if args.output else False + use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty() + + messages: List[Any] = [] + if use_progress_bar: + total_size = sum(capture.stat().st_size for capture in capturepaths) + progress_bar = ProgressBar(total_size) + else: + progress_bar = None + + for capture in capturepaths: + process_file( + str(capture), + messages, + "recv" in capture.stem, + progress_bar) + + messages.sort(key=lambda msg: msg['time']) + + if use_progress_bar: + progress_bar.set_progress(1) + jsonrep = json.dumps(messages) + if output: + with open(str(output), 'w+', encoding="utf8") as f_out: + f_out.write(jsonrep) + else: + print(jsonrep) + + +if __name__ == "__main__": + main() diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -1041,7 +1041,9 @@ argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); - + argsman.AddArg("-capturemessages", "Capture all P2P messages to disk", + ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY, + OptionsCategory::DEBUG_TEST); argsman.AddArg("-debug=", strprintf("Output debugging information (default: %u, " "supplying is optional)", @@ -1866,19 +1868,19 @@ // Trim requested connection counts, to fit into system limitations // in std::min(...) to work around FreeBSD compilation issue // described in #2695 - nFD = RaiseFileDescriptorLimit(nMaxConnections + nBind + - MIN_CORE_FILEDESCRIPTORS + - MAX_ADDNODE_CONNECTIONS); + nFD = RaiseFileDescriptorLimit( + nMaxConnections + nBind + MIN_CORE_FILEDESCRIPTORS + + MAX_ADDNODE_CONNECTIONS + NUM_FDS_MESSAGE_CAPTURE); #ifdef USE_POLL int fd_max = nFD; #else int fd_max = FD_SETSIZE; #endif - nMaxConnections = - std::max(std::min(nMaxConnections, fd_max - nBind - - MIN_CORE_FILEDESCRIPTORS - - MAX_ADDNODE_CONNECTIONS), - 0); + nMaxConnections = std::max( + std::min(nMaxConnections, + fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - + MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE), + 0); if (nFD < MIN_CORE_FILEDESCRIPTORS) { return InitError(_("Not enough file descriptors available.")); } diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -94,6 +95,8 @@ static const bool DEFAULT_BLOCKSONLY = false; /** -peertimeout default */ static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60; +/** Number of file descriptors required for message capture **/ +static const int NUM_FDS_MESSAGE_CAPTURE = 1; static const bool DEFAULT_FORCEDNSSEED = false; static const bool DEFAULT_DNSSEED = true; @@ -1438,6 +1441,10 @@ std::string getSubVersionEB(uint64_t MaxBlockSize); std::string userAgent(const Config &config); +/** Dump binary message to file, with timestamp */ +void CaptureMessage(const CAddress &addr, const std::string &msg_type, + const Span &data, bool is_incoming); + struct NodeEvictionCandidate { NodeId id; std::chrono::seconds m_connected; diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -3503,6 +3503,9 @@ size_t nMessageSize = msg.data.size(); LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.m_type), nMessageSize, pnode->GetId()); + if (gArgs.GetBoolArg("-capturemessages", false)) { + CaptureMessage(pnode->addr, msg.m_type, msg.data, /*incoming=*/false); + } // make sure we use the appropriate network transport format std::vector serializedHeader; @@ -3629,3 +3632,32 @@ // Size compliance is checked at startup, it is safe to not check it again return FormatUserAgent(client_name, client_version, uacomments); } + +void CaptureMessage(const CAddress &addr, const std::string &msg_type, + const Span &data, bool is_incoming) { + // Note: This function captures the message at the time of processing, + // not at socket receive/send time. + // This ensures that the messages are always in order from an application + // layer (processing) perspective. + auto now = GetTime(); + + // Windows folder names can not include a colon + std::string clean_addr = addr.ToString(); + std::replace(clean_addr.begin(), clean_addr.end(), ':', '_'); + + fs::path base_path = gArgs.GetDataDirNet() / "message_capture" / clean_addr; + fs::create_directories(base_path); + + fs::path path = + base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat"); + CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION); + + ser_writedata64(f, now.count()); + f.write(msg_type.data(), msg_type.length()); + for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) { + f << '\0'; + } + uint32_t size = data.size(); + ser_writedata32(f, size); + f.write((const char *)data.data(), data.size()); +} diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -5866,6 +5866,11 @@ } CNetMessage &msg(msgs.front()); + if (gArgs.GetBoolArg("-capturemessages", false)) { + CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv), + /*incoming=*/true); + } + msg.SetVersion(pfrom->GetCommonVersion()); // Check network magic diff --git a/test/functional/p2p_message_capture.py b/test/functional/p2p_message_capture.py new file mode 100755 --- /dev/null +++ b/test/functional/p2p_message_capture.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +# Copyright (c) 2020 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test per-peer message capture capability. + +Additionally, the output of contrib/message-capture/message-capture-parser.py +should be verified manually. +""" + +import glob +import os +from io import BytesIO + +from test_framework.p2p import MESSAGEMAP, P2PDataStore +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal + +TIME_SIZE = 8 +LENGTH_SIZE = 4 +MSGTYPE_SIZE = 12 + + +def mini_parser(dat_file): + """Parse a data file created by CaptureMessage. + + From the data file we'll only check the structure. + + We won't care about things like: + - Deserializing the payload of the message + - This is managed by the deserialize methods in test_framework.messages + - The order of the messages + - There's no reason why we can't, say, change the order of the messages + in the handshake + - Message Type + - We can add new message types + + We're ignoring these because they're simply too brittle to test here. + """ + with open(dat_file, 'rb') as f_in: + # This should have at least one message in it + assert(os.fstat(f_in.fileno()).st_size >= + TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE) + while True: + tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE) + if not tmp_header_raw: + break + tmp_header = BytesIO(tmp_header_raw) + int.from_bytes(tmp_header.read(TIME_SIZE), "little") + raw_msgtype = tmp_header.read(MSGTYPE_SIZE) + msgtype = raw_msgtype.split(b'\x00', 1)[0] + remainder = raw_msgtype.split(b'\x00', 1)[1] + assert(len(msgtype) > 0) + assert(msgtype in MESSAGEMAP) + assert(len(remainder) == 0 or not remainder.decode().isprintable()) + length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") + data = f_in.read(length) + assert_equal(len(data), length) + + +class MessageCaptureTest(BitcoinTestFramework): + def set_test_params(self): + self.num_nodes = 1 + self.extra_args = [["-capturemessages"]] + self.setup_clean_chain = True + + def run_test(self): + capturedir = os.path.join( + self.nodes[0].datadir, + "regtest/message_capture") + # Connect a node so that the handshake occurs + self.nodes[0].add_p2p_connection(P2PDataStore()) + self.nodes[0].disconnect_p2ps() + recv_file = glob.glob(os.path.join(capturedir, "*/msgs_recv.dat"))[0] + mini_parser(recv_file) + sent_file = glob.glob(os.path.join(capturedir, "*/msgs_sent.dat"))[0] + mini_parser(sent_file) + + +if __name__ == '__main__': + MessageCaptureTest().main() diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -831,6 +831,8 @@ class AvalancheStake: + __slots__ = ("utxo", "amount", "height", "pubkey", "is_coinbase") + def __init__(self, utxo=None, amount=0, height=0, pubkey=b"", is_coinbase=False): self.utxo: COutPoint = utxo or COutPoint() @@ -868,6 +870,8 @@ class AvalancheSignedStake: + __slots__ = ("stake", "sig") + def __init__(self, stake=None, sig=b""): self.stake: AvalancheStake = stake or AvalancheStake() self.sig: bytes = sig @@ -1460,7 +1464,7 @@ # for cases where a user needs tighter control over what is sent over the wire # note that the user must supply the name of the msgtype, and the data class msg_generic: - __slots__ = ("msgtype", "data") + __slots__ = ("data") def __init__(self, msgtype, data=None): self.msgtype = msgtype