add separate per-conn write limiting

This commit is contained in:
Roger Dingledine 2009-12-29 22:25:02 -05:00
parent 9a07f30b90
commit f255272f45
4 changed files with 43 additions and 19 deletions

View File

@ -8,6 +8,9 @@ Changes in version 0.2.2.7-alpha - 2009-12-??
use it. You can override this default by using the new use it. You can override this default by using the new
"CircuitPriorityHalflife" config option. Design and code by Ian "CircuitPriorityHalflife" config option. Design and code by Ian
Goldberg, Can Tang, and Chris Alexander. Goldberg, Can Tang, and Chris Alexander.
- Add separate per-conn write limiting to go with the per-conn read
limiting. We added a global write limit in Tor 0.1.2.5-alpha,
but never per-conn write limits.
- New consensus params "bwconnrate" and "bwconnburst" to let us - New consensus params "bwconnrate" and "bwconnburst" to let us
rate-limit client connections as they enter the network. It's rate-limit client connections as they enter the network. It's
controlled in the consensus so we can turn it on and off for controlled in the consensus so we can turn it on and off for

View File

@ -21,7 +21,8 @@ static void connection_init(time_t now, connection_t *conn, int type,
static int connection_init_accepted_conn(connection_t *conn, static int connection_init_accepted_conn(connection_t *conn,
uint8_t listener_type); uint8_t listener_type);
static int connection_handle_listener_read(connection_t *conn, int new_type); static int connection_handle_listener_read(connection_t *conn, int new_type);
static int connection_read_bucket_should_increase(or_connection_t *conn); static int connection_bucket_should_increase(int bucket,
or_connection_t *conn);
static int connection_finished_flushing(connection_t *conn); static int connection_finished_flushing(connection_t *conn);
static int connection_flushed_some(connection_t *conn); static int connection_flushed_some(connection_t *conn);
static int connection_finished_connecting(connection_t *conn); static int connection_finished_connecting(connection_t *conn);
@ -1973,6 +1974,7 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
int base = connection_speaks_cells(conn) ? int base = connection_speaks_cells(conn) ?
CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE; CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR; int priority = conn->type != CONN_TYPE_DIR;
int conn_bucket = (int)conn->outbuf_flushlen;
int global_bucket = global_write_bucket; int global_bucket = global_write_bucket;
if (!connection_is_rate_limited(conn)) { if (!connection_is_rate_limited(conn)) {
@ -1980,12 +1982,22 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
return conn->outbuf_flushlen; return conn->outbuf_flushlen;
} }
if (connection_speaks_cells(conn)) {
/* use the per-conn write limit if it's lower, but if it's less
* than zero just use zero */
or_connection_t *or_conn = TO_OR_CONN(conn);
if (conn->state == OR_CONN_STATE_OPEN)
if (or_conn->write_bucket < conn_bucket)
conn_bucket = or_conn->write_bucket >= 0 ?
or_conn->write_bucket : 0;
}
if (connection_counts_as_relayed_traffic(conn, now) && if (connection_counts_as_relayed_traffic(conn, now) &&
global_relayed_write_bucket <= global_write_bucket) global_relayed_write_bucket <= global_write_bucket)
global_bucket = global_relayed_write_bucket; global_bucket = global_relayed_write_bucket;
return connection_bucket_round_robin(base, priority, global_bucket, return connection_bucket_round_robin(base, priority,
conn->outbuf_flushlen); global_bucket, conn_bucket);
} }
/** Return 1 if the global write buckets are low enough that we /** Return 1 if the global write buckets are low enough that we
@ -2039,8 +2051,8 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
return 0; return 0;
} }
/** We just read num_read and wrote num_written onto conn. /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
* Decrement buckets appropriately. */ * onto <b>conn</b>. Decrement buckets appropriately. */
static void static void
connection_buckets_decrement(connection_t *conn, time_t now, connection_buckets_decrement(connection_t *conn, time_t now,
size_t num_read, size_t num_written) size_t num_read, size_t num_written)
@ -2075,8 +2087,10 @@ connection_buckets_decrement(connection_t *conn, time_t now,
} }
global_read_bucket -= (int)num_read; global_read_bucket -= (int)num_read;
global_write_bucket -= (int)num_written; global_write_bucket -= (int)num_written;
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
TO_OR_CONN(conn)->read_bucket -= (int)num_read; TO_OR_CONN(conn)->read_bucket -= (int)num_read;
TO_OR_CONN(conn)->write_bucket -= (int)num_written;
}
} }
/** If we have exhausted our global buckets, or the buckets for conn, /** If we have exhausted our global buckets, or the buckets for conn,
@ -2115,12 +2129,10 @@ connection_consider_empty_write_buckets(connection_t *conn)
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) && } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
global_relayed_write_bucket <= 0) { global_relayed_write_bucket <= 0) {
reason = "global relayed write bucket exhausted. Pausing."; reason = "global relayed write bucket exhausted. Pausing.";
#if 0
} else if (connection_speaks_cells(conn) && } else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN && conn->state == OR_CONN_STATE_OPEN &&
TO_OR_CONN(conn)->write_bucket <= 0) { TO_OR_CONN(conn)->write_bucket <= 0) {
reason = "connection write bucket exhausted. Pausing."; reason = "connection write bucket exhausted. Pausing.";
#endif
} else } else
return; /* all good, no need to stop it */ return; /* all good, no need to stop it */
@ -2216,14 +2228,19 @@ connection_bucket_refill(int seconds_elapsed, time_t now)
{ {
if (connection_speaks_cells(conn)) { if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn); or_connection_t *or_conn = TO_OR_CONN(conn);
if (connection_read_bucket_should_increase(or_conn)) { if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) {
connection_bucket_refill_helper(&or_conn->read_bucket, connection_bucket_refill_helper(&or_conn->read_bucket,
or_conn->bandwidthrate, or_conn->bandwidthrate,
or_conn->bandwidthburst, or_conn->bandwidthburst,
seconds_elapsed, seconds_elapsed,
"or_conn->read_bucket"); "or_conn->read_bucket");
//log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, }
// conn->read_bucket); if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) {
connection_bucket_refill_helper(&or_conn->write_bucket,
or_conn->bandwidthrate,
or_conn->bandwidthburst,
seconds_elapsed,
"or_conn->write_bucket");
} }
} }
@ -2244,8 +2261,10 @@ connection_bucket_refill(int seconds_elapsed, time_t now)
if (conn->write_blocked_on_bw == 1 if (conn->write_blocked_on_bw == 1
&& global_write_bucket > 0 /* and we're allowed to write */ && global_write_bucket > 0 /* and we're allowed to write */
&& (!connection_counts_as_relayed_traffic(conn, now) || && (!connection_counts_as_relayed_traffic(conn, now) ||
global_relayed_write_bucket > 0)) { global_relayed_write_bucket > 0) /* even if we're relayed traffic */
/* even if we're relayed traffic */ && (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
TO_OR_CONN(conn)->write_bucket > 0)) {
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for write", conn->s)); "waking up conn (fd %d) for write", conn->s));
conn->write_blocked_on_bw = 0; conn->write_blocked_on_bw = 0;
@ -2254,17 +2273,17 @@ connection_bucket_refill(int seconds_elapsed, time_t now)
}); });
} }
/** Is the receiver bucket for connection <b>conn</b> low enough that we /** Is the <b>bucket</b> for connection <b>conn</b> low enough that we
* should add another pile of tokens to it? * should add another pile of tokens to it?
*/ */
static int static int
connection_read_bucket_should_increase(or_connection_t *conn) connection_bucket_should_increase(int bucket, or_connection_t *conn)
{ {
tor_assert(conn); tor_assert(conn);
if (conn->_base.state != OR_CONN_STATE_OPEN) if (conn->_base.state != OR_CONN_STATE_OPEN)
return 0; /* only open connections play the rate limiting game */ return 0; /* only open connections play the rate limiting game */
if (conn->read_bucket >= conn->bandwidthburst) if (bucket >= conn->bandwidthburst)
return 0; return 0;
return 1; return 1;

View File

@ -352,12 +352,13 @@ connection_or_init_conn_from_address(or_connection_t *conn,
* give it full bandwidth. */ * give it full bandwidth. */
conn->bandwidthrate = (int)options->BandwidthRate; conn->bandwidthrate = (int)options->BandwidthRate;
conn->read_bucket = conn->bandwidthburst = (int)options->BandwidthBurst; conn->read_bucket = conn->bandwidthburst = (int)options->BandwidthBurst;
conn->write_bucket = conn->bandwidthburst = (int)options->BandwidthBurst;
} else { /* Not a recognized relay. Squeeze it down based on the } else { /* Not a recognized relay. Squeeze it down based on the
* suggested bandwidth parameters in the consensus. */ * suggested bandwidth parameters in the consensus. */
conn->bandwidthrate = conn->bandwidthrate =
(int)networkstatus_get_param(NULL, "bwconnrate", (int)networkstatus_get_param(NULL, "bwconnrate",
(int)options->BandwidthRate); (int)options->BandwidthRate);
conn->read_bucket = conn->bandwidthburst = conn->read_bucket = conn->write_bucket = conn->bandwidthburst =
(int)networkstatus_get_param(NULL, "bwconnburst", (int)networkstatus_get_param(NULL, "bwconnburst",
(int)options->BandwidthBurst); (int)options->BandwidthBurst);
} }

View File

@ -928,7 +928,7 @@ typedef struct connection_t {
* again once the bandwidth throttler allows it? */ * again once the bandwidth throttler allows it? */
unsigned int write_blocked_on_bw:1; /**< Boolean: should we start writing unsigned int write_blocked_on_bw:1; /**< Boolean: should we start writing
* again once the bandwidth throttler allows * again once the bandwidth throttler allows
* reads? */ * writes? */
unsigned int hold_open_until_flushed:1; /**< Despite this connection's being unsigned int hold_open_until_flushed:1; /**< Despite this connection's being
* marked for close, do we flush it * marked for close, do we flush it
* before closing it? */ * before closing it? */
@ -1062,12 +1062,13 @@ typedef struct or_connection_t {
time_t timestamp_last_added_nonpadding; /** When did we last add a time_t timestamp_last_added_nonpadding; /** When did we last add a
* non-padding cell to the outbuf? */ * non-padding cell to the outbuf? */
/* bandwidth* and read_bucket only used by ORs in OPEN state: */ /* bandwidth* and *_bucket only used by ORs in OPEN state: */
int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */ int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */
int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */ int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */
int read_bucket; /**< When this hits 0, stop receiving. Every second we int read_bucket; /**< When this hits 0, stop receiving. Every second we
* add 'bandwidthrate' to this, capping it at * add 'bandwidthrate' to this, capping it at
* bandwidthburst. (OPEN ORs only) */ * bandwidthburst. (OPEN ORs only) */
int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */
int n_circuits; /**< How many circuits use this connection as p_conn or int n_circuits; /**< How many circuits use this connection as p_conn or
* n_conn ? */ * n_conn ? */