Don't timeout a slow operation that's making progress

If we got at least MIN_BYTES_WANTED (default 512) during any network
poll, reset the timeout to allow more time for data to arrive.
This commit is contained in:
Howard Chu 2017-06-04 22:36:09 +01:00
parent 340830de5b
commit cf3a376cb5
No known key found for this signature in database
GPG Key ID: FD2A70B44AB11BA7

View File

@ -42,6 +42,10 @@
#undef MONERO_DEFAULT_LOG_CATEGORY #undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net" #define MONERO_DEFAULT_LOG_CATEGORY "net"
#ifndef MIN_BYTES_WANTED
#define MIN_BYTES_WANTED 512
#endif
namespace epee namespace epee
{ {
namespace levin namespace levin
@ -139,26 +143,23 @@ public:
virtual bool is_timer_started() const=0; virtual bool is_timer_started() const=0;
virtual void cancel()=0; virtual void cancel()=0;
virtual bool cancel_timer()=0; virtual bool cancel_timer()=0;
virtual void reset_timer()=0;
virtual void timeout_handler(const boost::system::error_code& error)=0;
}; };
template <class callback_t> template <class callback_t>
struct anvoke_handler: invoke_response_handler_base struct anvoke_handler: invoke_response_handler_base
{ {
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command) anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
:m_cb(cb), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false), :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command) m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
{ {
if(m_con.start_outer_call()) if(m_con.start_outer_call())
{ {
MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout)); m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
m_timer.async_wait([&con, command, cb](const boost::system::error_code& ec) m_timer.async_wait([this](const boost::system::error_code& ec)
{ {
if(ec == boost::asio::error::operation_aborted) timeout_handler(ec);
return;
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command);
std::string fake;
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
con.close();
con.finish_outer_call();
}); });
m_timer_started = true; m_timer_started = true;
} }
@ -171,7 +172,18 @@ public:
bool m_timer_started; bool m_timer_started;
bool m_cancel_timer_called; bool m_cancel_timer_called;
bool m_timer_cancelled; bool m_timer_cancelled;
uint64_t m_timeout;
int m_command; int m_command;
virtual void timeout_handler(const boost::system::error_code& error)
{
if(error == boost::asio::error::operation_aborted)
return;
MINFO(m_con.get_context_ref() << "Timeout on invoke operation happened, command: " << m_command << " timeout: " << m_timeout);
std::string fake;
m_cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, m_con.get_context_ref());
m_con.close();
m_con.finish_outer_call();
}
virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context) virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context)
{ {
if(!cancel_timer()) if(!cancel_timer())
@ -203,6 +215,18 @@ public:
} }
return m_timer_cancelled; return m_timer_cancelled;
} }
virtual void reset_timer()
{
boost::system::error_code ignored_ec;
if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
{
m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
m_timer.async_wait([this](const boost::system::error_code& ec)
{
timeout_handler(ec);
});
}
}
}; };
critical_section m_invoke_response_handlers_lock; critical_section m_invoke_response_handlers_lock;
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers; std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
@ -342,6 +366,13 @@ public:
if(m_cache_in_buffer.size() < m_current_head.m_cb) if(m_cache_in_buffer.size() < m_current_head.m_cb)
{ {
is_continue = false; is_continue = false;
if(cb >= MIN_BYTES_WANTED && !m_invoke_response_handlers.empty())
{
//async call scenario
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
response_handler->reset_timer();
MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
}
break; break;
} }
{ {
@ -595,9 +626,15 @@ public:
<< ", ver=" << head.m_protocol_version); << ", ver=" << head.m_protocol_version);
uint64_t ticks_start = misc_utils::get_tick_count(); uint64_t ticks_start = misc_utils::get_tick_count();
size_t prev_size = 0;
while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released) while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
{ {
if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
{
prev_size = m_cache_in_buffer.size();
ticks_start = misc_utils::get_tick_count();
}
if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout) if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
{ {
MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection "); MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");