diff --git a/changes/feature3630 b/changes/feature3630 new file mode 100644 index 0000000000..dab46c8790 --- /dev/null +++ b/changes/feature3630 @@ -0,0 +1,8 @@ + o Major features (networking): + - Add a new TokenBucketRefillInterval option to refill token buckets + more frequently than once per second. This should improve network + performance, alleviate queueing problems, and make traffic less + bursty. Implements proposal 183; closes ticket 3630. Design by + Florian Tschorsch and Björn Scheuermann; implementation by + Florian Tschorsch. + diff --git a/doc/tor.1.txt b/doc/tor.1.txt index 4edee80cea..4ae721e4a9 100644 --- a/doc/tor.1.txt +++ b/doc/tor.1.txt @@ -737,6 +737,13 @@ The following options are useful only for clients (that is, if unattached waiting for an appropriate circuit, before we fail it. (Default: 2 minutes.) +**TokenBucketRefillInterval** __NUM__ [**msec**|**second**]:: + Set the refill interval of Tor's token bucket to NUM milliseconds. + NUM must be between 1 and 1000, inclusive. Note that the configured + bandwidth limits are still expressed in bytes per second: this + option only affects the frequency with which Tor checks to see whether + previously exhausted connections may read again. (Default: 10 msec.) + **TrackHostExits** __host__,__.domain__,__...__:: For each value in the comma separated list, Tor will track recent connections to hosts that match this value and attempt to reuse the same diff --git a/src/common/compat_libevent.c b/src/common/compat_libevent.c index beae9502da..3201738701 100644 --- a/src/common/compat_libevent.c +++ b/src/common/compat_libevent.c @@ -169,6 +169,7 @@ struct event_base *the_event_base = NULL; #ifdef USE_BUFFEREVENTS static int using_iocp_bufferevents = 0; +static void tor_libevent_set_tick_timeout(int msec_per_tick); int tor_libevent_using_iocp_bufferevents(void) @@ -236,6 +237,10 @@ tor_libevent_initialize(tor_libevent_cfg *torcfg) "You have a *VERY* old version of libevent. It is likely to be buggy; " "please build Tor with a more recent version."); #endif + +#ifdef USE_BUFFEREVENTS + tor_libevent_set_tick_timeout(torcfg->msec_per_tick); +#endif } /** Return the current Libevent event base that we're set up to use. */ @@ -598,26 +603,29 @@ static const struct timeval *one_tick = NULL; /** * Return a special timeout to be passed whenever libevent's O(1) timeout * implementation should be used. Only use this when the timer is supposed - * to fire after 1 / TOR_LIBEVENT_TICKS_PER_SECOND seconds have passed. + * to fire after msec_per_tick ticks have elapsed. */ const struct timeval * tor_libevent_get_one_tick_timeout(void) { - if (PREDICT_UNLIKELY(one_tick == NULL)) { - struct event_base *base = tor_libevent_get_base(); - struct timeval tv; - if (TOR_LIBEVENT_TICKS_PER_SECOND == 1) { - tv.tv_sec = 1; - tv.tv_usec = 0; - } else { - tv.tv_sec = 0; - tv.tv_usec = 1000000 / TOR_LIBEVENT_TICKS_PER_SECOND; - } - one_tick = event_base_init_common_timeout(base, &tv); - } + tor_assert(one_tick); return one_tick; } +/** Initialize the common timeout that we'll use to refill the buckets every + * time a tick elapses. */ +static void +tor_libevent_set_tick_timeout(int msec_per_tick) +{ + struct event_base *base = tor_libevent_get_base(); + struct timeval tv; + + tor_assert(! one_tick); + tv.tv_sec = msec_per_tick / 1000; + tv.tv_usec = (msec_per_tick % 1000) * 1000; + one_tick = event_base_init_common_timeout(base, &tv); +} + static struct bufferevent * tor_get_root_bufferevent(struct bufferevent *bev) { diff --git a/src/common/compat_libevent.h b/src/common/compat_libevent.h index 15b0fc273b..0247297177 100644 --- a/src/common/compat_libevent.h +++ b/src/common/compat_libevent.h @@ -62,6 +62,7 @@ int tor_event_base_loopexit(struct event_base *base, struct timeval *tv); typedef struct tor_libevent_cfg { int disable_iocp; int num_cpus; + int msec_per_tick; } tor_libevent_cfg; void tor_libevent_initialize(tor_libevent_cfg *cfg); @@ -73,7 +74,6 @@ void tor_check_libevent_header_compatibility(void); const char *tor_libevent_get_version_str(void); #ifdef USE_BUFFEREVENTS -#define TOR_LIBEVENT_TICKS_PER_SECOND 3 const struct timeval *tor_libevent_get_one_tick_timeout(void); int tor_libevent_using_iocp_bufferevents(void); int tor_set_bufferevent_rate_limit(struct bufferevent *bev, diff --git a/src/or/config.c b/src/or/config.c index bc77b3a77e..e22f539ade 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -386,6 +386,7 @@ static config_var_t _option_vars[] = { OBSOLETE("SysLog"), V(TestSocks, BOOL, "0"), OBSOLETE("TestVia"), + V(TokenBucketRefillInterval, MSEC_INTERVAL, "10 msec"), V(TrackHostExits, CSV, NULL), V(TrackHostExitsExpire, INTERVAL, "30 minutes"), OBSOLETE("TrafficShaping"), @@ -3165,6 +3166,11 @@ options_validate(or_options_t *old_options, or_options_t *options, REJECT("TransPort and TransListenAddress are disabled in this build."); #endif + if (options->TokenBucketRefillInterval <= 0 + || options->TokenBucketRefillInterval > 1000) { + REJECT("TokenBucketRefillInterval must be between 1 and 1000 inclusive."); + } + if (options->AccountingMax && (is_listening_on_low_port(options->ORPort, options->ORListenAddress) || is_listening_on_low_port(options->DirPort, options->DirListenAddress))) @@ -3967,6 +3973,12 @@ options_transition_allowed(const or_options_t *old, return -1; } + if (old->TokenBucketRefillInterval != new_val->TokenBucketRefillInterval) { + *msg = tor_strdup("While Tor is running, changing TokenBucketRefill" + "Interval is not allowed"); + return -1; + } + if (old->DisableIOCP != new_val->DisableIOCP) { *msg = tor_strdup("While Tor is running, changing DisableIOCP " "is not allowed."); @@ -5633,6 +5645,7 @@ init_libevent(const or_options_t *options) memset(&cfg, 0, sizeof(cfg)); cfg.disable_iocp = options->DisableIOCP; cfg.num_cpus = get_num_cpus(options); + cfg.msec_per_tick = options->TokenBucketRefillInterval; tor_libevent_initialize(&cfg); diff --git a/src/or/connection.c b/src/or/connection.c index 009203095e..ad63a9d802 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2388,22 +2388,23 @@ connection_bucket_init(void) } } -/** Refill a single bucket called name with bandwidth rate - * rate and bandwidth burst burst, assuming that - * seconds_elapsed seconds have passed since the last call. - **/ +/** Refill a single bucket called name with bandwidth rate per + * second rate and bandwidth burst burst, assuming that + * milliseconds_elapsed milliseconds have passed since the last + * call. */ static void connection_bucket_refill_helper(int *bucket, int rate, int burst, - int seconds_elapsed, const char *name) + int milliseconds_elapsed, + const char *name) { int starting_bucket = *bucket; - if (starting_bucket < burst && seconds_elapsed) { - if (((burst - starting_bucket)/seconds_elapsed) < rate) { + if (starting_bucket < burst && milliseconds_elapsed > 0) { + int64_t incr = (((int64_t)rate) * milliseconds_elapsed) / 1000; + if ((burst - starting_bucket) < incr) { *bucket = burst; /* We would overflow the bucket; just set it to * the maximum. */ } else { - int incr = rate*seconds_elapsed; - *bucket += incr; + *bucket += (int)incr; if (*bucket > burst || *bucket < starting_bucket) { /* If we overflow the burst, or underflow our starting bucket, * cap the bucket value to burst. */ @@ -2416,41 +2417,46 @@ connection_bucket_refill_helper(int *bucket, int rate, int burst, } } -/** A second has rolled over; increment buckets appropriately. */ +/** Time has passed; increment buckets appropriately. */ void -connection_bucket_refill(int seconds_elapsed, time_t now) +connection_bucket_refill(int milliseconds_elapsed, time_t now) { const or_options_t *options = get_options(); smartlist_t *conns = get_connection_array(); - int relayrate, relayburst; + int bandwidthrate, bandwidthburst, relayrate, relayburst; + + bandwidthrate = (int)options->BandwidthRate; + bandwidthburst = (int)options->BandwidthBurst; if (options->RelayBandwidthRate) { relayrate = (int)options->RelayBandwidthRate; relayburst = (int)options->RelayBandwidthBurst; } else { - relayrate = (int)options->BandwidthRate; - relayburst = (int)options->BandwidthBurst; + relayrate = bandwidthrate; + relayburst = bandwidthburst; } - tor_assert(seconds_elapsed >= 0); + tor_assert(milliseconds_elapsed >= 0); write_buckets_empty_last_second = global_relayed_write_bucket <= 0 || global_write_bucket <= 0; /* refill the global buckets */ connection_bucket_refill_helper(&global_read_bucket, - (int)options->BandwidthRate, - (int)options->BandwidthBurst, - seconds_elapsed, "global_read_bucket"); + bandwidthrate, bandwidthburst, + milliseconds_elapsed, + "global_read_bucket"); connection_bucket_refill_helper(&global_write_bucket, - (int)options->BandwidthRate, - (int)options->BandwidthBurst, - seconds_elapsed, "global_write_bucket"); + bandwidthrate, bandwidthburst, + milliseconds_elapsed, + "global_write_bucket"); connection_bucket_refill_helper(&global_relayed_read_bucket, - relayrate, relayburst, seconds_elapsed, + relayrate, relayburst, + milliseconds_elapsed, "global_relayed_read_bucket"); connection_bucket_refill_helper(&global_relayed_write_bucket, - relayrate, relayburst, seconds_elapsed, + relayrate, relayburst, + milliseconds_elapsed, "global_relayed_write_bucket"); /* refill the per-connection buckets */ @@ -2458,18 +2464,20 @@ connection_bucket_refill(int seconds_elapsed, time_t now) { if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); + int orbandwidthrate = or_conn->bandwidthrate; + int orbandwidthburst = or_conn->bandwidthburst; if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) { connection_bucket_refill_helper(&or_conn->read_bucket, - or_conn->bandwidthrate, - or_conn->bandwidthburst, - seconds_elapsed, + orbandwidthrate, + orbandwidthburst, + milliseconds_elapsed, "or_conn->read_bucket"); } if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) { connection_bucket_refill_helper(&or_conn->write_bucket, - or_conn->bandwidthrate, - or_conn->bandwidthburst, - seconds_elapsed, + orbandwidthrate, + orbandwidthburst, + milliseconds_elapsed, "or_conn->write_bucket"); } } @@ -2553,7 +2561,10 @@ connection_bucket_init(void) burst = options->BandwidthBurst; } - rate /= TOR_LIBEVENT_TICKS_PER_SECOND; + /* This can't overflow, since TokenBucketRefillInterval <= 1000, + * and rate started out less than INT32_MAX. */ + rate = (rate * options->TokenBucketRefillInterval) / 1000; + bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst, (uint32_t)rate, (uint32_t)burst, tick); diff --git a/src/or/connection_or.c b/src/or/connection_or.c index a75444e1ed..29f0f8de72 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -580,7 +580,12 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset, { const struct timeval *tick = tor_libevent_get_one_tick_timeout(); struct ev_token_bucket_cfg *cfg, *old_cfg; - int rate_per_tick = rate / TOR_LIBEVENT_TICKS_PER_SECOND; + int64_t rate64 = (((int64_t)rate) * options->TokenBucketRefillInterval) + / 1000; + /* This can't overflow, since TokenBucketRefillInterval <= 1000, + * and rate started out less than INT_MAX. */ + int rate_per_tick = (int) rate64; + cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick, burst, tick); old_cfg = conn->bucket_cfg; diff --git a/src/or/main.c b/src/or/main.c index ad5558e649..1a95e31c04 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -91,10 +91,10 @@ static int stats_prev_global_read_bucket; /** What was the write bucket before the last second_elapsed_callback() call? * (used to determine how many bytes we've written). */ static int stats_prev_global_write_bucket; -#else +#endif + static uint64_t stats_prev_n_read = 0; static uint64_t stats_prev_n_written = 0; -#endif /* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/ /** How many bytes have we read since we started the process? */ @@ -1507,9 +1507,6 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg) size_t bytes_written; size_t bytes_read; int seconds_elapsed; -#ifdef USE_BUFFEREVENTS - uint64_t cur_read,cur_written; -#endif const or_options_t *options = get_options(); (void)timer; (void)arg; @@ -1523,30 +1520,28 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg) /* the second has rolled over. check more stuff. */ seconds_elapsed = current_second ? (int)(now - current_second) : 0; #ifdef USE_BUFFEREVENTS - connection_get_rate_limit_totals(&cur_read, &cur_written); - bytes_written = (size_t)(cur_written - stats_prev_n_written); - bytes_read = (size_t)(cur_read - stats_prev_n_read); + { + uint64_t cur_read,cur_written; + connection_get_rate_limit_totals(&cur_read, &cur_written); + bytes_written = (size_t)(cur_written - stats_prev_n_written); + bytes_read = (size_t)(cur_read - stats_prev_n_read); + stats_n_bytes_read += bytes_read; + stats_n_bytes_written += bytes_written; + if (accounting_is_enabled(options) && seconds_elapsed >= 0) + accounting_add_bytes(bytes_read, bytes_written, seconds_elapsed); + stats_prev_n_written = cur_written; + stats_prev_n_read = cur_read; + } #else - bytes_written = stats_prev_global_write_bucket - global_write_bucket; - bytes_read = stats_prev_global_read_bucket - global_read_bucket; + bytes_read = (size_t)(stats_n_bytes_read - stats_prev_n_read); + bytes_written = (size_t)(stats_n_bytes_written - stats_prev_n_written); + stats_prev_n_read = stats_n_bytes_read; + stats_prev_n_written = stats_n_bytes_written; #endif - stats_n_bytes_read += bytes_read; - stats_n_bytes_written += bytes_written; - if (accounting_is_enabled(options) && seconds_elapsed >= 0) - accounting_add_bytes(bytes_read, bytes_written, seconds_elapsed); + control_event_bandwidth_used((uint32_t)bytes_read,(uint32_t)bytes_written); control_event_stream_bandwidth_used(); - if (seconds_elapsed > 0) - connection_bucket_refill(seconds_elapsed, now); -#ifdef USE_BUFFEREVENTS - stats_prev_n_written = cur_written; - stats_prev_n_read = cur_read; -#else - stats_prev_global_read_bucket = global_read_bucket; - stats_prev_global_write_bucket = global_write_bucket; -#endif - if (server_mode(options) && !we_are_hibernating() && seconds_elapsed > 0 && @@ -1594,6 +1589,57 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg) current_second = now; /* remember which second it is, for next time */ } +#ifndef USE_BUFFEREVENTS +/** Timer: used to invoke refill_callback(). */ +static periodic_timer_t *refill_timer = NULL; + +/** Libevent callback: invoked periodically to refill token buckets + * and count r/w bytes. It is only used when bufferevents are disabled. */ +static void +refill_callback(periodic_timer_t *timer, void *arg) +{ + static struct timeval current_millisecond; + struct timeval now; + + size_t bytes_written; + size_t bytes_read; + int milliseconds_elapsed = 0; + int seconds_rolled_over = 0; + + const or_options_t *options = get_options(); + + (void)timer; + (void)arg; + + tor_gettimeofday(&now); + + /* If this is our first time, no time has passed. */ + if (current_millisecond.tv_sec) { + long mdiff = tv_mdiff(¤t_millisecond, &now); + if (mdiff > INT_MAX) + mdiff = INT_MAX; + milliseconds_elapsed = (int)mdiff; + seconds_rolled_over = (int)(now.tv_sec - current_millisecond.tv_sec); + } + + bytes_written = stats_prev_global_write_bucket - global_write_bucket; + bytes_read = stats_prev_global_read_bucket - global_read_bucket; + + stats_n_bytes_read += bytes_read; + stats_n_bytes_written += bytes_written; + if (accounting_is_enabled(options) && milliseconds_elapsed >= 0) + accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over); + + if (milliseconds_elapsed > 0) + connection_bucket_refill(milliseconds_elapsed, now.tv_sec); + + stats_prev_global_read_bucket = global_read_bucket; + stats_prev_global_write_bucket = global_write_bucket; + + current_millisecond = now; /* remember what time it is, for next time */ +} +#endif + #ifndef MS_WINDOWS /** Called when a possibly ignorable libevent error occurs; ensures that we * don't get into an infinite loop by ignoring too many errors from @@ -1791,6 +1837,22 @@ do_main_loop(void) tor_assert(second_timer); } +#ifndef USE_BUFFEREVENTS + if (!refill_timer) { + struct timeval refill_interval; + int msecs = get_options()->TokenBucketRefillInterval; + + refill_interval.tv_sec = msecs/1000; + refill_interval.tv_usec = (msecs%1000)*1000; + + refill_timer = periodic_timer_new(tor_libevent_get_base(), + &refill_interval, + refill_callback, + NULL); + tor_assert(refill_timer); + } +#endif + for (;;) { if (nt_service_is_stopping()) return 0; diff --git a/src/or/or.h b/src/or/or.h index 56f34fc4fe..b05c767bfe 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -3107,6 +3107,8 @@ typedef struct { * log whether it was DNS-leaking or not? */ int HardwareAccel; /**< Boolean: Should we enable OpenSSL hardware * acceleration where available? */ + /** Token Bucket Refill resolution in milliseconds. */ + int TokenBucketRefillInterval; char *AccelName; /**< Optional hardware acceleration engine name. */ char *AccelDir; /**< Optional hardware acceleration engine search dir. */ int UseEntryGuards; /**< Boolean: Do we try to enter from a smallish number