From ffd5070b04b3db4409d8e3dc933ffc7d12b5219d Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 22 Feb 2010 13:59:34 -0500 Subject: [PATCH] Convert bufferevents to use rate-limiting. This requires the latest Git version of Libevent as of 24 March 2010. In the future, we'll just say it requires Libevent 2.0.5-alpha or later. Since Libevent doesn't yet support hierarchical rate limit groups, there isn't yet support for tracking relayed-bytes separately when using the bufferevent system. If a future version does add support for hierarchical buckets, we can add that back in. --- src/common/compat_libevent.c | 24 +++++++ src/common/compat_libevent.h | 5 ++ src/common/tortls.c | 3 +- src/or/config.c | 11 ++++ src/or/connection.c | 118 +++++++++++++++++++++++++++++++++++ src/or/connection.h | 3 + src/or/connection_or.c | 19 ++++++ src/or/main.c | 25 +++++++- src/or/or.h | 6 ++ 9 files changed, 210 insertions(+), 4 deletions(-) diff --git a/src/common/compat_libevent.c b/src/common/compat_libevent.c index 250fa2bdb7..2ae280e669 100644 --- a/src/common/compat_libevent.c +++ b/src/common/compat_libevent.c @@ -551,3 +551,27 @@ periodic_timer_free(periodic_timer_t *timer) tor_free(timer); } +#ifdef USE_BUFFEREVENTS +static const struct timeval *one_tick = NULL; +/** + DOCDOC +*/ +const struct timeval *tor_libevent_get_one_tick_timeout(void) +{ + + if (PREDICT_UNLIKELY(one_tick == NULL)) { + struct event_base *base = tor_libevent_get_base(); + struct timeval tv; + if (TOR_LIBEVENT_TICKS_PER_SECOND == 1) { + tv.tv_sec = 1; + tv.tv_usec = 0; + } else { + tv.tv_sec = 0; + tv.tv_usec = 1000000 / TOR_LIBEVENT_TICKS_PER_SECOND; + } + one_tick = event_base_init_common_timeout(base, &tv); + } + return one_tick; +} +#endif + diff --git a/src/common/compat_libevent.h b/src/common/compat_libevent.h index a4011e37af..f483d6ee6d 100644 --- a/src/common/compat_libevent.h +++ b/src/common/compat_libevent.h @@ -64,5 +64,10 @@ void tor_check_libevent_version(const char *m, int server, void tor_check_libevent_header_compatibility(void); const char *tor_libevent_get_version_str(void); +#ifdef USE_BUFFEREVENTS +#define TOR_LIBEVENT_TICKS_PER_SECOND 3 +const struct timeval *tor_libevent_get_one_tick_timeout(void); +#endif + #endif diff --git a/src/common/tortls.c b/src/common/tortls.c index 06533ca43b..3ae3ef8835 100644 --- a/src/common/tortls.c +++ b/src/common/tortls.c @@ -1699,7 +1699,6 @@ tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in, state, BEV_OPT_DEFER_CALLBACKS); #else - /* Disabled: just use filter for now. */ if (bufev_in) { evutil_socket_t s = bufferevent_getfd(bufev_in); tor_assert(s == -1 || s == socket); @@ -1715,7 +1714,7 @@ tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in, tls->ssl, state, 0); - //BEV_OPT_DEFER_CALLBACKS); + //BEV_OPT_DEFER_CALLBACKS); #endif return out; } diff --git a/src/or/config.c b/src/or/config.c index 8febe7a56b..fa2eb73beb 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -1231,6 +1231,17 @@ options_act(or_options_t *old_options) if (accounting_is_enabled(options)) configure_accounting(time(NULL)); +#ifdef USE_BUFFEREVENTS + /* If we're using the bufferevents implementation and our rate limits + * changed, we need to tell the rate-limiting system about it. */ + if (!old_options || + old_options->BandwidthRate != options->BandwidthRate || + old_options->BandwidthBurst != options->BandwidthBurst || + old_options->RelayBandwidthRate != options->RelayBandwidthRate || + old_options->RelayBandwidthBurst != options->RelayBandwidthBurst) + connection_bucket_init(); +#endif + /* Change the cell EWMA settings */ cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus()); diff --git a/src/or/connection.c b/src/or/connection.c index 2944a0d4bb..b6f0d5d69d 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -49,8 +49,10 @@ static void connection_init(time_t now, connection_t *conn, int type, static int connection_init_accepted_conn(connection_t *conn, uint8_t listener_type); static int connection_handle_listener_read(connection_t *conn, int new_type); +#ifndef USE_BUFFEREVENTS static int connection_bucket_should_increase(int bucket, or_connection_t *conn); +#endif static int connection_finished_flushing(connection_t *conn); static int connection_flushed_some(connection_t *conn); static int connection_finished_connecting(connection_t *conn); @@ -199,6 +201,7 @@ connection_type_uses_bufferevent(connection_t *conn) case CONN_TYPE_DIR: case CONN_TYPE_CONTROL: case CONN_TYPE_OR: + case CONN_TYPE_CPUWORKER: return 1; default: return 0; @@ -452,6 +455,8 @@ _connection_free(connection_t *conn) tor_free(conn->read_event); /* Probably already freed by connection_free. */ tor_free(conn->write_event); /* Probably already freed by connection_free. */ IF_HAS_BUFFEREVENT(conn, { + /* XXXX this is a workaround. */ + bufferevent_setcb(conn->bufev, NULL, NULL, NULL, NULL); bufferevent_free(conn->bufev); conn->bufev = NULL; }); @@ -481,6 +486,11 @@ _connection_free(connection_t *conn) log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest"); connection_or_remove_from_identity_map(TO_OR_CONN(conn)); } +#ifdef USE_BUFFEREVENTS + if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bucket_cfg) { + ev_token_bucket_cfg_free(TO_OR_CONN(conn)->bucket_cfg); + } +#endif memset(mem, 0xCC, memlen); /* poison memory */ tor_free(mem); @@ -1945,6 +1955,9 @@ connection_is_rate_limited(connection_t *conn) return 1; } +#ifdef USE_BUFFEREVENTS +static struct bufferevent_rate_limit_group *global_rate_limit = NULL; +#else extern int global_read_bucket, global_write_bucket; extern int global_relayed_read_bucket, global_relayed_write_bucket; @@ -1952,11 +1965,13 @@ extern int global_relayed_read_bucket, global_relayed_write_bucket; * we are likely to run dry again this second, so be stingy with the * tokens we just put in. */ static int write_buckets_empty_last_second = 0; +#endif /** How many seconds of no active local circuits will make the * connection revert to the "relayed" bandwidth class? */ #define CLIENT_IDLE_TIME_FOR_PRIORITY 30 +#ifndef USE_BUFFEREVENTS /** Return 1 if conn should use tokens from the "relayed" * bandwidth rates, else 0. Currently, only OR conns with bandwidth * class 1, and directory conns that are serving data out, count. @@ -2067,6 +2082,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now) return connection_bucket_round_robin(base, priority, global_bucket, conn_bucket); } +#else +static ssize_t +connection_bucket_read_limit(connection_t *conn, time_t now) +{ + (void) now; + return bufferevent_get_max_to_read(conn->bufev); +} +ssize_t +connection_bucket_write_limit(connection_t *conn, time_t now) +{ + (void) now; + return bufferevent_get_max_to_write(conn->bufev); +} +#endif /** Return 1 if the global write buckets are low enough that we * shouldn't send attempt bytes of low-priority directory stuff @@ -2091,8 +2120,12 @@ connection_bucket_write_limit(connection_t *conn, time_t now) int global_write_bucket_low(connection_t *conn, size_t attempt, int priority) { +#ifdef USE_BUFFEREVENTS + ssize_t smaller_bucket = bufferevent_get_max_to_write(conn->bufev); +#else int smaller_bucket = global_write_bucket < global_relayed_write_bucket ? global_write_bucket : global_relayed_write_bucket; +#endif if (authdir_mode(get_options()) && priority>1) return 0; /* there's always room to answer v2 if we're an auth dir */ @@ -2102,8 +2135,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) if (smaller_bucket < (int)attempt) return 1; /* not enough space no matter the priority */ +#ifndef USE_BUFFEREVENTS if (write_buckets_empty_last_second) return 1; /* we're already hitting our limits, no more please */ +#endif if (priority == 1) { /* old-style v1 query */ /* Could we handle *two* of these requests within the next two seconds? */ @@ -2119,6 +2154,7 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) return 0; } +#ifndef USE_BUFFEREVENTS /** We just read num_read and wrote num_written bytes * onto conn. Decrement buckets appropriately. */ static void @@ -2362,6 +2398,88 @@ connection_bucket_should_increase(int bucket, or_connection_t *conn) return 1; } +#else + +static void +connection_buckets_decrement(connection_t *conn, time_t now, + size_t num_read, size_t num_written) +{ + (void) conn; + (void) now; + (void) num_read; + (void) num_written; + /* Libevent does this for us. */ +} +void +connection_bucket_refill(int seconds_elapsed, time_t now) +{ + (void) seconds_elapsed; + (void) now; + /* Libevent does this for us. */ +} +void +connection_bucket_init(void) +{ + or_options_t *options = get_options(); + const struct timeval *tick = tor_libevent_get_one_tick_timeout(); + struct ev_token_bucket_cfg *bucket_cfg; + + uint64_t rate, burst; + if (options->RelayBandwidthRate) { + rate = options->RelayBandwidthRate; + burst = options->RelayBandwidthBurst; + } else { + rate = options->BandwidthRate; + burst = options->BandwidthBurst; + } + + rate /= TOR_LIBEVENT_TICKS_PER_SECOND; + bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst, + (uint32_t)rate, (uint32_t)burst, + tick); + + if (!global_rate_limit) { + global_rate_limit = + bufferevent_rate_limit_group_new(tor_libevent_get_base(), bucket_cfg); + } else { + bufferevent_rate_limit_group_set_cfg(global_rate_limit, bucket_cfg); + } + ev_token_bucket_cfg_free(bucket_cfg); +} + +void +connection_get_rate_limit_totals(uint64_t *read_out, uint64_t *written_out) +{ + if (global_rate_limit == NULL) { + *read_out = *written_out = 0; + } else { + bufferevent_rate_limit_group_get_totals( + global_rate_limit, read_out, written_out); + } +} + +/** DOCDOC */ +void +connection_enable_rate_limiting(connection_t *conn) +{ + if (conn->bufev) { + if (!global_rate_limit) + connection_bucket_init(); + bufferevent_add_to_rate_limit_group(conn->bufev, global_rate_limit); + } +} + +static void +connection_consider_empty_write_buckets(connection_t *conn) +{ + (void) conn; +} +static void +connection_consider_empty_read_buckets(connection_t *conn) +{ + (void) conn; +} +#endif /** Read bytes from conn-\>s and process them. * diff --git a/src/or/connection.h b/src/or/connection.h index 83aec0ef00..a40b1a5fb5 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -144,6 +144,9 @@ void connection_handle_read_cb(struct bufferevent *bufev, void *arg); void connection_handle_write_cb(struct bufferevent *bufev, void *arg); void connection_handle_event_cb(struct bufferevent *bufev, short event, void *arg); +void connection_get_rate_limit_totals(uint64_t *read_out, + uint64_t *written_out); +void connection_enable_rate_limiting(connection_t *conn); #else #define connection_type_uses_bufferevent(c) (0) #endif diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 2fbc230bcd..044197d83a 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -388,6 +388,21 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset, conn->bandwidthrate = rate; conn->bandwidthburst = burst; +#ifdef USE_BUFFEREVENTS + { + const struct timeval *tick = tor_libevent_get_one_tick_timeout(); + struct ev_token_bucket_cfg *cfg, *old_cfg; + int rate_per_tick = rate / TOR_LIBEVENT_TICKS_PER_SECOND; + cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick, + burst, tick); + old_cfg = conn->bucket_cfg; + if (conn->_base.bufev) + bufferevent_set_rate_limit(conn->_base.bufev, cfg); + if (old_cfg) + ev_token_bucket_cfg_free(old_cfg); + conn->bucket_cfg = cfg; + } +#else if (reset) { /* set up the token buckets to be full */ conn->read_bucket = conn->write_bucket = burst; return; @@ -398,6 +413,7 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset, conn->read_bucket = burst; if (conn->write_bucket > burst) conn->write_bucket = burst; +#endif } /** Either our set of relays or our per-conn rate limits have changed. @@ -879,6 +895,9 @@ connection_tls_start_handshake(or_connection_t *conn, int receiving) return -1; } conn->_base.bufev = b; + if (conn->bucket_cfg) + bufferevent_set_rate_limit(conn->_base.bufev, conn->bucket_cfg); + connection_enable_rate_limiting(TO_CONN(conn)); bufferevent_setcb(b, connection_handle_read_cb, connection_handle_write_cb, connection_or_handle_event_cb, diff --git a/src/or/main.c b/src/or/main.c index f6f26b05f9..1979529f72 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -76,6 +76,7 @@ static int connection_should_read_from_linked_conn(connection_t *conn); /********* START VARIABLES **********/ +#ifndef USE_BUFFEREVENTS int global_read_bucket; /**< Max number of bytes I can read this second. */ int global_write_bucket; /**< Max number of bytes I can write this second. */ @@ -83,13 +84,17 @@ int global_write_bucket; /**< Max number of bytes I can write this second. */ int global_relayed_read_bucket; /** Max number of relayed (bandwidth class 1) bytes I can write this second. */ int global_relayed_write_bucket; - /** What was the read bucket before the last second_elapsed_callback() call? * (used to determine how many bytes we've read). */ static int stats_prev_global_read_bucket; /** What was the write bucket before the last second_elapsed_callback() call? * (used to determine how many bytes we've written). */ static int stats_prev_global_write_bucket; +#else +static uint64_t stats_prev_n_read = 0; +static uint64_t stats_prev_n_written = 0; +#endif + /* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/ /** How many bytes have we read since we started the process? */ static uint64_t stats_n_bytes_read = 0; @@ -1395,6 +1400,9 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg) size_t bytes_written; size_t bytes_read; int seconds_elapsed; +#ifdef USE_BUFFEREVENTS + uint64_t cur_read,cur_written; +#endif or_options_t *options = get_options(); (void)timer; (void)arg; @@ -1406,9 +1414,15 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg) update_approx_time(now); /* the second has rolled over. check more stuff. */ + seconds_elapsed = current_second ? (int)(now - current_second) : 0; +#ifdef USE_BUFFEREVENTS + connection_get_rate_limit_totals(&cur_read, &cur_written); + bytes_written = ((size_t)cur_written) - stats_prev_n_written; + bytes_read = ((size_t)cur_read) - stats_prev_n_read; +#else bytes_written = stats_prev_global_write_bucket - global_write_bucket; bytes_read = stats_prev_global_read_bucket - global_read_bucket; - seconds_elapsed = current_second ? (int)(now - current_second) : 0; +#endif stats_n_bytes_read += bytes_read; stats_n_bytes_written += bytes_written; if (accounting_is_enabled(options) && seconds_elapsed >= 0) @@ -1418,8 +1432,13 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg) if (seconds_elapsed > 0) connection_bucket_refill(seconds_elapsed, now); +#ifdef USE_BUFFEREVENTS + stats_prev_n_written = cur_written; + stats_prev_n_read = cur_read; +#else stats_prev_global_read_bucket = global_read_bucket; stats_prev_global_write_bucket = global_write_bucket; +#endif if (server_mode(options) && !we_are_hibernating() && @@ -1620,8 +1639,10 @@ do_main_loop(void) /* Set up our buckets */ connection_bucket_init(); +#ifndef USE_BUFFEREVENTS stats_prev_global_read_bucket = global_read_bucket; stats_prev_global_write_bucket = global_write_bucket; +#endif /* initialize the bootstrap status events to know we're starting up */ control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0); diff --git a/src/or/or.h b/src/or/or.h index 8f875731f6..f4f511ad00 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1078,10 +1078,16 @@ typedef struct or_connection_t { /* bandwidth* and *_bucket only used by ORs in OPEN state: */ int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */ int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */ +#ifndef USE_BUFFEREVENTS int read_bucket; /**< When this hits 0, stop receiving. Every second we * add 'bandwidthrate' to this, capping it at * bandwidthburst. (OPEN ORs only) */ int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */ +#else + /** DOCDOC */ + /* XXXX we could share this among all connections. */ + struct ev_token_bucket_cfg *bucket_cfg; +#endif int n_circuits; /**< How many circuits use this connection as p_conn or * n_conn ? */