Page MenuHomePhabricator

No OneTemporary

diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py
index decf29d42a..6268123dd8 100755
--- a/contrib/zmq/zmq_sub.py
+++ b/contrib/zmq/zmq_sub.py
@@ -1,37 +1,41 @@
#!/usr/bin/env python2
import array
import binascii
import zmq
+import struct
port = 28332
zmqContext = zmq.Context()
zmqSubSocket = zmqContext.socket(zmq.SUB)
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx")
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
try:
while True:
msg = zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]
-
+ sequence = "Unknown";
+ if len(msg[-1]) == 4:
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ sequence = str(msgSequence)
if topic == "hashblock":
- print "- HASH BLOCK -"
+ print '- HASH BLOCK ('+sequence+') -'
print binascii.hexlify(body)
elif topic == "hashtx":
- print '- HASH TX -'
+ print '- HASH TX ('+sequence+') -'
print binascii.hexlify(body)
elif topic == "rawblock":
- print "- RAW BLOCK HEADER -"
+ print '- RAW BLOCK HEADER ('+sequence+') -'
print binascii.hexlify(body[:80])
elif topic == "rawtx":
- print '- RAW TX -'
+ print '- RAW TX ('+sequence+') -'
print binascii.hexlify(body)
except KeyboardInterrupt:
zmqContext.destroy()
diff --git a/doc/release-notes.md b/doc/release-notes.md
index 806d174ebf..9fb1a13072 100644
--- a/doc/release-notes.md
+++ b/doc/release-notes.md
@@ -1,76 +1,85 @@
(note: this is a temporary file, to be added-to by anybody, and moved to
release-notes at release time)
Notable changes
===============
Example item
----------------
bitcoin-cli: arguments privacy
--------------------------------
The RPC command line client gained a new argument, `-stdin`
to read extra arguments from standard input, one per line until EOF/Ctrl-D.
For example:
$ echo -e "mysecretcode\n120" | src/bitcoin-cli -stdin walletpassphrase
It is recommended to use this for sensitive information such as wallet
passphrases, as command-line arguments can usually be read from the process
table by any user on the system.
0.13.0 Change log
=================
Detailed release notes follow. This overview includes changes that affect
behavior, not code moves, refactors and string updates. For convenience in locating
the code changes and accompanying discussion, both the pull request and
git merge commit are mentioned.
### RPC and REST
Asm script outputs now contain OP_CHECKLOCKTIMEVERIFY in place of OP_NOP2
-------------------------------------------------------------------------
OP_NOP2 has been renamed to OP_CHECKLOCKTIMEVERIFY by [BIP
65](https://github.com/bitcoin/bips/blob/master/bip-0065.mediawiki)
The following outputs are affected by this change:
- RPC `getrawtransaction` (in verbose mode)
- RPC `decoderawtransaction`
- RPC `decodescript`
- REST `/rest/tx/` (JSON format)
- REST `/rest/block/` (JSON format when including extended tx details)
- `bitcoin-tx -json`
+### ZMQ
+
+Each ZMQ notification now contains an up-counting sequence number that allows
+listeners to detect lost notifications.
+The sequence number is always the last element in a multi-part ZMQ notification and
+therefore backward compatible.
+Each message type has its own counter.
+(https://github.com/bitcoin/bitcoin/pull/7762)
+
### Configuration and command-line options
### Block and transaction handling
### P2P protocol and network code
The p2p alert system has been removed in #7692 and the 'alert' message is no longer supported.
Fee filtering of invs (BIP 133)
------------------------------------
The optional new p2p message "feefilter" is implemented and the protocol
version is bumped to 70013. Upon receiving a feefilter message from a peer,
a node will not send invs for any transactions which do not meet the filter
feerate. [BIP 133](https://github.com/bitcoin/bips/blob/master/bip-0133.mediawiki)
### Validation
### Build system
### Wallet
### GUI
### Tests
### Miscellaneous
diff --git a/doc/zmq.md b/doc/zmq.md
index 902d1124c7..8d795a388a 100644
--- a/doc/zmq.md
+++ b/doc/zmq.md
@@ -1,101 +1,106 @@
# Block and Transaction Broadcasting With ZeroMQ
[ZeroMQ](http://zeromq.org/) is a lightweight wrapper around TCP
connections, inter-process communication, and shared-memory,
providing various message-oriented semantics such as publish/subscribe,
request/reply, and push/pull.
The Bitcoin Core daemon can be configured to act as a trusted "border
router", implementing the bitcoin wire protocol and relay, making
consensus decisions, maintaining the local blockchain database,
broadcasting locally generated transactions into the network, and
providing a queryable RPC interface to interact on a polled basis for
requesting blockchain related data. However, there exists only a
limited service to notify external software of events like the arrival
of new blocks or transactions.
The ZeroMQ facility implements a notification interface through a set
of specific notifiers. Currently there are notifiers that publish
blocks and transactions. This read-only facility requires only the
connection of a corresponding ZeroMQ subscriber port in receiving
software; it is not authenticated nor is there any two-way protocol
involvement. Therefore, subscribers should validate the received data
since it may be out of date, incomplete or even invalid.
ZeroMQ sockets are self-connecting and self-healing; that is,
connections made between two endpoints will be automatically restored
after an outage, and either end may be freely started or stopped in
any order.
Because ZeroMQ is message oriented, subscribers receive transactions
and blocks all-at-once and do not need to implement any sort of
buffering or reassembly.
## Prerequisites
The ZeroMQ feature in Bitcoin Core requires ZeroMQ API version 4.x or
newer. Typically, it is packaged by distributions as something like
*libzmq3-dev*. The C++ wrapper for ZeroMQ is *not* needed.
In order to run the example Python client scripts in contrib/ one must
also install *python-zmq*, though this is not necessary for daemon
operation.
## Enabling
By default, the ZeroMQ feature is automatically compiled in if the
necessary prerequisites are found. To disable, use --disable-zmq
during the *configure* step of building bitcoind:
$ ./configure --disable-zmq (other options)
To actually enable operation, one must set the appropriate options on
the commandline or in the configuration file.
## Usage
Currently, the following notifications are supported:
-zmqpubhashtx=address
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
For instance:
$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw
Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the hexadecimal transaction hash (32
bytes).
These options can also be provided in bitcoin.conf.
ZeroMQ endpoint specifiers for TCP (and others) are documented in the
[ZeroMQ API](http://api.zeromq.org/4-0:_start).
Client side, then, the ZeroMQ subscriber socket must have the
ZMQ_SUBSCRIBE option set to one or either of these prefixes (for
instance, just `hash`); without doing so will result in no messages
arriving. Please see `contrib/zmq/zmq_sub.py` for a working example.
## Remarks
From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB
sockets don't even have a read function. Thus, there is no state
introduced into bitcoind directly. Furthermore, no information is
broadcast that wasn't already received from the public P2P network.
No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip.
+
+There are several possibilities that ZMQ notification can get lost
+during transmission depending on the communication type your are
+using. Bitcoind appends an up-counting sequence number to each
+notification which allows listeners to detect lost notifications.
diff --git a/qa/rpc-tests/zmq_test.py b/qa/rpc-tests/zmq_test.py
index 3a8d62ef2e..97850bea3c 100755
--- a/qa/rpc-tests/zmq_test.py
+++ b/qa/rpc-tests/zmq_test.py
@@ -1,90 +1,103 @@
#!/usr/bin/env python2
# Copyright (c) 2015 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Test ZMQ interface
#
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
import zmq
import binascii
+import struct
try:
import http.client as httplib
except ImportError:
import httplib
try:
import urllib.parse as urlparse
except ImportError:
import urlparse
class ZMQTest (BitcoinTestFramework):
port = 28332
def setup_nodes(self):
self.zmqContext = zmq.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock")
self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % self.port)
return start_nodes(4, self.options.tmpdir, extra_args=[
['-zmqpubhashtx=tcp://127.0.0.1:'+str(self.port), '-zmqpubhashblock=tcp://127.0.0.1:'+str(self.port)],
[],
[],
[]
])
def run_test(self):
self.sync_all()
genhashes = self.nodes[0].generate(1)
self.sync_all()
print "listen..."
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
+ assert_equal(topic, b"hashtx")
body = msg[1]
+ nseq = msg[2]
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, 0) #must be sequence 0 on hashtx
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, 0) #must be sequence 0 on hashblock
blkhash = bytes_to_hex_str(body)
assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq
n = 10
genhashes = self.nodes[1].generate(n)
self.sync_all()
zmqHashes = []
+ blockcount = 0
for x in range(0,n*2):
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
if topic == b"hashblock":
zmqHashes.append(bytes_to_hex_str(body))
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, blockcount+1)
+ blockcount += 1
for x in range(0,n):
assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq
#test tx from a second node
hashRPC = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
self.sync_all()
# now we should receive a zmq msg because the tx was broadcast
msg = self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
hashZMQ = ""
if topic == b"hashtx":
hashZMQ = bytes_to_hex_str(body)
+ msgSequence = struct.unpack('<I', msg[-1])[-1]
+ assert_equal(msgSequence, blockcount+1)
assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq
if __name__ == '__main__':
ZMQTest ().main ()
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index a9ce9c48c4..b6c907980f 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -1,178 +1,191 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "chainparams.h"
#include "zmqpublishnotifier.h"
#include "main.h"
#include "util.h"
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
va_list args;
va_start(args, size);
while (1)
{
zmq_msg_t msg;
int rc = zmq_msg_init_size(&msg, size);
if (rc != 0)
{
zmqError("Unable to initialize ZMQ msg");
return -1;
}
void *buf = zmq_msg_data(&msg);
memcpy(buf, data, size);
data = va_arg(args, const void*);
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
if (rc == -1)
{
zmqError("Unable to send ZMQ msg");
zmq_msg_close(&msg);
return -1;
}
zmq_msg_close(&msg);
if (!data)
break;
size = va_arg(args, size_t);
}
return 0;
}
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
{
assert(!psocket);
// check if address is being used by other publish notifier
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
if (i==mapPublishNotifiers.end())
{
psocket = zmq_socket(pcontext, ZMQ_PUB);
if (!psocket)
{
zmqError("Failed to create socket");
return false;
}
int rc = zmq_bind(psocket, address.c_str());
if (rc!=0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
return false;
}
// register this notifier for the address, so it can be reused for other publish notifier
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
else
{
LogPrint("zmq", "zmq: Reusing socket for address %s\n", address);
psocket = i->second->psocket;
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
}
void CZMQAbstractPublishNotifier::Shutdown()
{
assert(psocket);
int count = mapPublishNotifiers.count(address);
// remove this notifier from the list of publishers using this address
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
for (iterator it = iterpair.first; it != iterpair.second; ++it)
{
if (it->second==this)
{
mapPublishNotifiers.erase(it);
break;
}
}
if (count == 1)
{
LogPrint("zmq", "Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
}
psocket = 0;
}
+bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
+{
+ assert(psocket);
+
+ /* send three parts, command & data & a LE 4byte sequence number */
+ unsigned char msgseq[sizeof(uint32_t)];
+ WriteLE32(&msgseq[0], nSequence);
+ int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
+ if (rc == -1)
+ return false;
+
+ /* increment memory only sequence number after sending */
+ nSequence++;
+
+ return true;
+}
+
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
- int rc = zmq_send_multipart(psocket, MSG_HASHBLOCK, 9, data, 32, 0);
- return rc == 0;
+ return SendMessage(MSG_HASHBLOCK, data, 32);
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
- int rc = zmq_send_multipart(psocket, MSG_HASHTX, 6, data, 32, 0);
- return rc == 0;
+ return SendMessage(MSG_HASHTX, data, 32);
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
if(!ReadBlockFromDisk(block, pindex, consensusParams))
{
zmqError("Can't read block from disk");
return false;
}
ss << block;
}
- int rc = zmq_send_multipart(psocket, MSG_RAWBLOCK, 8, &(*ss.begin()), ss.size(), 0);
- return rc == 0;
+ return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
- int rc = zmq_send_multipart(psocket, MSG_RAWTX, 5, &(*ss.begin()), ss.size(), 0);
- return rc == 0;
+ return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index 44d5cbea67..22f02a3d0d 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -1,43 +1,55 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
#include "zmqabstractnotifier.h"
class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
+private:
+ uint32_t nSequence; //! upcounting per message sequence number
+
public:
+
+ /* send zmq multipart message
+ parts:
+ * command
+ * data
+ * message sequence number
+ */
+ bool SendMessage(const char *command, const void* data, size_t size);
+
bool Initialize(void *pcontext);
void Shutdown();
};
class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlock(const CBlockIndex *pindex);
};
class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlock(const CBlockIndex *pindex);
};
class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H

File Metadata

Mime Type
text/x-diff
Expires
Sun, Mar 2, 12:01 (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5187757
Default Alt Text
(19 KB)

Event Timeline