Changeset View
Changeset View
Standalone View
Standalone View
contrib/message-capture/message-capture-parser.py
Show All 9 Lines | |||||
import json | import json | ||||
import os | import os | ||||
import shutil | import shutil | ||||
import sys | import sys | ||||
from io import BytesIO | from io import BytesIO | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Any, Dict, List, Optional, Union | from typing import Any, Dict, List, Optional, Union | ||||
sys.path.append( | sys.path.append(os.path.join(os.path.dirname(__file__), "../../test/functional")) | ||||
os.path.join( | |||||
os.path.dirname(__file__), | |||||
'../../test/functional')) | |||||
from test_framework.messages import ser_uint256 # noqa: E402 | from test_framework.messages import ser_uint256 # noqa: E402 | ||||
from test_framework.p2p import MESSAGEMAP # noqa: E402 | from test_framework.p2p import MESSAGEMAP # noqa: E402 | ||||
TIME_SIZE = 8 | TIME_SIZE = 8 | ||||
LENGTH_SIZE = 4 | LENGTH_SIZE = 4 | ||||
MSGTYPE_SIZE = 12 | MSGTYPE_SIZE = 12 | ||||
Show All 26 Lines | |||||
class MessageTypeNotPrintableError(Exception): | class MessageTypeNotPrintableError(Exception): | ||||
pass | pass | ||||
class ProgressBar: | class ProgressBar: | ||||
def __init__(self, total: float): | def __init__(self, total: float): | ||||
self.total = total | self.total = total | ||||
self.running = 0. | self.running = 0.0 | ||||
def set_progress(self, progress: float): | def set_progress(self, progress: float): | ||||
cols = shutil.get_terminal_size()[0] | cols = shutil.get_terminal_size()[0] | ||||
if cols <= 12: | if cols <= 12: | ||||
return | return | ||||
max_blocks = cols - 9 | max_blocks = cols - 9 | ||||
num_blocks = int(max_blocks * progress) | num_blocks = int(max_blocks * progress) | ||||
print('\r[ {}{} ] {:3.0f}%' | print( | ||||
.format('#' * num_blocks, | "\r[ {}{} ] {:3.0f}%".format( | ||||
' ' * (max_blocks - num_blocks), | "#" * num_blocks, " " * (max_blocks - num_blocks), progress * 100 | ||||
progress * 100), | ), | ||||
end='') | end="", | ||||
) | |||||
def update(self, more: float): | def update(self, more: float): | ||||
self.running += more | self.running += more | ||||
self.set_progress(self.running / self.total) | self.set_progress(self.running / self.total) | ||||
def to_jsonable(obj: Any) -> Any: | def to_jsonable(obj: Any) -> Any: | ||||
if hasattr(obj, "__dict__"): | if hasattr(obj, "__dict__"): | ||||
return obj.__dict__ | return obj.__dict__ | ||||
elif hasattr(obj, "__slots__"): | elif hasattr(obj, "__slots__"): | ||||
ret: Dict[str, Any] = {} | ret: Dict[str, Any] = {} | ||||
for slot in obj.__slots__: | for slot in obj.__slots__: | ||||
val = getattr(obj, slot, None) | val = getattr(obj, slot, None) | ||||
if slot in HASH_INTS and isinstance(val, int): | if slot in HASH_INTS and isinstance(val, int): | ||||
ret[slot] = ser_uint256(val).hex() | ret[slot] = ser_uint256(val).hex() | ||||
elif slot in HASH_INT_VECTORS and isinstance(val, list) and isinstance(val[0], int): | elif ( | ||||
slot in HASH_INT_VECTORS | |||||
and isinstance(val, list) | |||||
and isinstance(val[0], int) | |||||
): | |||||
ret[slot] = [ser_uint256(a).hex() for a in val] | ret[slot] = [ser_uint256(a).hex() for a in val] | ||||
else: | else: | ||||
ret[slot] = to_jsonable(val) | ret[slot] = to_jsonable(val) | ||||
return ret | return ret | ||||
elif isinstance(obj, list): | elif isinstance(obj, list): | ||||
return [to_jsonable(a) for a in obj] | return [to_jsonable(a) for a in obj] | ||||
elif isinstance(obj, bytes): | elif isinstance(obj, bytes): | ||||
return obj.hex() | return obj.hex() | ||||
else: | else: | ||||
return obj | return obj | ||||
def process_file(path: str, messages: List[Any], recv: bool, | def process_file( | ||||
progress_bar: Optional[ProgressBar]) -> None: | path: str, messages: List[Any], recv: bool, progress_bar: Optional[ProgressBar] | ||||
with open(path, 'rb') as f_in: | ) -> None: | ||||
with open(path, "rb") as f_in: | |||||
if progress_bar: | if progress_bar: | ||||
bytes_read = 0 | bytes_read = 0 | ||||
while True: | while True: | ||||
if progress_bar: | if progress_bar: | ||||
# Update progress bar | # Update progress bar | ||||
diff = f_in.tell() - bytes_read - 1 | diff = f_in.tell() - bytes_read - 1 | ||||
progress_bar.update(diff) | progress_bar.update(diff) | ||||
bytes_read = f_in.tell() - 1 | bytes_read = f_in.tell() - 1 | ||||
# Read the Header | # Read the Header | ||||
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE) | tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE) | ||||
if not tmp_header_raw: | if not tmp_header_raw: | ||||
break | break | ||||
tmp_header = BytesIO(tmp_header_raw) | tmp_header = BytesIO(tmp_header_raw) | ||||
time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") | time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") | ||||
msgtype: bytes = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0] | msgtype: bytes = tmp_header.read(MSGTYPE_SIZE).split(b"\x00", 1)[0] | ||||
length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") | length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") | ||||
# Start converting the message to a dictionary | # Start converting the message to a dictionary | ||||
msg_dict: Dict[str, Union[int, str]] = {} | msg_dict: Dict[str, Union[int, str]] = {} | ||||
msg_dict["direction"] = "recv" if recv else "sent" | msg_dict["direction"] = "recv" if recv else "sent" | ||||
msg_dict["time"] = time | msg_dict["time"] = time | ||||
# "size" is less readable here, but more readable in the output | # "size" is less readable here, but more readable in the output | ||||
msg_dict["size"] = length | msg_dict["size"] = length | ||||
Show All 10 Lines | with open(path, "rb") as f_in: | ||||
msg_dict["msgtype"] = msgtype_tmp | msg_dict["msgtype"] = msgtype_tmp | ||||
except (UnicodeDecodeError, MessageTypeNotPrintableError): | except (UnicodeDecodeError, MessageTypeNotPrintableError): | ||||
msg_dict["msgtype"] = "UNREADABLE" | msg_dict["msgtype"] = "UNREADABLE" | ||||
msg_dict["body"] = msg_ser.read().hex() | msg_dict["body"] = msg_ser.read().hex() | ||||
msg_dict["error"] = "Unrecognized message type." | msg_dict["error"] = "Unrecognized message type." | ||||
messages.append(msg_dict) | messages.append(msg_dict) | ||||
print( | print( | ||||
f"WARNING - Unrecognized message type {msgtype!r} in {path}", | f"WARNING - Unrecognized message type {msgtype!r} in {path}", | ||||
file=sys.stderr) | file=sys.stderr, | ||||
) | |||||
continue | continue | ||||
# Deserialize the message | # Deserialize the message | ||||
msg = MESSAGEMAP[msgtype]() | msg = MESSAGEMAP[msgtype]() | ||||
msg_dict["msgtype"] = msgtype.decode() | msg_dict["msgtype"] = msgtype.decode() | ||||
try: | try: | ||||
msg.deserialize(msg_ser) | msg.deserialize(msg_ser) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
raise | raise | ||||
except Exception: | except Exception: | ||||
# Unable to deserialize message body | # Unable to deserialize message body | ||||
msg_ser.seek(0, os.SEEK_SET) | msg_ser.seek(0, os.SEEK_SET) | ||||
msg_dict["body"] = msg_ser.read().hex() | msg_dict["body"] = msg_ser.read().hex() | ||||
msg_dict["error"] = "Unable to deserialize message." | msg_dict["error"] = "Unable to deserialize message." | ||||
messages.append(msg_dict) | messages.append(msg_dict) | ||||
print( | print( | ||||
f"WARNING - Unable to deserialize message in {path}", | f"WARNING - Unable to deserialize message in {path}", | ||||
file=sys.stderr) | file=sys.stderr, | ||||
) | |||||
continue | continue | ||||
# Convert body of message into a jsonable object | # Convert body of message into a jsonable object | ||||
if length: | if length: | ||||
msg_dict["body"] = to_jsonable(msg) | msg_dict["body"] = to_jsonable(msg) | ||||
messages.append(msg_dict) | messages.append(msg_dict) | ||||
if progress_bar: | if progress_bar: | ||||
# Update the progress bar to the end of the current file | # Update the progress bar to the end of the current file | ||||
# in case we exited the loop early | # in case we exited the loop early | ||||
# Go to end of file | # Go to end of file | ||||
f_in.seek(0, os.SEEK_END) | f_in.seek(0, os.SEEK_END) | ||||
diff = f_in.tell() - bytes_read - 1 | diff = f_in.tell() - bytes_read - 1 | ||||
progress_bar.update(diff) | progress_bar.update(diff) | ||||
def main(): | def main(): | ||||
parser = argparse.ArgumentParser( | parser = argparse.ArgumentParser( | ||||
description=__doc__, | description=__doc__, | ||||
epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format( | epilog=( | ||||
sys.argv[0]), | f"EXAMPLE \n\t{sys.argv[0]} -o out.json <data-dir>/message_capture/**/*.dat" | ||||
formatter_class=argparse.RawTextHelpFormatter) | ), | ||||
parser.add_argument( | formatter_class=argparse.RawTextHelpFormatter, | ||||
"capturepaths", | ) | ||||
nargs='+', | |||||
help="binary message capture files to parse.") | |||||
parser.add_argument( | parser.add_argument( | ||||
"-o", "--output", | "capturepaths", nargs="+", help="binary message capture files to parse." | ||||
help="output file. If unset print to stdout") | ) | ||||
parser.add_argument("-o", "--output", help="output file. If unset print to stdout") | |||||
parser.add_argument( | parser.add_argument( | ||||
"-n", "--no-progress-bar", | "-n", | ||||
action='store_true', | "--no-progress-bar", | ||||
help="disable the progress bar. Automatically set if the output is not a terminal") | action="store_true", | ||||
help=( | |||||
"disable the progress bar. Automatically set if the output is not a" | |||||
" terminal" | |||||
), | |||||
) | |||||
args = parser.parse_args() | args = parser.parse_args() | ||||
capturepaths = [Path.cwd() / Path(capturepath) | capturepaths = [Path.cwd() / Path(capturepath) for capturepath in args.capturepaths] | ||||
for capturepath in args.capturepaths] | |||||
output = Path.cwd() / Path(args.output) if args.output else False | output = Path.cwd() / Path(args.output) if args.output else False | ||||
use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty() | use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty() | ||||
messages: List[Any] = [] | messages: List[Any] = [] | ||||
if use_progress_bar: | if use_progress_bar: | ||||
total_size = sum(capture.stat().st_size for capture in capturepaths) | total_size = sum(capture.stat().st_size for capture in capturepaths) | ||||
progress_bar = ProgressBar(total_size) | progress_bar = ProgressBar(total_size) | ||||
else: | else: | ||||
progress_bar = None | progress_bar = None | ||||
for capture in capturepaths: | for capture in capturepaths: | ||||
process_file( | process_file(str(capture), messages, "recv" in capture.stem, progress_bar) | ||||
str(capture), | |||||
messages, | |||||
"recv" in capture.stem, | |||||
progress_bar) | |||||
messages.sort(key=lambda msg: msg['time']) | messages.sort(key=lambda msg: msg["time"]) | ||||
if use_progress_bar: | if use_progress_bar: | ||||
progress_bar.set_progress(1) | progress_bar.set_progress(1) | ||||
jsonrep = json.dumps(messages) | jsonrep = json.dumps(messages) | ||||
if output: | if output: | ||||
with open(str(output), 'w+', encoding="utf8") as f_out: | with open(str(output), "w+", encoding="utf8") as f_out: | ||||
f_out.write(jsonrep) | f_out.write(jsonrep) | ||||
else: | else: | ||||
print(jsonrep) | print(jsonrep) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |