Add a separate set of token buckets for relayed traffic. Right

now that's just defined as answers to directory requests.


svn:r9881
This commit is contained in:
Roger Dingledine 2007-03-20 02:55:31 +00:00
parent 4ab0b979b2
commit b4f743562f
5 changed files with 203 additions and 93 deletions

View File

@ -65,8 +65,7 @@ bandwidth usage to that same value. (Default: 3 MB)
.TP
\fBBandwidthBurst \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
Limit the maximum token bucket size (also known as the burst) to the
given number of bytes in each direction. This value should be at least
twice your BandwidthRate. (Default: 6 MB)
given number of bytes in each direction. (Default: 6 MB)
.LP
.TP
\fBMaxAdvertisedBandwidth \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
@ -77,6 +76,20 @@ advertised bandwidth rate) can thus reduce the CPU demands on their
server without impacting network performance.
.LP
.TP
\fBRelayBandwidthRate \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
If defined, a separate token bucket limits the average incoming bandwidth
usage for _relayed traffic_ on this node to the specified number of
bytes per second, and the average outgoing bandwidth usage to that same
value. Relayed traffic is currently defined as answers to directory
requests, but that may change. (Default: 0)
.LP
.TP
\fBRelayBandwidthBurst \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
Limit the maximum token bucket size (also known as the burst) for
_relayed traffic_ to the
given number of bytes in each direction. (Default: 0)
.LP
.TP
\fBConnLimit \fR\fINUM\fP
The minimum number of file descriptors that must be available to
the Tor process before it will start. Tor will ask the OS for as

View File

@ -216,6 +216,8 @@ static config_var_t _option_vars[] = {
VAR("RecommendedClientVersions", LINELIST, RecommendedClientVersions, NULL),
VAR("RecommendedServerVersions", LINELIST, RecommendedServerVersions, NULL),
VAR("RedirectExit", LINELIST, RedirectExit, NULL),
VAR("RelayBandwidthBurst", MEMUNIT, RelayBandwidthBurst, "0"),
VAR("RelayBandwidthRate", MEMUNIT, RelayBandwidthRate, "0"),
VAR("RendExcludeNodes", STRING, RendExcludeNodes, NULL),
VAR("RendNodes", STRING, RendNodes, NULL),
VAR("RendPostPeriod", INTERVAL, RendPostPeriod, "1 hour"),
@ -2666,6 +2668,19 @@ options_validate(or_options_t *old_options, or_options_t *options,
*msg = tor_strdup(r >= 0 ? buf : "internal error");
return -1;
}
if (options->RelayBandwidthRate > options->RelayBandwidthBurst)
REJECT("RelayBandwidthBurst must be at least equal "
"to RelayBandwidthRate.");
if (options->RelayBandwidthRate &&
options->RelayBandwidthRate < ROUTER_REQUIRED_MIN_BANDWIDTH) {
r = tor_snprintf(buf, sizeof(buf),
"RelayBandwidthRate is set to %d bytes/second. "
"For servers, it must be at least %d.",
(int)options->RelayBandwidthRate,
ROUTER_REQUIRED_MIN_BANDWIDTH);
*msg = tor_strdup(r >= 0 ? buf : "internal error");
return -1;
}
}
if (options->BandwidthRate > options->BandwidthBurst)

View File

@ -1105,11 +1105,28 @@ connection_is_rate_limited(connection_t *conn)
}
extern int global_read_bucket, global_write_bucket;
extern int global_relayed_read_bucket, global_relayed_write_bucket;
/** Did our global write bucket run dry last second? If so, we are
* likely to run dry again this second, so be stingy with the tokens
* we just put in. */
static int global_write_bucket_empty_last_second = 0;
/** Did our either global write bucket run dry last second? If so,
* we are likely to run dry again this second, so be stingy with the
* tokens we just put in. */
static int write_buckets_empty_last_second = 0;
/** Return 1 if <b>conn</b> should use tokens from the "relayed"
* bandwidth rates, else 0. Currently, only OR conns with bandwidth
* class 1, and directory conns that are serving data out, count.
*/
static int
connection_counts_as_relayed_traffic(connection_t *conn)
{
#if 0
if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bandwidth_class)
return 1;
#endif
if (conn->type == CONN_TYPE_DIR && DIR_CONN_IS_SERVER(conn))
return 1;
return 0;
}
/** Helper function to decide how many bytes out of <b>global_bucket</b>
* we're willing to use for this transaction. <b>base</b> is the size
@ -1153,16 +1170,25 @@ connection_bucket_read_limit(connection_t *conn)
CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR;
int conn_bucket = -1;
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
int global_bucket = global_read_bucket;
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
conn_bucket = or_conn->read_bucket;
if (conn->state == OR_CONN_STATE_OPEN)
conn_bucket = or_conn->read_bucket;
}
if (!connection_is_rate_limited(conn)) {
/* be willing to read on local conns even if our buckets are empty */
return conn_bucket>=0 ? conn_bucket : 1<<14;
}
if (connection_counts_as_relayed_traffic(conn) &&
global_relayed_read_bucket <= global_read_bucket)
global_bucket = global_relayed_read_bucket;
return connection_bucket_round_robin(base, priority,
global_read_bucket, conn_bucket);
global_bucket, conn_bucket);
}
/** How many bytes at most can we write onto this connection? */
@ -1172,24 +1198,31 @@ connection_bucket_write_limit(connection_t *conn)
int base = connection_speaks_cells(conn) ?
CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR;
int global_bucket = global_write_bucket;
if (!connection_is_rate_limited(conn)) {
/* be willing to write to local conns even if our buckets are empty */
return conn->outbuf_flushlen;
}
return connection_bucket_round_robin(base, priority, global_write_bucket,
if (connection_counts_as_relayed_traffic(conn) &&
global_relayed_write_bucket <= global_write_bucket)
global_bucket = global_relayed_write_bucket;
return connection_bucket_round_robin(base, priority, global_bucket,
conn->outbuf_flushlen);
}
/** Return 1 if the global write bucket is low enough that we shouldn't
* send <b>attempt</b> bytes of low-priority directory stuff out to
* <b>conn</b>. Else return 0.
/** Return 1 if the global write buckets are low enough that we
* shouldn't send <b>attempt</b> bytes of low-priority directory stuff
* out to <b>conn</b>. Else return 0.
* Priority is 1 for v1 requests (directories and running-routers),
* and 2 for v2 requests (statuses and descriptors). But see FFFF in
* directory_handle_command_get() for why we don't use priority 2 yet.
*
* There are a lot of parameters we could use here:
* - global_relayed_write_bucket. Low is bad.
* - global_write_bucket. Low is bad.
* - bandwidthrate. Low is bad.
* - bandwidthburst. Not a big factor?
@ -1203,22 +1236,26 @@ connection_bucket_write_limit(connection_t *conn)
int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{
int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
global_write_bucket : global_relayed_write_bucket;
if (authdir_mode(get_options()) && priority>1)
return 0; /* there's always room to answer v2 if we're an auth dir */
if (!connection_is_rate_limited(conn))
return 0; /* local conns don't get limited */
if (global_write_bucket < (int)attempt)
if (smaller_bucket < (int)attempt)
return 1; /* not enough space no matter the priority */
if (global_write_bucket_empty_last_second)
if (write_buckets_empty_last_second)
return 1; /* we're already hitting our limits, no more please */
if (priority == 1) { /* old-style v1 query */
/* Could we handle *two* of these requests within the next two seconds? */
int64_t can_write = (int64_t)global_write_bucket
+ 2*get_options()->BandwidthRate;
or_options_t *options = get_options();
int64_t can_write = (int64_t)smaller_bucket
+ 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate :
options->BandwidthRate);
if (can_write < 2*(int64_t)attempt)
return 1;
} else { /* v2 query */
@ -1227,14 +1264,28 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
return 0;
}
/** We just read num_read onto conn. Decrement buckets appropriately. */
/** We just read num_read and wrote num_written onto conn.
* Decrement buckets appropriately. */
static void
connection_read_bucket_decrement(connection_t *conn, int num_read)
connection_buckets_decrement(connection_t *conn, time_t now,
int num_read, int num_written)
{
global_read_bucket -= num_read;
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
TO_OR_CONN(conn)->read_bucket -= num_read;
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
if (num_read > 0)
rep_hist_note_bytes_read(num_read, now);
if (num_written > 0)
rep_hist_note_bytes_written(num_written, now);
if (connection_counts_as_relayed_traffic(conn)) {
global_relayed_read_bucket -= num_read;
global_relayed_write_bucket -= num_written;
}
global_read_bucket -= num_read;
global_write_bucket -= num_written;
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN)
TO_OR_CONN(conn)->read_bucket -= num_read;
}
/** If we have exhausted our global buckets, or the buckets for conn,
@ -1242,21 +1293,23 @@ connection_read_bucket_decrement(connection_t *conn, int num_read)
static void
connection_consider_empty_read_buckets(connection_t *conn)
{
const char *reason;
if (global_read_bucket <= 0) {
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"global read bucket exhausted. Pausing."));
conn->wants_to_read = 1;
connection_stop_reading(conn);
return;
}
if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
TO_OR_CONN(conn)->read_bucket <= 0) {
LOG_FN_CONN(conn,
(LOG_DEBUG,LD_NET,"read bucket exhausted. Pausing."));
conn->wants_to_read = 1;
connection_stop_reading(conn);
}
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn) &&
global_relayed_read_bucket <= 0) {
reason = "global relayed read bucket exhausted. Pausing.";
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
TO_OR_CONN(conn)->read_bucket <= 0) {
reason = "connection read bucket exhausted. Pausing.";
} else
return; /* all good, no need to stop it */
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
conn->wants_to_read = 1;
connection_stop_reading(conn);
}
/** If we have exhausted our global buckets, or the buckets for conn,
@ -1264,26 +1317,28 @@ connection_consider_empty_read_buckets(connection_t *conn)
static void
connection_consider_empty_write_buckets(connection_t *conn)
{
const char *reason;
if (global_write_bucket <= 0) {
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"global write bucket exhausted. Pausing."));
conn->wants_to_write = 1;
connection_stop_writing(conn);
return;
}
reason = "global write bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn) &&
global_relayed_write_bucket <= 0) {
reason = "global relayed write bucket exhausted. Pausing.";
#if 0
if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
TO_OR_CONN(conn)->write_bucket <= 0) {
LOG_FN_CONN(conn,
(LOG_DEBUG,LD_NET,"write bucket exhausted. Pausing."));
conn->wants_to_write = 1;
connection_stop_writing(conn);
}
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
TO_OR_CONN(conn)->write_bucket <= 0) {
reason = "connection write bucket exhausted. Pausing.";
#endif
} else
return; /* all good, no need to stop it */
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
conn->wants_to_write = 1;
connection_stop_writing(conn);
}
/** Initialize the global read bucket to options->BandwidthBurst. */
/** Initialize the global read bucket to options-\>BandwidthBurst. */
void
connection_bucket_init(void)
{
@ -1291,8 +1346,28 @@ connection_bucket_init(void)
/* start it at max traffic */
global_read_bucket = (int)options->BandwidthBurst;
global_write_bucket = (int)options->BandwidthBurst;
if (options->RelayBandwidthRate) {
global_relayed_read_bucket = (int)options->RelayBandwidthBurst;
global_relayed_write_bucket = (int)options->RelayBandwidthBurst;
} else {
global_relayed_read_bucket = (int)options->BandwidthBurst;
global_relayed_write_bucket = (int)options->BandwidthBurst;
}
}
static void
connection_bucket_refill_helper(int *bucket, int rate, int burst,
int seconds_elapsed, const char *name)
{
if (*bucket < burst) {
*bucket += rate*seconds_elapsed;
if (*bucket > burst)
*bucket = burst;
log(LOG_DEBUG, LD_NET,"%s now %d.", name, *bucket);
}
}
/** A second has rolled over; increment buckets appropriately. */
void
connection_bucket_refill(int seconds_elapsed)
@ -1301,23 +1376,36 @@ connection_bucket_refill(int seconds_elapsed)
connection_t *conn;
connection_t **carray;
or_options_t *options = get_options();
int relayrate, relayburst;
if (options->RelayBandwidthRate) {
relayrate = (int)options->RelayBandwidthRate;
relayburst = (int)options->RelayBandwidthBurst;
} else {
relayrate = (int)options->BandwidthRate;
relayburst = (int)options->BandwidthBurst;
}
tor_assert(seconds_elapsed >= 0);
write_buckets_empty_last_second =
global_relayed_write_bucket == 0 || global_write_bucket == 0;
/* refill the global buckets */
if (global_read_bucket < (int)options->BandwidthBurst) {
global_read_bucket += (int)options->BandwidthRate*seconds_elapsed;
if (global_read_bucket > (int)options->BandwidthBurst)
global_read_bucket = (int)options->BandwidthBurst;
log(LOG_DEBUG, LD_NET,"global_read_bucket now %d.", global_read_bucket);
}
if (global_write_bucket < (int)options->BandwidthBurst) {
global_write_bucket_empty_last_second = global_write_bucket == 0;
global_write_bucket += (int)options->BandwidthRate*seconds_elapsed;
if (global_write_bucket > (int)options->BandwidthBurst)
global_write_bucket = (int)options->BandwidthBurst;
log(LOG_DEBUG, LD_NET,"global_write_bucket now %d.", global_write_bucket);
}
connection_bucket_refill_helper(&global_read_bucket,
(int)options->BandwidthRate,
(int)options->BandwidthBurst,
seconds_elapsed, "global_read_bucket");
connection_bucket_refill_helper(&global_write_bucket,
(int)options->BandwidthRate,
(int)options->BandwidthBurst,
seconds_elapsed, "global_write_bucket");
connection_bucket_refill_helper(&global_relayed_read_bucket,
relayrate, relayburst, seconds_elapsed,
"global_relayed_read_bucket");
connection_bucket_refill_helper(&global_relayed_write_bucket,
relayrate, relayburst, seconds_elapsed,
"global_relayed_write_bucket");
/* refill the per-connection buckets */
get_connection_array(&carray,&n);
@ -1337,19 +1425,25 @@ connection_bucket_refill(int seconds_elapsed)
if (conn->wants_to_read == 1 /* it's marked to turn reading back on now */
&& global_read_bucket > 0 /* and we're allowed to read */
&& (!connection_counts_as_relayed_traffic(conn) ||
global_relayed_read_bucket > 0) /* even if we're relayed traffic */
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
TO_OR_CONN(conn)->read_bucket > 0)) {
/* and either a non-cell conn or a cell conn with non-empty bucket */
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for read",conn->s));
"waking up conn (fd %d) for read", conn->s));
conn->wants_to_read = 0;
connection_start_reading(conn);
}
if (conn->wants_to_write == 1 &&
global_write_bucket > 0) { /* and we're allowed to write */
if (conn->wants_to_write == 1
&& global_write_bucket > 0 /* and we're allowed to write */
&& (!connection_counts_as_relayed_traffic(conn) ||
global_relayed_write_bucket > 0)) {
/* even if we're relayed traffic */
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for write",conn->s));
"waking up conn (fd %d) for write", conn->s));
conn->wants_to_write = 0;
connection_start_writing(conn);
}
@ -1561,18 +1655,7 @@ connection_read_to_buf(connection_t *conn, int *max_to_read)
edge_conn->n_read += n_read;
}
if (connection_is_rate_limited(conn)) {
/* For non-local IPs, remember if we flushed any bytes over the wire. */
time_t now = time(NULL);
if (n_read > 0) {
rep_hist_note_bytes_read(n_read, now);
connection_read_bucket_decrement(conn, n_read);
}
if (n_written > 0) {
rep_hist_note_bytes_written(n_written, now);
global_write_bucket -= n_written;
}
}
connection_buckets_decrement(conn, time(NULL), n_read, n_written);
if (more_to_read && result == at_most) {
bytes_in_buf = buf_capacity(conn->inbuf) - buf_datalen(conn->inbuf);
@ -1762,18 +1845,7 @@ connection_handle_write(connection_t *conn, int force)
edge_conn->n_written += n_written;
}
if (connection_is_rate_limited(conn)) {
/* For non-local IPs, remember if we flushed any bytes over the wire. */
time_t now = time(NULL);
if (n_written > 0) {
rep_hist_note_bytes_written(n_written, now);
global_write_bucket -= n_written;
}
if (n_read > 0) {
rep_hist_note_bytes_read(n_read, now);
connection_read_bucket_decrement(conn, n_read);
}
}
connection_buckets_decrement(conn, time(NULL), n_read, n_written);
if (result > 0) {
/* If we wrote any bytes from our buffer, then call the appropriate

View File

@ -34,12 +34,18 @@ static int conn_close_if_marked(int i);
int global_read_bucket; /**< Max number of bytes I can read this second. */
int global_write_bucket; /**< Max number of bytes I can write this second. */
/** Max number of relayed (bandwidth class 1) bytes I can read this second. */
int global_relayed_read_bucket;
/** Max number of relayed (bandwidth class 1) bytes I can write this second. */
int global_relayed_write_bucket;
/** What was the read bucket before the last call to prepare_for_pool?
* (used to determine how many bytes we've read). */
static int stats_prev_global_read_bucket;
/** What was the write bucket before the last call to prepare_for_pool?
* (used to determine how many bytes we've written). */
static int stats_prev_global_write_bucket;
/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
/** How many bytes have we read/written since we started the process? */
static uint64_t stats_n_bytes_read = 0;
static uint64_t stats_n_bytes_written = 0;

View File

@ -804,7 +804,7 @@ typedef struct or_connection_t {
int n_circuits; /**< How many circuits use this connection as p_conn or
* n_conn ? */
struct or_connection_t *next_with_same_id; /**< Next connection with same
* identity digest as this one. */
* identity digest as this one. */
/** Linked list of bridged dirserver connections that can't write until
* this connection's outbuf is less full. */
struct dir_connection_t *blocked_dir_connections;
@ -1697,6 +1697,10 @@ typedef struct {
* to use in a second? */
uint64_t MaxAdvertisedBandwidth; /**< How much bandwidth are we willing to
* tell people we have? */
uint64_t RelayBandwidthRate; /**< How much bandwidth, on average, are we
* willing to use for all relayed conns? */
uint64_t RelayBandwidthBurst; /**< How much bandwidth, at maximum, will we
* use in a second for all relayed conns? */
int NumCpus; /**< How many CPUs should we try to use? */
int RunTesting; /**< If true, create testing circuits to measure how well the
* other ORs are running. */