Changeset View
Changeset View
Standalone View
Standalone View
contrib/zmq/zmq_sub.py
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||
# Copyright (c) 2014-2017 The Bitcoin Core developers | # Copyright (c) 2014-2017 The Bitcoin Core developers | ||||
# Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
""" | """ | ||||
ZMQ example using python3's asyncio | ZMQ example using python3's asyncio | ||||
Bitcoin should be started with the command line arguments: | Bitcoin should be started with the command line arguments: | ||||
bitcoind -testnet -daemon \ | bitcoind -testnet -daemon \ | ||||
-zmqpubrawtx=tcp://127.0.0.1:28332 \ | -zmqpubrawtx=tcp://127.0.0.1:28332 \ | ||||
-zmqpubrawblock=tcp://127.0.0.1:28332 \ | -zmqpubrawblock=tcp://127.0.0.1:28332 \ | ||||
-zmqpubhashtx=tcp://127.0.0.1:28332 \ | -zmqpubhashtx=tcp://127.0.0.1:28332 \ | ||||
-zmqpubhashblock=tcp://127.0.0.1:28332 | -zmqpubhashblock=tcp://127.0.0.1:28332 \ | ||||
-zmqpubsequence=tcp://127.0.0.1:28332 | |||||
We use the asyncio library here. `self.handle()` installs itself as a | We use the asyncio library here. `self.handle()` installs itself as a | ||||
future at the end of the function. Since it never returns with the event | future at the end of the function. Since it never returns with the event | ||||
loop having an empty stack of futures, this creates an infinite loop. An | loop having an empty stack of futures, this creates an infinite loop. An | ||||
alternative is to wrap the contents of `handle` inside `while True`. | alternative is to wrap the contents of `handle` inside `while True`. | ||||
A blocking example using python 2.7 can be obtained from the git history: | A blocking example using python 2.7 can be obtained from the git history: | ||||
https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py | https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py | ||||
""" | """ | ||||
import binascii | import binascii | ||||
import asyncio | import asyncio | ||||
import zmq | import zmq | ||||
import zmq.asyncio | import zmq.asyncio | ||||
import signal | import signal | ||||
import struct | import struct | ||||
import sys | import sys | ||||
if not (sys.version_info.major >= 3 and sys.version_info.minor >= 5): | if not (sys.version_info.major >= 3 and sys.version_info.minor >= 5): | ||||
print("This example only works with Python 3.5 and greater") | print("This example only works with Python 3.5 and greater") | ||||
sys.exit(1) | sys.exit(1) | ||||
port = 28332 | port = 28332 | ||||
ip = "172.31.9.161" | ip = "127.0.0.1" | ||||
class ZMQHandler(): | class ZMQHandler(): | ||||
def __init__(self): | def __init__(self): | ||||
self.loop = asyncio.get_event_loop() | self.loop = asyncio.get_event_loop() | ||||
self.zmqContext = zmq.asyncio.Context() | self.zmqContext = zmq.asyncio.Context() | ||||
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) | self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) | ||||
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) | self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) | ||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") | self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") | ||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") | self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") | ||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") | self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") | ||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") | self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") | ||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence") | |||||
self.zmqSubSocket.connect(f"tcp://{ip}:{port}") | self.zmqSubSocket.connect(f"tcp://{ip}:{port}") | ||||
async def handle(self): | async def handle(self): | ||||
msg = await self.zmqSubSocket.recv_multipart() | topic, body, seq = await self.zmqSubSocket.recv_multipart() | ||||
topic = msg[0] | |||||
body = msg[1] | |||||
sequence = "Unknown" | sequence = "Unknown" | ||||
if len(msg[-1]) == 4: | if len(seq) == 4: | ||||
msgSequence = struct.unpack('<I', msg[-1])[-1] | sequence = str(struct.unpack('<I', seq)[-1]) | ||||
sequence = str(msgSequence) | |||||
if topic == b"hashblock": | if topic == b"hashblock": | ||||
print(f'- HASH BLOCK ({sequence}) -') | print(f'- HASH BLOCK ({sequence}) -') | ||||
print(binascii.hexlify(body)) | print(binascii.hexlify(body)) | ||||
elif topic == b"hashtx": | elif topic == b"hashtx": | ||||
print(f'- HASH TX ({sequence}) -') | print(f'- HASH TX ({sequence}) -') | ||||
print(binascii.hexlify(body)) | print(binascii.hexlify(body)) | ||||
elif topic == b"rawblock": | elif topic == b"rawblock": | ||||
print(f'- RAW BLOCK HEADER ({sequence}) -') | print(f'- RAW BLOCK HEADER ({sequence}) -') | ||||
print(binascii.hexlify(body[:80])) | print(binascii.hexlify(body[:80])) | ||||
elif topic == b"rawtx": | elif topic == b"rawtx": | ||||
print(f'- RAW TX ({sequence}) -') | print(f'- RAW TX ({sequence}) -') | ||||
print(binascii.hexlify(body)) | print(binascii.hexlify(body)) | ||||
elif topic == b"sequence": | |||||
hash = binascii.hexlify(body[:32]) | |||||
label = chr(body[32]) | |||||
mempool_sequence = (None if len(body) != 32 + 1 + 8 else | |||||
struct.unpack("<Q", body[32 + 1:])[0]) | |||||
print('- SEQUENCE (' + sequence + ') -') | |||||
print(hash, label, mempool_sequence) | |||||
# schedule ourselves to receive the next message | # schedule ourselves to receive the next message | ||||
asyncio.ensure_future(self.handle()) | asyncio.ensure_future(self.handle()) | ||||
def start(self): | def start(self): | ||||
self.loop.add_signal_handler(signal.SIGINT, self.stop) | self.loop.add_signal_handler(signal.SIGINT, self.stop) | ||||
self.loop.create_task(self.handle()) | self.loop.create_task(self.handle()) | ||||
self.loop.run_forever() | self.loop.run_forever() | ||||
def stop(self): | def stop(self): | ||||
self.loop.stop() | self.loop.stop() | ||||
self.zmqContext.destroy() | self.zmqContext.destroy() | ||||
daemon = ZMQHandler() | daemon = ZMQHandler() | ||||
daemon.start() | daemon.start() |