New torrc option to allow bucket refill intervals of less than 1 sec

Implements bug3630.
This commit is contained in:
Florian Tschorsch 2011-09-07 20:21:53 -04:00 committed by Nick Mathewson
parent 40288e1e66
commit 6b1d8321ae
5 changed files with 141 additions and 52 deletions

View File

@ -737,6 +737,14 @@ The following options are useful only for clients (that is, if
unattached waiting for an appropriate circuit, before we fail it. (Default: unattached waiting for an appropriate circuit, before we fail it. (Default:
2 minutes.) 2 minutes.)
**TokenBucketRefillInterval** __NUM__::
Set the refill interval of Tor's token bucket to NUM milliseconds.
NUM must be positive and either a divisor or a multiple of 1 second.
Note that this option retains the configured bandwidth limits and refills
token buckets only in ratio to the interval. This option will be ignored
when Tor was built with Libevent's bufferevents enabled. (Default: 1 second)
**TrackHostExits** __host__,__.domain__,__...__:: **TrackHostExits** __host__,__.domain__,__...__::
For each value in the comma separated list, Tor will track recent For each value in the comma separated list, Tor will track recent
connections to hosts that match this value and attempt to reuse the same connections to hosts that match this value and attempt to reuse the same

View File

@ -386,6 +386,7 @@ static config_var_t _option_vars[] = {
OBSOLETE("SysLog"), OBSOLETE("SysLog"),
V(TestSocks, BOOL, "0"), V(TestSocks, BOOL, "0"),
OBSOLETE("TestVia"), OBSOLETE("TestVia"),
V(TokenBucketRefillInterval, MSEC_INTERVAL, "10 msec"),
V(TrackHostExits, CSV, NULL), V(TrackHostExits, CSV, NULL),
V(TrackHostExitsExpire, INTERVAL, "30 minutes"), V(TrackHostExitsExpire, INTERVAL, "30 minutes"),
OBSOLETE("TrafficShaping"), OBSOLETE("TrafficShaping"),
@ -1382,6 +1383,13 @@ options_act(const or_options_t *old_options)
if (accounting_is_enabled(options)) if (accounting_is_enabled(options))
configure_accounting(time(NULL)); configure_accounting(time(NULL));
if (options->TokenBucketRefillInterval < 0
|| options->TokenBucketRefillInterval > 1000) {
log_warn(LD_CONFIG, "Token bucket refill interval must be in the range "
"of [0:1000]");
return -1;
}
#ifdef USE_BUFFEREVENTS #ifdef USE_BUFFEREVENTS
/* If we're using the bufferevents implementation and our rate limits /* If we're using the bufferevents implementation and our rate limits
* changed, we need to tell the rate-limiting system about it. */ * changed, we need to tell the rate-limiting system about it. */

View File

@ -2389,20 +2389,21 @@ connection_bucket_init(void)
} }
/** Refill a single <b>bucket</b> called <b>name</b> with bandwidth rate /** Refill a single <b>bucket</b> called <b>name</b> with bandwidth rate
* <b>rate</b> and bandwidth burst <b>burst</b>, assuming that * per millisecond <b>rate</b> and bandwidth burst per refill interval
* <b>seconds_elapsed</b> seconds have passed since the last call. * <b>burst</b>, assuming that <b>milliseconds_elapsed</b> milliseconds
**/ * have passed since the last call. */
static void static void
connection_bucket_refill_helper(int *bucket, int rate, int burst, connection_bucket_refill_helper(int *bucket, int rate, int burst,
int seconds_elapsed, const char *name) int milliseconds_elapsed,
const char *name)
{ {
int starting_bucket = *bucket; int starting_bucket = *bucket;
if (starting_bucket < burst && seconds_elapsed) { if (starting_bucket < burst && milliseconds_elapsed) {
if (((burst - starting_bucket)/seconds_elapsed) < rate) { if (((burst - starting_bucket)/milliseconds_elapsed) < rate) {
*bucket = burst; /* We would overflow the bucket; just set it to *bucket = burst; /* We would overflow the bucket; just set it to
* the maximum. */ * the maximum. */
} else { } else {
int incr = rate*seconds_elapsed; int incr = rate*milliseconds_elapsed;
*bucket += incr; *bucket += incr;
if (*bucket > burst || *bucket < starting_bucket) { if (*bucket > burst || *bucket < starting_bucket) {
/* If we overflow the burst, or underflow our starting bucket, /* If we overflow the burst, or underflow our starting bucket,
@ -2416,41 +2417,46 @@ connection_bucket_refill_helper(int *bucket, int rate, int burst,
} }
} }
/** A second has rolled over; increment buckets appropriately. */ /** Time has passed; increment buckets appropriately. */
void void
connection_bucket_refill(int seconds_elapsed, time_t now) connection_bucket_refill(int milliseconds_elapsed, time_t now)
{ {
const or_options_t *options = get_options(); const or_options_t *options = get_options();
smartlist_t *conns = get_connection_array(); smartlist_t *conns = get_connection_array();
int relayrate, relayburst; int bandwidthrate, bandwidthburst, relayrate, relayburst;
bandwidthrate = (int)options->BandwidthRate / 1000;
bandwidthburst = (int)options->BandwidthBurst;
if (options->RelayBandwidthRate) { if (options->RelayBandwidthRate) {
relayrate = (int)options->RelayBandwidthRate; relayrate = (int)options->RelayBandwidthRate / 1000;
relayburst = (int)options->RelayBandwidthBurst; relayburst = (int)options->RelayBandwidthBurst;
} else { } else {
relayrate = (int)options->BandwidthRate; relayrate = bandwidthrate;
relayburst = (int)options->BandwidthBurst; relayburst = bandwidthburst;
} }
tor_assert(seconds_elapsed >= 0); tor_assert(milliseconds_elapsed >= 0);
write_buckets_empty_last_second = write_buckets_empty_last_second =
global_relayed_write_bucket <= 0 || global_write_bucket <= 0; global_relayed_write_bucket <= 0 || global_write_bucket <= 0;
/* refill the global buckets */ /* refill the global buckets */
connection_bucket_refill_helper(&global_read_bucket, connection_bucket_refill_helper(&global_read_bucket,
(int)options->BandwidthRate, bandwidthrate, bandwidthburst,
(int)options->BandwidthBurst, milliseconds_elapsed,
seconds_elapsed, "global_read_bucket"); "global_read_bucket");
connection_bucket_refill_helper(&global_write_bucket, connection_bucket_refill_helper(&global_write_bucket,
(int)options->BandwidthRate, bandwidthrate, bandwidthburst,
(int)options->BandwidthBurst, milliseconds_elapsed,
seconds_elapsed, "global_write_bucket"); "global_read_bucket");
connection_bucket_refill_helper(&global_relayed_read_bucket, connection_bucket_refill_helper(&global_relayed_read_bucket,
relayrate, relayburst, seconds_elapsed, relayrate, relayburst,
milliseconds_elapsed,
"global_relayed_read_bucket"); "global_relayed_read_bucket");
connection_bucket_refill_helper(&global_relayed_write_bucket, connection_bucket_refill_helper(&global_relayed_write_bucket,
relayrate, relayburst, seconds_elapsed, relayrate, relayburst,
milliseconds_elapsed,
"global_relayed_write_bucket"); "global_relayed_write_bucket");
/* refill the per-connection buckets */ /* refill the per-connection buckets */
@ -2458,18 +2464,20 @@ connection_bucket_refill(int seconds_elapsed, time_t now)
{ {
if (connection_speaks_cells(conn)) { if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn); or_connection_t *or_conn = TO_OR_CONN(conn);
int orbandwidthrate = or_conn->bandwidthrate / 1000;
int orbandwidthburst = or_conn->bandwidthburst;
if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) { if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) {
connection_bucket_refill_helper(&or_conn->read_bucket, connection_bucket_refill_helper(&or_conn->read_bucket,
or_conn->bandwidthrate, orbandwidthrate,
or_conn->bandwidthburst, orbandwidthburst,
seconds_elapsed, milliseconds_elapsed,
"or_conn->read_bucket"); "or_conn->read_bucket");
} }
if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) { if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) {
connection_bucket_refill_helper(&or_conn->write_bucket, connection_bucket_refill_helper(&or_conn->write_bucket,
or_conn->bandwidthrate, orbandwidthrate,
or_conn->bandwidthburst, orbandwidthburst,
seconds_elapsed, milliseconds_elapsed,
"or_conn->write_bucket"); "or_conn->write_bucket");
} }
} }

View File

@ -91,10 +91,10 @@ static int stats_prev_global_read_bucket;
/** What was the write bucket before the last second_elapsed_callback() call? /** What was the write bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've written). */ * (used to determine how many bytes we've written). */
static int stats_prev_global_write_bucket; static int stats_prev_global_write_bucket;
#else #endif
static uint64_t stats_prev_n_read = 0; static uint64_t stats_prev_n_read = 0;
static uint64_t stats_prev_n_written = 0; static uint64_t stats_prev_n_written = 0;
#endif
/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/ /* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
/** How many bytes have we read since we started the process? */ /** How many bytes have we read since we started the process? */
@ -1507,9 +1507,6 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
size_t bytes_written; size_t bytes_written;
size_t bytes_read; size_t bytes_read;
int seconds_elapsed; int seconds_elapsed;
#ifdef USE_BUFFEREVENTS
uint64_t cur_read,cur_written;
#endif
const or_options_t *options = get_options(); const or_options_t *options = get_options();
(void)timer; (void)timer;
(void)arg; (void)arg;
@ -1523,30 +1520,28 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
/* the second has rolled over. check more stuff. */ /* the second has rolled over. check more stuff. */
seconds_elapsed = current_second ? (int)(now - current_second) : 0; seconds_elapsed = current_second ? (int)(now - current_second) : 0;
#ifdef USE_BUFFEREVENTS #ifdef USE_BUFFEREVENTS
{
uint64_t cur_read,cur_written;
connection_get_rate_limit_totals(&cur_read, &cur_written); connection_get_rate_limit_totals(&cur_read, &cur_written);
bytes_written = (size_t)(cur_written - stats_prev_n_written); bytes_written = (size_t)(cur_written - stats_prev_n_written);
bytes_read = (size_t)(cur_read - stats_prev_n_read); bytes_read = (size_t)(cur_read - stats_prev_n_read);
#else
bytes_written = stats_prev_global_write_bucket - global_write_bucket;
bytes_read = stats_prev_global_read_bucket - global_read_bucket;
#endif
stats_n_bytes_read += bytes_read; stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written; stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && seconds_elapsed >= 0) if (accounting_is_enabled(options) && seconds_elapsed >= 0)
accounting_add_bytes(bytes_read, bytes_written, seconds_elapsed); accounting_add_bytes(bytes_read, bytes_written, seconds_elapsed);
control_event_bandwidth_used((uint32_t)bytes_read,(uint32_t)bytes_written);
control_event_stream_bandwidth_used();
if (seconds_elapsed > 0)
connection_bucket_refill(seconds_elapsed, now);
#ifdef USE_BUFFEREVENTS
stats_prev_n_written = cur_written; stats_prev_n_written = cur_written;
stats_prev_n_read = cur_read; stats_prev_n_read = cur_read;
}
#else #else
stats_prev_global_read_bucket = global_read_bucket; bytes_read = (size_t)(stats_n_bytes_read - stats_prev_n_read);
stats_prev_global_write_bucket = global_write_bucket; bytes_written = (size_t)(stats_n_bytes_written - stats_prev_n_written);
stats_prev_n_read = stats_n_bytes_read;
stats_prev_n_written = stats_n_bytes_written;
#endif #endif
control_event_bandwidth_used((uint32_t)bytes_read,(uint32_t)bytes_written);
control_event_stream_bandwidth_used();
if (server_mode(options) && if (server_mode(options) &&
!we_are_hibernating() && !we_are_hibernating() &&
seconds_elapsed > 0 && seconds_elapsed > 0 &&
@ -1594,6 +1589,57 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
current_second = now; /* remember which second it is, for next time */ current_second = now; /* remember which second it is, for next time */
} }
#ifndef USE_BUFFEREVENTS
/** Timer: used to invoke refill_callback(). */
static periodic_timer_t *refill_timer = NULL;
/** Libevent callback: invoked periodically to refill token buckets
* and count r/w bytes. It is only used when bufferevents are disabled. */
static void
refill_callback(periodic_timer_t *timer, void *arg)
{
static struct timeval current_millisecond;
struct timeval now;
size_t bytes_written;
size_t bytes_read;
int milliseconds_elapsed = 0;
int seconds_rolled_over = 0;
const or_options_t *options = get_options();
(void)timer;
(void)arg;
tor_gettimeofday(&now);
/* If this is our first time, no time has passed. */
if (current_millisecond.tv_sec) {
long mdiff = tv_mdiff(&current_millisecond, &now);
if (mdiff > INT_MAX)
mdiff = INT_MAX;
milliseconds_elapsed = (int)mdiff;
seconds_rolled_over = (int)(now.tv_sec - current_millisecond.tv_sec);
}
bytes_written = stats_prev_global_write_bucket - global_write_bucket;
bytes_read = stats_prev_global_read_bucket - global_read_bucket;
stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && milliseconds_elapsed >= 0)
accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over);
if (milliseconds_elapsed > 0)
connection_bucket_refill(milliseconds_elapsed, now.tv_sec);
stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket;
current_millisecond = now; /* remember what time it is, for next time */
}
#endif
#ifndef MS_WINDOWS #ifndef MS_WINDOWS
/** Called when a possibly ignorable libevent error occurs; ensures that we /** Called when a possibly ignorable libevent error occurs; ensures that we
* don't get into an infinite loop by ignoring too many errors from * don't get into an infinite loop by ignoring too many errors from
@ -1791,6 +1837,22 @@ do_main_loop(void)
tor_assert(second_timer); tor_assert(second_timer);
} }
#ifndef USE_BUFFEREVENTS
if (!refill_timer) {
struct timeval refill_interval;
int msecs = get_options()->TokenBucketRefillInterval;
refill_interval.tv_sec = msecs/1000;
refill_interval.tv_usec = (msecs%1000)*1000;
refill_timer = periodic_timer_new(tor_libevent_get_base(),
&refill_interval,
refill_callback,
NULL);
tor_assert(refill_timer);
}
#endif
for (;;) { for (;;) {
if (nt_service_is_stopping()) if (nt_service_is_stopping())
return 0; return 0;

View File

@ -3107,6 +3107,9 @@ typedef struct {
* log whether it was DNS-leaking or not? */ * log whether it was DNS-leaking or not? */
int HardwareAccel; /**< Boolean: Should we enable OpenSSL hardware int HardwareAccel; /**< Boolean: Should we enable OpenSSL hardware
* acceleration where available? */ * acceleration where available? */
/** Token Bucket Refill resolution in milliseconds (ignored when
* bufferevents are enabled) */
int TokenBucketRefillInterval;
char *AccelName; /**< Optional hardware acceleration engine name. */ char *AccelName; /**< Optional hardware acceleration engine name. */
char *AccelDir; /**< Optional hardware acceleration engine search dir. */ char *AccelDir; /**< Optional hardware acceleration engine search dir. */
int UseEntryGuards; /**< Boolean: Do we try to enter from a smallish number int UseEntryGuards; /**< Boolean: Do we try to enter from a smallish number