Use byte_slice for sending zmq messages - removes data copy within zmq
This commit is contained in:
parent
378cdeaeae
commit
da99157462
@ -42,7 +42,12 @@ namespace epee
|
|||||||
|
|
||||||
struct release_byte_slice
|
struct release_byte_slice
|
||||||
{
|
{
|
||||||
void operator()(byte_slice_data*) const noexcept;
|
//! For use with `zmq_message_init_data`, use second arg for buffer pointer.
|
||||||
|
static void call(void*, void* ptr) noexcept;
|
||||||
|
void operator()(byte_slice_data* ptr) const noexcept
|
||||||
|
{
|
||||||
|
call(nullptr, ptr);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/*! Inspired by slices in golang. Storage is thread-safe reference counted,
|
/*! Inspired by slices in golang. Storage is thread-safe reference counted,
|
||||||
@ -140,6 +145,9 @@ namespace epee
|
|||||||
\throw std::out_of_range If `size() < end`.
|
\throw std::out_of_range If `size() < end`.
|
||||||
\return Slice starting at `data() + begin` of size `end - begin`. */
|
\return Slice starting at `data() + begin` of size `end - begin`. */
|
||||||
byte_slice get_slice(std::size_t begin, std::size_t end) const;
|
byte_slice get_slice(std::size_t begin, std::size_t end) const;
|
||||||
|
|
||||||
|
//! \post `empty()` \return Ownership of ref-counted buffer.
|
||||||
|
std::unique_ptr<byte_slice_data, release_byte_slice> take_buffer() noexcept;
|
||||||
};
|
};
|
||||||
} // epee
|
} // epee
|
||||||
|
|
||||||
|
@ -49,12 +49,16 @@ namespace epee
|
|||||||
std::atomic<std::size_t> ref_count;
|
std::atomic<std::size_t> ref_count;
|
||||||
};
|
};
|
||||||
|
|
||||||
void release_byte_slice::operator()(byte_slice_data* ptr) const noexcept
|
void release_byte_slice::call(void*, void* ptr) noexcept
|
||||||
{
|
{
|
||||||
if (ptr && --(ptr->ref_count) == 0)
|
if (ptr)
|
||||||
{
|
{
|
||||||
ptr->~byte_slice_data();
|
byte_slice_data* self = static_cast<byte_slice_data*>(ptr);
|
||||||
free(ptr);
|
if (--(self->ref_count) == 0)
|
||||||
|
{
|
||||||
|
self->~byte_slice_data();
|
||||||
|
free(self);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,4 +210,11 @@ namespace epee
|
|||||||
return {};
|
return {};
|
||||||
return {storage_.get(), {portion_.begin() + begin, end - begin}};
|
return {storage_.get(), {portion_.begin() + begin, end - begin}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<byte_slice_data, release_byte_slice> byte_slice::take_buffer() noexcept
|
||||||
|
{
|
||||||
|
std::unique_ptr<byte_slice_data, release_byte_slice> out{std::move(storage_)};
|
||||||
|
portion_ = nullptr;
|
||||||
|
return out;
|
||||||
|
}
|
||||||
} // epee
|
} // epee
|
||||||
|
@ -33,6 +33,8 @@
|
|||||||
#include <limits>
|
#include <limits>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
#include "byte_slice.h"
|
||||||
|
|
||||||
namespace net
|
namespace net
|
||||||
{
|
{
|
||||||
namespace zmq
|
namespace zmq
|
||||||
@ -183,6 +185,22 @@ namespace zmq
|
|||||||
{
|
{
|
||||||
return retry_op(zmq_send, socket, payload.data(), payload.size(), flags);
|
return retry_op(zmq_send, socket, payload.data(), payload.size(), flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expect<void> send(epee::byte_slice&& payload, void* socket, int flags) noexcept
|
||||||
|
{
|
||||||
|
void* const data = const_cast<std::uint8_t*>(payload.data());
|
||||||
|
const std::size_t size = payload.size();
|
||||||
|
auto buffer = payload.take_buffer(); // clears `payload` from callee
|
||||||
|
|
||||||
|
zmq_msg_t msg{};
|
||||||
|
MONERO_ZMQ_CHECK(zmq_msg_init_data(std::addressof(msg), data, size, epee::release_byte_slice::call, buffer.get()));
|
||||||
|
buffer.release(); // zmq will now decrement byte_slice ref-count
|
||||||
|
|
||||||
|
expect<void> sent = retry_op(zmq_msg_send, std::addressof(msg), socket, flags);
|
||||||
|
if (!sent) // beware if removing `noexcept` from this function - possible leak here
|
||||||
|
zmq_msg_close(std::addressof(msg));
|
||||||
|
return sent;
|
||||||
|
}
|
||||||
} // zmq
|
} // zmq
|
||||||
} // net
|
} // net
|
||||||
|
|
||||||
|
@ -53,6 +53,11 @@
|
|||||||
#define MONERO_ZMQ_THROW(msg) \
|
#define MONERO_ZMQ_THROW(msg) \
|
||||||
MONERO_THROW( ::net::zmq::get_error_code(), msg )
|
MONERO_THROW( ::net::zmq::get_error_code(), msg )
|
||||||
|
|
||||||
|
namespace epee
|
||||||
|
{
|
||||||
|
class byte_slice;
|
||||||
|
}
|
||||||
|
|
||||||
namespace net
|
namespace net
|
||||||
{
|
{
|
||||||
namespace zmq
|
namespace zmq
|
||||||
@ -132,5 +137,24 @@ namespace zmq
|
|||||||
\param flags See `zmq_send` for possible flags.
|
\param flags See `zmq_send` for possible flags.
|
||||||
\return `success()` if sent, otherwise ZMQ error. */
|
\return `success()` if sent, otherwise ZMQ error. */
|
||||||
expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept;
|
expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept;
|
||||||
|
|
||||||
|
/*! Sends `payload` on `socket`. Blocks until the entire message is queued
|
||||||
|
for sending, or until `zmq_term` is called on the `zmq_context`
|
||||||
|
associated with `socket`. If the context is terminated,
|
||||||
|
`make_error_code(ETERM)` is returned.
|
||||||
|
|
||||||
|
\note This will automatically retry on `EINTR`, so exiting on
|
||||||
|
interrupts requires context termination.
|
||||||
|
\note If non-blocking behavior is requested on `socket` or by `flags`,
|
||||||
|
then `net::zmq::make_error_code(EAGAIN)` will be returned if this
|
||||||
|
would block.
|
||||||
|
|
||||||
|
\param payload sent as one message on `socket`.
|
||||||
|
\param socket Handle created with `zmq_socket`.
|
||||||
|
\param flags See `zmq_msg_send` for possible flags.
|
||||||
|
|
||||||
|
\post `payload.emtpy()` - ownership is transferred to zmq.
|
||||||
|
\return `success()` if sent, otherwise ZMQ error. */
|
||||||
|
expect<void> send(epee::byte_slice&& payload, void* socket, int flags = 0) noexcept;
|
||||||
} // zmq
|
} // zmq
|
||||||
} // net
|
} // net
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
#include <boost/uuid/nil_generator.hpp>
|
#include <boost/uuid/nil_generator.hpp>
|
||||||
|
#include <boost/utility/string_ref.hpp>
|
||||||
// likely included by daemon_handler.h's includes,
|
// likely included by daemon_handler.h's includes,
|
||||||
// but including here for clarity
|
// but including here for clarity
|
||||||
#include "cryptonote_core/cryptonote_core.h"
|
#include "cryptonote_core/cryptonote_core.h"
|
||||||
@ -48,7 +49,7 @@ namespace rpc
|
|||||||
{
|
{
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
using handler_function = std::string(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& msg);
|
using handler_function = epee::byte_slice(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& msg);
|
||||||
struct handler_map
|
struct handler_map
|
||||||
{
|
{
|
||||||
const char* method_name;
|
const char* method_name;
|
||||||
@ -66,7 +67,7 @@ namespace rpc
|
|||||||
}
|
}
|
||||||
|
|
||||||
template<typename Message>
|
template<typename Message>
|
||||||
std::string handle_message(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& parameters)
|
epee::byte_slice handle_message(DaemonHandler& handler, const rapidjson::Value& id, const rapidjson::Value& parameters)
|
||||||
{
|
{
|
||||||
typename Message::Request request{};
|
typename Message::Request request{};
|
||||||
request.fromJson(parameters);
|
request.fromJson(parameters);
|
||||||
@ -903,7 +904,7 @@ namespace rpc
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string DaemonHandler::handle(const std::string& request)
|
epee::byte_slice DaemonHandler::handle(const std::string& request)
|
||||||
{
|
{
|
||||||
MDEBUG("Handling RPC request: " << request);
|
MDEBUG("Handling RPC request: " << request);
|
||||||
|
|
||||||
@ -916,8 +917,11 @@ namespace rpc
|
|||||||
if (matched_handler == std::end(handlers) || matched_handler->method_name != request_type)
|
if (matched_handler == std::end(handlers) || matched_handler->method_name != request_type)
|
||||||
return BAD_REQUEST(request_type, req_full.getID());
|
return BAD_REQUEST(request_type, req_full.getID());
|
||||||
|
|
||||||
std::string response = matched_handler->call(*this, req_full.getID(), req_full.getMessage());
|
epee::byte_slice response = matched_handler->call(*this, req_full.getID(), req_full.getMessage());
|
||||||
MDEBUG("Returning RPC response: " << response);
|
|
||||||
|
const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
|
||||||
|
MDEBUG("Returning RPC response: " << response_view);
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "byte_slice.h"
|
||||||
#include "daemon_messages.h"
|
#include "daemon_messages.h"
|
||||||
#include "daemon_rpc_version.h"
|
#include "daemon_rpc_version.h"
|
||||||
#include "rpc_handler.h"
|
#include "rpc_handler.h"
|
||||||
@ -132,7 +133,7 @@ class DaemonHandler : public RpcHandler
|
|||||||
|
|
||||||
void handle(const GetOutputDistribution::Request& req, GetOutputDistribution::Response& res);
|
void handle(const GetOutputDistribution::Request& req, GetOutputDistribution::Response& res);
|
||||||
|
|
||||||
std::string handle(const std::string& request);
|
epee::byte_slice handle(const std::string& request) override final;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -149,7 +149,7 @@ cryptonote::rpc::error FullMessage::getError()
|
|||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string FullMessage::getRequest(const std::string& request, const Message& message, const unsigned id)
|
epee::byte_slice FullMessage::getRequest(const std::string& request, const Message& message, const unsigned id)
|
||||||
{
|
{
|
||||||
rapidjson::StringBuffer buffer;
|
rapidjson::StringBuffer buffer;
|
||||||
{
|
{
|
||||||
@ -172,11 +172,11 @@ std::string FullMessage::getRequest(const std::string& request, const Message& m
|
|||||||
if (!dest.IsComplete())
|
if (!dest.IsComplete())
|
||||||
throw std::logic_error{"Invalid JSON tree generated"};
|
throw std::logic_error{"Invalid JSON tree generated"};
|
||||||
}
|
}
|
||||||
return std::string{buffer.GetString(), buffer.GetSize()};
|
return epee::byte_slice{{buffer.GetString(), buffer.GetSize()}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
std::string FullMessage::getResponse(const Message& message, const rapidjson::Value& id)
|
epee::byte_slice FullMessage::getResponse(const Message& message, const rapidjson::Value& id)
|
||||||
{
|
{
|
||||||
rapidjson::StringBuffer buffer;
|
rapidjson::StringBuffer buffer;
|
||||||
{
|
{
|
||||||
@ -207,17 +207,17 @@ std::string FullMessage::getResponse(const Message& message, const rapidjson::Va
|
|||||||
if (!dest.IsComplete())
|
if (!dest.IsComplete())
|
||||||
throw std::logic_error{"Invalid JSON tree generated"};
|
throw std::logic_error{"Invalid JSON tree generated"};
|
||||||
}
|
}
|
||||||
return std::string{buffer.GetString(), buffer.GetSize()};
|
return epee::byte_slice{{buffer.GetString(), buffer.GetSize()}};
|
||||||
}
|
}
|
||||||
|
|
||||||
// convenience functions for bad input
|
// convenience functions for bad input
|
||||||
std::string BAD_REQUEST(const std::string& request)
|
epee::byte_slice BAD_REQUEST(const std::string& request)
|
||||||
{
|
{
|
||||||
rapidjson::Value invalid;
|
rapidjson::Value invalid;
|
||||||
return BAD_REQUEST(request, invalid);
|
return BAD_REQUEST(request, invalid);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id)
|
epee::byte_slice BAD_REQUEST(const std::string& request, const rapidjson::Value& id)
|
||||||
{
|
{
|
||||||
Message fail;
|
Message fail;
|
||||||
fail.status = Message::STATUS_BAD_REQUEST;
|
fail.status = Message::STATUS_BAD_REQUEST;
|
||||||
@ -225,7 +225,7 @@ std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id)
|
|||||||
return FullMessage::getResponse(fail, id);
|
return FullMessage::getResponse(fail, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string BAD_JSON(const std::string& error_details)
|
epee::byte_slice BAD_JSON(const std::string& error_details)
|
||||||
{
|
{
|
||||||
rapidjson::Value invalid;
|
rapidjson::Value invalid;
|
||||||
Message fail;
|
Message fail;
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
#include <rapidjson/writer.h>
|
#include <rapidjson/writer.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
#include "byte_slice.h"
|
||||||
#include "rpc/message_data_structs.h"
|
#include "rpc/message_data_structs.h"
|
||||||
|
|
||||||
namespace cryptonote
|
namespace cryptonote
|
||||||
@ -85,8 +86,8 @@ namespace rpc
|
|||||||
|
|
||||||
cryptonote::rpc::error getError();
|
cryptonote::rpc::error getError();
|
||||||
|
|
||||||
static std::string getRequest(const std::string& request, const Message& message, unsigned id);
|
static epee::byte_slice getRequest(const std::string& request, const Message& message, unsigned id);
|
||||||
static std::string getResponse(const Message& message, const rapidjson::Value& id);
|
static epee::byte_slice getResponse(const Message& message, const rapidjson::Value& id);
|
||||||
private:
|
private:
|
||||||
|
|
||||||
FullMessage() = default;
|
FullMessage() = default;
|
||||||
@ -99,10 +100,10 @@ namespace rpc
|
|||||||
|
|
||||||
|
|
||||||
// convenience functions for bad input
|
// convenience functions for bad input
|
||||||
std::string BAD_REQUEST(const std::string& request);
|
epee::byte_slice BAD_REQUEST(const std::string& request);
|
||||||
std::string BAD_REQUEST(const std::string& request, const rapidjson::Value& id);
|
epee::byte_slice BAD_REQUEST(const std::string& request, const rapidjson::Value& id);
|
||||||
|
|
||||||
std::string BAD_JSON(const std::string& error_details);
|
epee::byte_slice BAD_JSON(const std::string& error_details);
|
||||||
|
|
||||||
|
|
||||||
} // namespace rpc
|
} // namespace rpc
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include "byte_slice.h"
|
||||||
#include "crypto/hash.h"
|
#include "crypto/hash.h"
|
||||||
|
|
||||||
namespace cryptonote
|
namespace cryptonote
|
||||||
@ -54,7 +55,7 @@ class RpcHandler
|
|||||||
RpcHandler() { }
|
RpcHandler() { }
|
||||||
virtual ~RpcHandler() { }
|
virtual ~RpcHandler() { }
|
||||||
|
|
||||||
virtual std::string handle(const std::string& request) = 0;
|
virtual epee::byte_slice handle(const std::string& request) = 0;
|
||||||
|
|
||||||
static boost::optional<output_distribution_data>
|
static boost::optional<output_distribution_data>
|
||||||
get_output_distribution(const std::function<bool(uint64_t, uint64_t, uint64_t, uint64_t&, std::vector<uint64_t>&, uint64_t&)> &f, uint64_t amount, uint64_t from_height, uint64_t to_height, const std::function<crypto::hash(uint64_t)> &get_hash, bool cumulative, uint64_t blockchain_height);
|
get_output_distribution(const std::function<bool(uint64_t, uint64_t, uint64_t, uint64_t&, std::vector<uint64_t>&, uint64_t&)> &f, uint64_t amount, uint64_t from_height, uint64_t to_height, const std::function<crypto::hash(uint64_t)> &get_hash, bool cumulative, uint64_t blockchain_height);
|
||||||
|
@ -28,10 +28,13 @@
|
|||||||
|
|
||||||
#include "zmq_server.h"
|
#include "zmq_server.h"
|
||||||
|
|
||||||
|
#include <boost/utility/string_ref.hpp>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <system_error>
|
#include <system_error>
|
||||||
|
|
||||||
|
#include "byte_slice.h"
|
||||||
|
|
||||||
namespace cryptonote
|
namespace cryptonote
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -73,10 +76,11 @@ void ZmqServer::serve()
|
|||||||
{
|
{
|
||||||
const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
|
const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
|
||||||
MDEBUG("Received RPC request: \"" << message << "\"");
|
MDEBUG("Received RPC request: \"" << message << "\"");
|
||||||
const std::string& response = handler.handle(message);
|
epee::byte_slice response = handler.handle(message);
|
||||||
|
|
||||||
MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
|
const boost::string_ref response_view{reinterpret_cast<const char*>(response.data()), response.size()};
|
||||||
MDEBUG("Sent RPC reply: \"" << response << "\"");
|
MDEBUG("Sending RPC reply: \"" << response_view << "\"");
|
||||||
|
MONERO_UNWRAP(net::zmq::send(std::move(response), socket.get()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const std::system_error& e)
|
catch (const std::system_error& e)
|
||||||
|
@ -1702,6 +1702,45 @@ TEST(zmq, read_write)
|
|||||||
EXPECT_EQ(message, *received);
|
EXPECT_EQ(message, *received);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(zmq, read_write_slice)
|
||||||
|
{
|
||||||
|
net::zmq::context context{zmq_init(1)};
|
||||||
|
ASSERT_NE(nullptr, context);
|
||||||
|
|
||||||
|
net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
|
||||||
|
net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
|
||||||
|
ASSERT_NE(nullptr, send_socket);
|
||||||
|
ASSERT_NE(nullptr, recv_socket);
|
||||||
|
|
||||||
|
ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
|
||||||
|
ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
|
||||||
|
|
||||||
|
std::string message;
|
||||||
|
message.resize(1024);
|
||||||
|
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
|
||||||
|
|
||||||
|
{
|
||||||
|
epee::byte_slice slice_message{{epee::strspan<std::uint8_t>(message)}};
|
||||||
|
ASSERT_TRUE(bool(net::zmq::send(std::move(slice_message), send_socket.get())));
|
||||||
|
EXPECT_TRUE(slice_message.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
const expect<std::string> received = net::zmq::receive(recv_socket.get());
|
||||||
|
ASSERT_TRUE(bool(received));
|
||||||
|
EXPECT_EQ(message, *received);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(zmq, write_slice_fail)
|
||||||
|
{
|
||||||
|
std::string message;
|
||||||
|
message.resize(1024);
|
||||||
|
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
|
||||||
|
|
||||||
|
epee::byte_slice slice_message{std::move(message)};
|
||||||
|
EXPECT_FALSE(bool(net::zmq::send(std::move(slice_message), nullptr)));
|
||||||
|
EXPECT_TRUE(slice_message.empty());
|
||||||
|
}
|
||||||
|
|
||||||
TEST(zmq, read_write_multipart)
|
TEST(zmq, read_write_multipart)
|
||||||
{
|
{
|
||||||
net::zmq::context context{zmq_init(1)};
|
net::zmq::context context{zmq_init(1)};
|
||||||
|
Loading…
Reference in New Issue
Block a user