Merge branch 'feature3630-rebased'

This commit is contained in:
Nick Mathewson 2011-09-22 15:54:40 -04:00
commit 5a8dcca8f7
9 changed files with 185 additions and 69 deletions

8
changes/feature3630 Normal file
View File

@ -0,0 +1,8 @@
o Major features (networking):
- Add a new TokenBucketRefillInterval option to refill token buckets
more frequently than once per second. This should improve network
performance, alleviate queueing problems, and make traffic less
bursty. Implements proposal 183; closes ticket 3630. Design by
Florian Tschorsch and Björn Scheuermann; implementation by
Florian Tschorsch.

View File

@ -737,6 +737,13 @@ The following options are useful only for clients (that is, if
unattached waiting for an appropriate circuit, before we fail it. (Default: unattached waiting for an appropriate circuit, before we fail it. (Default:
2 minutes.) 2 minutes.)
**TokenBucketRefillInterval** __NUM__ [**msec**|**second**]::
Set the refill interval of Tor's token bucket to NUM milliseconds.
NUM must be between 1 and 1000, inclusive. Note that the configured
bandwidth limits are still expressed in bytes per second: this
option only affects the frequency with which Tor checks to see whether
previously exhausted connections may read again. (Default: 10 msec.)
**TrackHostExits** __host__,__.domain__,__...__:: **TrackHostExits** __host__,__.domain__,__...__::
For each value in the comma separated list, Tor will track recent For each value in the comma separated list, Tor will track recent
connections to hosts that match this value and attempt to reuse the same connections to hosts that match this value and attempt to reuse the same

View File

@ -169,6 +169,7 @@ struct event_base *the_event_base = NULL;
#ifdef USE_BUFFEREVENTS #ifdef USE_BUFFEREVENTS
static int using_iocp_bufferevents = 0; static int using_iocp_bufferevents = 0;
static void tor_libevent_set_tick_timeout(int msec_per_tick);
int int
tor_libevent_using_iocp_bufferevents(void) tor_libevent_using_iocp_bufferevents(void)
@ -236,6 +237,10 @@ tor_libevent_initialize(tor_libevent_cfg *torcfg)
"You have a *VERY* old version of libevent. It is likely to be buggy; " "You have a *VERY* old version of libevent. It is likely to be buggy; "
"please build Tor with a more recent version."); "please build Tor with a more recent version.");
#endif #endif
#ifdef USE_BUFFEREVENTS
tor_libevent_set_tick_timeout(torcfg->msec_per_tick);
#endif
} }
/** Return the current Libevent event base that we're set up to use. */ /** Return the current Libevent event base that we're set up to use. */
@ -598,25 +603,28 @@ static const struct timeval *one_tick = NULL;
/** /**
* Return a special timeout to be passed whenever libevent's O(1) timeout * Return a special timeout to be passed whenever libevent's O(1) timeout
* implementation should be used. Only use this when the timer is supposed * implementation should be used. Only use this when the timer is supposed
* to fire after 1 / TOR_LIBEVENT_TICKS_PER_SECOND seconds have passed. * to fire after msec_per_tick ticks have elapsed.
*/ */
const struct timeval * const struct timeval *
tor_libevent_get_one_tick_timeout(void) tor_libevent_get_one_tick_timeout(void)
{ {
if (PREDICT_UNLIKELY(one_tick == NULL)) { tor_assert(one_tick);
return one_tick;
}
/** Initialize the common timeout that we'll use to refill the buckets every
* time a tick elapses. */
static void
tor_libevent_set_tick_timeout(int msec_per_tick)
{
struct event_base *base = tor_libevent_get_base(); struct event_base *base = tor_libevent_get_base();
struct timeval tv; struct timeval tv;
if (TOR_LIBEVENT_TICKS_PER_SECOND == 1) {
tv.tv_sec = 1; tor_assert(! one_tick);
tv.tv_usec = 0; tv.tv_sec = msec_per_tick / 1000;
} else { tv.tv_usec = (msec_per_tick % 1000) * 1000;
tv.tv_sec = 0;
tv.tv_usec = 1000000 / TOR_LIBEVENT_TICKS_PER_SECOND;
}
one_tick = event_base_init_common_timeout(base, &tv); one_tick = event_base_init_common_timeout(base, &tv);
} }
return one_tick;
}
static struct bufferevent * static struct bufferevent *
tor_get_root_bufferevent(struct bufferevent *bev) tor_get_root_bufferevent(struct bufferevent *bev)

View File

@ -62,6 +62,7 @@ int tor_event_base_loopexit(struct event_base *base, struct timeval *tv);
typedef struct tor_libevent_cfg { typedef struct tor_libevent_cfg {
int disable_iocp; int disable_iocp;
int num_cpus; int num_cpus;
int msec_per_tick;
} tor_libevent_cfg; } tor_libevent_cfg;
void tor_libevent_initialize(tor_libevent_cfg *cfg); void tor_libevent_initialize(tor_libevent_cfg *cfg);
@ -73,7 +74,6 @@ void tor_check_libevent_header_compatibility(void);
const char *tor_libevent_get_version_str(void); const char *tor_libevent_get_version_str(void);
#ifdef USE_BUFFEREVENTS #ifdef USE_BUFFEREVENTS
#define TOR_LIBEVENT_TICKS_PER_SECOND 3
const struct timeval *tor_libevent_get_one_tick_timeout(void); const struct timeval *tor_libevent_get_one_tick_timeout(void);
int tor_libevent_using_iocp_bufferevents(void); int tor_libevent_using_iocp_bufferevents(void);
int tor_set_bufferevent_rate_limit(struct bufferevent *bev, int tor_set_bufferevent_rate_limit(struct bufferevent *bev,

View File

@ -386,6 +386,7 @@ static config_var_t _option_vars[] = {
OBSOLETE("SysLog"), OBSOLETE("SysLog"),
V(TestSocks, BOOL, "0"), V(TestSocks, BOOL, "0"),
OBSOLETE("TestVia"), OBSOLETE("TestVia"),
V(TokenBucketRefillInterval, MSEC_INTERVAL, "10 msec"),
V(TrackHostExits, CSV, NULL), V(TrackHostExits, CSV, NULL),
V(TrackHostExitsExpire, INTERVAL, "30 minutes"), V(TrackHostExitsExpire, INTERVAL, "30 minutes"),
OBSOLETE("TrafficShaping"), OBSOLETE("TrafficShaping"),
@ -3165,6 +3166,11 @@ options_validate(or_options_t *old_options, or_options_t *options,
REJECT("TransPort and TransListenAddress are disabled in this build."); REJECT("TransPort and TransListenAddress are disabled in this build.");
#endif #endif
if (options->TokenBucketRefillInterval <= 0
|| options->TokenBucketRefillInterval > 1000) {
REJECT("TokenBucketRefillInterval must be between 1 and 1000 inclusive.");
}
if (options->AccountingMax && if (options->AccountingMax &&
(is_listening_on_low_port(options->ORPort, options->ORListenAddress) || (is_listening_on_low_port(options->ORPort, options->ORListenAddress) ||
is_listening_on_low_port(options->DirPort, options->DirListenAddress))) is_listening_on_low_port(options->DirPort, options->DirListenAddress)))
@ -3967,6 +3973,12 @@ options_transition_allowed(const or_options_t *old,
return -1; return -1;
} }
if (old->TokenBucketRefillInterval != new_val->TokenBucketRefillInterval) {
*msg = tor_strdup("While Tor is running, changing TokenBucketRefill"
"Interval is not allowed");
return -1;
}
if (old->DisableIOCP != new_val->DisableIOCP) { if (old->DisableIOCP != new_val->DisableIOCP) {
*msg = tor_strdup("While Tor is running, changing DisableIOCP " *msg = tor_strdup("While Tor is running, changing DisableIOCP "
"is not allowed."); "is not allowed.");
@ -5633,6 +5645,7 @@ init_libevent(const or_options_t *options)
memset(&cfg, 0, sizeof(cfg)); memset(&cfg, 0, sizeof(cfg));
cfg.disable_iocp = options->DisableIOCP; cfg.disable_iocp = options->DisableIOCP;
cfg.num_cpus = get_num_cpus(options); cfg.num_cpus = get_num_cpus(options);
cfg.msec_per_tick = options->TokenBucketRefillInterval;
tor_libevent_initialize(&cfg); tor_libevent_initialize(&cfg);

View File

@ -2388,22 +2388,23 @@ connection_bucket_init(void)
} }
} }
/** Refill a single <b>bucket</b> called <b>name</b> with bandwidth rate /** Refill a single <b>bucket</b> called <b>name</b> with bandwidth rate per
* <b>rate</b> and bandwidth burst <b>burst</b>, assuming that * second <b>rate</b> and bandwidth burst <b>burst</b>, assuming that
* <b>seconds_elapsed</b> seconds have passed since the last call. * <b>milliseconds_elapsed</b> milliseconds have passed since the last
**/ * call. */
static void static void
connection_bucket_refill_helper(int *bucket, int rate, int burst, connection_bucket_refill_helper(int *bucket, int rate, int burst,
int seconds_elapsed, const char *name) int milliseconds_elapsed,
const char *name)
{ {
int starting_bucket = *bucket; int starting_bucket = *bucket;
if (starting_bucket < burst && seconds_elapsed) { if (starting_bucket < burst && milliseconds_elapsed > 0) {
if (((burst - starting_bucket)/seconds_elapsed) < rate) { int64_t incr = (((int64_t)rate) * milliseconds_elapsed) / 1000;
if ((burst - starting_bucket) < incr) {
*bucket = burst; /* We would overflow the bucket; just set it to *bucket = burst; /* We would overflow the bucket; just set it to
* the maximum. */ * the maximum. */
} else { } else {
int incr = rate*seconds_elapsed; *bucket += (int)incr;
*bucket += incr;
if (*bucket > burst || *bucket < starting_bucket) { if (*bucket > burst || *bucket < starting_bucket) {
/* If we overflow the burst, or underflow our starting bucket, /* If we overflow the burst, or underflow our starting bucket,
* cap the bucket value to burst. */ * cap the bucket value to burst. */
@ -2416,41 +2417,46 @@ connection_bucket_refill_helper(int *bucket, int rate, int burst,
} }
} }
/** A second has rolled over; increment buckets appropriately. */ /** Time has passed; increment buckets appropriately. */
void void
connection_bucket_refill(int seconds_elapsed, time_t now) connection_bucket_refill(int milliseconds_elapsed, time_t now)
{ {
const or_options_t *options = get_options(); const or_options_t *options = get_options();
smartlist_t *conns = get_connection_array(); smartlist_t *conns = get_connection_array();
int relayrate, relayburst; int bandwidthrate, bandwidthburst, relayrate, relayburst;
bandwidthrate = (int)options->BandwidthRate;
bandwidthburst = (int)options->BandwidthBurst;
if (options->RelayBandwidthRate) { if (options->RelayBandwidthRate) {
relayrate = (int)options->RelayBandwidthRate; relayrate = (int)options->RelayBandwidthRate;
relayburst = (int)options->RelayBandwidthBurst; relayburst = (int)options->RelayBandwidthBurst;
} else { } else {
relayrate = (int)options->BandwidthRate; relayrate = bandwidthrate;
relayburst = (int)options->BandwidthBurst; relayburst = bandwidthburst;
} }
tor_assert(seconds_elapsed >= 0); tor_assert(milliseconds_elapsed >= 0);
write_buckets_empty_last_second = write_buckets_empty_last_second =
global_relayed_write_bucket <= 0 || global_write_bucket <= 0; global_relayed_write_bucket <= 0 || global_write_bucket <= 0;
/* refill the global buckets */ /* refill the global buckets */
connection_bucket_refill_helper(&global_read_bucket, connection_bucket_refill_helper(&global_read_bucket,
(int)options->BandwidthRate, bandwidthrate, bandwidthburst,
(int)options->BandwidthBurst, milliseconds_elapsed,
seconds_elapsed, "global_read_bucket"); "global_read_bucket");
connection_bucket_refill_helper(&global_write_bucket, connection_bucket_refill_helper(&global_write_bucket,
(int)options->BandwidthRate, bandwidthrate, bandwidthburst,
(int)options->BandwidthBurst, milliseconds_elapsed,
seconds_elapsed, "global_write_bucket"); "global_write_bucket");
connection_bucket_refill_helper(&global_relayed_read_bucket, connection_bucket_refill_helper(&global_relayed_read_bucket,
relayrate, relayburst, seconds_elapsed, relayrate, relayburst,
milliseconds_elapsed,
"global_relayed_read_bucket"); "global_relayed_read_bucket");
connection_bucket_refill_helper(&global_relayed_write_bucket, connection_bucket_refill_helper(&global_relayed_write_bucket,
relayrate, relayburst, seconds_elapsed, relayrate, relayburst,
milliseconds_elapsed,
"global_relayed_write_bucket"); "global_relayed_write_bucket");
/* refill the per-connection buckets */ /* refill the per-connection buckets */
@ -2458,18 +2464,20 @@ connection_bucket_refill(int seconds_elapsed, time_t now)
{ {
if (connection_speaks_cells(conn)) { if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn); or_connection_t *or_conn = TO_OR_CONN(conn);
int orbandwidthrate = or_conn->bandwidthrate;
int orbandwidthburst = or_conn->bandwidthburst;
if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) { if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) {
connection_bucket_refill_helper(&or_conn->read_bucket, connection_bucket_refill_helper(&or_conn->read_bucket,
or_conn->bandwidthrate, orbandwidthrate,
or_conn->bandwidthburst, orbandwidthburst,
seconds_elapsed, milliseconds_elapsed,
"or_conn->read_bucket"); "or_conn->read_bucket");
} }
if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) { if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) {
connection_bucket_refill_helper(&or_conn->write_bucket, connection_bucket_refill_helper(&or_conn->write_bucket,
or_conn->bandwidthrate, orbandwidthrate,
or_conn->bandwidthburst, orbandwidthburst,
seconds_elapsed, milliseconds_elapsed,
"or_conn->write_bucket"); "or_conn->write_bucket");
} }
} }
@ -2553,7 +2561,10 @@ connection_bucket_init(void)
burst = options->BandwidthBurst; burst = options->BandwidthBurst;
} }
rate /= TOR_LIBEVENT_TICKS_PER_SECOND; /* This can't overflow, since TokenBucketRefillInterval <= 1000,
* and rate started out less than INT32_MAX. */
rate = (rate * options->TokenBucketRefillInterval) / 1000;
bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst, bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst,
(uint32_t)rate, (uint32_t)burst, (uint32_t)rate, (uint32_t)burst,
tick); tick);

View File

@ -580,7 +580,12 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
{ {
const struct timeval *tick = tor_libevent_get_one_tick_timeout(); const struct timeval *tick = tor_libevent_get_one_tick_timeout();
struct ev_token_bucket_cfg *cfg, *old_cfg; struct ev_token_bucket_cfg *cfg, *old_cfg;
int rate_per_tick = rate / TOR_LIBEVENT_TICKS_PER_SECOND; int64_t rate64 = (((int64_t)rate) * options->TokenBucketRefillInterval)
/ 1000;
/* This can't overflow, since TokenBucketRefillInterval <= 1000,
* and rate started out less than INT_MAX. */
int rate_per_tick = (int) rate64;
cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick, cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick,
burst, tick); burst, tick);
old_cfg = conn->bucket_cfg; old_cfg = conn->bucket_cfg;

View File

@ -91,10 +91,10 @@ static int stats_prev_global_read_bucket;
/** What was the write bucket before the last second_elapsed_callback() call? /** What was the write bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've written). */ * (used to determine how many bytes we've written). */
static int stats_prev_global_write_bucket; static int stats_prev_global_write_bucket;
#else #endif
static uint64_t stats_prev_n_read = 0; static uint64_t stats_prev_n_read = 0;
static uint64_t stats_prev_n_written = 0; static uint64_t stats_prev_n_written = 0;
#endif
/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/ /* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
/** How many bytes have we read since we started the process? */ /** How many bytes have we read since we started the process? */
@ -1507,9 +1507,6 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
size_t bytes_written; size_t bytes_written;
size_t bytes_read; size_t bytes_read;
int seconds_elapsed; int seconds_elapsed;
#ifdef USE_BUFFEREVENTS
uint64_t cur_read,cur_written;
#endif
const or_options_t *options = get_options(); const or_options_t *options = get_options();
(void)timer; (void)timer;
(void)arg; (void)arg;
@ -1523,30 +1520,28 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
/* the second has rolled over. check more stuff. */ /* the second has rolled over. check more stuff. */
seconds_elapsed = current_second ? (int)(now - current_second) : 0; seconds_elapsed = current_second ? (int)(now - current_second) : 0;
#ifdef USE_BUFFEREVENTS #ifdef USE_BUFFEREVENTS
{
uint64_t cur_read,cur_written;
connection_get_rate_limit_totals(&cur_read, &cur_written); connection_get_rate_limit_totals(&cur_read, &cur_written);
bytes_written = (size_t)(cur_written - stats_prev_n_written); bytes_written = (size_t)(cur_written - stats_prev_n_written);
bytes_read = (size_t)(cur_read - stats_prev_n_read); bytes_read = (size_t)(cur_read - stats_prev_n_read);
#else
bytes_written = stats_prev_global_write_bucket - global_write_bucket;
bytes_read = stats_prev_global_read_bucket - global_read_bucket;
#endif
stats_n_bytes_read += bytes_read; stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written; stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && seconds_elapsed >= 0) if (accounting_is_enabled(options) && seconds_elapsed >= 0)
accounting_add_bytes(bytes_read, bytes_written, seconds_elapsed); accounting_add_bytes(bytes_read, bytes_written, seconds_elapsed);
control_event_bandwidth_used((uint32_t)bytes_read,(uint32_t)bytes_written);
control_event_stream_bandwidth_used();
if (seconds_elapsed > 0)
connection_bucket_refill(seconds_elapsed, now);
#ifdef USE_BUFFEREVENTS
stats_prev_n_written = cur_written; stats_prev_n_written = cur_written;
stats_prev_n_read = cur_read; stats_prev_n_read = cur_read;
}
#else #else
stats_prev_global_read_bucket = global_read_bucket; bytes_read = (size_t)(stats_n_bytes_read - stats_prev_n_read);
stats_prev_global_write_bucket = global_write_bucket; bytes_written = (size_t)(stats_n_bytes_written - stats_prev_n_written);
stats_prev_n_read = stats_n_bytes_read;
stats_prev_n_written = stats_n_bytes_written;
#endif #endif
control_event_bandwidth_used((uint32_t)bytes_read,(uint32_t)bytes_written);
control_event_stream_bandwidth_used();
if (server_mode(options) && if (server_mode(options) &&
!we_are_hibernating() && !we_are_hibernating() &&
seconds_elapsed > 0 && seconds_elapsed > 0 &&
@ -1594,6 +1589,57 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
current_second = now; /* remember which second it is, for next time */ current_second = now; /* remember which second it is, for next time */
} }
#ifndef USE_BUFFEREVENTS
/** Timer: used to invoke refill_callback(). */
static periodic_timer_t *refill_timer = NULL;
/** Libevent callback: invoked periodically to refill token buckets
* and count r/w bytes. It is only used when bufferevents are disabled. */
static void
refill_callback(periodic_timer_t *timer, void *arg)
{
static struct timeval current_millisecond;
struct timeval now;
size_t bytes_written;
size_t bytes_read;
int milliseconds_elapsed = 0;
int seconds_rolled_over = 0;
const or_options_t *options = get_options();
(void)timer;
(void)arg;
tor_gettimeofday(&now);
/* If this is our first time, no time has passed. */
if (current_millisecond.tv_sec) {
long mdiff = tv_mdiff(&current_millisecond, &now);
if (mdiff > INT_MAX)
mdiff = INT_MAX;
milliseconds_elapsed = (int)mdiff;
seconds_rolled_over = (int)(now.tv_sec - current_millisecond.tv_sec);
}
bytes_written = stats_prev_global_write_bucket - global_write_bucket;
bytes_read = stats_prev_global_read_bucket - global_read_bucket;
stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && milliseconds_elapsed >= 0)
accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over);
if (milliseconds_elapsed > 0)
connection_bucket_refill(milliseconds_elapsed, now.tv_sec);
stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket;
current_millisecond = now; /* remember what time it is, for next time */
}
#endif
#ifndef MS_WINDOWS #ifndef MS_WINDOWS
/** Called when a possibly ignorable libevent error occurs; ensures that we /** Called when a possibly ignorable libevent error occurs; ensures that we
* don't get into an infinite loop by ignoring too many errors from * don't get into an infinite loop by ignoring too many errors from
@ -1791,6 +1837,22 @@ do_main_loop(void)
tor_assert(second_timer); tor_assert(second_timer);
} }
#ifndef USE_BUFFEREVENTS
if (!refill_timer) {
struct timeval refill_interval;
int msecs = get_options()->TokenBucketRefillInterval;
refill_interval.tv_sec = msecs/1000;
refill_interval.tv_usec = (msecs%1000)*1000;
refill_timer = periodic_timer_new(tor_libevent_get_base(),
&refill_interval,
refill_callback,
NULL);
tor_assert(refill_timer);
}
#endif
for (;;) { for (;;) {
if (nt_service_is_stopping()) if (nt_service_is_stopping())
return 0; return 0;

View File

@ -3107,6 +3107,8 @@ typedef struct {
* log whether it was DNS-leaking or not? */ * log whether it was DNS-leaking or not? */
int HardwareAccel; /**< Boolean: Should we enable OpenSSL hardware int HardwareAccel; /**< Boolean: Should we enable OpenSSL hardware
* acceleration where available? */ * acceleration where available? */
/** Token Bucket Refill resolution in milliseconds. */
int TokenBucketRefillInterval;
char *AccelName; /**< Optional hardware acceleration engine name. */ char *AccelName; /**< Optional hardware acceleration engine name. */
char *AccelDir; /**< Optional hardware acceleration engine search dir. */ char *AccelDir; /**< Optional hardware acceleration engine search dir. */
int UseEntryGuards; /**< Boolean: Do we try to enter from a smallish number int UseEntryGuards; /**< Boolean: Do we try to enter from a smallish number