diff --git a/src/or/connection.c b/src/or/connection.c index b11d82dbcd..56643fd972 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -8,8 +8,6 @@ extern or_options_t options; /* command-line and config-file options */ -extern int global_read_bucket; - char *conn_type_to_string[] = { "", /* 0 */ "OP listener", /* 1 */ @@ -72,6 +70,7 @@ char *conn_state_to_string[][_CONN_TYPE_MAX+1] = { static int connection_init_accepted_conn(connection_t *conn); static int connection_handle_listener_read(connection_t *conn, int new_type); +static int connection_receiver_bucket_should_increase(connection_t *conn); /**************************************************************/ @@ -444,6 +443,121 @@ int retry_all_connections(void) { return 0; } +extern int global_read_bucket; + +/* how many bytes at most can we read onto this connection? */ +int connection_bucket_read_limit(connection_t *conn) { + int at_most; + + if(options.LinkPadding) { + at_most = global_read_bucket; + } else { + /* do a rudimentary round-robin so one circuit can't hog a connection */ + if(connection_speaks_cells(conn)) { + at_most = 32*(CELL_NETWORK_SIZE); + } else { + at_most = 32*(RELAY_PAYLOAD_SIZE); + } + + if(at_most > global_read_bucket) + at_most = global_read_bucket; + } + + if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) + if(at_most > conn->receiver_bucket) + at_most = conn->receiver_bucket; + + return at_most; +} + +/* we just read num_read onto conn. Decrement buckets appropriately. */ +void connection_bucket_decrement(connection_t *conn, int num_read) { + global_read_bucket -= num_read; assert(global_read_bucket >= 0); + if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { + conn->receiver_bucket -= num_read; assert(conn->receiver_bucket >= 0); + } + if(global_read_bucket == 0) { + log_fn(LOG_DEBUG,"global bucket exhausted. Pausing."); + conn->wants_to_read = 1; + connection_stop_reading(conn); + return; + } + if(connection_speaks_cells(conn) && + conn->state == OR_CONN_STATE_OPEN && + conn->receiver_bucket == 0) { + log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing."); + conn->wants_to_read = 1; + connection_stop_reading(conn); + } +} + +/* keep a timeval to know when time has passed enough to refill buckets */ +static struct timeval current_time; + +void connection_bucket_init() { + tor_gettimeofday(¤t_time); + global_read_bucket = options.BandwidthBurst; /* start it at max traffic */ +} + +/* some time has passed; increment buckets appropriately. */ +void connection_bucket_refill(struct timeval *now) { + int i, n; + connection_t *conn; + connection_t **carray; + + if(now->tv_sec <= current_time.tv_sec) + return; /* wait until the second has rolled over */ + + current_time.tv_sec = now->tv_sec; /* update current_time */ + /* (ignore usecs for now) */ + + /* refill the global bucket */ + if(global_read_bucket < options.BandwidthBurst) { + global_read_bucket += options.BandwidthRate; + log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket); + } + + /* refill the per-connection buckets */ + get_connection_array(&carray,&n); + for(i=0;ireceiver_bucket += conn->bandwidth; + //log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket); + } + + if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */ + && global_read_bucket > 0 /* and we're allowed to read */ + && (!connection_speaks_cells(conn) || + conn->state != OR_CONN_STATE_OPEN || + conn->receiver_bucket > 0)) { + /* and either a non-cell conn or a cell conn with non-empty bucket */ + conn->wants_to_read = 0; + connection_start_reading(conn); + if(conn->wants_to_write == 1) { + conn->wants_to_write = 0; + connection_start_writing(conn); + } + } + } +} + +static int connection_receiver_bucket_should_increase(connection_t *conn) { + assert(conn); + + if(!connection_speaks_cells(conn)) + return 0; /* edge connections don't use receiver_buckets */ + if(conn->state != OR_CONN_STATE_OPEN) + return 0; /* only open connections play the rate limiting game */ + + assert(conn->bandwidth > 0); + if(conn->receiver_bucket > 9*conn->bandwidth) + return 0; + + return 1; +} + int connection_handle_read(connection_t *conn) { conn->timestamp_lastread = time(NULL); @@ -482,27 +596,14 @@ int connection_read_to_buf(connection_t *conn) { int result; int at_most; - if(options.LinkPadding) { - at_most = global_read_bucket; - } else { - /* do a rudimentary round-robin so one connection can't hog a thickpipe */ - if(connection_speaks_cells(conn)) { - at_most = 32*(CELL_NETWORK_SIZE); - } else { - at_most = 32*(RELAY_PAYLOAD_SIZE); - } - - if(at_most > global_read_bucket) - at_most = global_read_bucket; - } + /* how many bytes are we allowed to read? */ + at_most = connection_bucket_read_limit(conn); if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) { if(conn->state == OR_CONN_STATE_HANDSHAKING) return connection_tls_continue_handshake(conn); /* else open, or closing */ - if(at_most > conn->receiver_bucket) - at_most = conn->receiver_bucket; result = read_to_buf_tls(conn->tls, at_most, conn->inbuf); switch(result) { @@ -527,22 +628,7 @@ int connection_read_to_buf(connection_t *conn) { return -1; } - global_read_bucket -= result; assert(global_read_bucket >= 0); - if(global_read_bucket == 0) { - log_fn(LOG_DEBUG,"global bucket exhausted. Pausing."); - conn->wants_to_read = 1; - connection_stop_reading(conn); - return 0; - } - if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - conn->receiver_bucket -= result; assert(conn->receiver_bucket >= 0); - if(conn->receiver_bucket == 0) { - log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing."); - conn->wants_to_read = 1; - connection_stop_reading(conn); - return 0; - } - } + connection_bucket_decrement(conn, result); return 0; } @@ -754,21 +840,6 @@ connection_t *connection_get_by_type_state_lastwritten(int type, int state) { return best; } -int connection_receiver_bucket_should_increase(connection_t *conn) { - assert(conn); - - if(!connection_speaks_cells(conn)) - return 0; /* edge connections don't use receiver_buckets */ - if(conn->state != OR_CONN_STATE_OPEN) - return 0; /* only open connections play the rate limiting game */ - - assert(conn->bandwidth > 0); - if(conn->receiver_bucket > 9*conn->bandwidth) - return 0; - - return 1; -} - int connection_is_listener(connection_t *conn) { if(conn->type == CONN_TYPE_OR_LISTENER || conn->type == CONN_TYPE_AP_LISTENER || diff --git a/src/or/main.c b/src/or/main.c index 52c51a2f17..e3c5a5a8c6 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -11,7 +11,6 @@ static int init_from_config(int argc, char **argv); /********* START VARIABLES **********/ -extern char *conn_type_to_string[]; extern char *conn_state_to_string[][_CONN_TYPE_MAX+1]; or_options_t options; /* command-line and config-file options */ @@ -281,23 +280,6 @@ static void run_connection_housekeeping(int i, time_t now) { cell_t cell; connection_t *conn = connection_array[i]; - if(connection_receiver_bucket_should_increase(conn)) { - conn->receiver_bucket += conn->bandwidth; - // log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket); - } - - if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */ - && global_read_bucket > 0 /* and we're allowed to read */ - && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) { - /* and either a non-cell conn or a cell conn with non-empty bucket */ - conn->wants_to_read = 0; - connection_start_reading(conn); - if(conn->wants_to_write == 1) { - conn->wants_to_write = 0; - connection_start_writing(conn); - } - } - /* check connections to see whether we should send a keepalive, expire, or wait */ if(!connection_speaks_cells(conn)) return; @@ -397,16 +379,6 @@ static void run_scheduled_events(time_t now) { } } - /* 4. Every second, we check how much bandwidth we've consumed and - * increment global_read_bucket. - */ - stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket; - if(global_read_bucket < options.BandwidthBurst) { - global_read_bucket += options.BandwidthRate; - log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket); - } - stats_prev_global_read_bucket = global_read_bucket; - /* 5. We do housekeeping for each connection... */ for(i=0;i current_second) { /* the second has rolled over. check more stuff. */ ++stats_n_seconds_reading; @@ -486,7 +464,7 @@ static int init_from_config(int argc, char **argv) { log_fn(LOG_DEBUG, "Successfully opened DebugLogFile '%s'.", options.DebugLogFile); } - global_read_bucket = options.BandwidthBurst; /* start it at max traffic */ + connection_bucket_init(); stats_prev_global_read_bucket = global_read_bucket; if(options.RunAsDaemon) { diff --git a/src/or/or.h b/src/or/or.h index 5fd5b9222f..e25a0692c3 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -664,8 +664,8 @@ int getconfig(int argc, char **argv, or_options_t *options); /********************************* connection.c ***************************/ -#define CONN_TYPE_TO_STRING(t) (((t) < _CONN_TYPE_MIN || (t) > _CONN_TYPE_MAX) ? "Unknown" : \ - conn_type_to_string[(t)]) +#define CONN_TYPE_TO_STRING(t) (((t) < _CONN_TYPE_MIN || (t) > _CONN_TYPE_MAX) ? \ + "Unknown" : conn_type_to_string[(t)]) extern char *conn_type_to_string[]; @@ -711,13 +711,11 @@ connection_t *connection_get_by_type(int type); connection_t *connection_get_by_type_state(int type, int state); connection_t *connection_get_by_type_state_lastwritten(int type, int state); -int connection_receiver_bucket_should_increase(connection_t *conn); - #define connection_speaks_cells(conn) ((conn)->type == CONN_TYPE_OR) #define connection_has_pending_tls_data(conn) \ ((conn)->type == CONN_TYPE_OR && \ (conn)->state == OR_CONN_STATE_OPEN && \ - tor_tls_get_pending_bytes(conn->tls)) + tor_tls_get_pending_bytes((conn)->tls)) int connection_is_listener(connection_t *conn); int connection_state_is_open(connection_t *conn);