diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index aed948265..20e5b3c8d 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -1,88 +1,94 @@ #!/usr/bin/env python3 # Copyright (c) 2014-2017 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ ZMQ example using python3's asyncio Bitcoin should be started with the command line arguments: bitcoind -testnet -daemon \ -zmqpubrawtx=tcp://127.0.0.1:28332 \ -zmqpubrawblock=tcp://127.0.0.1:28332 \ -zmqpubhashtx=tcp://127.0.0.1:28332 \ - -zmqpubhashblock=tcp://127.0.0.1:28332 + -zmqpubhashblock=tcp://127.0.0.1:28332 \ + -zmqpubsequence=tcp://127.0.0.1:28332 We use the asyncio library here. `self.handle()` installs itself as a future at the end of the function. Since it never returns with the event loop having an empty stack of futures, this creates an infinite loop. An alternative is to wrap the contents of `handle` inside `while True`. A blocking example using python 2.7 can be obtained from the git history: https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py """ import binascii import asyncio import zmq import zmq.asyncio import signal import struct import sys if not (sys.version_info.major >= 3 and sys.version_info.minor >= 5): print("This example only works with Python 3.5 and greater") sys.exit(1) port = 28332 -ip = "172.31.9.161" +ip = "127.0.0.1" class ZMQHandler(): def __init__(self): self.loop = asyncio.get_event_loop() self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence") self.zmqSubSocket.connect(f"tcp://{ip}:{port}") async def handle(self): - msg = await self.zmqSubSocket.recv_multipart() - topic = msg[0] - body = msg[1] + topic, body, seq = await self.zmqSubSocket.recv_multipart() sequence = "Unknown" - if len(msg[-1]) == 4: - msgSequence = struct.unpack('<I', msg[-1])[-1] - sequence = str(msgSequence) + if len(seq) == 4: + sequence = str(struct.unpack('<I', seq)[-1]) if topic == b"hashblock": print(f'- HASH BLOCK ({sequence}) -') print(binascii.hexlify(body)) elif topic == b"hashtx": print(f'- HASH TX ({sequence}) -') print(binascii.hexlify(body)) elif topic == b"rawblock": print(f'- RAW BLOCK HEADER ({sequence}) -') print(binascii.hexlify(body[:80])) elif topic == b"rawtx": print(f'- RAW TX ({sequence}) -') print(binascii.hexlify(body)) + elif topic == b"sequence": + hash = binascii.hexlify(body[:32]) + label = chr(body[32]) + mempool_sequence = (None if len(body) != 32 + 1 + 8 else + struct.unpack("<Q", body[32 + 1:])[0]) + print('- SEQUENCE (' + sequence + ') -') + print(hash, label, mempool_sequence) # schedule ourselves to receive the next message asyncio.ensure_future(self.handle()) def start(self): self.loop.add_signal_handler(signal.SIGINT, self.stop) self.loop.create_task(self.handle()) self.loop.run_forever() def stop(self): self.loop.stop() self.zmqContext.destroy() daemon = ZMQHandler() daemon.start() diff --git a/doc/zmq.md b/doc/zmq.md index 8638a6990..025c85160 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -1,135 +1,152 @@ # Block and Transaction Broadcasting with ZeroMQ [ZeroMQ](https://zeromq.org/) is a lightweight wrapper around TCP connections, inter-process communication, and shared-memory, providing various message-oriented semantics such as publish/subscribe, request/reply, and push/pull. The Bitcoin ABC daemon can be configured to act as a trusted "border router", implementing the bitcoin wire protocol and relay, making consensus decisions, maintaining the local blockchain database, broadcasting locally generated transactions into the network, and providing a queryable RPC interface to interact on a polled basis for requesting blockchain related data. However, there exists only a limited service to notify external software of events like the arrival of new blocks or transactions. The ZeroMQ facility implements a notification interface through a set of specific notifiers. Currently there are notifiers that publish blocks and transactions. This read-only facility requires only the connection of a corresponding ZeroMQ subscriber port in receiving software; it is not authenticated nor is there any two-way protocol involvement. Therefore, subscribers should validate the received data since it may be out of date, incomplete or even invalid. ZeroMQ sockets are self-connecting and self-healing; that is, connections made between two endpoints will be automatically restored after an outage, and either end may be freely started or stopped in any order. Because ZeroMQ is message oriented, subscribers receive transactions and blocks all-at-once and do not need to implement any sort of buffering or reassembly. ## Prerequisites The ZeroMQ feature in Bitcoin ABC requires the ZeroMQ API >= 4.1.5 [libzmq](https://github.com/zeromq/libzmq/releases). For version information, see [dependencies.md](dependencies.md). Typically, it is packaged by distributions as something like *libzmq3-dev*. The C++ wrapper for ZeroMQ is *not* needed. In order to run the example Python client scripts in the `contrib/zmq/` directory, one must also install [PyZMQ](https://github.com/zeromq/pyzmq) (generally with `pip install pyzmq`), though this is not necessary for daemon operation. ## Enabling By default, the ZeroMQ feature is automatically compiled. To disable, use -DBUILD_BITCOIN_ZMQ=OFF to `cmake` when building bitcoind: $ cmake -GNinja .. -DBUILD_BITCOIN_ZMQ=OFF [...] To actually enable operation, one must set the appropriate options on the command line or in the configuration file. ## Usage Currently, the following notifications are supported: -zmqpubhashtx=address -zmqpubhashblock=address -zmqpubrawblock=address -zmqpubrawtx=address + -zmqpubsequence=address The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. The option to set the PUB socket's outbound message high water mark (SNDHWM) may be set individually for each notification: -zmqpubhashtxhwm=n -zmqpubhashblockhwm=n -zmqpubrawblockhwm=n -zmqpubrawtxhwm=n + -zmqpubsequencehwm=address The high water mark value must be an integer greater than or equal to 0. For instance: $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \ -zmqpubhashtxhwm=10000 Each PUB notification has a topic and body, where the header corresponds to the notification type. For instance, for the notification `-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the transaction hash (32 -bytes). +bytes) for all but `sequence` topic. For `sequence`, the body +is structured as the following based on the type of message: + + <32-byte hash>C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + +Where the 8-byte uints correspond to the mempool sequence number. These options can also be provided in bitcoin.conf. ZeroMQ endpoint specifiers for TCP (and others) are documented in the [ZeroMQ API](http://api.zeromq.org/4-0:_start). Client side, then, the ZeroMQ subscriber socket must have the ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without doing so will result in no messages arriving. Please see [`contrib/zmq/zmq_sub.py`](/contrib/zmq/zmq_sub.py) for a working example. The ZMQ_PUB socket's ZMQ_TCP_KEEPALIVE option is enabled. This means that the underlying SO_KEEPALIVE option is enabled when using a TCP transport. The effective TCP keepalive values are managed through the underlying operating system configuration and must be configured prior to connection establishment. For example, when running on GNU/Linux, one might use the following to lower the keepalive setting to 10 minutes: sudo sysctl -w net.ipv4.tcp_keepalive_time=600 Setting the keepalive values appropriately for your operating environment may improve connectivity in situations where long-lived connections are silently dropped by network middle boxes. ## Remarks From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB sockets don't even have a read function. Thus, there is no state introduced into bitcoind directly. Furthermore, no information is broadcast that wasn't already received from the public P2P network. No authentication or authorization is done on connecting clients; it is assumed that the ZeroMQ port is exposed only to trusted entities, using other means such as firewalling. -Note that when the block chain tip changes, a reorganisation may occur -and just the tip will be notified. It is up to the subscriber to -retrieve the chain from the last known block to the new tip. Also note -that no notification occurs if the tip was in the active chain - this -is the case after calling invalidateblock RPC. +Note that for `*block` topics, when the block chain tip changes, +a reorganisation may occur and just the tip will be notified. +It is up to the subscriber to retrieve the chain from the last known +block to the new tip. Also note that no notification will occur if the tip +was in the active chain--as would be the case after calling invalidateblock RPC. +In contrast, the `sequence` topic publishes all block connections and +disconnections. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are using. Bitcoind appends an up-counting sequence number to each notification which allows listeners to detect lost notifications. + +The `sequence` topic refers specifically to the mempool sequence +number, which is also published along with all mempool events. This +is a different sequence value than in ZMQ itself in order to allow a total +ordering of mempool events to be constructed.