From b4f743562f609792619199007c2d091968f72a71 Mon Sep 17 00:00:00 2001 From: Roger Dingledine Date: Tue, 20 Mar 2007 02:55:31 +0000 Subject: [PATCH] Add a separate set of token buckets for relayed traffic. Right now that's just defined as answers to directory requests. svn:r9881 --- doc/tor.1.in | 17 ++- src/or/config.c | 15 +++ src/or/connection.c | 252 ++++++++++++++++++++++++++++---------------- src/or/main.c | 6 ++ src/or/or.h | 6 +- 5 files changed, 203 insertions(+), 93 deletions(-) diff --git a/doc/tor.1.in b/doc/tor.1.in index eb05dd7448..6a7e0da7a4 100644 --- a/doc/tor.1.in +++ b/doc/tor.1.in @@ -65,8 +65,7 @@ bandwidth usage to that same value. (Default: 3 MB) .TP \fBBandwidthBurst \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP Limit the maximum token bucket size (also known as the burst) to the -given number of bytes in each direction. This value should be at least -twice your BandwidthRate. (Default: 6 MB) +given number of bytes in each direction. (Default: 6 MB) .LP .TP \fBMaxAdvertisedBandwidth \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP @@ -77,6 +76,20 @@ advertised bandwidth rate) can thus reduce the CPU demands on their server without impacting network performance. .LP .TP +\fBRelayBandwidthRate \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP +If defined, a separate token bucket limits the average incoming bandwidth +usage for _relayed traffic_ on this node to the specified number of +bytes per second, and the average outgoing bandwidth usage to that same +value. Relayed traffic is currently defined as answers to directory +requests, but that may change. (Default: 0) +.LP +.TP +\fBRelayBandwidthBurst \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP +Limit the maximum token bucket size (also known as the burst) for +_relayed traffic_ to the +given number of bytes in each direction. (Default: 0) +.LP +.TP \fBConnLimit \fR\fINUM\fP The minimum number of file descriptors that must be available to the Tor process before it will start. Tor will ask the OS for as diff --git a/src/or/config.c b/src/or/config.c index 658e26aca9..33065427c6 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -216,6 +216,8 @@ static config_var_t _option_vars[] = { VAR("RecommendedClientVersions", LINELIST, RecommendedClientVersions, NULL), VAR("RecommendedServerVersions", LINELIST, RecommendedServerVersions, NULL), VAR("RedirectExit", LINELIST, RedirectExit, NULL), + VAR("RelayBandwidthBurst", MEMUNIT, RelayBandwidthBurst, "0"), + VAR("RelayBandwidthRate", MEMUNIT, RelayBandwidthRate, "0"), VAR("RendExcludeNodes", STRING, RendExcludeNodes, NULL), VAR("RendNodes", STRING, RendNodes, NULL), VAR("RendPostPeriod", INTERVAL, RendPostPeriod, "1 hour"), @@ -2666,6 +2668,19 @@ options_validate(or_options_t *old_options, or_options_t *options, *msg = tor_strdup(r >= 0 ? buf : "internal error"); return -1; } + if (options->RelayBandwidthRate > options->RelayBandwidthBurst) + REJECT("RelayBandwidthBurst must be at least equal " + "to RelayBandwidthRate."); + if (options->RelayBandwidthRate && + options->RelayBandwidthRate < ROUTER_REQUIRED_MIN_BANDWIDTH) { + r = tor_snprintf(buf, sizeof(buf), + "RelayBandwidthRate is set to %d bytes/second. " + "For servers, it must be at least %d.", + (int)options->RelayBandwidthRate, + ROUTER_REQUIRED_MIN_BANDWIDTH); + *msg = tor_strdup(r >= 0 ? buf : "internal error"); + return -1; + } } if (options->BandwidthRate > options->BandwidthBurst) diff --git a/src/or/connection.c b/src/or/connection.c index c28fedecfc..41104ba11d 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -1105,11 +1105,28 @@ connection_is_rate_limited(connection_t *conn) } extern int global_read_bucket, global_write_bucket; +extern int global_relayed_read_bucket, global_relayed_write_bucket; -/** Did our global write bucket run dry last second? If so, we are - * likely to run dry again this second, so be stingy with the tokens - * we just put in. */ -static int global_write_bucket_empty_last_second = 0; +/** Did our either global write bucket run dry last second? If so, + * we are likely to run dry again this second, so be stingy with the + * tokens we just put in. */ +static int write_buckets_empty_last_second = 0; + +/** Return 1 if conn should use tokens from the "relayed" + * bandwidth rates, else 0. Currently, only OR conns with bandwidth + * class 1, and directory conns that are serving data out, count. + */ +static int +connection_counts_as_relayed_traffic(connection_t *conn) +{ +#if 0 + if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bandwidth_class) + return 1; +#endif + if (conn->type == CONN_TYPE_DIR && DIR_CONN_IS_SERVER(conn)) + return 1; + return 0; +} /** Helper function to decide how many bytes out of global_bucket * we're willing to use for this transaction. base is the size @@ -1153,16 +1170,25 @@ connection_bucket_read_limit(connection_t *conn) CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; int conn_bucket = -1; - if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { + int global_bucket = global_read_bucket; + + if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); - conn_bucket = or_conn->read_bucket; + if (conn->state == OR_CONN_STATE_OPEN) + conn_bucket = or_conn->read_bucket; } + if (!connection_is_rate_limited(conn)) { /* be willing to read on local conns even if our buckets are empty */ return conn_bucket>=0 ? conn_bucket : 1<<14; } + + if (connection_counts_as_relayed_traffic(conn) && + global_relayed_read_bucket <= global_read_bucket) + global_bucket = global_relayed_read_bucket; + return connection_bucket_round_robin(base, priority, - global_read_bucket, conn_bucket); + global_bucket, conn_bucket); } /** How many bytes at most can we write onto this connection? */ @@ -1172,24 +1198,31 @@ connection_bucket_write_limit(connection_t *conn) int base = connection_speaks_cells(conn) ? CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; + int global_bucket = global_write_bucket; if (!connection_is_rate_limited(conn)) { /* be willing to write to local conns even if our buckets are empty */ return conn->outbuf_flushlen; } - return connection_bucket_round_robin(base, priority, global_write_bucket, + + if (connection_counts_as_relayed_traffic(conn) && + global_relayed_write_bucket <= global_write_bucket) + global_bucket = global_relayed_write_bucket; + + return connection_bucket_round_robin(base, priority, global_bucket, conn->outbuf_flushlen); } -/** Return 1 if the global write bucket is low enough that we shouldn't - * send attempt bytes of low-priority directory stuff out to - * conn. Else return 0. +/** Return 1 if the global write buckets are low enough that we + * shouldn't send attempt bytes of low-priority directory stuff + * out to conn. Else return 0. * Priority is 1 for v1 requests (directories and running-routers), * and 2 for v2 requests (statuses and descriptors). But see FFFF in * directory_handle_command_get() for why we don't use priority 2 yet. * * There are a lot of parameters we could use here: + * - global_relayed_write_bucket. Low is bad. * - global_write_bucket. Low is bad. * - bandwidthrate. Low is bad. * - bandwidthburst. Not a big factor? @@ -1203,22 +1236,26 @@ connection_bucket_write_limit(connection_t *conn) int global_write_bucket_low(connection_t *conn, size_t attempt, int priority) { + int smaller_bucket = global_write_bucket < global_relayed_write_bucket ? + global_write_bucket : global_relayed_write_bucket; if (authdir_mode(get_options()) && priority>1) return 0; /* there's always room to answer v2 if we're an auth dir */ if (!connection_is_rate_limited(conn)) return 0; /* local conns don't get limited */ - if (global_write_bucket < (int)attempt) + if (smaller_bucket < (int)attempt) return 1; /* not enough space no matter the priority */ - if (global_write_bucket_empty_last_second) + if (write_buckets_empty_last_second) return 1; /* we're already hitting our limits, no more please */ if (priority == 1) { /* old-style v1 query */ /* Could we handle *two* of these requests within the next two seconds? */ - int64_t can_write = (int64_t)global_write_bucket - + 2*get_options()->BandwidthRate; + or_options_t *options = get_options(); + int64_t can_write = (int64_t)smaller_bucket + + 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate : + options->BandwidthRate); if (can_write < 2*(int64_t)attempt) return 1; } else { /* v2 query */ @@ -1227,14 +1264,28 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) return 0; } -/** We just read num_read onto conn. Decrement buckets appropriately. */ +/** We just read num_read and wrote num_written onto conn. + * Decrement buckets appropriately. */ static void -connection_read_bucket_decrement(connection_t *conn, int num_read) +connection_buckets_decrement(connection_t *conn, time_t now, + int num_read, int num_written) { - global_read_bucket -= num_read; - if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - TO_OR_CONN(conn)->read_bucket -= num_read; + if (!connection_is_rate_limited(conn)) + return; /* local IPs are free */ + + if (num_read > 0) + rep_hist_note_bytes_read(num_read, now); + if (num_written > 0) + rep_hist_note_bytes_written(num_written, now); + + if (connection_counts_as_relayed_traffic(conn)) { + global_relayed_read_bucket -= num_read; + global_relayed_write_bucket -= num_written; } + global_read_bucket -= num_read; + global_write_bucket -= num_written; + if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) + TO_OR_CONN(conn)->read_bucket -= num_read; } /** If we have exhausted our global buckets, or the buckets for conn, @@ -1242,21 +1293,23 @@ connection_read_bucket_decrement(connection_t *conn, int num_read) static void connection_consider_empty_read_buckets(connection_t *conn) { + const char *reason; + if (global_read_bucket <= 0) { - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "global read bucket exhausted. Pausing.")); - conn->wants_to_read = 1; - connection_stop_reading(conn); - return; - } - if (connection_speaks_cells(conn) && - conn->state == OR_CONN_STATE_OPEN && - TO_OR_CONN(conn)->read_bucket <= 0) { - LOG_FN_CONN(conn, - (LOG_DEBUG,LD_NET,"read bucket exhausted. Pausing.")); - conn->wants_to_read = 1; - connection_stop_reading(conn); - } + reason = "global read bucket exhausted. Pausing."; + } else if (connection_counts_as_relayed_traffic(conn) && + global_relayed_read_bucket <= 0) { + reason = "global relayed read bucket exhausted. Pausing."; + } else if (connection_speaks_cells(conn) && + conn->state == OR_CONN_STATE_OPEN && + TO_OR_CONN(conn)->read_bucket <= 0) { + reason = "connection read bucket exhausted. Pausing."; + } else + return; /* all good, no need to stop it */ + + LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); + conn->wants_to_read = 1; + connection_stop_reading(conn); } /** If we have exhausted our global buckets, or the buckets for conn, @@ -1264,26 +1317,28 @@ connection_consider_empty_read_buckets(connection_t *conn) static void connection_consider_empty_write_buckets(connection_t *conn) { + const char *reason; + if (global_write_bucket <= 0) { - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "global write bucket exhausted. Pausing.")); - conn->wants_to_write = 1; - connection_stop_writing(conn); - return; - } + reason = "global write bucket exhausted. Pausing."; + } else if (connection_counts_as_relayed_traffic(conn) && + global_relayed_write_bucket <= 0) { + reason = "global relayed write bucket exhausted. Pausing."; #if 0 - if (connection_speaks_cells(conn) && - conn->state == OR_CONN_STATE_OPEN && - TO_OR_CONN(conn)->write_bucket <= 0) { - LOG_FN_CONN(conn, - (LOG_DEBUG,LD_NET,"write bucket exhausted. Pausing.")); - conn->wants_to_write = 1; - connection_stop_writing(conn); - } + } else if (connection_speaks_cells(conn) && + conn->state == OR_CONN_STATE_OPEN && + TO_OR_CONN(conn)->write_bucket <= 0) { + reason = "connection write bucket exhausted. Pausing."; #endif + } else + return; /* all good, no need to stop it */ + + LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); + conn->wants_to_write = 1; + connection_stop_writing(conn); } -/** Initialize the global read bucket to options->BandwidthBurst. */ +/** Initialize the global read bucket to options-\>BandwidthBurst. */ void connection_bucket_init(void) { @@ -1291,8 +1346,28 @@ connection_bucket_init(void) /* start it at max traffic */ global_read_bucket = (int)options->BandwidthBurst; global_write_bucket = (int)options->BandwidthBurst; + if (options->RelayBandwidthRate) { + global_relayed_read_bucket = (int)options->RelayBandwidthBurst; + global_relayed_write_bucket = (int)options->RelayBandwidthBurst; + } else { + global_relayed_read_bucket = (int)options->BandwidthBurst; + global_relayed_write_bucket = (int)options->BandwidthBurst; + } } +static void +connection_bucket_refill_helper(int *bucket, int rate, int burst, + int seconds_elapsed, const char *name) +{ + if (*bucket < burst) { + *bucket += rate*seconds_elapsed; + if (*bucket > burst) + *bucket = burst; + log(LOG_DEBUG, LD_NET,"%s now %d.", name, *bucket); + } +} + + /** A second has rolled over; increment buckets appropriately. */ void connection_bucket_refill(int seconds_elapsed) @@ -1301,23 +1376,36 @@ connection_bucket_refill(int seconds_elapsed) connection_t *conn; connection_t **carray; or_options_t *options = get_options(); + int relayrate, relayburst; + + if (options->RelayBandwidthRate) { + relayrate = (int)options->RelayBandwidthRate; + relayburst = (int)options->RelayBandwidthBurst; + } else { + relayrate = (int)options->BandwidthRate; + relayburst = (int)options->BandwidthBurst; + } tor_assert(seconds_elapsed >= 0); + write_buckets_empty_last_second = + global_relayed_write_bucket == 0 || global_write_bucket == 0; + /* refill the global buckets */ - if (global_read_bucket < (int)options->BandwidthBurst) { - global_read_bucket += (int)options->BandwidthRate*seconds_elapsed; - if (global_read_bucket > (int)options->BandwidthBurst) - global_read_bucket = (int)options->BandwidthBurst; - log(LOG_DEBUG, LD_NET,"global_read_bucket now %d.", global_read_bucket); - } - if (global_write_bucket < (int)options->BandwidthBurst) { - global_write_bucket_empty_last_second = global_write_bucket == 0; - global_write_bucket += (int)options->BandwidthRate*seconds_elapsed; - if (global_write_bucket > (int)options->BandwidthBurst) - global_write_bucket = (int)options->BandwidthBurst; - log(LOG_DEBUG, LD_NET,"global_write_bucket now %d.", global_write_bucket); - } + connection_bucket_refill_helper(&global_read_bucket, + (int)options->BandwidthRate, + (int)options->BandwidthBurst, + seconds_elapsed, "global_read_bucket"); + connection_bucket_refill_helper(&global_write_bucket, + (int)options->BandwidthRate, + (int)options->BandwidthBurst, + seconds_elapsed, "global_write_bucket"); + connection_bucket_refill_helper(&global_relayed_read_bucket, + relayrate, relayburst, seconds_elapsed, + "global_relayed_read_bucket"); + connection_bucket_refill_helper(&global_relayed_write_bucket, + relayrate, relayburst, seconds_elapsed, + "global_relayed_write_bucket"); /* refill the per-connection buckets */ get_connection_array(&carray,&n); @@ -1337,19 +1425,25 @@ connection_bucket_refill(int seconds_elapsed) if (conn->wants_to_read == 1 /* it's marked to turn reading back on now */ && global_read_bucket > 0 /* and we're allowed to read */ + && (!connection_counts_as_relayed_traffic(conn) || + global_relayed_read_bucket > 0) /* even if we're relayed traffic */ && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || TO_OR_CONN(conn)->read_bucket > 0)) { /* and either a non-cell conn or a cell conn with non-empty bucket */ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "waking up conn (fd %d) for read",conn->s)); + "waking up conn (fd %d) for read", conn->s)); conn->wants_to_read = 0; connection_start_reading(conn); } - if (conn->wants_to_write == 1 && - global_write_bucket > 0) { /* and we're allowed to write */ + + if (conn->wants_to_write == 1 + && global_write_bucket > 0 /* and we're allowed to write */ + && (!connection_counts_as_relayed_traffic(conn) || + global_relayed_write_bucket > 0)) { + /* even if we're relayed traffic */ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "waking up conn (fd %d) for write",conn->s)); + "waking up conn (fd %d) for write", conn->s)); conn->wants_to_write = 0; connection_start_writing(conn); } @@ -1561,18 +1655,7 @@ connection_read_to_buf(connection_t *conn, int *max_to_read) edge_conn->n_read += n_read; } - if (connection_is_rate_limited(conn)) { - /* For non-local IPs, remember if we flushed any bytes over the wire. */ - time_t now = time(NULL); - if (n_read > 0) { - rep_hist_note_bytes_read(n_read, now); - connection_read_bucket_decrement(conn, n_read); - } - if (n_written > 0) { - rep_hist_note_bytes_written(n_written, now); - global_write_bucket -= n_written; - } - } + connection_buckets_decrement(conn, time(NULL), n_read, n_written); if (more_to_read && result == at_most) { bytes_in_buf = buf_capacity(conn->inbuf) - buf_datalen(conn->inbuf); @@ -1762,18 +1845,7 @@ connection_handle_write(connection_t *conn, int force) edge_conn->n_written += n_written; } - if (connection_is_rate_limited(conn)) { - /* For non-local IPs, remember if we flushed any bytes over the wire. */ - time_t now = time(NULL); - if (n_written > 0) { - rep_hist_note_bytes_written(n_written, now); - global_write_bucket -= n_written; - } - if (n_read > 0) { - rep_hist_note_bytes_read(n_read, now); - connection_read_bucket_decrement(conn, n_read); - } - } + connection_buckets_decrement(conn, time(NULL), n_read, n_written); if (result > 0) { /* If we wrote any bytes from our buffer, then call the appropriate diff --git a/src/or/main.c b/src/or/main.c index 314f2bf5b2..24caba0b52 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -34,12 +34,18 @@ static int conn_close_if_marked(int i); int global_read_bucket; /**< Max number of bytes I can read this second. */ int global_write_bucket; /**< Max number of bytes I can write this second. */ +/** Max number of relayed (bandwidth class 1) bytes I can read this second. */ +int global_relayed_read_bucket; +/** Max number of relayed (bandwidth class 1) bytes I can write this second. */ +int global_relayed_write_bucket; + /** What was the read bucket before the last call to prepare_for_pool? * (used to determine how many bytes we've read). */ static int stats_prev_global_read_bucket; /** What was the write bucket before the last call to prepare_for_pool? * (used to determine how many bytes we've written). */ static int stats_prev_global_write_bucket; +/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/ /** How many bytes have we read/written since we started the process? */ static uint64_t stats_n_bytes_read = 0; static uint64_t stats_n_bytes_written = 0; diff --git a/src/or/or.h b/src/or/or.h index ffb173136b..3f76ceaaa6 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -804,7 +804,7 @@ typedef struct or_connection_t { int n_circuits; /**< How many circuits use this connection as p_conn or * n_conn ? */ struct or_connection_t *next_with_same_id; /**< Next connection with same - * identity digest as this one. */ + * identity digest as this one. */ /** Linked list of bridged dirserver connections that can't write until * this connection's outbuf is less full. */ struct dir_connection_t *blocked_dir_connections; @@ -1697,6 +1697,10 @@ typedef struct { * to use in a second? */ uint64_t MaxAdvertisedBandwidth; /**< How much bandwidth are we willing to * tell people we have? */ + uint64_t RelayBandwidthRate; /**< How much bandwidth, on average, are we + * willing to use for all relayed conns? */ + uint64_t RelayBandwidthBurst; /**< How much bandwidth, at maximum, will we + * use in a second for all relayed conns? */ int NumCpus; /**< How many CPUs should we try to use? */ int RunTesting; /**< If true, create testing circuits to measure how well the * other ORs are running. */