Changeset View
Changeset View
Standalone View
Standalone View
contrib/zmq/zmq_sub.py
- This file was copied to contrib/zmq/zmq_sub27.py.
#!/usr/bin/env python2 | #!/usr/bin/env python3 | ||||
# Copyright (c) 2014-2016 The Bitcoin Core developers | # Copyright (c) 2014-2017 The Bitcoin Core developers | ||||
# Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
""" | |||||
ZMQ example using python3's asyncio | |||||
Bitcoin should be started with the command line arguments: | |||||
bitcoind -testnet -daemon \ | |||||
-zmqpubrawtx=tcp://127.0.0.1:28332 \ | |||||
-zmqpubrawblock=tcp://127.0.0.1:28332 \ | |||||
-zmqpubhashtx=tcp://127.0.0.1:28332 \ | |||||
-zmqpubhashblock=tcp://127.0.0.1:28332 | |||||
We use the asyncio library here. `self.handle()` installs itself as a | |||||
future at the end of the function. Since it never returns with the event | |||||
loop having an empty stack of futures, this creates an infinite loop. An | |||||
alternative is to wrap the contents of `handle` inside `while True`. | |||||
A blocking example using python 2.7 can be obtained from the git history: | |||||
https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py | |||||
""" | |||||
import binascii | import binascii | ||||
import asyncio | |||||
import zmq | import zmq | ||||
import zmq.asyncio | |||||
import signal | |||||
import struct | import struct | ||||
import sys | |||||
if not (sys.version_info.major >= 3 and sys.version_info.minor >= 5): | |||||
print("This example only works with Python 3.5 and greater") | |||||
sys.exit(1) | |||||
port = 28332 | port = 28332 | ||||
ip = "172.31.9.161" | |||||
zmqContext = zmq.Context() | class ZMQHandler(): | ||||
zmqSubSocket = zmqContext.socket(zmq.SUB) | def __init__(self): | ||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock") | self.loop = asyncio.get_event_loop() | ||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx") | self.zmqContext = zmq.asyncio.Context() | ||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock") | |||||
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx") | self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) | ||||
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) | self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") | ||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") | |||||
try: | self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") | ||||
while True: | self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") | ||||
msg = zmqSubSocket.recv_multipart() | self.zmqSubSocket.connect(f"tcp://{ip}:{port}") | ||||
topic = str(msg[0]) | |||||
async def handle(self): | |||||
msg = await self.zmqSubSocket.recv_multipart() | |||||
topic = msg[0] | |||||
body = msg[1] | body = msg[1] | ||||
sequence = "Unknown" | sequence = "Unknown" | ||||
if len(msg[-1]) == 4: | if len(msg[-1]) == 4: | ||||
msgSequence = struct.unpack('<I', msg[-1])[-1] | msgSequence = struct.unpack('<I', msg[-1])[-1] | ||||
sequence = str(msgSequence) | sequence = str(msgSequence) | ||||
if topic == "hashblock": | if topic == b"hashblock": | ||||
print '- HASH BLOCK (' + sequence + ') -' | print(f'- HASH BLOCK ({sequence}) -') | ||||
print binascii.hexlify(body) | print(binascii.hexlify(body)) | ||||
elif topic == "hashtx": | elif topic == b"hashtx": | ||||
print '- HASH TX (' + sequence + ') -' | print(f'- HASH TX ({sequence}) -') | ||||
print binascii.hexlify(body) | print(binascii.hexlify(body)) | ||||
elif topic == "rawblock": | elif topic == b"rawblock": | ||||
print '- RAW BLOCK HEADER (' + sequence + ') -' | print(f'- RAW BLOCK HEADER ({sequence}) -') | ||||
print binascii.hexlify(body[:80]) | print(binascii.hexlify(body[:80])) | ||||
elif topic == "rawtx": | elif topic == b"rawtx": | ||||
print '- RAW TX (' + sequence + ') -' | print(f'- RAW TX ({sequence}) -') | ||||
print binascii.hexlify(body) | print(binascii.hexlify(body)) | ||||
# schedule ourselves to receive the next message | |||||
asyncio.ensure_future(self.handle()) | |||||
def start(self): | |||||
self.loop.add_signal_handler(signal.SIGINT, self.stop) | |||||
self.loop.create_task(self.handle()) | |||||
self.loop.run_forever() | |||||
def stop(self): | |||||
self.loop.stop() | |||||
self.zmqContext.destroy() | |||||
except KeyboardInterrupt: | daemon = ZMQHandler() | ||||
zmqContext.destroy() | daemon.start() |