From c6f70e36e0125af0a9205f50fd5df5bfd16be780 Mon Sep 17 00:00:00 2001 From: Roger Dingledine Date: Sat, 5 Jul 2003 07:10:34 +0000 Subject: [PATCH] implemented total read rate limiting svn:r365 --- src/or/buffers.c | 5 ++--- src/or/config.c | 6 +++--- src/or/connection.c | 41 ++++++++++++++++++----------------------- src/or/main.c | 19 ++++++++++++++++++- src/or/or.h | 1 - 5 files changed, 41 insertions(+), 31 deletions(-) diff --git a/src/or/buffers.c b/src/or/buffers.c index 010547f355..4a89b260d4 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -28,7 +28,7 @@ void buf_free(char *buf) { free(buf); } -/* read from socket s, writing onto buf+buf_datalen. If at_most is >= 0 then +/* read from socket s, writing onto buf+buf_datalen. * read at most 'at_most' bytes, and in any case don't read more than will fit based on buflen. * If read() returns 0, set *reached_eof to 1 and return 0. If you want to tear * down the connection return -1, else return the number of bytes read. @@ -41,9 +41,8 @@ int read_to_buf(int s, int at_most, char **buf, int *buflen, int *buf_datalen, i /* this is the point where you would grow the buffer, if you want to */ - if(at_most < 0 || *buflen - *buf_datalen < at_most) + if(at_most > *buflen - *buf_datalen) at_most = *buflen - *buf_datalen; /* take the min of the two */ - /* (note that this only modifies at_most inside this function) */ if(at_most == 0) return 0; /* we shouldn't read anything */ diff --git a/src/or/config.c b/src/or/config.c index b5519c5f81..9a5c645d73 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -188,6 +188,7 @@ void config_assign(or_options_t *options, struct config_line *list) { config_compare(list, "KeepalivePeriod", CONFIG_TYPE_INT, &options->KeepalivePeriod) || config_compare(list, "MaxOnionsPending",CONFIG_TYPE_INT, &options->MaxOnionsPending) || config_compare(list, "NewCircuitPeriod",CONFIG_TYPE_INT, &options->NewCircuitPeriod) || + config_compare(list, "TotalBandwidth", CONFIG_TYPE_INT, &options->TotalBandwidth) || config_compare(list, "OnionRouter", CONFIG_TYPE_BOOL, &options->OnionRouter) || config_compare(list, "Daemon", CONFIG_TYPE_BOOL, &options->Daemon) || @@ -216,18 +217,17 @@ int getconfig(int argc, char **argv, or_options_t *options) { const char *cmd; int result = 0; -/* give reasonable defaults for each option */ +/* give reasonable values for each option. Defaults to zero. */ memset(options,0,sizeof(or_options_t)); - options->Daemon = 0; options->LogLevel = "debug"; options->loglevel = LOG_DEBUG; options->CoinWeight = 0.8; - options->LinkPadding = 0; options->MaxConn = 900; options->DirFetchPeriod = 600; options->KeepalivePeriod = 300; options->MaxOnionsPending = 10; options->NewCircuitPeriod = 60; /* once a minute */ + options->TotalBandwidth = 800000; /* at most 800kB/s total sustained incoming */ // options->ReconnectPeriod = 6001; /* get config lines from /etc/torrc and assign them */ diff --git a/src/or/connection.c b/src/or/connection.c index ce56c2f05d..59608c56be 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -8,6 +8,8 @@ extern or_options_t options; /* command-line and config-file options */ +extern int global_read_bucket; + char *conn_type_to_string[] = { "", /* 0 */ "OP listener", /* 1 */ @@ -265,6 +267,7 @@ int retry_all_connections(uint16_t or_listenport, uint16_t ap_listenport, uint16 int connection_read_to_buf(connection_t *conn) { int read_result; struct timeval now; + int at_most = global_read_bucket; if(connection_speaks_cells(conn)) { assert(conn->receiver_bucket >= 0); @@ -277,21 +280,25 @@ int connection_read_to_buf(connection_t *conn) { conn->timestamp_lastread = now.tv_sec; - read_result = read_to_buf(conn->s, conn->receiver_bucket, &conn->inbuf, &conn->inbuflen, + if(conn->receiver_bucket >= 0 && at_most > conn->receiver_bucket) + at_most = conn->receiver_bucket; + + read_result = read_to_buf(conn->s, at_most, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen, &conn->inbuf_reached_eof); // log(LOG_DEBUG,"connection_read_to_buf(): read_to_buf returned %d.",read_result); - if(read_result >= 0 && connection_speaks_cells(conn)) { -// log(LOG_DEBUG,"connection_read_to_buf(): Read %d, bucket now %d.",read_result,conn->receiver_bucket); - conn->receiver_bucket -= read_result; - if(conn->receiver_bucket <= 0) { - -// log(LOG_DEBUG,"connection_read_to_buf() stopping reading, receiver bucket full."); + if(read_result >= 0) { + global_read_bucket -= read_result; assert(global_read_bucket >= 0); + if(connection_speaks_cells(conn)) + conn->receiver_bucket -= read_result; + if(conn->receiver_bucket == 0 || global_read_bucket == 0) { + log_fn(LOG_DEBUG,"buckets (%d, %d) exhausted. Pausing.", global_read_bucket, conn->receiver_bucket); + conn->wants_to_read = 1; connection_stop_reading(conn); /* If we're not in 'open' state here, then we're never going to finish the * handshake, because we'll never increment the receiver_bucket. But we * can't check for that here, because the buf we just read might have enough - * on it to finish the handshake. So we check for that in check_conn_read(). + * on it to finish the handshake. So we check for that in conn_read(). */ } } @@ -350,26 +357,12 @@ int connection_receiver_bucket_should_increase(connection_t *conn) { if(!connection_speaks_cells(conn)) return 0; /* edge connections don't use receiver_buckets */ - if(conn->receiver_bucket > 10*conn->bandwidth) + if(conn->receiver_bucket > 9*conn->bandwidth) return 0; return 1; } -void connection_increment_receiver_bucket(connection_t *conn) { - assert(conn); - - if(connection_receiver_bucket_should_increase(conn)) { - /* yes, the receiver_bucket can become overfull here. But not by much. */ - conn->receiver_bucket += conn->bandwidth*1.1; -// log(LOG_DEBUG,"connection_increment_receiver_bucket(): Bucket now %d.",conn->receiver_bucket); - if(connection_state_is_open(conn)) { - /* if we're in state 'open', then start reading again */ - connection_start_reading(conn); - } - } -} - int connection_is_listener(connection_t *conn) { if(conn->type == CONN_TYPE_OR_LISTENER || conn->type == CONN_TYPE_AP_LISTENER || @@ -393,6 +386,8 @@ void connection_send_cell(connection_t *conn) { cell_t cell; int bytes_in_full_flushlen; + assert(0); /* this function has decayed. rewrite before using. */ + /* this function only gets called if options.LinkPadding is 1 */ assert(options.LinkPadding == 1); diff --git a/src/or/main.c b/src/or/main.c index f2a69fc471..5c6f3386ee 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -11,6 +11,7 @@ static void dumpstats(void); /* dump stats to stdout */ /********* START VARIABLES **********/ or_options_t options; /* command-line and config-file options */ +int global_read_bucket; /* max number of bytes I can read this second */ static connection_t *connection_array[MAXCONNECTIONS] = { NULL }; @@ -367,10 +368,25 @@ static int prepare_for_poll(int *timeout) { time_to_new_circuit = now.tv_sec + options.NewCircuitPeriod; } + if(global_read_bucket < 9*options.TotalBandwidth) { + global_read_bucket += options.TotalBandwidth; + log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket); + } + /* do housekeeping for each connection */ for(i=0;ireceiver_bucket += tmpconn->bandwidth; +// log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, tmpconn->receiver_bucket); + } + + if(tmpconn->wants_to_read == 1 /* it's marked to turn reading back on now */ + && global_read_bucket > 0 /* and we're allowed to read */ + && tmpconn->receiver_bucket != 0) { /* and either an edge conn or non-empty bucket */ + tmpconn->wants_to_read = 0; + connection_start_reading(tmpconn); + } /* check connections to see whether we should send a keepalive, expire, or wait */ if(!connection_speaks_cells(tmpconn)) @@ -811,6 +827,7 @@ int tor_main(int argc, char *argv[]) { if(getconfig(argc,argv,&options)) exit(1); log_set_severity(options.loglevel); /* assign logging severity level from options */ + global_read_bucket = options.TotalBandwidth; /* start it at 1 second of traffic */ if(options.Daemon) daemonize(); diff --git a/src/or/or.h b/src/or/or.h index 5ae79b606e..fee368eef0 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -550,7 +550,6 @@ int connection_write_to_buf(char *string, int len, connection_t *conn); void connection_send_cell(connection_t *conn); int connection_receiver_bucket_should_increase(connection_t *conn); -void connection_increment_receiver_bucket (connection_t *conn); void connection_increment_send_timeval(connection_t *conn); void connection_init_timeval(connection_t *conn);