Merge pull request #6418
e5214a2ca
Adding ZMQ/Pub support for txpool_add and chain_main events (Lee Clagett)
This commit is contained in:
commit
d9deb2c2fe
61
ZMQ.md
Normal file
61
ZMQ.md
Normal file
@ -0,0 +1,61 @@
|
||||
# The Current/Future Status of ZMQ in Monero
|
||||
|
||||
## ZMQ Pub/Sub
|
||||
Client `ZMQ_SUB` sockets must "subscribe" to topics before it receives any data.
|
||||
This allows filtering on the server side, so network traffic is reduced. Monero
|
||||
allows for filtering on: (1) format, (2) context, and (3) event.
|
||||
|
||||
* **format** refers to the _wire_ format (i.e. JSON) used to send event
|
||||
information.
|
||||
* **context** allows for a reduction in fields for the event, so the
|
||||
daemon doesn't waste cycles serializing fields that get ignored.
|
||||
* **event** refers to status changes occurring within the daemon (i.e. new
|
||||
block to main chain).
|
||||
|
||||
* Formats:
|
||||
* `json`
|
||||
* Contexts:
|
||||
* `full` - the entire block or transaction is transmitted (the hash can be
|
||||
computed remotely).
|
||||
* `minimal` - the bare minimum for a remote client to react to an event is
|
||||
sent.
|
||||
* Events:
|
||||
* `chain_main` - changes to the primary/main blockchain.
|
||||
* `txpool_add` - new _publicly visible_ transactions in the mempool.
|
||||
Includes previously unseen transactions in a block but _not_ the
|
||||
`miner_tx`. Does not "re-publish" after a reorg. Includes `do_not_relay`
|
||||
transactions.
|
||||
|
||||
The subscription topics are formatted as `format-context-event`, with prefix
|
||||
matching supported by both Monero and ZMQ. The `format`, `context` and `event`
|
||||
will _never_ have hyphens or colons in their name. For example, subscribing to
|
||||
`json-minimal-chain_main` will send minimal information in JSON when changes
|
||||
to the main/primary blockchain occur. Whereas, subscribing to `json-minimal`
|
||||
will send minimal information in JSON on all available events supported by the
|
||||
daemon.
|
||||
|
||||
The Monero daemon will ensure that events prefixed by `chain` will be sent in
|
||||
"chain-order" - the `prev_id` (hash) field will _always_ refer to a previous
|
||||
block. On rollbacks/reorgs, the event will reference an earlier block in the
|
||||
chain instead of the last block. The Monero daemon also ensures that
|
||||
`txpool_add` events are sent before `chain_*` events - the `chain_*` messages
|
||||
will only serialize miner transactions since the other transactions were
|
||||
previously published via `txpool_add`. This prevents transactions from being
|
||||
serialized twice, even when the transaction was first observed in a block.
|
||||
|
||||
ZMQ Pub/Sub will drop messages if the network is congested, so the above rules
|
||||
for send order are used for detecting lost messages. A missing gap in `height`
|
||||
or `prev_id` for `chain_*` events indicates a lost pub message. Missing
|
||||
`txpool_add` messages can only be detected at the next `chain_` message.
|
||||
|
||||
Since blockchain events can be dropped, clients will likely want to have a
|
||||
timeout against `chain_main` events. The `GetLastBlockHeader` RPC is useful
|
||||
for checking the current chain state. Dropped messages should be rare in most
|
||||
conditions.
|
||||
|
||||
The Monero daemon will send a `txpool_add` pub exactly once for each
|
||||
transaction, even after a reorg or restarts. Clients should use the
|
||||
`GetTransactionPool` after a reorg to get all transactions that have been put
|
||||
back into the tx pool or been invalidated due to a double-spend.
|
||||
|
||||
|
@ -62,7 +62,7 @@ static void replace(std::vector<std::string> &v, const char *tag, const char *s)
|
||||
boost::replace_all(str, tag, s);
|
||||
}
|
||||
|
||||
int Notify::notify(const char *tag, const char *s, ...)
|
||||
int Notify::notify(const char *tag, const char *s, ...) const
|
||||
{
|
||||
std::vector<std::string> margs = args;
|
||||
|
||||
|
@ -38,8 +38,12 @@ class Notify
|
||||
{
|
||||
public:
|
||||
Notify(const char *spec);
|
||||
Notify(const Notify&) = default;
|
||||
Notify(Notify&&) = default;
|
||||
Notify& operator=(const Notify&) = default;
|
||||
Notify& operator=(Notify&&) = default;
|
||||
|
||||
int notify(const char *tag, const char *s, ...);
|
||||
int notify(const char *tag, const char *s, ...) const;
|
||||
|
||||
private:
|
||||
std::string filename;
|
||||
@ -47,3 +51,4 @@ private:
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
46
src/cryptonote_basic/events.h
Normal file
46
src/cryptonote_basic/events.h
Normal file
@ -0,0 +1,46 @@
|
||||
// Copyright (c) 2020, The Monero Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "crypto/hash.h"
|
||||
#include "cryptonote_basic/cryptonote_basic.h"
|
||||
|
||||
namespace cryptonote
|
||||
{
|
||||
/*! Transactions are expensive to move or copy (lots of 32-byte internal
|
||||
buffers). This allows `cryptonote::core` to do a single notification for
|
||||
a vector of transactions, without having to move/copy duplicate or invalid
|
||||
transactions. */
|
||||
struct txpool_event
|
||||
{
|
||||
cryptonote::transaction tx;
|
||||
crypto::hash hash;
|
||||
bool res; //!< Listeners must ignore `tx` when this is false.
|
||||
};
|
||||
}
|
36
src/cryptonote_basic/fwd.h
Normal file
36
src/cryptonote_basic/fwd.h
Normal file
@ -0,0 +1,36 @@
|
||||
// Copyright (c) 2020, The Monero Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace cryptonote
|
||||
{
|
||||
struct block;
|
||||
class transaction;
|
||||
struct txpool_event;
|
||||
}
|
@ -1234,10 +1234,15 @@ bool Blockchain::switch_to_alternative_blockchain(std::list<block_extended_info>
|
||||
reorg_notify->notify("%s", std::to_string(split_height).c_str(), "%h", std::to_string(m_db->height()).c_str(),
|
||||
"%n", std::to_string(m_db->height() - split_height).c_str(), "%d", std::to_string(discarded_blocks).c_str(), NULL);
|
||||
|
||||
std::shared_ptr<tools::Notify> block_notify = m_block_notify;
|
||||
if (block_notify)
|
||||
for (const auto &bei: alt_chain)
|
||||
block_notify->notify("%s", epee::string_tools::pod_to_hex(get_block_hash(bei.bl)).c_str(), NULL);
|
||||
for (const auto& notifier : m_block_notifiers)
|
||||
{
|
||||
std::size_t notify_height = split_height;
|
||||
for (const auto& bei: alt_chain)
|
||||
{
|
||||
notifier(notify_height, {std::addressof(bei.bl), 1});
|
||||
++notify_height;
|
||||
}
|
||||
}
|
||||
|
||||
MGINFO_GREEN("REORGANIZE SUCCESS! on height: " << split_height << ", new blockchain size: " << m_db->height());
|
||||
return true;
|
||||
@ -4236,12 +4241,9 @@ leave:
|
||||
get_difficulty_for_next_block(); // just to cache it
|
||||
invalidate_block_template_cache();
|
||||
|
||||
if (notify)
|
||||
{
|
||||
std::shared_ptr<tools::Notify> block_notify = m_block_notify;
|
||||
if (block_notify)
|
||||
block_notify->notify("%s", epee::string_tools::pod_to_hex(id).c_str(), NULL);
|
||||
}
|
||||
|
||||
for (const auto& notifier: m_block_notifiers)
|
||||
notifier(new_height - 1, {std::addressof(bl), 1});
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -5132,6 +5134,15 @@ void Blockchain::set_user_options(uint64_t maxthreads, bool sync_on_blocks, uint
|
||||
m_max_prepare_blocks_threads = maxthreads;
|
||||
}
|
||||
|
||||
void Blockchain::add_block_notify(boost::function<void(std::uint64_t, epee::span<const block>)>&& notify)
|
||||
{
|
||||
if (notify)
|
||||
{
|
||||
CRITICAL_REGION_LOCAL(m_blockchain_lock);
|
||||
m_block_notifiers.push_back(std::move(notify));
|
||||
}
|
||||
}
|
||||
|
||||
void Blockchain::safesyncmode(const bool onoff)
|
||||
{
|
||||
/* all of this is no-op'd if the user set a specific
|
||||
|
@ -30,6 +30,7 @@
|
||||
|
||||
#pragma once
|
||||
#include <boost/asio/io_service.hpp>
|
||||
#include <boost/function/function_fwd.hpp>
|
||||
#include <boost/serialization/serialization.hpp>
|
||||
#include <boost/serialization/version.hpp>
|
||||
#include <boost/serialization/list.hpp>
|
||||
@ -764,7 +765,7 @@ namespace cryptonote
|
||||
*
|
||||
* @param notify the notify object to call at every new block
|
||||
*/
|
||||
void set_block_notify(const std::shared_ptr<tools::Notify> ¬ify) { m_block_notify = notify; }
|
||||
void add_block_notify(boost::function<void(std::uint64_t, epee::span<const block>)> &¬ify);
|
||||
|
||||
/**
|
||||
* @brief sets a reorg notify object to call for every reorg
|
||||
@ -1125,7 +1126,11 @@ namespace cryptonote
|
||||
|
||||
bool m_batch_success;
|
||||
|
||||
std::shared_ptr<tools::Notify> m_block_notify;
|
||||
/* `boost::function` is used because the implementation never allocates if
|
||||
the callable object has a single `std::shared_ptr` or `std::weap_ptr`
|
||||
internally. Whereas, the libstdc++ `std::function` will allocate. */
|
||||
|
||||
std::vector<boost::function<void(std::uint64_t, epee::span<const block>)>> m_block_notifiers;
|
||||
std::shared_ptr<tools::Notify> m_reorg_notify;
|
||||
|
||||
// for prepare_handle_incoming_blocks
|
||||
|
@ -41,6 +41,7 @@ using namespace epee;
|
||||
#include "common/download.h"
|
||||
#include "common/threadpool.h"
|
||||
#include "common/command_line.h"
|
||||
#include "cryptonote_basic/events.h"
|
||||
#include "warnings.h"
|
||||
#include "crypto/crypto.h"
|
||||
#include "cryptonote_config.h"
|
||||
@ -51,6 +52,7 @@ using namespace epee;
|
||||
#include "ringct/rctTypes.h"
|
||||
#include "blockchain_db/blockchain_db.h"
|
||||
#include "ringct/rctSigs.h"
|
||||
#include "rpc/zmq_pub.h"
|
||||
#include "common/notify.h"
|
||||
#include "hardforks/hardforks.h"
|
||||
#include "version.h"
|
||||
@ -262,6 +264,13 @@ namespace cryptonote
|
||||
{
|
||||
m_blockchain_storage.set_enforce_dns_checkpoints(enforce_dns);
|
||||
}
|
||||
//-----------------------------------------------------------------------------------
|
||||
void core::set_txpool_listener(boost::function<void(std::vector<txpool_event>)> zmq_pub)
|
||||
{
|
||||
CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
|
||||
m_zmq_pub = std::move(zmq_pub);
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
bool core::update_checkpoints(const bool skip_dns /* = false */)
|
||||
{
|
||||
@ -614,7 +623,20 @@ namespace cryptonote
|
||||
try
|
||||
{
|
||||
if (!command_line::is_arg_defaulted(vm, arg_block_notify))
|
||||
m_blockchain_storage.set_block_notify(std::shared_ptr<tools::Notify>(new tools::Notify(command_line::get_arg(vm, arg_block_notify).c_str())));
|
||||
{
|
||||
struct hash_notify
|
||||
{
|
||||
tools::Notify cmdline;
|
||||
|
||||
void operator()(std::uint64_t, epee::span<const block> blocks) const
|
||||
{
|
||||
for (const block bl : blocks)
|
||||
cmdline.notify("%s", epee::string_tools::pod_to_hex(get_block_hash(bl)).c_str(), NULL);
|
||||
}
|
||||
};
|
||||
|
||||
m_blockchain_storage.add_block_notify(hash_notify{{command_line::get_arg(vm, arg_block_notify).c_str()}});
|
||||
}
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
@ -957,8 +979,7 @@ namespace cryptonote
|
||||
return false;
|
||||
}
|
||||
|
||||
struct result { bool res; cryptonote::transaction tx; crypto::hash hash; };
|
||||
std::vector<result> results(tx_blobs.size());
|
||||
std::vector<txpool_event> results(tx_blobs.size());
|
||||
|
||||
CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
|
||||
|
||||
@ -1023,6 +1044,7 @@ namespace cryptonote
|
||||
if (!tx_info.empty())
|
||||
handle_incoming_tx_accumulated_batch(tx_info, tx_relay == relay_method::block);
|
||||
|
||||
bool valid_events = false;
|
||||
bool ok = true;
|
||||
it = tx_blobs.begin();
|
||||
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
|
||||
@ -1045,10 +1067,18 @@ namespace cryptonote
|
||||
{MERROR_VER("Transaction verification impossible: " << results[i].hash);}
|
||||
|
||||
if(tvc[i].m_added_to_pool)
|
||||
{
|
||||
MDEBUG("tx added: " << results[i].hash);
|
||||
valid_events = true;
|
||||
}
|
||||
else
|
||||
results[i].res = false;
|
||||
}
|
||||
return ok;
|
||||
|
||||
if (valid_events && m_zmq_pub && matches_category(tx_relay, relay_category::legacy))
|
||||
m_zmq_pub(std::move(results));
|
||||
|
||||
return ok;
|
||||
CATCH_ENTRY_L0("core::handle_incoming_txs()", false);
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
|
@ -32,9 +32,11 @@
|
||||
|
||||
#include <ctime>
|
||||
|
||||
#include <boost/function.hpp>
|
||||
#include <boost/program_options/options_description.hpp>
|
||||
#include <boost/program_options/variables_map.hpp>
|
||||
|
||||
#include "cryptonote_basic/fwd.h"
|
||||
#include "cryptonote_core/i_core_events.h"
|
||||
#include "cryptonote_protocol/cryptonote_protocol_handler_common.h"
|
||||
#include "cryptonote_protocol/enums.h"
|
||||
@ -48,6 +50,7 @@
|
||||
#include "warnings.h"
|
||||
#include "crypto/hash.h"
|
||||
#include "span.h"
|
||||
#include "rpc/fwd.h"
|
||||
|
||||
PUSH_WARNINGS
|
||||
DISABLE_VS_WARNINGS(4355)
|
||||
@ -445,6 +448,13 @@ namespace cryptonote
|
||||
*/
|
||||
void set_enforce_dns_checkpoints(bool enforce_dns);
|
||||
|
||||
/**
|
||||
* @brief set a listener for txes being added to the txpool
|
||||
*
|
||||
* @param callable to notify, or empty function to disable.
|
||||
*/
|
||||
void set_txpool_listener(boost::function<void(std::vector<txpool_event>)> zmq_pub);
|
||||
|
||||
/**
|
||||
* @brief set whether or not to enable or disable DNS checkpoints
|
||||
*
|
||||
@ -1098,7 +1108,12 @@ namespace cryptonote
|
||||
bool m_fluffy_blocks_enabled;
|
||||
bool m_offline;
|
||||
|
||||
/* `boost::function` is used because the implementation never allocates if
|
||||
the callable object has a single `std::shared_ptr` or `std::weap_ptr`
|
||||
internally. Whereas, the libstdc++ `std::function` will allocate. */
|
||||
|
||||
std::shared_ptr<tools::Notify> m_block_rate_notify;
|
||||
boost::function<void(std::vector<txpool_event>)> m_zmq_pub;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -121,6 +121,10 @@ namespace daemon_args
|
||||
return val;
|
||||
}
|
||||
};
|
||||
const command_line::arg_descriptor<std::vector<std::string>> arg_zmq_pub = {
|
||||
"zmq-pub"
|
||||
, "Address for ZMQ pub - tcp://ip:port or ipc://path"
|
||||
};
|
||||
|
||||
const command_line::arg_descriptor<bool> arg_zmq_rpc_disabled = {
|
||||
"no-zmq"
|
||||
|
@ -34,10 +34,12 @@
|
||||
#include "misc_log_ex.h"
|
||||
#include "daemon/daemon.h"
|
||||
#include "rpc/daemon_handler.h"
|
||||
#include "rpc/zmq_pub.h"
|
||||
#include "rpc/zmq_server.h"
|
||||
|
||||
#include "common/password.h"
|
||||
#include "common/util.h"
|
||||
#include "cryptonote_basic/events.h"
|
||||
#include "daemon/core.h"
|
||||
#include "daemon/p2p.h"
|
||||
#include "daemon/protocol.h"
|
||||
@ -56,6 +58,17 @@ using namespace epee;
|
||||
|
||||
namespace daemonize {
|
||||
|
||||
struct zmq_internals
|
||||
{
|
||||
explicit zmq_internals(t_core& core, t_p2p& p2p)
|
||||
: rpc_handler{core.get(), p2p.get()}
|
||||
, server{rpc_handler}
|
||||
{}
|
||||
|
||||
cryptonote::rpc::DaemonHandler rpc_handler;
|
||||
cryptonote::rpc::ZmqServer server;
|
||||
};
|
||||
|
||||
struct t_internals {
|
||||
private:
|
||||
t_protocol protocol;
|
||||
@ -63,6 +76,7 @@ public:
|
||||
t_core core;
|
||||
t_p2p p2p;
|
||||
std::vector<std::unique_ptr<t_rpc>> rpcs;
|
||||
std::unique_ptr<zmq_internals> zmq;
|
||||
|
||||
t_internals(
|
||||
boost::program_options::variables_map const & vm
|
||||
@ -70,6 +84,7 @@ public:
|
||||
: core{vm}
|
||||
, protocol{vm, core, command_line::get_arg(vm, cryptonote::arg_offline)}
|
||||
, p2p{vm, protocol}
|
||||
, zmq{nullptr}
|
||||
{
|
||||
// Handle circular dependencies
|
||||
protocol.set_p2p_endpoint(p2p.get());
|
||||
@ -86,6 +101,28 @@ public:
|
||||
auto restricted_rpc_port = command_line::get_arg(vm, restricted_rpc_port_arg);
|
||||
rpcs.emplace_back(new t_rpc{vm, core, p2p, true, restricted_rpc_port, "restricted", true});
|
||||
}
|
||||
|
||||
if (!command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled))
|
||||
{
|
||||
zmq.reset(new zmq_internals{core, p2p});
|
||||
|
||||
const std::string zmq_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port);
|
||||
const std::string zmq_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip);
|
||||
|
||||
if (!zmq->server.init_rpc(zmq_address, zmq_port))
|
||||
throw std::runtime_error{"Failed to add TCP socket(" + zmq_address + ":" + zmq_port + ") to ZMQ RPC Server"};
|
||||
|
||||
std::shared_ptr<cryptonote::listener::zmq_pub> shared;
|
||||
const std::vector<std::string> zmq_pub = command_line::get_arg(vm, daemon_args::arg_zmq_pub);
|
||||
if (!zmq_pub.empty() && !(shared = zmq->server.init_pub(epee::to_span(zmq_pub))))
|
||||
throw std::runtime_error{"Failed to initialize zmq_pub"};
|
||||
|
||||
if (shared)
|
||||
{
|
||||
core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared});
|
||||
core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared});
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -103,9 +140,6 @@ t_daemon::t_daemon(
|
||||
: mp_internals{new t_internals{vm}},
|
||||
public_rpc_port(public_rpc_port)
|
||||
{
|
||||
zmq_rpc_bind_port = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_port);
|
||||
zmq_rpc_bind_address = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_bind_ip);
|
||||
zmq_rpc_disabled = command_line::get_arg(vm, daemon_args::arg_zmq_rpc_disabled);
|
||||
}
|
||||
|
||||
t_daemon::~t_daemon() = default;
|
||||
@ -169,31 +203,8 @@ bool t_daemon::run(bool interactive)
|
||||
rpc_commands->start_handling(std::bind(&daemonize::t_daemon::stop_p2p, this));
|
||||
}
|
||||
|
||||
cryptonote::rpc::DaemonHandler rpc_daemon_handler(mp_internals->core.get(), mp_internals->p2p.get());
|
||||
cryptonote::rpc::ZmqServer zmq_server(rpc_daemon_handler);
|
||||
|
||||
if (!zmq_rpc_disabled)
|
||||
{
|
||||
if (!zmq_server.addTCPSocket(zmq_rpc_bind_address, zmq_rpc_bind_port))
|
||||
{
|
||||
LOG_ERROR(std::string("Failed to add TCP Socket (") + zmq_rpc_bind_address
|
||||
+ ":" + zmq_rpc_bind_port + ") to ZMQ RPC Server");
|
||||
|
||||
if (rpc_commands)
|
||||
rpc_commands->stop_handling();
|
||||
|
||||
for(auto& rpc : mp_internals->rpcs)
|
||||
rpc->stop();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
MINFO("Starting ZMQ server...");
|
||||
zmq_server.run();
|
||||
|
||||
MINFO(std::string("ZMQ server started at ") + zmq_rpc_bind_address
|
||||
+ ":" + zmq_rpc_bind_port + ".");
|
||||
}
|
||||
if (mp_internals->zmq)
|
||||
mp_internals->zmq->server.run();
|
||||
else
|
||||
MINFO("ZMQ server disabled");
|
||||
|
||||
@ -208,8 +219,8 @@ bool t_daemon::run(bool interactive)
|
||||
if (rpc_commands)
|
||||
rpc_commands->stop_handling();
|
||||
|
||||
if (!zmq_rpc_disabled)
|
||||
zmq_server.stop();
|
||||
if (mp_internals->zmq)
|
||||
mp_internals->zmq->server.stop();
|
||||
|
||||
for(auto& rpc : mp_internals->rpcs)
|
||||
rpc->stop();
|
||||
|
@ -44,9 +44,6 @@ private:
|
||||
private:
|
||||
std::unique_ptr<t_internals> mp_internals;
|
||||
uint16_t public_rpc_port;
|
||||
std::string zmq_rpc_bind_address;
|
||||
std::string zmq_rpc_bind_port;
|
||||
bool zmq_rpc_disabled;
|
||||
public:
|
||||
t_daemon(
|
||||
boost::program_options::variables_map const & vm,
|
||||
|
@ -154,6 +154,7 @@ int main(int argc, char const * argv[])
|
||||
command_line::add_arg(core_settings, daemon_args::arg_public_node);
|
||||
command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_ip);
|
||||
command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_bind_port);
|
||||
command_line::add_arg(core_settings, daemon_args::arg_zmq_pub);
|
||||
command_line::add_arg(core_settings, daemon_args::arg_zmq_rpc_disabled);
|
||||
|
||||
daemonizer::init_options(hidden_options, visible_options);
|
||||
|
@ -158,20 +158,6 @@ namespace zmq
|
||||
return unsigned(max_out) < added ? max_out : int(added);
|
||||
}
|
||||
};
|
||||
|
||||
template<typename F, typename... T>
|
||||
expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...)))
|
||||
{
|
||||
for (;;)
|
||||
{
|
||||
if (0 <= op(args...))
|
||||
return success();
|
||||
|
||||
const int error = zmq_errno();
|
||||
if (error != EINTR)
|
||||
return make_error_code(error);
|
||||
}
|
||||
}
|
||||
} // anonymous
|
||||
|
||||
expect<std::string> receive(void* const socket, const int flags)
|
||||
|
@ -26,6 +26,8 @@
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <system_error>
|
||||
@ -105,6 +107,26 @@ namespace zmq
|
||||
//! Unique ZMQ socket handle, calls `zmq_close` on destruction.
|
||||
using socket = std::unique_ptr<void, close>;
|
||||
|
||||
/*! Retry a ZMQ function on `EINTR` errors. `F` must return an int with
|
||||
values less than 0 on error.
|
||||
|
||||
\param op The ZMQ function to execute + retry
|
||||
\param args Forwarded to `op`. Must be resuable in case of retry.
|
||||
\return All errors except for `EINTR`. */
|
||||
template<typename F, typename... T>
|
||||
expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...)))
|
||||
{
|
||||
for (;;)
|
||||
{
|
||||
if (0 <= op(args...))
|
||||
return success();
|
||||
|
||||
const int error = zmq_errno();
|
||||
if (error != EINTR)
|
||||
return make_error_code(error);
|
||||
}
|
||||
}
|
||||
|
||||
/*! Read all parts of the next message on `socket`. Blocks until the entire
|
||||
next message (all parts) are read, or until `zmq_term` is called on the
|
||||
`zmq_context` associated with `socket`. If the context is terminated,
|
||||
|
@ -45,8 +45,11 @@ set(daemon_messages_sources
|
||||
message.cpp
|
||||
daemon_messages.cpp)
|
||||
|
||||
set(rpc_pub_sources zmq_pub.cpp)
|
||||
|
||||
set(daemon_rpc_server_sources
|
||||
daemon_handler.cpp
|
||||
zmq_pub.cpp
|
||||
zmq_server.cpp)
|
||||
|
||||
|
||||
@ -59,8 +62,9 @@ set(rpc_headers
|
||||
rpc_version_str.h
|
||||
rpc_handler.h)
|
||||
|
||||
set(daemon_rpc_server_headers)
|
||||
set(rpc_pub_headers zmq_pub.h)
|
||||
|
||||
set(daemon_rpc_server_headers)
|
||||
|
||||
set(rpc_daemon_private_headers
|
||||
bootstrap_daemon.h
|
||||
@ -83,6 +87,8 @@ set(daemon_rpc_server_private_headers
|
||||
monero_private_headers(rpc
|
||||
${rpc_private_headers})
|
||||
|
||||
set(rpc_pub_private_headers)
|
||||
|
||||
monero_private_headers(daemon_rpc_server
|
||||
${daemon_rpc_server_private_headers})
|
||||
|
||||
@ -97,6 +103,11 @@ monero_add_library(rpc
|
||||
${rpc_headers}
|
||||
${rpc_private_headers})
|
||||
|
||||
monero_add_library(rpc_pub
|
||||
${rpc_pub_sources}
|
||||
${rpc_pub_headers}
|
||||
${rpc_pub_private_headers})
|
||||
|
||||
monero_add_library(daemon_messages
|
||||
${daemon_messages_sources}
|
||||
${daemon_messages_headers}
|
||||
@ -131,6 +142,14 @@ target_link_libraries(rpc
|
||||
PRIVATE
|
||||
${EXTRA_LIBRARIES})
|
||||
|
||||
target_link_libraries(rpc_pub
|
||||
PUBLIC
|
||||
epee
|
||||
net
|
||||
cryptonote_basic
|
||||
serialization
|
||||
${Boost_THREAD_LIBRARY})
|
||||
|
||||
target_link_libraries(daemon_messages
|
||||
LINK_PRIVATE
|
||||
cryptonote_core
|
||||
@ -142,6 +161,7 @@ target_link_libraries(daemon_messages
|
||||
target_link_libraries(daemon_rpc_server
|
||||
LINK_PRIVATE
|
||||
rpc
|
||||
rpc_pub
|
||||
cryptonote_core
|
||||
cryptonote_protocol
|
||||
version
|
||||
|
37
src/rpc/fwd.h
Normal file
37
src/rpc/fwd.h
Normal file
@ -0,0 +1,37 @@
|
||||
// Copyright (c) 2019-2020, The Monero Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace cryptonote
|
||||
{
|
||||
namespace listener
|
||||
{
|
||||
class zmq_pub;
|
||||
}
|
||||
}
|
478
src/rpc/zmq_pub.cpp
Normal file
478
src/rpc/zmq_pub.cpp
Normal file
@ -0,0 +1,478 @@
|
||||
// Copyright (c) 2020, The Monero Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#include "zmq_pub.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <boost/range/adaptor/filtered.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/thread/locks.hpp>
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <rapidjson/document.h>
|
||||
#include <rapidjson/stringbuffer.h>
|
||||
#include <rapidjson/writer.h>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "common/expect.h"
|
||||
#include "crypto/crypto.h"
|
||||
#include "cryptonote_basic/cryptonote_format_utils.h"
|
||||
#include "cryptonote_basic/events.h"
|
||||
#include "misc_log_ex.h"
|
||||
#include "serialization/json_object.h"
|
||||
|
||||
#undef MONERO_DEFAULT_LOG_CATEGORY
|
||||
#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq"
|
||||
|
||||
namespace
|
||||
{
|
||||
constexpr const char txpool_signal[] = "tx_signal";
|
||||
|
||||
using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span<const cryptonote::block>);
|
||||
using txpool_writer = void(epee::byte_stream&, epee::span<const cryptonote::txpool_event>);
|
||||
|
||||
template<typename F>
|
||||
struct context
|
||||
{
|
||||
char const* const name;
|
||||
F* generate_pub;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
bool operator<(const context<T>& lhs, const context<T>& rhs) noexcept
|
||||
{
|
||||
return std::strcmp(lhs.name, rhs.name) < 0;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
bool operator<(const context<T>& lhs, const boost::string_ref rhs) noexcept
|
||||
{
|
||||
return lhs.name < rhs;
|
||||
}
|
||||
|
||||
struct is_valid
|
||||
{
|
||||
bool operator()(const cryptonote::txpool_event& event) const noexcept
|
||||
{
|
||||
return event.res;
|
||||
}
|
||||
};
|
||||
|
||||
template<typename T, std::size_t N>
|
||||
void verify_sorted(const std::array<context<T>, N>& elems, const char* name)
|
||||
{
|
||||
auto unsorted = std::is_sorted_until(elems.begin(), elems.end());
|
||||
if (unsorted != elems.end())
|
||||
throw std::logic_error{name + std::string{" array is not properly sorted, see: "} + unsorted->name};
|
||||
}
|
||||
|
||||
void write_header(epee::byte_stream& buf, const boost::string_ref name)
|
||||
{
|
||||
buf.write(name.data(), name.size());
|
||||
buf.put(':');
|
||||
}
|
||||
|
||||
//! \return `name:...` where `...` is JSON and `name` is directly copied (no quotes - not JSON).
|
||||
template<typename T>
|
||||
void json_pub(epee::byte_stream& buf, const T value)
|
||||
{
|
||||
rapidjson::Writer<epee::byte_stream> dest{buf};
|
||||
using cryptonote::json::toJsonValue;
|
||||
toJsonValue(dest, value);
|
||||
}
|
||||
|
||||
//! Object for "minimal" block serialization
|
||||
struct minimal_chain
|
||||
{
|
||||
const std::uint64_t height;
|
||||
const epee::span<const cryptonote::block> blocks;
|
||||
};
|
||||
|
||||
//! Object for "minimal" tx serialization
|
||||
struct minimal_txpool
|
||||
{
|
||||
const cryptonote::transaction& tx;
|
||||
};
|
||||
|
||||
void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_chain self)
|
||||
{
|
||||
namespace adapt = boost::adaptors;
|
||||
|
||||
const auto to_block_id = [](const cryptonote::block& bl)
|
||||
{
|
||||
crypto::hash id;
|
||||
if (!get_block_hash(bl, id))
|
||||
MERROR("ZMQ/Pub failure: get_block_hash");
|
||||
return id;
|
||||
};
|
||||
|
||||
assert(!self.blocks.empty()); // checked in zmq_pub::send_chain_main
|
||||
|
||||
dest.StartObject();
|
||||
INSERT_INTO_JSON_OBJECT(dest, first_height, self.height);
|
||||
INSERT_INTO_JSON_OBJECT(dest, first_prev_id, self.blocks[0].prev_id);
|
||||
INSERT_INTO_JSON_OBJECT(dest, ids, (self.blocks | adapt::transformed(to_block_id)));
|
||||
dest.EndObject();
|
||||
}
|
||||
|
||||
void toJsonValue(rapidjson::Writer<epee::byte_stream>& dest, const minimal_txpool self)
|
||||
{
|
||||
crypto::hash id{};
|
||||
std::size_t blob_size = 0;
|
||||
if (!get_transaction_hash(self.tx, id, blob_size))
|
||||
{
|
||||
MERROR("ZMQ/Pub failure: get_transaction_hash");
|
||||
return;
|
||||
}
|
||||
|
||||
dest.StartObject();
|
||||
INSERT_INTO_JSON_OBJECT(dest, id, id);
|
||||
INSERT_INTO_JSON_OBJECT(dest, blob_size, blob_size);
|
||||
dest.EndObject();
|
||||
}
|
||||
|
||||
void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
|
||||
{
|
||||
json_pub(buf, blocks);
|
||||
}
|
||||
|
||||
void json_minimal_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
|
||||
{
|
||||
json_pub(buf, minimal_chain{height, blocks});
|
||||
}
|
||||
|
||||
// boost::adaptors are in place "views" - no copy/move takes place
|
||||
// moving transactions (via sort, etc.), is expensive!
|
||||
|
||||
void json_full_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes)
|
||||
{
|
||||
namespace adapt = boost::adaptors;
|
||||
const auto to_full_tx = [](const cryptonote::txpool_event& event)
|
||||
{
|
||||
return event.tx;
|
||||
};
|
||||
json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_full_tx)));
|
||||
}
|
||||
|
||||
void json_minimal_txpool(epee::byte_stream& buf, epee::span<const cryptonote::txpool_event> txes)
|
||||
{
|
||||
namespace adapt = boost::adaptors;
|
||||
const auto to_minimal_tx = [](const cryptonote::txpool_event& event)
|
||||
{
|
||||
return minimal_txpool{event.tx};
|
||||
};
|
||||
json_pub(buf, (txes | adapt::filtered(is_valid{}) | adapt::transformed(to_minimal_tx)));
|
||||
}
|
||||
|
||||
constexpr const std::array<context<chain_writer>, 2> chain_contexts =
|
||||
{{
|
||||
{u8"json-full-chain_main", json_full_chain},
|
||||
{u8"json-minimal-chain_main", json_minimal_chain}
|
||||
}};
|
||||
|
||||
constexpr const std::array<context<txpool_writer>, 2> txpool_contexts =
|
||||
{{
|
||||
{u8"json-full-txpool_add", json_full_txpool},
|
||||
{u8"json-minimal-txpool_add", json_minimal_txpool}
|
||||
}};
|
||||
|
||||
template<typename T, std::size_t N>
|
||||
epee::span<const context<T>> get_range(const std::array<context<T>, N>& contexts, const boost::string_ref value)
|
||||
{
|
||||
const auto not_prefix = [](const boost::string_ref lhs, const context<T>& rhs)
|
||||
{
|
||||
return !(boost::string_ref{rhs.name}.starts_with(lhs));
|
||||
};
|
||||
|
||||
const auto lower = std::lower_bound(contexts.begin(), contexts.end(), value);
|
||||
const auto upper = std::upper_bound(lower, contexts.end(), value, not_prefix);
|
||||
return {lower, std::size_t(upper - lower)};
|
||||
}
|
||||
|
||||
template<std::size_t N, typename T>
|
||||
void add_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first)
|
||||
{
|
||||
assert(range.size() <= N);
|
||||
assert(range.begin() - first <= N - range.size());
|
||||
|
||||
for (const auto& ctx : range)
|
||||
{
|
||||
const std::size_t i = std::addressof(ctx) - first;
|
||||
subs[i] = std::min(std::numeric_limits<std::size_t>::max() - 1, subs[i]) + 1;
|
||||
}
|
||||
}
|
||||
|
||||
template<std::size_t N, typename T>
|
||||
void remove_subscriptions(std::array<std::size_t, N>& subs, const epee::span<const context<T>> range, context<T> const* const first)
|
||||
{
|
||||
assert(range.size() <= N);
|
||||
assert(range.begin() - first <= N - range.size());
|
||||
|
||||
for (const auto& ctx : range)
|
||||
{
|
||||
const std::size_t i = std::addressof(ctx) - first;
|
||||
subs[i] = std::max(std::size_t(1), subs[i]) - 1;
|
||||
}
|
||||
}
|
||||
|
||||
template<std::size_t N, typename T, typename... U>
|
||||
std::array<epee::byte_slice, N> make_pubs(const std::array<std::size_t, N>& subs, const std::array<context<T>, N>& contexts, U&&... args)
|
||||
{
|
||||
epee::byte_stream buf{};
|
||||
|
||||
std::size_t last_offset = 0;
|
||||
std::array<std::size_t, N> offsets{{}};
|
||||
for (std::size_t i = 0; i < N; ++i)
|
||||
{
|
||||
if (subs[i])
|
||||
{
|
||||
write_header(buf, contexts[i].name);
|
||||
contexts[i].generate_pub(buf, std::forward<U>(args)...);
|
||||
offsets[i] = buf.size() - last_offset;
|
||||
last_offset = buf.size();
|
||||
}
|
||||
}
|
||||
|
||||
epee::byte_slice bytes{std::move(buf)};
|
||||
std::array<epee::byte_slice, N> out;
|
||||
for (std::size_t i = 0; i < N; ++i)
|
||||
out[i] = bytes.take_slice(offsets[i]);
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
template<std::size_t N>
|
||||
std::size_t send_messages(void* const socket, std::array<epee::byte_slice, N>& messages)
|
||||
{
|
||||
std::size_t count = 0;
|
||||
for (epee::byte_slice& message : messages)
|
||||
{
|
||||
if (!message.empty())
|
||||
{
|
||||
const expect<void> sent = net::zmq::send(std::move(message), socket, ZMQ_DONTWAIT);
|
||||
if (!sent)
|
||||
MERROR("Failed to send ZMQ/Pub message: " << sent.error().message());
|
||||
else
|
||||
++count;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
expect<bool> relay_block_pub(void* const relay, void* const pub) noexcept
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init(std::addressof(msg));
|
||||
MONERO_CHECK(net::zmq::retry_op(zmq_msg_recv, std::addressof(msg), relay, ZMQ_DONTWAIT));
|
||||
|
||||
const boost::string_ref payload{
|
||||
reinterpret_cast<const char*>(zmq_msg_data(std::addressof(msg))),
|
||||
zmq_msg_size(std::addressof(msg))
|
||||
};
|
||||
|
||||
if (payload == txpool_signal)
|
||||
{
|
||||
zmq_msg_close(std::addressof(msg));
|
||||
return false;
|
||||
}
|
||||
|
||||
// forward block messages (serialized on P2P thread for now)
|
||||
const expect<void> sent = net::zmq::retry_op(zmq_msg_send, std::addressof(msg), pub, ZMQ_DONTWAIT);
|
||||
if (!sent)
|
||||
{
|
||||
zmq_msg_close(std::addressof(msg));
|
||||
return sent.error();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
} // anonymous
|
||||
|
||||
namespace cryptonote { namespace listener
|
||||
{
|
||||
|
||||
zmq_pub::zmq_pub(void* context)
|
||||
: relay_(),
|
||||
chain_subs_{{0}},
|
||||
txpool_subs_{{0}},
|
||||
sync_()
|
||||
{
|
||||
if (!context)
|
||||
throw std::logic_error{"ZMQ context cannot be NULL"};
|
||||
|
||||
verify_sorted(chain_contexts, "chain_contexts");
|
||||
verify_sorted(txpool_contexts, "txpool_contexts");
|
||||
|
||||
relay_.reset(zmq_socket(context, ZMQ_PAIR));
|
||||
if (!relay_)
|
||||
MONERO_ZMQ_THROW("Failed to create relay socket");
|
||||
if (zmq_connect(relay_.get(), relay_endpoint()) != 0)
|
||||
MONERO_ZMQ_THROW("Failed to connect relay socket");
|
||||
}
|
||||
|
||||
zmq_pub::~zmq_pub()
|
||||
{}
|
||||
|
||||
bool zmq_pub::sub_request(boost::string_ref message)
|
||||
{
|
||||
if (!message.empty())
|
||||
{
|
||||
const char tag = message[0];
|
||||
message.remove_prefix(1);
|
||||
|
||||
const auto chain_range = get_range(chain_contexts, message);
|
||||
const auto txpool_range = get_range(txpool_contexts, message);
|
||||
|
||||
if (!chain_range.empty() || !txpool_range.empty())
|
||||
{
|
||||
MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " <<
|
||||
chain_range.size() << " chain topic(s) and " << txpool_range.size() << " txpool topic(s)");
|
||||
|
||||
const boost::lock_guard<boost::mutex> lock{sync_};
|
||||
switch (tag)
|
||||
{
|
||||
case 0:
|
||||
remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
|
||||
remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
|
||||
return true;
|
||||
case 1:
|
||||
add_subscriptions(chain_subs_, chain_range, chain_contexts.begin());
|
||||
add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin());
|
||||
return true;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
MERROR("Invalid ZMQ/Sub message");
|
||||
return false;
|
||||
}
|
||||
|
||||
bool zmq_pub::relay_to_pub(void* const relay, void* const pub)
|
||||
{
|
||||
const expect<bool> relayed = relay_block_pub(relay, pub);
|
||||
if (!relayed)
|
||||
{
|
||||
MERROR("Error relaying ZMQ/Pub: " << relayed.error().message());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!*relayed)
|
||||
{
|
||||
std::array<std::size_t, 2> subs;
|
||||
std::vector<cryptonote::txpool_event> events;
|
||||
{
|
||||
const boost::lock_guard<boost::mutex> lock{sync_};
|
||||
if (txes_.empty())
|
||||
return false;
|
||||
|
||||
subs = txpool_subs_;
|
||||
events = std::move(txes_.front());
|
||||
txes_.pop_front();
|
||||
}
|
||||
auto messages = make_pubs(subs, txpool_contexts, epee::to_span(events));
|
||||
send_messages(pub, messages);
|
||||
MDEBUG("Sent txpool ZMQ/Pub");
|
||||
}
|
||||
else
|
||||
MDEBUG("Sent chain_main ZMQ/Pub");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::size_t zmq_pub::send_chain_main(const std::uint64_t height, const epee::span<const cryptonote::block> blocks)
|
||||
{
|
||||
if (blocks.empty())
|
||||
return 0;
|
||||
|
||||
/* Block format only sends one block at a time - multiple block notifications
|
||||
are less common and only occur on rollbacks. */
|
||||
|
||||
boost::unique_lock<boost::mutex> guard{sync_};
|
||||
|
||||
const auto subs_copy = chain_subs_;
|
||||
guard.unlock();
|
||||
|
||||
for (const std::size_t sub : subs_copy)
|
||||
{
|
||||
if (sub)
|
||||
{
|
||||
/* cryptonote_core/blockchain.cpp cannot "give" us the block like core
|
||||
does for txpool events. Since copying the block is expensive anyway,
|
||||
serialization is done right here on the p2p thread (for now). */
|
||||
|
||||
auto messages = make_pubs(subs_copy, chain_contexts, height, blocks);
|
||||
guard.lock();
|
||||
return send_messages(relay_.get(), messages);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::size_t zmq_pub::send_txpool_add(std::vector<txpool_event> txes)
|
||||
{
|
||||
if (txes.empty())
|
||||
return 0;
|
||||
|
||||
const boost::lock_guard<boost::mutex> lock{sync_};
|
||||
for (const std::size_t sub : txpool_subs_)
|
||||
{
|
||||
if (sub)
|
||||
{
|
||||
const expect<void> sent = net::zmq::retry_op(zmq_send_const, relay_.get(), txpool_signal, sizeof(txpool_signal) - 1, ZMQ_DONTWAIT);
|
||||
if (sent)
|
||||
txes_.emplace_back(std::move(txes));
|
||||
else
|
||||
MERROR("ZMQ/Pub failure, relay queue error: " << sent.error().message());
|
||||
return bool(sent);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void zmq_pub::chain_main::operator()(const std::uint64_t height, epee::span<const cryptonote::block> blocks) const
|
||||
{
|
||||
const std::shared_ptr<zmq_pub> self = self_.lock();
|
||||
if (self)
|
||||
self->send_chain_main(height, blocks);
|
||||
else
|
||||
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
|
||||
}
|
||||
|
||||
void zmq_pub::txpool_add::operator()(std::vector<cryptonote::txpool_event> txes) const
|
||||
{
|
||||
const std::shared_ptr<zmq_pub> self = self_.lock();
|
||||
if (self)
|
||||
self->send_txpool_add(std::move(txes));
|
||||
else
|
||||
MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed");
|
||||
}
|
||||
|
||||
}}
|
110
src/rpc/zmq_pub.h
Normal file
110
src/rpc/zmq_pub.h
Normal file
@ -0,0 +1,110 @@
|
||||
// Copyright (c) 2020, The Monero Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <array>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/utility/string_ref.hpp>
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "cryptonote_basic/fwd.h"
|
||||
#include "net/zmq.h"
|
||||
#include "span.h"
|
||||
|
||||
namespace cryptonote { namespace listener
|
||||
{
|
||||
/*! \brief Sends ZMQ PUB messages on cryptonote events
|
||||
|
||||
Clients must ensure that all transaction(s) are notified before any blocks
|
||||
they are contained in, and must ensure that each block is notified in chain
|
||||
order. An external lock **must** be held by clients during the entire
|
||||
txpool check and notification sequence and (a possibly second) lock is held
|
||||
during the entire block check and notification sequence. Otherwise, events
|
||||
could be sent in a different order than processed. */
|
||||
class zmq_pub
|
||||
{
|
||||
/* Each socket has its own internal queue. So we can only use one socket, else
|
||||
the messages being published are not guaranteed to be in the same order
|
||||
pushed. */
|
||||
|
||||
net::zmq::socket relay_;
|
||||
std::deque<std::vector<txpool_event>> txes_;
|
||||
std::array<std::size_t, 2> chain_subs_;
|
||||
std::array<std::size_t, 2> txpool_subs_;
|
||||
boost::mutex sync_; //!< Synchronizes counts in `*_subs_` arrays.
|
||||
|
||||
public:
|
||||
//! \return Name of ZMQ_PAIR endpoint for pub notifications
|
||||
static constexpr const char* relay_endpoint() noexcept { return "inproc://pub_relay"; }
|
||||
|
||||
explicit zmq_pub(void* context);
|
||||
|
||||
zmq_pub(const zmq_pub&) = delete;
|
||||
zmq_pub(zmq_pub&&) = delete;
|
||||
|
||||
~zmq_pub();
|
||||
|
||||
zmq_pub& operator=(const zmq_pub&) = delete;
|
||||
zmq_pub& operator=(zmq_pub&&) = delete;
|
||||
|
||||
//! Process a client subscription request (from XPUB sockets). Thread-safe.
|
||||
bool sub_request(const boost::string_ref message);
|
||||
|
||||
/*! Forward ZMQ messages sent to `relay` via `send_chain_main` or
|
||||
`send_txpool_add` to `pub`. Used by `ZmqServer`. */
|
||||
bool relay_to_pub(void* relay, void* pub);
|
||||
|
||||
/*! Send a `ZMQ_PUB` notification for a change to the main chain.
|
||||
Thread-safe.
|
||||
\return Number of ZMQ messages sent to relay. */
|
||||
std::size_t send_chain_main(std::uint64_t height, epee::span<const cryptonote::block> blocks);
|
||||
|
||||
/*! Send a `ZMQ_PUB` notification for new tx(es) being added to the local
|
||||
pool. Thread-safe.
|
||||
\return Number of ZMQ messages sent to relay. */
|
||||
std::size_t send_txpool_add(std::vector<cryptonote::txpool_event> txes);
|
||||
|
||||
//! Callable for `send_chain_main` with weak ownership to `zmq_pub` object.
|
||||
struct chain_main
|
||||
{
|
||||
std::weak_ptr<zmq_pub> self_;
|
||||
void operator()(std::uint64_t height, epee::span<const cryptonote::block> blocks) const;
|
||||
};
|
||||
|
||||
//! Callable for `send_txpool_add` with weak ownership to `zmq_pub` object.
|
||||
struct txpool_add
|
||||
{
|
||||
std::weak_ptr<zmq_pub> self_;
|
||||
void operator()(std::vector<cryptonote::txpool_event> txes) const;
|
||||
};
|
||||
};
|
||||
}}
|
@ -29,10 +29,16 @@
|
||||
#include "zmq_server.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <utility>
|
||||
#include <stdexcept>
|
||||
#include <system_error>
|
||||
|
||||
#include "byte_slice.h"
|
||||
#include "rpc/zmq_pub.h"
|
||||
|
||||
#undef MONERO_DEFAULT_LOG_CATEGORY
|
||||
#define MONERO_DEFAULT_LOG_CATEGORY "net.zmq"
|
||||
|
||||
namespace cryptonote
|
||||
{
|
||||
@ -42,14 +48,57 @@ namespace
|
||||
constexpr const int num_zmq_threads = 1;
|
||||
constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
|
||||
constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
|
||||
}
|
||||
|
||||
net::zmq::socket init_socket(void* context, int type, epee::span<const std::string> addresses)
|
||||
{
|
||||
if (context == nullptr)
|
||||
throw std::logic_error{"NULL context provided"};
|
||||
|
||||
net::zmq::socket out{};
|
||||
out.reset(zmq_socket(context, type));
|
||||
if (!out)
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("Failed to create ZMQ socket");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(out.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
|
||||
if (zmq_setsockopt(out.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
for (const std::string& address : addresses)
|
||||
{
|
||||
if (zmq_bind(out.get(), address.c_str()) < 0)
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("ZMQ bind failed");
|
||||
return nullptr;
|
||||
}
|
||||
MINFO("ZMQ now listening at " << address);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
} // anonymous
|
||||
|
||||
namespace rpc
|
||||
{
|
||||
|
||||
ZmqServer::ZmqServer(RpcHandler& h) :
|
||||
handler(h),
|
||||
context(zmq_init(num_zmq_threads))
|
||||
context(zmq_init(num_zmq_threads)),
|
||||
rep_socket(nullptr),
|
||||
pub_socket(nullptr),
|
||||
relay_socket(nullptr),
|
||||
shared_state(nullptr)
|
||||
{
|
||||
if (!context)
|
||||
MONERO_ZMQ_THROW("Unable to create ZMQ context");
|
||||
@ -64,22 +113,59 @@ void ZmqServer::serve()
|
||||
try
|
||||
{
|
||||
// socket must close before `zmq_term` will exit.
|
||||
const net::zmq::socket socket = std::move(rep_socket);
|
||||
if (!socket)
|
||||
const net::zmq::socket rep = std::move(rep_socket);
|
||||
const net::zmq::socket pub = std::move(pub_socket);
|
||||
const net::zmq::socket relay = std::move(relay_socket);
|
||||
const std::shared_ptr<listener::zmq_pub> state = std::move(shared_state);
|
||||
|
||||
const unsigned init_count = unsigned(bool(pub)) + bool(relay) + bool(state);
|
||||
if (!rep || (init_count && init_count != 3))
|
||||
{
|
||||
MERROR("ZMQ RPC server reply socket is null");
|
||||
MERROR("ZMQ RPC server socket is null");
|
||||
return;
|
||||
}
|
||||
|
||||
MINFO("ZMQ Server started");
|
||||
|
||||
const int read_flags = pub ? ZMQ_DONTWAIT : 0;
|
||||
std::array<zmq_pollitem_t, 3> sockets =
|
||||
{{
|
||||
{relay.get(), 0, ZMQ_POLLIN, 0},
|
||||
{pub.get(), 0, ZMQ_POLLIN, 0},
|
||||
{rep.get(), 0, ZMQ_POLLIN, 0}
|
||||
}};
|
||||
|
||||
/* This uses XPUB to watch for subscribers, to reduce CPU cycles for
|
||||
serialization when the data will be dropped. This is important for block
|
||||
serialization, which is done on the p2p threads currently (see
|
||||
zmq_pub.cpp).
|
||||
|
||||
XPUB sockets are not thread-safe, so the p2p thread cannot write into
|
||||
the socket while we read here for subscribers. A ZMQ_PAIR socket is
|
||||
used for inproc notification. No data is every copied to kernel, it is
|
||||
all userspace messaging. */
|
||||
|
||||
while (1)
|
||||
{
|
||||
const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
|
||||
MDEBUG("Received RPC request: \"" << message << "\"");
|
||||
epee::byte_slice response = handler.handle(message);
|
||||
if (pub)
|
||||
MONERO_UNWRAP(net::zmq::retry_op(zmq_poll, sockets.data(), sockets.size(), -1));
|
||||
|
||||
const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
|
||||
MDEBUG("Sending RPC reply: \"" << response_view << "\"");
|
||||
MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get()));
|
||||
if (sockets[0].revents)
|
||||
state->relay_to_pub(relay.get(), pub.get());
|
||||
|
||||
if (sockets[1].revents)
|
||||
state->sub_request(MONERO_UNWRAP(net::zmq::receive(pub.get(), ZMQ_DONTWAIT)));
|
||||
|
||||
if (!pub || sockets[2].revents)
|
||||
{
|
||||
const std::string message = MONERO_UNWRAP(net::zmq::receive(rep.get(), read_flags));
|
||||
MDEBUG("Received RPC request: \"" << message << "\"");
|
||||
epee::byte_slice response = handler.handle(message);
|
||||
|
||||
const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
|
||||
MDEBUG("Sending RPC reply: \"" << response_view << "\"");
|
||||
MONERO_UNWRAP(net::zmq::send(std::move(response), rep.get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const std::system_error& e)
|
||||
@ -97,38 +183,12 @@ void ZmqServer::serve()
|
||||
}
|
||||
}
|
||||
|
||||
bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
|
||||
{
|
||||
MERROR("ZmqServer::addIPCSocket not yet implemented!");
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
|
||||
void* ZmqServer::init_rpc(boost::string_ref address, boost::string_ref port)
|
||||
{
|
||||
if (!context)
|
||||
{
|
||||
MERROR("ZMQ RPC Server already shutdown");
|
||||
return false;
|
||||
}
|
||||
|
||||
rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
|
||||
if (!rep_socket)
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
|
||||
return false;
|
||||
}
|
||||
|
||||
static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
|
||||
if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
|
||||
return false;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (address.empty())
|
||||
@ -141,12 +201,34 @@ bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
|
||||
bind_address += ":";
|
||||
bind_address.append(port.data(), port.size());
|
||||
|
||||
if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
|
||||
rep_socket = init_socket(context.get(), ZMQ_REP, {std::addressof(bind_address), 1});
|
||||
return bool(rep_socket) ? context.get() : nullptr;
|
||||
}
|
||||
|
||||
std::shared_ptr<listener::zmq_pub> ZmqServer::init_pub(epee::span<const std::string> addresses)
|
||||
{
|
||||
try
|
||||
{
|
||||
MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
|
||||
return false;
|
||||
shared_state = std::make_shared<listener::zmq_pub>(context.get());
|
||||
pub_socket = init_socket(context.get(), ZMQ_XPUB, addresses);
|
||||
if (!pub_socket)
|
||||
throw std::runtime_error{"Unable to initialize ZMQ_XPUB socket"};
|
||||
|
||||
const std::string relay_address[] = {listener::zmq_pub::relay_endpoint()};
|
||||
relay_socket = init_socket(context.get(), ZMQ_PAIR, relay_address);
|
||||
if (!relay_socket)
|
||||
throw std::runtime_error{"Unable to initialize ZMQ_PAIR relay"};
|
||||
}
|
||||
return true;
|
||||
catch (const std::runtime_error& e)
|
||||
{
|
||||
shared_state = nullptr;
|
||||
pub_socket = nullptr;
|
||||
relay_socket = nullptr;
|
||||
MERROR("Failed to create ZMQ/Pub listener: " << e.what());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return shared_state;
|
||||
}
|
||||
|
||||
void ZmqServer::run()
|
||||
@ -163,7 +245,6 @@ void ZmqServer::stop()
|
||||
run_thread.join();
|
||||
}
|
||||
|
||||
|
||||
} // namespace cryptonote
|
||||
|
||||
} // namespace rpc
|
||||
|
@ -30,10 +30,16 @@
|
||||
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include <boost/utility/string_ref.hpp>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "common/command_line.h"
|
||||
#include "cryptonote_basic/fwd.h"
|
||||
#include "net/zmq.h"
|
||||
#include "rpc_handler.h"
|
||||
#include "rpc/fwd.h"
|
||||
#include "rpc/rpc_handler.h"
|
||||
#include "span.h"
|
||||
|
||||
namespace cryptonote
|
||||
{
|
||||
@ -41,7 +47,7 @@ namespace cryptonote
|
||||
namespace rpc
|
||||
{
|
||||
|
||||
class ZmqServer
|
||||
class ZmqServer final
|
||||
{
|
||||
public:
|
||||
|
||||
@ -49,12 +55,13 @@ class ZmqServer
|
||||
|
||||
~ZmqServer();
|
||||
|
||||
static void init_options(boost::program_options::options_description& desc);
|
||||
|
||||
void serve();
|
||||
|
||||
bool addIPCSocket(boost::string_ref address, boost::string_ref port);
|
||||
bool addTCPSocket(boost::string_ref address, boost::string_ref port);
|
||||
//! \return ZMQ context on success, `nullptr` on failure
|
||||
void* init_rpc(boost::string_ref address, boost::string_ref port);
|
||||
|
||||
//! \return `nullptr` on errors.
|
||||
std::shared_ptr<listener::zmq_pub> init_pub(epee::span<const std::string> addresses);
|
||||
|
||||
void run();
|
||||
void stop();
|
||||
@ -67,9 +74,11 @@ class ZmqServer
|
||||
boost::thread run_thread;
|
||||
|
||||
net::zmq::socket rep_socket;
|
||||
net::zmq::socket pub_socket;
|
||||
net::zmq::socket relay_socket;
|
||||
std::shared_ptr<listener::zmq_pub> shared_state;
|
||||
};
|
||||
|
||||
|
||||
} // namespace cryptonote
|
||||
|
||||
} // namespace rpc
|
||||
|
@ -356,7 +356,7 @@ inline typename std::enable_if<sfinae::is_vector_like<Vec>::value, void>::type t
|
||||
dest.StartArray();
|
||||
for (const auto& t : vec)
|
||||
toJsonValue(dest, t);
|
||||
dest.EndArray(vec.size());
|
||||
dest.EndArray();
|
||||
}
|
||||
|
||||
template <typename Vec>
|
||||
|
@ -109,6 +109,7 @@ target_link_libraries(unit_tests
|
||||
cryptonote_protocol
|
||||
cryptonote_core
|
||||
daemon_messages
|
||||
daemon_rpc_server
|
||||
blockchain_db
|
||||
lmdb_lib
|
||||
rpc
|
||||
|
@ -15,7 +15,7 @@
|
||||
#include "serialization/json_object.h"
|
||||
|
||||
|
||||
namespace
|
||||
namespace test
|
||||
{
|
||||
cryptonote::transaction
|
||||
make_miner_transaction(cryptonote::account_public_address const& to)
|
||||
@ -82,7 +82,10 @@ namespace
|
||||
|
||||
return tx;
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
template<typename T>
|
||||
T test_json(const T& value)
|
||||
{
|
||||
@ -109,7 +112,7 @@ TEST(JsonSerialization, MinerTransaction)
|
||||
{
|
||||
cryptonote::account_base acct;
|
||||
acct.generate();
|
||||
const auto miner_tx = make_miner_transaction(acct.get_keys().m_account_address);
|
||||
const auto miner_tx = test::make_miner_transaction(acct.get_keys().m_account_address);
|
||||
|
||||
crypto::hash tx_hash{};
|
||||
ASSERT_TRUE(cryptonote::get_transaction_hash(miner_tx, tx_hash));
|
||||
@ -137,8 +140,8 @@ TEST(JsonSerialization, RegularTransaction)
|
||||
cryptonote::account_base acct2;
|
||||
acct2.generate();
|
||||
|
||||
const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address);
|
||||
const auto tx = make_transaction(
|
||||
const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address);
|
||||
const auto tx = test::make_transaction(
|
||||
acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, false, false
|
||||
);
|
||||
|
||||
@ -168,8 +171,8 @@ TEST(JsonSerialization, RingctTransaction)
|
||||
cryptonote::account_base acct2;
|
||||
acct2.generate();
|
||||
|
||||
const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address);
|
||||
const auto tx = make_transaction(
|
||||
const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address);
|
||||
const auto tx = test::make_transaction(
|
||||
acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, true, false
|
||||
);
|
||||
|
||||
@ -199,8 +202,8 @@ TEST(JsonSerialization, BulletproofTransaction)
|
||||
cryptonote::account_base acct2;
|
||||
acct2.generate();
|
||||
|
||||
const auto miner_tx = make_miner_transaction(acct1.get_keys().m_account_address);
|
||||
const auto tx = make_transaction(
|
||||
const auto miner_tx = test::make_miner_transaction(acct1.get_keys().m_account_address);
|
||||
const auto tx = test::make_transaction(
|
||||
acct1.get_keys(), {miner_tx}, {acct2.get_keys().m_account_address}, true, true
|
||||
);
|
||||
|
||||
|
42
tests/unit_tests/json_serialization.h
Normal file
42
tests/unit_tests/json_serialization.h
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright (c) 2020, The Monero Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace test
|
||||
{
|
||||
cryptonote::transaction make_miner_transaction(cryptonote::account_public_address const& to);
|
||||
|
||||
cryptonote::transaction
|
||||
make_transaction(
|
||||
cryptonote::account_keys const& from,
|
||||
std::vector<cryptonote::transaction> const& sources,
|
||||
std::vector<cryptonote::account_public_address> const& destinations,
|
||||
bool rct,
|
||||
bool bulletproof);
|
||||
}
|
@ -26,11 +26,25 @@
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#include <boost/preprocessor/stringize.hpp>
|
||||
#include <gtest/gtest.h>
|
||||
#include <rapidjson/document.h>
|
||||
|
||||
#include "cryptonote_basic/account.h"
|
||||
#include "cryptonote_basic/cryptonote_basic.h"
|
||||
#include "cryptonote_basic/events.h"
|
||||
#include "cryptonote_basic/cryptonote_format_utils.h"
|
||||
#include "json_serialization.h"
|
||||
#include "net/zmq.h"
|
||||
#include "rpc/message.h"
|
||||
#include "rpc/zmq_pub.h"
|
||||
#include "rpc/zmq_server.h"
|
||||
#include "serialization/json_object.h"
|
||||
|
||||
#define MASSERT(...) \
|
||||
if (!(__VA_ARGS__)) \
|
||||
return testing::AssertionFailure() << BOOST_PP_STRINGIZE(__VA_ARGS__)
|
||||
|
||||
TEST(ZmqFullMessage, InvalidRequest)
|
||||
{
|
||||
EXPECT_THROW(
|
||||
@ -53,3 +67,711 @@ TEST(ZmqFullMessage, Request)
|
||||
cryptonote::rpc::FullMessage parsed{request, true};
|
||||
EXPECT_STREQ("foo", parsed.getRequestType().c_str());
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
using published_json = std::pair<std::string, rapidjson::Document>;
|
||||
|
||||
constexpr const char inproc_pub[] = "inproc://dummy_pub";
|
||||
|
||||
net::zmq::socket create_socket(void* ctx, const char* address)
|
||||
{
|
||||
net::zmq::socket sock{zmq_socket(ctx, ZMQ_PAIR)};
|
||||
if (!sock)
|
||||
MONERO_ZMQ_THROW("failed to create socket");
|
||||
if (zmq_bind(sock.get(), address) != 0)
|
||||
MONERO_ZMQ_THROW("socket bind failure");
|
||||
return sock;
|
||||
}
|
||||
|
||||
std::vector<std::string> get_messages(void* socket, int count = -1)
|
||||
{
|
||||
std::vector<std::string> out;
|
||||
for ( ; count || count < 0; --count)
|
||||
{
|
||||
expect<std::string> next = net::zmq::receive(socket, (count < 0 ? ZMQ_DONTWAIT : 0));
|
||||
if (next == net::zmq::make_error_code(EAGAIN))
|
||||
return out;
|
||||
out.push_back(std::move(*next));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
std::vector<published_json> get_published(void* socket, int count = -1)
|
||||
{
|
||||
std::vector<published_json> out;
|
||||
|
||||
const auto messages = get_messages(socket, count);
|
||||
out.reserve(messages.size());
|
||||
|
||||
for (const std::string& message : messages)
|
||||
{
|
||||
const char* split = std::strchr(message.c_str(), ':');
|
||||
if (!split)
|
||||
throw std::runtime_error{"Invalid ZMQ/Pub message"};
|
||||
|
||||
out.emplace_back();
|
||||
out.back().first = {message.c_str(), split};
|
||||
if (out.back().second.Parse(split + 1).HasParseError())
|
||||
throw std::runtime_error{"Failed to parse ZMQ/Pub message"};
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
testing::AssertionResult compare_full_txpool(epee::span<const cryptonote::txpool_event> events, const published_json& pub)
|
||||
{
|
||||
MASSERT(pub.first == "json-full-txpool_add");
|
||||
MASSERT(pub.second.IsArray());
|
||||
MASSERT(pub.second.Size() <= events.size());
|
||||
|
||||
std::size_t i = 0;
|
||||
for (const cryptonote::txpool_event& event : events)
|
||||
{
|
||||
MASSERT(i <= pub.second.Size());
|
||||
if (!event.res)
|
||||
continue;
|
||||
|
||||
cryptonote::transaction tx{};
|
||||
cryptonote::json::fromJsonValue(pub.second[i], tx);
|
||||
|
||||
crypto::hash id{};
|
||||
MASSERT(cryptonote::get_transaction_hash(event.tx, id));
|
||||
MASSERT(cryptonote::get_transaction_hash(tx, id));
|
||||
MASSERT(event.tx.hash == tx.hash);
|
||||
++i;
|
||||
}
|
||||
return testing::AssertionSuccess();
|
||||
}
|
||||
|
||||
testing::AssertionResult compare_minimal_txpool(epee::span<const cryptonote::txpool_event> events, const published_json& pub)
|
||||
{
|
||||
MASSERT(pub.first == "json-minimal-txpool_add");
|
||||
MASSERT(pub.second.IsArray());
|
||||
MASSERT(pub.second.Size() <= events.size());
|
||||
|
||||
std::size_t i = 0;
|
||||
for (const cryptonote::txpool_event& event : events)
|
||||
{
|
||||
MASSERT(i <= pub.second.Size());
|
||||
if (!event.res)
|
||||
continue;
|
||||
|
||||
std::size_t actual_size = 0;
|
||||
crypto::hash actual_id{};
|
||||
|
||||
MASSERT(pub.second[i].IsObject());
|
||||
GET_FROM_JSON_OBJECT(pub.second[i], actual_id, id);
|
||||
GET_FROM_JSON_OBJECT(pub.second[i], actual_size, blob_size);
|
||||
|
||||
std::size_t expected_size = 0;
|
||||
crypto::hash expected_id{};
|
||||
MASSERT(cryptonote::get_transaction_hash(event.tx, expected_id, expected_size));
|
||||
MASSERT(expected_size == actual_size);
|
||||
MASSERT(expected_id == actual_id);
|
||||
++i;
|
||||
}
|
||||
return testing::AssertionSuccess();
|
||||
}
|
||||
|
||||
testing::AssertionResult compare_full_block(const epee::span<const cryptonote::block> expected, const published_json& pub)
|
||||
{
|
||||
MASSERT(pub.first == "json-full-chain_main");
|
||||
MASSERT(pub.second.IsArray());
|
||||
|
||||
std::vector<cryptonote::block> actual;
|
||||
cryptonote::json::fromJsonValue(pub.second, actual);
|
||||
|
||||
MASSERT(expected.size() == actual.size());
|
||||
|
||||
for (std::size_t i = 0; i < expected.size(); ++i)
|
||||
{
|
||||
crypto::hash id;
|
||||
MASSERT(cryptonote::get_block_hash(expected[i], id));
|
||||
MASSERT(cryptonote::get_block_hash(actual[i], id));
|
||||
MASSERT(expected[i].hash == actual[i].hash);
|
||||
}
|
||||
|
||||
return testing::AssertionSuccess();
|
||||
}
|
||||
|
||||
testing::AssertionResult compare_minimal_block(std::size_t height, const epee::span<const cryptonote::block> expected, const published_json& pub)
|
||||
{
|
||||
MASSERT(pub.first == "json-minimal-chain_main");
|
||||
MASSERT(pub.second.IsObject());
|
||||
MASSERT(!expected.empty());
|
||||
|
||||
std::size_t actual_height = 0;
|
||||
crypto::hash actual_id{};
|
||||
crypto::hash actual_prev_id{};
|
||||
std::vector<crypto::hash> actual_ids{};
|
||||
GET_FROM_JSON_OBJECT(pub.second, actual_height, first_height);
|
||||
GET_FROM_JSON_OBJECT(pub.second, actual_prev_id, first_prev_id);
|
||||
GET_FROM_JSON_OBJECT(pub.second, actual_ids, ids);
|
||||
|
||||
MASSERT(height == actual_height);
|
||||
MASSERT(expected[0].prev_id == actual_prev_id);
|
||||
MASSERT(expected.size() == actual_ids.size());
|
||||
|
||||
for (std::size_t i = 0; i < expected.size(); ++i)
|
||||
{
|
||||
crypto::hash id;
|
||||
MASSERT(cryptonote::get_block_hash(expected[i], id));
|
||||
MASSERT(id == actual_ids[i]);
|
||||
}
|
||||
|
||||
return testing::AssertionSuccess();
|
||||
}
|
||||
|
||||
struct zmq_base : public testing::Test
|
||||
{
|
||||
cryptonote::account_base acct;
|
||||
|
||||
zmq_base()
|
||||
: testing::Test(), acct()
|
||||
{
|
||||
acct.generate();
|
||||
}
|
||||
|
||||
cryptonote::transaction make_miner_transaction()
|
||||
{
|
||||
return test::make_miner_transaction(acct.get_keys().m_account_address);
|
||||
}
|
||||
|
||||
cryptonote::transaction make_transaction(const std::vector<cryptonote::account_public_address>& destinations)
|
||||
{
|
||||
return test::make_transaction(acct.get_keys(), {make_miner_transaction()}, destinations, true, true);
|
||||
}
|
||||
|
||||
cryptonote::transaction make_transaction()
|
||||
{
|
||||
cryptonote::account_base temp_account;
|
||||
temp_account.generate();
|
||||
return make_transaction({temp_account.get_keys().m_account_address});
|
||||
}
|
||||
|
||||
cryptonote::block make_block()
|
||||
{
|
||||
cryptonote::block block{};
|
||||
block.major_version = 1;
|
||||
block.minor_version = 3;
|
||||
block.timestamp = 100;
|
||||
block.prev_id = crypto::rand<crypto::hash>();
|
||||
block.nonce = 100;
|
||||
block.miner_tx = make_miner_transaction();
|
||||
return block;
|
||||
}
|
||||
};
|
||||
|
||||
struct zmq_pub : public zmq_base
|
||||
{
|
||||
net::zmq::context ctx;
|
||||
net::zmq::socket relay;
|
||||
net::zmq::socket dummy_pub;
|
||||
net::zmq::socket dummy_client;
|
||||
std::shared_ptr<cryptonote::listener::zmq_pub> pub;
|
||||
|
||||
zmq_pub()
|
||||
: zmq_base(),
|
||||
ctx(zmq_init(1)),
|
||||
relay(create_socket(ctx.get(), cryptonote::listener::zmq_pub::relay_endpoint())),
|
||||
dummy_pub(create_socket(ctx.get(), inproc_pub)),
|
||||
dummy_client(zmq_socket(ctx.get(), ZMQ_PAIR)),
|
||||
pub(std::make_shared<cryptonote::listener::zmq_pub>(ctx.get()))
|
||||
{
|
||||
if (!dummy_client)
|
||||
MONERO_ZMQ_THROW("failed to create socket");
|
||||
if (zmq_connect(dummy_client.get(), inproc_pub) != 0)
|
||||
MONERO_ZMQ_THROW("failed to connect to dummy pub");
|
||||
}
|
||||
|
||||
virtual void TearDown() override final
|
||||
{
|
||||
EXPECT_EQ(0u, get_messages(relay.get()).size());
|
||||
EXPECT_EQ(0u, get_messages(dummy_client.get()).size());
|
||||
}
|
||||
|
||||
template<std::size_t N>
|
||||
bool sub_request(const char (&topic)[N])
|
||||
{
|
||||
return pub->sub_request({topic, N - 1});
|
||||
}
|
||||
};
|
||||
|
||||
struct dummy_handler final : cryptonote::rpc::RpcHandler
|
||||
{
|
||||
dummy_handler()
|
||||
: cryptonote::rpc::RpcHandler()
|
||||
{}
|
||||
|
||||
virtual epee::byte_slice handle(const std::string& request) override final
|
||||
{
|
||||
throw std::logic_error{"not implemented"};
|
||||
}
|
||||
};
|
||||
|
||||
struct zmq_server : public zmq_base
|
||||
{
|
||||
dummy_handler handler;
|
||||
cryptonote::rpc::ZmqServer server;
|
||||
std::shared_ptr<cryptonote::listener::zmq_pub> pub;
|
||||
net::zmq::socket sub;
|
||||
|
||||
zmq_server()
|
||||
: zmq_base(),
|
||||
handler(),
|
||||
server(handler),
|
||||
pub(),
|
||||
sub()
|
||||
{
|
||||
void* ctx = server.init_rpc({}, {});
|
||||
if (!ctx)
|
||||
throw std::runtime_error{"init_rpc failure"};
|
||||
|
||||
const std::string endpoint = inproc_pub;
|
||||
pub = server.init_pub({std::addressof(endpoint), 1});
|
||||
if (!pub)
|
||||
throw std::runtime_error{"failed to initiaze zmq/pub"};
|
||||
|
||||
sub.reset(zmq_socket(ctx, ZMQ_SUB));
|
||||
if (!sub)
|
||||
MONERO_ZMQ_THROW("failed to create socket");
|
||||
if (zmq_connect(sub.get(), inproc_pub) != 0)
|
||||
MONERO_ZMQ_THROW("failed to connect to dummy pub");
|
||||
|
||||
server.run();
|
||||
}
|
||||
|
||||
virtual void TearDown() override final
|
||||
{
|
||||
EXPECT_EQ(0u, get_messages(sub.get()).size());
|
||||
sub.reset();
|
||||
pub.reset();
|
||||
server.stop();
|
||||
}
|
||||
|
||||
template<std::size_t N>
|
||||
void subscribe(const char (&topic)[N])
|
||||
{
|
||||
if (zmq_setsockopt(sub.get(), ZMQ_SUBSCRIBE, topic, N - 1) != 0)
|
||||
MONERO_ZMQ_THROW("failed to subscribe");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, InvalidContext)
|
||||
{
|
||||
EXPECT_THROW(cryptonote::listener::zmq_pub{nullptr}, std::logic_error);
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, NoBlocking)
|
||||
{
|
||||
EXPECT_FALSE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, DefaultDrop)
|
||||
{
|
||||
EXPECT_EQ(0u, pub->send_txpool_add({{make_transaction(), {}, true}}));
|
||||
|
||||
const cryptonote::block bl = make_block();
|
||||
EXPECT_EQ(0u,pub->send_chain_main(5, {std::addressof(bl), 1}));
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(5, {std::addressof(bl), 1}));
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonFullTxpool)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json-full-txpool_add";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
std::vector<cryptonote::txpool_event> events
|
||||
{
|
||||
{make_transaction(), {}, true}, {make_transaction(), {}, true}
|
||||
};
|
||||
|
||||
EXPECT_NO_THROW(pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_EQ(1u, pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonMinimalTxpool)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json-minimal-txpool_add";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
std::vector<cryptonote::txpool_event> events
|
||||
{
|
||||
{make_transaction(), {}, true}, {make_transaction(), {}, true}
|
||||
};
|
||||
|
||||
EXPECT_NO_THROW(pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_EQ(1u, pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonFullChain)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json-full-chain_main";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
|
||||
|
||||
EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonMinimalChain)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json-minimal-chain_main";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
|
||||
|
||||
EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front()));
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonFullAll)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json-full";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
{
|
||||
std::vector<cryptonote::txpool_event> events
|
||||
{
|
||||
{make_transaction(), {}, true}, {make_transaction(), {}, true}
|
||||
};
|
||||
|
||||
EXPECT_EQ(1u, pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_NO_THROW(pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
}
|
||||
{
|
||||
const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
|
||||
|
||||
EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonMinimalAll)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json-minimal";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
{
|
||||
std::vector<cryptonote::txpool_event> events
|
||||
{
|
||||
{make_transaction(), {}, true}, {make_transaction(), {}, true}
|
||||
};
|
||||
|
||||
EXPECT_EQ(1u, pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_NO_THROW(pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
}
|
||||
{
|
||||
const std::array<cryptonote::block, 2> blocks{{make_block(), make_block()}};
|
||||
|
||||
EXPECT_EQ(1u, pub->send_chain_main(100, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.front()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(1u, pubs.size());
|
||||
ASSERT_LE(1u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.front()));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonAll)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
{
|
||||
std::vector<cryptonote::txpool_event> events
|
||||
{
|
||||
{make_transaction(), {}, true}, {make_transaction(), {}, true}
|
||||
};
|
||||
|
||||
EXPECT_EQ(1u, pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(2u, pubs.size());
|
||||
ASSERT_LE(2u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(2u, pubs.size());
|
||||
ASSERT_LE(2u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_EQ(1u, pub->send_txpool_add(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(2u, pubs.size());
|
||||
ASSERT_LE(2u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
|
||||
|
||||
events.at(0).res = false;
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(events));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(2u, pubs.size());
|
||||
ASSERT_LE(2u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_txpool(epee::to_span(events), pubs.front()));
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.back()));
|
||||
}
|
||||
{
|
||||
const std::array<cryptonote::block, 1> blocks{{make_block()}};
|
||||
|
||||
EXPECT_EQ(2u, pub->send_chain_main(100, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
auto pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(2u, pubs.size());
|
||||
ASSERT_LE(2u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
|
||||
EXPECT_TRUE(compare_minimal_block(100, epee::to_span(blocks), pubs.back()));
|
||||
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
EXPECT_TRUE(pub->relay_to_pub(relay.get(), dummy_pub.get()));
|
||||
|
||||
pubs = get_published(dummy_client.get());
|
||||
EXPECT_EQ(2u, pubs.size());
|
||||
ASSERT_LE(2u, pubs.size());
|
||||
EXPECT_TRUE(compare_full_block(epee::to_span(blocks), pubs.front()));
|
||||
EXPECT_TRUE(compare_minimal_block(533, epee::to_span(blocks), pubs.back()));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonChainWeakPtrSkip)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
const std::array<cryptonote::block, 1> blocks{{make_block()}};
|
||||
|
||||
pub.reset();
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::chain_main{pub}(533, epee::to_span(blocks)));
|
||||
}
|
||||
|
||||
TEST_F(zmq_pub, JsonTxpoolWeakPtrSkip)
|
||||
{
|
||||
static constexpr const char topic[] = "\1json";
|
||||
|
||||
ASSERT_TRUE(sub_request(topic));
|
||||
|
||||
std::vector<cryptonote::txpool_event> events
|
||||
{
|
||||
{make_transaction(), {}, true}, {make_transaction(), {}, true}
|
||||
};
|
||||
|
||||
pub.reset();
|
||||
EXPECT_NO_THROW(cryptonote::listener::zmq_pub::txpool_add{pub}(std::move(events)));
|
||||
}
|
||||
|
||||
TEST_F(zmq_server, pub)
|
||||
{
|
||||
subscribe("json-minimal");
|
||||
|
||||
std::vector<cryptonote::txpool_event> events
|
||||
{
|
||||
{make_transaction(), {}, true}, {make_transaction(), {}, true}
|
||||
};
|
||||
|
||||
const std::array<cryptonote::block, 1> blocks{{make_block()}};
|
||||
|
||||
ASSERT_EQ(1u, pub->send_txpool_add(events));
|
||||
ASSERT_EQ(1u, pub->send_chain_main(200, epee::to_span(blocks)));
|
||||
|
||||
auto pubs = get_published(sub.get(), 2);
|
||||
EXPECT_EQ(2u, pubs.size());
|
||||
ASSERT_LE(2u, pubs.size());
|
||||
EXPECT_TRUE(compare_minimal_txpool(epee::to_span(events), pubs.front()));
|
||||
EXPECT_TRUE(compare_minimal_block(200, epee::to_span(blocks), pubs.back()));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user