monero/tests/net_load_tests/clt.cpp
moneromooo-monero 7dbf76d0da
Fix an object lifetime bug in net load tests
The commands handler must not be destroyed before the config
object, or we'll be accessing freed memory.

An earlier attempt at using boost::shared_ptr to control object
lifetime turned out to be very invasive, though would be a
better solution in theory.
2017-10-09 16:46:42 +01:00

638 lines
28 KiB
C++

// Copyright (c) 2014-2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers
#include <atomic>
#include <chrono>
#include <functional>
#include <numeric>
#include <boost/thread/thread.hpp>
#include <vector>
#include "gtest/gtest.h"
#include "include_base_utils.h"
#include "misc_language.h"
#include "misc_log_ex.h"
#include "storages/levin_abstract_invoke2.h"
#include "net_load_tests.h"
using namespace net_load_tests;
namespace
{
const size_t CONNECTION_COUNT = 100000;
const size_t CONNECTION_TIMEOUT = 10000;
const size_t DEFAULT_OPERATION_TIMEOUT = 30000;
const size_t RESERVED_CONN_CNT = 1;
template<typename t_predicate>
bool busy_wait_for(size_t timeout_ms, const t_predicate& predicate, size_t sleep_ms = 10)
{
for (size_t i = 0; i < timeout_ms / sleep_ms; ++i)
{
if (predicate())
return true;
//std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
epee::misc_utils::sleep_no_w(static_cast<long>(sleep_ms));
}
return false;
}
class t_connection_opener_1
{
public:
t_connection_opener_1(test_tcp_server& tcp_server, size_t open_request_target)
: m_tcp_server(tcp_server)
, m_open_request_target(open_request_target)
, m_next_id(0)
, m_error_count(0)
, m_connections(open_request_target)
{
for (auto& conn_id : m_connections)
conn_id = boost::uuids::nil_uuid();
}
bool open()
{
size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed);
if (m_open_request_target <= id)
return false;
bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
if (!ec)
{
m_connections[id] = context.m_connection_id;
}
else
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
});
if (!r)
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
return true;
}
bool close(size_t id)
{
if (!m_connections[id].is_nil())
{
m_tcp_server.get_config_object().close(m_connections[id]);
return true;
}
else
{
return false;
}
}
size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
private:
test_tcp_server& m_tcp_server;
size_t m_open_request_target;
std::atomic<size_t> m_next_id;
std::atomic<size_t> m_error_count;
std::vector<boost::uuids::uuid> m_connections;
};
class t_connection_opener_2
{
public:
t_connection_opener_2(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count)
: m_tcp_server(tcp_server)
, m_open_request_target(open_request_target)
, m_open_request_count(0)
, m_error_count(0)
, m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count)
{
}
bool open_and_close()
{
size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed);
if (m_open_request_target <= req_count)
return false;
bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
if (!ec)
{
m_open_close_test_helper.handle_new_connection(context.m_connection_id);
}
else
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
});
if (!r)
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
return true;
}
void close_remaining_connections()
{
m_open_close_test_helper.close_remaining_connections();
}
size_t opened_connection_count() const { return m_open_close_test_helper.opened_connection_count(); }
size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
private:
test_tcp_server& m_tcp_server;
size_t m_open_request_target;
std::atomic<size_t> m_open_request_count;
std::atomic<size_t> m_error_count;
open_close_test_helper m_open_close_test_helper;
};
class net_load_test_clt : public ::testing::Test
{
public:
net_load_test_clt()
: m_tcp_server(epee::net_utils::e_connection_type_RPC) // RPC disables network limit for unit tests
{
}
protected:
virtual void SetUp()
{
m_thread_count = (std::max)(min_thread_count, boost::thread::hardware_concurrency() / 2);
m_tcp_server.get_config_object().set_handler(&m_commands_handler);
m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
ASSERT_TRUE(m_tcp_server.init_server(clt_port, "127.0.0.1"));
ASSERT_TRUE(m_tcp_server.run_server(m_thread_count, false));
// Connect to server
std::atomic<int> conn_status(0);
m_cmd_conn_id = boost::uuids::nil_uuid();
ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
if (!ec)
{
m_cmd_conn_id = context.m_connection_id;
}
else
{
LOG_ERROR("Connection error: " << ec.message());
}
conn_status.store(1, std::memory_order_seq_cst);
}));
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out";
ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
ASSERT_FALSE(m_cmd_conn_id.is_nil());
conn_status.store(0, std::memory_order_seq_cst);
CMD_RESET_STATISTICS::request req;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req,
m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) {
conn_status.store(code, std::memory_order_seq_cst);
}));
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "reset statistics timed out";
ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst));
}
virtual void TearDown()
{
m_tcp_server.send_stop_signal();
ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT));
}
static void TearDownTestCase()
{
// Stop server
test_levin_commands_handler *commands_handler_ptr = new test_levin_commands_handler();
test_levin_commands_handler &commands_handler = *commands_handler_ptr;
test_tcp_server tcp_server(epee::net_utils::e_connection_type_RPC);
tcp_server.get_config_object().set_handler(commands_handler_ptr, [](epee::levin::levin_commands_handler<test_connection_context> *handler)->void { delete handler; });
tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
if (!tcp_server.init_server(clt_port, "127.0.0.1")) return;
if (!tcp_server.run_server(2, false)) return;
// Connect to server and invoke shutdown command
std::atomic<int> conn_status(0);
boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid();
tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
cmd_conn_id = context.m_connection_id;
conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
});
if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return;
if (1 != conn_status.load(std::memory_order_seq_cst)) return;
epee::net_utils::notify_remote_command2(cmd_conn_id, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object());
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); });
}
template<typename Func>
static auto call_func(size_t /*thread_index*/, const Func& func, int) -> decltype(func())
{
func();
}
template<typename Func>
static auto call_func(size_t thread_index, const Func& func, long) -> decltype(func(thread_index))
{
func(thread_index);
}
template<typename Func>
void parallel_exec(const Func& func)
{
unit_test::call_counter properly_finished_threads;
std::vector<boost::thread> threads(m_thread_count);
for (size_t i = 0; i < threads.size(); ++i)
{
threads[i] = boost::thread([&, i] {
call_func(i, func, 0);
properly_finished_threads.inc();
});
}
for (auto& th : threads)
th.join();
ASSERT_EQ(properly_finished_threads.get(), m_thread_count);
}
void get_server_statistics(CMD_GET_STATISTICS::response& statistics)
{
std::atomic<int> req_status(0);
CMD_GET_STATISTICS::request req;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req,
m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) {
if (0 < code)
{
statistics = rsp;
}
else
{
LOG_ERROR("Get server statistics error: " << code);
}
req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
}));
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != req_status.load(std::memory_order_seq_cst); })) << "get_server_statistics timed out";
ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst));
}
template <typename t_predicate>
bool busy_wait_for_server_statistics(CMD_GET_STATISTICS::response& statistics, const t_predicate& predicate)
{
for (size_t i = 0; i < 30; ++i)
{
get_server_statistics(statistics);
if (predicate(statistics))
{
return true;
}
//std::this_thread::sleep_for(std::chrono::seconds(1));
epee::misc_utils::sleep_no_w(1000);
}
return false;
}
void ask_for_data_requests(size_t request_size = 0)
{
CMD_SEND_DATA_REQUESTS::request req;
req.request_size = request_size;
epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
}
protected:
test_tcp_server m_tcp_server;
test_levin_commands_handler m_commands_handler;
size_t m_thread_count;
boost::uuids::uuid m_cmd_conn_id;
};
}
TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
{
// Open connections
t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
parallel_exec([&] {
while (connection_opener.open());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
// Check
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
// Close connections
parallel_exec([&](size_t thread_idx) {
for (size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count)
{
connection_opener.close(i);
}
});
// Wait for all opened connections to close
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// Check all connections are closed
ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter());
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
CMD_GET_STATISTICS::response srv_stat;
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
// It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Request data from server, it causes to close rest connections
ask_for_data_requests();
// Wait for server to close rest connections
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status. All connections should be closed
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
}
TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server)
{
// Open connections
t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
parallel_exec([&] {
while (connection_opener.open());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
// Check
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
// Wait for server accepts all connections
CMD_GET_STATISTICS::response srv_stat;
int last_new_connection_counter = -1;
busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
});
// Close connections
CMD_CLOSE_ALL_CONNECTIONS::request req;
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
// Wait for all opened connections to close
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// It's OK, if server didn't close all connections, because it could accept not all our connections
ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Close rest connections
m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
if (ctx.m_connection_id != m_cmd_conn_id)
{
CMD_DATA_REQUEST::request req;
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
if (code <= 0)
{
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
}
});
if (!r)
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
}
return true;
});
// Wait for all opened connections to close
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// Check
ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
}
TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client)
{
static const size_t MAX_OPENED_CONN_COUNT = 100;
// Open/close connections
t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT);
parallel_exec([&] {
while (connection_opener.open_and_close());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
// Check
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
// Wait for all close requests to complete
EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){ return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; }));
LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
// Check
ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count());
connection_opener.close_remaining_connections();
// Wait for all close requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }));
LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT);
ASSERT_EQ(0, connection_opener.opened_connection_count());
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
CMD_GET_STATISTICS::response srv_stat;
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
// It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Request data from server, it causes to close rest connections
ask_for_data_requests();
// Wait for server to close rest connections
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status. All connections should be closed
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
}
TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server)
{
static const size_t MAX_OPENED_CONN_COUNT = 100;
// Init test
std::atomic<int> test_state(0);
CMD_START_OPEN_CLOSE_TEST::request req_start;
req_start.open_request_target = CONNECTION_COUNT;
req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) {
test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
}));
// Wait for server response
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 1 == test_state.load(std::memory_order_seq_cst); }));
ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst));
// Open connections
t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
parallel_exec([&] {
while (connection_opener.open());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
LOG_PRINT_L0("actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
// Wait for server accepts all connections
CMD_GET_STATISTICS::response srv_stat;
int last_new_connection_counter = -1;
busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
});
// Ask server to close rest connections
CMD_CLOSE_ALL_CONNECTIONS::request req;
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
// Wait for almost all connections to be closed by server
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });
// It's OK, if there are opened connections, because server could accept not all our connections
ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter());
ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Close rest connections
m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
if (ctx.m_connection_id != m_cmd_conn_id)
{
CMD_DATA_REQUEST::request req;
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
if (code <= 0)
{
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
}
});
if (!r)
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
}
return true;
});
// Wait for all opened connections to close
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// Check
ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
}
int main(int argc, char** argv)
{
epee::debug::get_set_enable_assert(true, false);
//set up logging options
mlog_configure(mlog_get_default_log_path("net_load_tests_clt.log"), true);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}