Convert bufferevents to use rate-limiting.

This requires the latest Git version of Libevent as of 24 March 2010.
In the future, we'll just say it requires Libevent 2.0.5-alpha or
later.

Since Libevent doesn't yet support hierarchical rate limit groups,
there isn't yet support for tracking relayed-bytes separately when
using the bufferevent system.  If a future version does add support
for hierarchical buckets, we can add that back in.
This commit is contained in:
Nick Mathewson 2010-02-22 13:59:34 -05:00
parent 98ec959c9c
commit ffd5070b04
9 changed files with 210 additions and 4 deletions

View File

@ -551,3 +551,27 @@ periodic_timer_free(periodic_timer_t *timer)
tor_free(timer); tor_free(timer);
} }
#ifdef USE_BUFFEREVENTS
static const struct timeval *one_tick = NULL;
/**
DOCDOC
*/
const struct timeval *tor_libevent_get_one_tick_timeout(void)
{
if (PREDICT_UNLIKELY(one_tick == NULL)) {
struct event_base *base = tor_libevent_get_base();
struct timeval tv;
if (TOR_LIBEVENT_TICKS_PER_SECOND == 1) {
tv.tv_sec = 1;
tv.tv_usec = 0;
} else {
tv.tv_sec = 0;
tv.tv_usec = 1000000 / TOR_LIBEVENT_TICKS_PER_SECOND;
}
one_tick = event_base_init_common_timeout(base, &tv);
}
return one_tick;
}
#endif

View File

@ -64,5 +64,10 @@ void tor_check_libevent_version(const char *m, int server,
void tor_check_libevent_header_compatibility(void); void tor_check_libevent_header_compatibility(void);
const char *tor_libevent_get_version_str(void); const char *tor_libevent_get_version_str(void);
#ifdef USE_BUFFEREVENTS
#define TOR_LIBEVENT_TICKS_PER_SECOND 3
const struct timeval *tor_libevent_get_one_tick_timeout(void);
#endif
#endif #endif

View File

@ -1699,7 +1699,6 @@ tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in,
state, state,
BEV_OPT_DEFER_CALLBACKS); BEV_OPT_DEFER_CALLBACKS);
#else #else
/* Disabled: just use filter for now. */
if (bufev_in) { if (bufev_in) {
evutil_socket_t s = bufferevent_getfd(bufev_in); evutil_socket_t s = bufferevent_getfd(bufev_in);
tor_assert(s == -1 || s == socket); tor_assert(s == -1 || s == socket);

View File

@ -1231,6 +1231,17 @@ options_act(or_options_t *old_options)
if (accounting_is_enabled(options)) if (accounting_is_enabled(options))
configure_accounting(time(NULL)); configure_accounting(time(NULL));
#ifdef USE_BUFFEREVENTS
/* If we're using the bufferevents implementation and our rate limits
* changed, we need to tell the rate-limiting system about it. */
if (!old_options ||
old_options->BandwidthRate != options->BandwidthRate ||
old_options->BandwidthBurst != options->BandwidthBurst ||
old_options->RelayBandwidthRate != options->RelayBandwidthRate ||
old_options->RelayBandwidthBurst != options->RelayBandwidthBurst)
connection_bucket_init();
#endif
/* Change the cell EWMA settings */ /* Change the cell EWMA settings */
cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus()); cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus());

View File

@ -49,8 +49,10 @@ static void connection_init(time_t now, connection_t *conn, int type,
static int connection_init_accepted_conn(connection_t *conn, static int connection_init_accepted_conn(connection_t *conn,
uint8_t listener_type); uint8_t listener_type);
static int connection_handle_listener_read(connection_t *conn, int new_type); static int connection_handle_listener_read(connection_t *conn, int new_type);
#ifndef USE_BUFFEREVENTS
static int connection_bucket_should_increase(int bucket, static int connection_bucket_should_increase(int bucket,
or_connection_t *conn); or_connection_t *conn);
#endif
static int connection_finished_flushing(connection_t *conn); static int connection_finished_flushing(connection_t *conn);
static int connection_flushed_some(connection_t *conn); static int connection_flushed_some(connection_t *conn);
static int connection_finished_connecting(connection_t *conn); static int connection_finished_connecting(connection_t *conn);
@ -199,6 +201,7 @@ connection_type_uses_bufferevent(connection_t *conn)
case CONN_TYPE_DIR: case CONN_TYPE_DIR:
case CONN_TYPE_CONTROL: case CONN_TYPE_CONTROL:
case CONN_TYPE_OR: case CONN_TYPE_OR:
case CONN_TYPE_CPUWORKER:
return 1; return 1;
default: default:
return 0; return 0;
@ -452,6 +455,8 @@ _connection_free(connection_t *conn)
tor_free(conn->read_event); /* Probably already freed by connection_free. */ tor_free(conn->read_event); /* Probably already freed by connection_free. */
tor_free(conn->write_event); /* Probably already freed by connection_free. */ tor_free(conn->write_event); /* Probably already freed by connection_free. */
IF_HAS_BUFFEREVENT(conn, { IF_HAS_BUFFEREVENT(conn, {
/* XXXX this is a workaround. */
bufferevent_setcb(conn->bufev, NULL, NULL, NULL, NULL);
bufferevent_free(conn->bufev); bufferevent_free(conn->bufev);
conn->bufev = NULL; conn->bufev = NULL;
}); });
@ -481,6 +486,11 @@ _connection_free(connection_t *conn)
log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest"); log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest");
connection_or_remove_from_identity_map(TO_OR_CONN(conn)); connection_or_remove_from_identity_map(TO_OR_CONN(conn));
} }
#ifdef USE_BUFFEREVENTS
if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bucket_cfg) {
ev_token_bucket_cfg_free(TO_OR_CONN(conn)->bucket_cfg);
}
#endif
memset(mem, 0xCC, memlen); /* poison memory */ memset(mem, 0xCC, memlen); /* poison memory */
tor_free(mem); tor_free(mem);
@ -1945,6 +1955,9 @@ connection_is_rate_limited(connection_t *conn)
return 1; return 1;
} }
#ifdef USE_BUFFEREVENTS
static struct bufferevent_rate_limit_group *global_rate_limit = NULL;
#else
extern int global_read_bucket, global_write_bucket; extern int global_read_bucket, global_write_bucket;
extern int global_relayed_read_bucket, global_relayed_write_bucket; extern int global_relayed_read_bucket, global_relayed_write_bucket;
@ -1952,11 +1965,13 @@ extern int global_relayed_read_bucket, global_relayed_write_bucket;
* we are likely to run dry again this second, so be stingy with the * we are likely to run dry again this second, so be stingy with the
* tokens we just put in. */ * tokens we just put in. */
static int write_buckets_empty_last_second = 0; static int write_buckets_empty_last_second = 0;
#endif
/** How many seconds of no active local circuits will make the /** How many seconds of no active local circuits will make the
* connection revert to the "relayed" bandwidth class? */ * connection revert to the "relayed" bandwidth class? */
#define CLIENT_IDLE_TIME_FOR_PRIORITY 30 #define CLIENT_IDLE_TIME_FOR_PRIORITY 30
#ifndef USE_BUFFEREVENTS
/** Return 1 if <b>conn</b> should use tokens from the "relayed" /** Return 1 if <b>conn</b> should use tokens from the "relayed"
* bandwidth rates, else 0. Currently, only OR conns with bandwidth * bandwidth rates, else 0. Currently, only OR conns with bandwidth
* class 1, and directory conns that are serving data out, count. * class 1, and directory conns that are serving data out, count.
@ -2067,6 +2082,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
return connection_bucket_round_robin(base, priority, return connection_bucket_round_robin(base, priority,
global_bucket, conn_bucket); global_bucket, conn_bucket);
} }
#else
static ssize_t
connection_bucket_read_limit(connection_t *conn, time_t now)
{
(void) now;
return bufferevent_get_max_to_read(conn->bufev);
}
ssize_t
connection_bucket_write_limit(connection_t *conn, time_t now)
{
(void) now;
return bufferevent_get_max_to_write(conn->bufev);
}
#endif
/** Return 1 if the global write buckets are low enough that we /** Return 1 if the global write buckets are low enough that we
* shouldn't send <b>attempt</b> bytes of low-priority directory stuff * shouldn't send <b>attempt</b> bytes of low-priority directory stuff
@ -2091,8 +2120,12 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
int int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority) global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{ {
#ifdef USE_BUFFEREVENTS
ssize_t smaller_bucket = bufferevent_get_max_to_write(conn->bufev);
#else
int smaller_bucket = global_write_bucket < global_relayed_write_bucket ? int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
global_write_bucket : global_relayed_write_bucket; global_write_bucket : global_relayed_write_bucket;
#endif
if (authdir_mode(get_options()) && priority>1) if (authdir_mode(get_options()) && priority>1)
return 0; /* there's always room to answer v2 if we're an auth dir */ return 0; /* there's always room to answer v2 if we're an auth dir */
@ -2102,8 +2135,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
if (smaller_bucket < (int)attempt) if (smaller_bucket < (int)attempt)
return 1; /* not enough space no matter the priority */ return 1; /* not enough space no matter the priority */
#ifndef USE_BUFFEREVENTS
if (write_buckets_empty_last_second) if (write_buckets_empty_last_second)
return 1; /* we're already hitting our limits, no more please */ return 1; /* we're already hitting our limits, no more please */
#endif
if (priority == 1) { /* old-style v1 query */ if (priority == 1) { /* old-style v1 query */
/* Could we handle *two* of these requests within the next two seconds? */ /* Could we handle *two* of these requests within the next two seconds? */
@ -2119,6 +2154,7 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
return 0; return 0;
} }
#ifndef USE_BUFFEREVENTS
/** We just read <b>num_read</b> and wrote <b>num_written</b> bytes /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
* onto <b>conn</b>. Decrement buckets appropriately. */ * onto <b>conn</b>. Decrement buckets appropriately. */
static void static void
@ -2362,6 +2398,88 @@ connection_bucket_should_increase(int bucket, or_connection_t *conn)
return 1; return 1;
} }
#else
static void
connection_buckets_decrement(connection_t *conn, time_t now,
size_t num_read, size_t num_written)
{
(void) conn;
(void) now;
(void) num_read;
(void) num_written;
/* Libevent does this for us. */
}
void
connection_bucket_refill(int seconds_elapsed, time_t now)
{
(void) seconds_elapsed;
(void) now;
/* Libevent does this for us. */
}
void
connection_bucket_init(void)
{
or_options_t *options = get_options();
const struct timeval *tick = tor_libevent_get_one_tick_timeout();
struct ev_token_bucket_cfg *bucket_cfg;
uint64_t rate, burst;
if (options->RelayBandwidthRate) {
rate = options->RelayBandwidthRate;
burst = options->RelayBandwidthBurst;
} else {
rate = options->BandwidthRate;
burst = options->BandwidthBurst;
}
rate /= TOR_LIBEVENT_TICKS_PER_SECOND;
bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst,
(uint32_t)rate, (uint32_t)burst,
tick);
if (!global_rate_limit) {
global_rate_limit =
bufferevent_rate_limit_group_new(tor_libevent_get_base(), bucket_cfg);
} else {
bufferevent_rate_limit_group_set_cfg(global_rate_limit, bucket_cfg);
}
ev_token_bucket_cfg_free(bucket_cfg);
}
void
connection_get_rate_limit_totals(uint64_t *read_out, uint64_t *written_out)
{
if (global_rate_limit == NULL) {
*read_out = *written_out = 0;
} else {
bufferevent_rate_limit_group_get_totals(
global_rate_limit, read_out, written_out);
}
}
/** DOCDOC */
void
connection_enable_rate_limiting(connection_t *conn)
{
if (conn->bufev) {
if (!global_rate_limit)
connection_bucket_init();
bufferevent_add_to_rate_limit_group(conn->bufev, global_rate_limit);
}
}
static void
connection_consider_empty_write_buckets(connection_t *conn)
{
(void) conn;
}
static void
connection_consider_empty_read_buckets(connection_t *conn)
{
(void) conn;
}
#endif
/** Read bytes from conn-\>s and process them. /** Read bytes from conn-\>s and process them.
* *

View File

@ -144,6 +144,9 @@ void connection_handle_read_cb(struct bufferevent *bufev, void *arg);
void connection_handle_write_cb(struct bufferevent *bufev, void *arg); void connection_handle_write_cb(struct bufferevent *bufev, void *arg);
void connection_handle_event_cb(struct bufferevent *bufev, short event, void connection_handle_event_cb(struct bufferevent *bufev, short event,
void *arg); void *arg);
void connection_get_rate_limit_totals(uint64_t *read_out,
uint64_t *written_out);
void connection_enable_rate_limiting(connection_t *conn);
#else #else
#define connection_type_uses_bufferevent(c) (0) #define connection_type_uses_bufferevent(c) (0)
#endif #endif

View File

@ -388,6 +388,21 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
conn->bandwidthrate = rate; conn->bandwidthrate = rate;
conn->bandwidthburst = burst; conn->bandwidthburst = burst;
#ifdef USE_BUFFEREVENTS
{
const struct timeval *tick = tor_libevent_get_one_tick_timeout();
struct ev_token_bucket_cfg *cfg, *old_cfg;
int rate_per_tick = rate / TOR_LIBEVENT_TICKS_PER_SECOND;
cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick,
burst, tick);
old_cfg = conn->bucket_cfg;
if (conn->_base.bufev)
bufferevent_set_rate_limit(conn->_base.bufev, cfg);
if (old_cfg)
ev_token_bucket_cfg_free(old_cfg);
conn->bucket_cfg = cfg;
}
#else
if (reset) { /* set up the token buckets to be full */ if (reset) { /* set up the token buckets to be full */
conn->read_bucket = conn->write_bucket = burst; conn->read_bucket = conn->write_bucket = burst;
return; return;
@ -398,6 +413,7 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
conn->read_bucket = burst; conn->read_bucket = burst;
if (conn->write_bucket > burst) if (conn->write_bucket > burst)
conn->write_bucket = burst; conn->write_bucket = burst;
#endif
} }
/** Either our set of relays or our per-conn rate limits have changed. /** Either our set of relays or our per-conn rate limits have changed.
@ -879,6 +895,9 @@ connection_tls_start_handshake(or_connection_t *conn, int receiving)
return -1; return -1;
} }
conn->_base.bufev = b; conn->_base.bufev = b;
if (conn->bucket_cfg)
bufferevent_set_rate_limit(conn->_base.bufev, conn->bucket_cfg);
connection_enable_rate_limiting(TO_CONN(conn));
bufferevent_setcb(b, connection_handle_read_cb, bufferevent_setcb(b, connection_handle_read_cb,
connection_handle_write_cb, connection_handle_write_cb,
connection_or_handle_event_cb, connection_or_handle_event_cb,

View File

@ -76,6 +76,7 @@ static int connection_should_read_from_linked_conn(connection_t *conn);
/********* START VARIABLES **********/ /********* START VARIABLES **********/
#ifndef USE_BUFFEREVENTS
int global_read_bucket; /**< Max number of bytes I can read this second. */ int global_read_bucket; /**< Max number of bytes I can read this second. */
int global_write_bucket; /**< Max number of bytes I can write this second. */ int global_write_bucket; /**< Max number of bytes I can write this second. */
@ -83,13 +84,17 @@ int global_write_bucket; /**< Max number of bytes I can write this second. */
int global_relayed_read_bucket; int global_relayed_read_bucket;
/** Max number of relayed (bandwidth class 1) bytes I can write this second. */ /** Max number of relayed (bandwidth class 1) bytes I can write this second. */
int global_relayed_write_bucket; int global_relayed_write_bucket;
/** What was the read bucket before the last second_elapsed_callback() call? /** What was the read bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've read). */ * (used to determine how many bytes we've read). */
static int stats_prev_global_read_bucket; static int stats_prev_global_read_bucket;
/** What was the write bucket before the last second_elapsed_callback() call? /** What was the write bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've written). */ * (used to determine how many bytes we've written). */
static int stats_prev_global_write_bucket; static int stats_prev_global_write_bucket;
#else
static uint64_t stats_prev_n_read = 0;
static uint64_t stats_prev_n_written = 0;
#endif
/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/ /* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
/** How many bytes have we read since we started the process? */ /** How many bytes have we read since we started the process? */
static uint64_t stats_n_bytes_read = 0; static uint64_t stats_n_bytes_read = 0;
@ -1395,6 +1400,9 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
size_t bytes_written; size_t bytes_written;
size_t bytes_read; size_t bytes_read;
int seconds_elapsed; int seconds_elapsed;
#ifdef USE_BUFFEREVENTS
uint64_t cur_read,cur_written;
#endif
or_options_t *options = get_options(); or_options_t *options = get_options();
(void)timer; (void)timer;
(void)arg; (void)arg;
@ -1406,9 +1414,15 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
update_approx_time(now); update_approx_time(now);
/* the second has rolled over. check more stuff. */ /* the second has rolled over. check more stuff. */
seconds_elapsed = current_second ? (int)(now - current_second) : 0;
#ifdef USE_BUFFEREVENTS
connection_get_rate_limit_totals(&cur_read, &cur_written);
bytes_written = ((size_t)cur_written) - stats_prev_n_written;
bytes_read = ((size_t)cur_read) - stats_prev_n_read;
#else
bytes_written = stats_prev_global_write_bucket - global_write_bucket; bytes_written = stats_prev_global_write_bucket - global_write_bucket;
bytes_read = stats_prev_global_read_bucket - global_read_bucket; bytes_read = stats_prev_global_read_bucket - global_read_bucket;
seconds_elapsed = current_second ? (int)(now - current_second) : 0; #endif
stats_n_bytes_read += bytes_read; stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written; stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && seconds_elapsed >= 0) if (accounting_is_enabled(options) && seconds_elapsed >= 0)
@ -1418,8 +1432,13 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
if (seconds_elapsed > 0) if (seconds_elapsed > 0)
connection_bucket_refill(seconds_elapsed, now); connection_bucket_refill(seconds_elapsed, now);
#ifdef USE_BUFFEREVENTS
stats_prev_n_written = cur_written;
stats_prev_n_read = cur_read;
#else
stats_prev_global_read_bucket = global_read_bucket; stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket; stats_prev_global_write_bucket = global_write_bucket;
#endif
if (server_mode(options) && if (server_mode(options) &&
!we_are_hibernating() && !we_are_hibernating() &&
@ -1620,8 +1639,10 @@ do_main_loop(void)
/* Set up our buckets */ /* Set up our buckets */
connection_bucket_init(); connection_bucket_init();
#ifndef USE_BUFFEREVENTS
stats_prev_global_read_bucket = global_read_bucket; stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket; stats_prev_global_write_bucket = global_write_bucket;
#endif
/* initialize the bootstrap status events to know we're starting up */ /* initialize the bootstrap status events to know we're starting up */
control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0); control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);

View File

@ -1078,10 +1078,16 @@ typedef struct or_connection_t {
/* bandwidth* and *_bucket only used by ORs in OPEN state: */ /* bandwidth* and *_bucket only used by ORs in OPEN state: */
int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */ int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */
int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */ int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */
#ifndef USE_BUFFEREVENTS
int read_bucket; /**< When this hits 0, stop receiving. Every second we int read_bucket; /**< When this hits 0, stop receiving. Every second we
* add 'bandwidthrate' to this, capping it at * add 'bandwidthrate' to this, capping it at
* bandwidthburst. (OPEN ORs only) */ * bandwidthburst. (OPEN ORs only) */
int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */ int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */
#else
/** DOCDOC */
/* XXXX we could share this among all connections. */
struct ev_token_bucket_cfg *bucket_cfg;
#endif
int n_circuits; /**< How many circuits use this connection as p_conn or int n_circuits; /**< How many circuits use this connection as p_conn or
* n_conn ? */ * n_conn ? */