diff --git a/src/or/connection.c b/src/or/connection.c index 81b499160a..dfcd1ab210 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2541,6 +2541,27 @@ record_num_bytes_transferred(connection_t *conn, #endif #ifndef USE_BUFFEREVENTS +/** Last emptied global or relay buckets in msec since midnight; only used + * in TestingTorNetwork mode. */ +static uint32_t global_relayed_read_emptied = 0, + global_relayed_write_emptied = 0, + global_read_emptied = 0, + global_write_emptied = 0; + +/** Check if a bucket has just run out of tokens, and if so, note the + * timestamp for TB_EMPTY events; only used in TestingTorNetwork mode. */ +static void +connection_buckets_empty_ts(uint32_t *timestamp_var, int tokens_before, + size_t tokens_removed) +{ + if (tokens_before > 0 && tokens_before - (int)tokens_removed <= 0) { + struct timeval tvnow; + tor_gettimeofday_cached(&tvnow); + *timestamp_var = (uint32_t)(((tvnow.tv_sec % 86400L) * 1000L) + + ((uint32_t)tvnow.tv_usec / (uint32_t)1000L)); + } +} + /** We just read num_read and wrote num_written bytes * onto conn. Decrement buckets appropriately. */ static void @@ -2563,6 +2584,28 @@ connection_buckets_decrement(connection_t *conn, time_t now, if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ + /* If one or more of our token buckets ran dry just now, note the + * timestamp for TB_EMPTY events. */ + if (get_options()->TestingTorNetwork) { + if (connection_counts_as_relayed_traffic(conn, now)) { + connection_buckets_empty_ts(&global_relayed_read_emptied, + global_relayed_read_bucket, num_read); + connection_buckets_empty_ts(&global_relayed_write_emptied, + global_relayed_write_bucket, num_written); + } + connection_buckets_empty_ts(&global_read_emptied, global_read_bucket, + num_read); + connection_buckets_empty_ts(&global_write_emptied, global_write_bucket, + num_written); + if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { + or_connection_t *or_conn = TO_OR_CONN(conn); + connection_buckets_empty_ts(&or_conn->read_emptied_time, + or_conn->read_bucket, num_read); + connection_buckets_empty_ts(&or_conn->write_emptied_time, + or_conn->write_bucket, num_written); + } + } + if (connection_counts_as_relayed_traffic(conn, now)) { global_relayed_read_bucket -= (int)num_read; global_relayed_write_bucket -= (int)num_written; @@ -2677,6 +2720,11 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) smartlist_t *conns = get_connection_array(); int bandwidthrate, bandwidthburst, relayrate, relayburst; + int prev_global_read = global_read_bucket; + int prev_global_write = global_write_bucket; + int prev_relay_read = global_relayed_read_bucket; + int prev_relay_write = global_relayed_write_bucket; + bandwidthrate = (int)options->BandwidthRate; bandwidthburst = (int)options->BandwidthBurst; @@ -2711,12 +2759,25 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) milliseconds_elapsed, "global_relayed_write_bucket"); + control_event_refill_global(global_read_bucket, prev_global_read, + global_read_emptied, global_write_bucket, + prev_global_write, global_write_emptied, + global_relayed_read_bucket, prev_relay_read, + global_relayed_read_emptied, + global_relayed_write_bucket, prev_relay_write, + global_relayed_write_emptied, + milliseconds_elapsed); + /* refill the per-connection buckets */ SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); int orbandwidthrate = or_conn->bandwidthrate; int orbandwidthburst = or_conn->bandwidthburst; + + int prev_conn_read = or_conn->read_bucket; + int prev_conn_write = or_conn->write_bucket; + if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) { connection_bucket_refill_helper(&or_conn->read_bucket, orbandwidthrate, @@ -2731,6 +2792,9 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) milliseconds_elapsed, "or_conn->write_bucket"); } + + control_event_refill_conn(or_conn, prev_conn_read, prev_conn_write, + (uint32_t)milliseconds_elapsed); } if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */ diff --git a/src/or/control.c b/src/or/control.c index 66258775e9..47d08c295e 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -84,7 +84,8 @@ #define EVENT_CONF_CHANGED 0x0019 #define EVENT_CONN_BW 0x001A #define EVENT_CELL_STATS 0x001B -#define EVENT_MAX_ 0x001B +#define EVENT_TB_EMPTY 0x001C +#define EVENT_MAX_ 0x001C /* If EVENT_MAX_ ever hits 0x0020, we need to make the mask wider. */ /** Bitfield: The bit 1<<e is set if any open control @@ -962,6 +963,7 @@ static const struct control_event_t control_event_table[] = { { EVENT_CONF_CHANGED, "CONF_CHANGED"}, { EVENT_CONN_BW, "CONN_BW" }, { EVENT_CELL_STATS, "CELL_STATS" }, + { EVENT_TB_EMPTY, "TB_EMPTY" }, { 0, NULL }, }; @@ -4119,6 +4121,104 @@ control_event_circuit_cell_stats(void) return 0; } +/** Helper: return the time in millis that a given bucket was empty, + * capped at the time in millis since last refilling that bucket. Return + * 0 if the bucket was not empty during the last refill period. */ +static uint32_t +bucket_millis_empty(int prev_tokens, uint32_t last_empty_time, + uint32_t milliseconds_elapsed) +{ + uint32_t result = 0, refilled; + if (prev_tokens <= 0) { + struct timeval tvnow; + tor_gettimeofday_cached(&tvnow); + refilled = (uint32_t)((tvnow.tv_sec % 86400L) * 1000L + + (uint32_t)tvnow.tv_usec / (uint32_t)1000L); + result = (uint32_t)((refilled + 86400L * 1000L - last_empty_time) % + (86400L * 1000L)); + if (result > milliseconds_elapsed) + result = milliseconds_elapsed; + } + return result; +} + +/** Token buckets have been refilled: tell any interested control + * connections how global and relay token buckets have changed. */ +int +control_event_refill_global(int global_read, int prev_global_read, + uint32_t global_read_emptied_time, + int global_write, int prev_global_write, + uint32_t global_write_emptied_time, + int relay_read, int prev_relay_read, + uint32_t relay_read_emptied_time, + int relay_write, int prev_relay_write, + uint32_t relay_write_emptied_time, + uint32_t milliseconds_elapsed) +{ + uint32_t global_read_empty_time, global_write_empty_time, + relay_read_empty_time, relay_write_empty_time; + if (!get_options()->TestingTorNetwork || + !EVENT_IS_INTERESTING(EVENT_TB_EMPTY)) + return 0; + if (prev_global_read == global_read && + prev_global_write == global_write && + prev_relay_read == relay_read && + prev_relay_write == relay_write) + return 0; + if (prev_global_read <= 0 && prev_global_write <= 0) { + global_read_empty_time = bucket_millis_empty(prev_global_read, + global_read_emptied_time, milliseconds_elapsed); + global_write_empty_time = bucket_millis_empty(prev_global_write, + global_write_emptied_time, milliseconds_elapsed); + send_control_event(EVENT_TB_EMPTY, ALL_FORMATS, + "650 TB_EMPTY GLOBAL READ=%d WRITTEN=%d " + "LAST=%d\r\n", + global_read_empty_time, global_write_empty_time, + milliseconds_elapsed); + } + if (prev_relay_read <= 0 && prev_relay_write <= 0) { + relay_read_empty_time = bucket_millis_empty(prev_relay_read, + relay_read_emptied_time, milliseconds_elapsed); + relay_write_empty_time = bucket_millis_empty(prev_relay_write, + relay_write_emptied_time, milliseconds_elapsed); + send_control_event(EVENT_TB_EMPTY, ALL_FORMATS, + "650 TB_EMPTY RELAY READ=%d WRITTEN=%d " + "LAST=%d\r\n", + relay_read_empty_time, relay_write_empty_time, + milliseconds_elapsed); + } + return 0; +} + +/** Token buckets of a connection have been refilled: tell any interested + * control connections how per-connection token buckets have changed. */ +int +control_event_refill_conn(or_connection_t *or_conn, + int prev_read, int prev_write, + uint32_t milliseconds_elapsed) +{ + uint32_t read_bucket_empty_time, write_bucket_empty_time; + if (!get_options()->TestingTorNetwork || + !EVENT_IS_INTERESTING(EVENT_TB_EMPTY)) + return 0; + if (prev_read == or_conn->read_bucket && + prev_write == or_conn->write_bucket) + return 0; + if (prev_read <= 0 || prev_write <= 0) { + read_bucket_empty_time = bucket_millis_empty(prev_read, + or_conn->read_emptied_time, milliseconds_elapsed); + write_bucket_empty_time = bucket_millis_empty(prev_write, + or_conn->write_emptied_time, milliseconds_elapsed); + send_control_event(EVENT_TB_EMPTY, ALL_FORMATS, + "650 TB_EMPTY ORCONN ID="U64_FORMAT" READ=%d " + "WRITTEN=%d LAST=%d\r\n", + U64_PRINTF_ARG(or_conn->base_.global_identifier), + read_bucket_empty_time, write_bucket_empty_time, + milliseconds_elapsed); + } + return 0; +} + /** A second or more has elapsed: tell any interested control * connections how much bandwidth we used. */ int diff --git a/src/or/control.h b/src/or/control.h index 4d950bfc5d..2977159b91 100644 --- a/src/or/control.h +++ b/src/or/control.h @@ -53,6 +53,18 @@ int control_event_stream_bandwidth_used(void); int control_event_conn_bandwidth(connection_t *conn); int control_event_conn_bandwidth_used(void); int control_event_circuit_cell_stats(void); +int control_event_refill_global(int global_read, int prev_global_read, + uint32_t global_read_emptied, + int global_write, int prev_global_write, + uint32_t global_write_emptied, + int relay_read, int prev_relay_read, + uint32_t relay_read_emptied, + int relay_write, int prev_relay_write, + uint32_t relay_write_emptied, + uint32_t milliseconds_elapsed); +int control_event_refill_conn(or_connection_t *or_conn, + int prev_read, int prev_write, + uint32_t milliseconds_elapsed); void control_event_logmsg(int severity, uint32_t domain, const char *msg); int control_event_descriptors_changed(smartlist_t *routers); int control_event_address_mapped(const char *from, const char *to, diff --git a/src/or/or.h b/src/or/or.h index 22e4b96fcc..c2be282911 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1477,6 +1477,12 @@ typedef struct or_connection_t { struct or_connection_t *next_with_same_id; /**< Next connection with same * identity digest as this one. */ + /** Last emptied read token bucket in msec since midnight; only used in + * TestingTorNetwork mode. */ + uint32_t read_emptied_time; + /** Last emptied write token bucket in msec since midnight; only used in + * TestingTorNetwork mode. */ + uint32_t write_emptied_time; } or_connection_t; /** Subtype of connection_t for an "edge connection" -- that is, an entry (ap)