diff --git a/changes/ticket25760 b/changes/ticket25760
new file mode 100644
index 0000000000..504fd60de6
--- /dev/null
+++ b/changes/ticket25760
@@ -0,0 +1,5 @@
+ o Removed features:
+ - The TestingEnableTbEmptyEvent option has been removed. It was used
+ in testing simulations to measure how often connection buckets were
+ emptied, in order to improve our scheduling, but it has not
+ been actively used in years. Closes ticket 25760.
diff --git a/changes/ticket25766 b/changes/ticket25766
new file mode 100644
index 0000000000..6382b6215e
--- /dev/null
+++ b/changes/ticket25766
@@ -0,0 +1,3 @@
+ o Code simplification and refactoring:
+ - Refactor token-bucket implementations to use a common backend.
+ Closes ticket 25766.
diff --git a/doc/tor.1.txt b/doc/tor.1.txt
index c3493f7181..5c88e86fb3 100644
--- a/doc/tor.1.txt
+++ b/doc/tor.1.txt
@@ -2883,7 +2883,6 @@ The following options are used for running a testing Tor network.
TestingDirConnectionMaxStall 30 seconds
TestingEnableConnBwEvent 1
TestingEnableCellStatsEvent 1
- TestingEnableTbEmptyEvent 1
[[TestingV3AuthInitialVotingInterval]] **TestingV3AuthInitialVotingInterval** __N__ **minutes**|**hours**::
Like V3AuthVotingInterval, but for initial voting interval before the first
@@ -3021,11 +3020,6 @@ The following options are used for running a testing Tor network.
events. Changing this requires that **TestingTorNetwork** is set.
(Default: 0)
-[[TestingEnableTbEmptyEvent]] **TestingEnableTbEmptyEvent** **0**|**1**::
- If this option is set, then Tor controllers may register for TB_EMPTY
- events. Changing this requires that **TestingTorNetwork** is set.
- (Default: 0)
-
[[TestingMinExitFlagThreshold]] **TestingMinExitFlagThreshold** __N__ **KBytes**|**MBytes**|**GBytes**|**TBytes**|**KBits**|**MBits**|**GBits**|**TBits**::
Sets a lower-bound for assigning an exit flag when running as an
authority on a testing network. Overrides the usual default lower bound
diff --git a/src/common/compat_time.c b/src/common/compat_time.c
index 183a60a480..b940447b67 100644
--- a/src/common/compat_time.c
+++ b/src/common/compat_time.c
@@ -830,11 +830,24 @@ monotime_coarse_stamp_units_to_approx_msec(uint64_t units)
return (abstime_diff * mach_time_info.numer) /
(mach_time_info.denom * ONE_MILLION);
}
+uint64_t
+monotime_msec_to_approx_coarse_stamp_units(uint64_t msec)
+{
+ uint64_t abstime_val =
+ (((uint64_t)msec) * ONE_MILLION * mach_time_info.denom) /
+ mach_time_info.numer;
+ return abstime_val >> monotime_shift;
+}
#else
uint64_t
monotime_coarse_stamp_units_to_approx_msec(uint64_t units)
{
return (units * 1000) / STAMP_TICKS_PER_SECOND;
}
+uint64_t
+monotime_msec_to_approx_coarse_stamp_units(uint64_t msec)
+{
+ return (msec * STAMP_TICKS_PER_SECOND) / 1000;
+}
#endif
diff --git a/src/common/compat_time.h b/src/common/compat_time.h
index 6ddd11883d..75b57f6f24 100644
--- a/src/common/compat_time.h
+++ b/src/common/compat_time.h
@@ -150,6 +150,7 @@ uint32_t monotime_coarse_to_stamp(const monotime_coarse_t *t);
* into an approximate number of milliseconds.
*/
uint64_t monotime_coarse_stamp_units_to_approx_msec(uint64_t units);
+uint64_t monotime_msec_to_approx_coarse_stamp_units(uint64_t msec);
uint32_t monotime_coarse_get_stamp(void);
#if defined(MONOTIME_COARSE_TYPE_IS_DIFFERENT)
diff --git a/src/common/include.am b/src/common/include.am
index 73c51ff0b2..87ab9d79e9 100644
--- a/src/common/include.am
+++ b/src/common/include.am
@@ -97,6 +97,7 @@ LIBOR_A_SRC = \
src/common/util_process.c \
src/common/sandbox.c \
src/common/storagedir.c \
+ src/common/token_bucket.c \
src/common/workqueue.c \
$(libor_extra_source) \
$(threads_impl_source) \
@@ -184,6 +185,7 @@ COMMONHEADERS = \
src/common/storagedir.h \
src/common/testsupport.h \
src/common/timers.h \
+ src/common/token_bucket.h \
src/common/torint.h \
src/common/torlog.h \
src/common/tortls.h \
diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c
new file mode 100644
index 0000000000..6af2982147
--- /dev/null
+++ b/src/common/token_bucket.c
@@ -0,0 +1,199 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file token_bucket.c
+ * \brief Functions to use and manipulate token buckets, used for
+ * rate-limiting on connections and globally.
+ *
+ * Tor uses these token buckets to keep track of bandwidth usage, and
+ * sometimes other things too.
+ *
+ * The time units we use internally are based on "timestamp" units -- see
+ * monotime_coarse_to_stamp() for a rationale.
+ *
+ * Token buckets may become negative.
+ **/
+
+#define TOKEN_BUCKET_PRIVATE
+
+#include "token_bucket.h"
+#include "util_bug.h"
+
+/** Convert a rate in bytes per second to a rate in bytes per step */
+static uint32_t
+rate_per_sec_to_rate_per_step(uint32_t rate)
+{
+ /*
+ The precise calculation we'd want to do is
+
+ (rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize
+ rounding error, we do it this way instead, and divide last.
+ */
+ return (uint32_t)
+ monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000;
+}
+
+/**
+ * Initialize a token bucket in *bucket, set up to allow rate
+ * bytes per second, with a maximum burst of burst bytes. The bucket
+ * is created such that now_ts is the current timestamp. The bucket
+ * starts out full.
+ */
+void
+token_bucket_init(token_bucket_t *bucket,
+ uint32_t rate,
+ uint32_t burst,
+ uint32_t now_ts)
+{
+ memset(bucket, 0, sizeof(token_bucket_t));
+ token_bucket_adjust(bucket, rate, burst);
+ token_bucket_reset(bucket, now_ts);
+}
+
+/**
+ * Change the configured rate (in bytes per second) and burst (in bytes)
+ * for the token bucket in *bucket.
+ */
+void
+token_bucket_adjust(token_bucket_t *bucket,
+ uint32_t rate,
+ uint32_t burst)
+{
+ tor_assert_nonfatal(rate > 0);
+ tor_assert_nonfatal(burst > 0);
+ if (burst > TOKEN_BUCKET_MAX_BURST)
+ burst = TOKEN_BUCKET_MAX_BURST;
+
+ bucket->rate = rate_per_sec_to_rate_per_step(rate);
+ bucket->burst = burst;
+ bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst);
+ bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst);
+}
+
+/**
+ * Reset bucket to be full, as of timestamp now_ts.
+ */
+void
+token_bucket_reset(token_bucket_t *bucket,
+ uint32_t now_ts)
+{
+ bucket->read_bucket = bucket->burst;
+ bucket->write_bucket = bucket->burst;
+ bucket->last_refilled_at_ts = now_ts;
+}
+
+/* Helper: see token_bucket_refill */
+static int
+refill_single_bucket(int32_t *bucketptr,
+ const uint32_t rate,
+ const int32_t burst,
+ const uint32_t elapsed_steps)
+{
+ const int was_empty = (*bucketptr <= 0);
+ /* The casts here prevent an underflow.
+ *
+ * Note that even if the bucket value is negative, subtracting it from
+ * "burst" will still produce a correct result. If this result is
+ * ridiculously high, then the "elapsed_steps > gap / rate" check below
+ * should catch it. */
+ const size_t gap = ((size_t)burst) - ((size_t)*bucketptr);
+
+ if (elapsed_steps > gap / rate) {
+ *bucketptr = burst;
+ } else {
+ *bucketptr += rate * elapsed_steps;
+ }
+
+ return was_empty && *bucketptr > 0;
+}
+
+/**
+ * Refill bucket as appropriate, given that the current timestamp
+ * is now_ts.
+ *
+ * Return a bitmask containing TB_READ iff read bucket was empty and became
+ * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty.
+ */
+int
+token_bucket_refill(token_bucket_t *bucket,
+ uint32_t now_ts)
+{
+ const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts);
+ if (elapsed_ticks > UINT32_MAX-(300*1000)) {
+ /* Either about 48 days have passed since the last refill, or the
+ * monotonic clock has somehow moved backwards. (We're looking at you,
+ * Windows.). We accept up to a 5 minute jump backwards as
+ * "unremarkable".
+ */
+ return 0;
+ }
+ const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP;
+
+ if (!elapsed_steps) {
+ /* Note that if less than one whole step elapsed, we don't advance the
+ * time in last_refilled_at_ts. That's intentional: we want to make sure
+ * that we add some bytes to it eventually. */
+ return 0;
+ }
+
+ int flags = 0;
+ if (refill_single_bucket(&bucket->read_bucket,
+ bucket->rate, bucket->burst, elapsed_steps))
+ flags |= TB_READ;
+ if (refill_single_bucket(&bucket->write_bucket,
+ bucket->rate, bucket->burst, elapsed_steps))
+ flags |= TB_WRITE;
+
+ bucket->last_refilled_at_ts = now_ts;
+ return flags;
+}
+
+static int
+decrement_single_bucket(int32_t *bucketptr,
+ ssize_t n)
+{
+ if (BUG(n < 0))
+ return 0;
+ const int becomes_empty = *bucketptr > 0 && n >= *bucketptr;
+ *bucketptr -= n;
+ return becomes_empty;
+}
+
+/**
+ * Decrement the read token bucket in bucket by n bytes.
+ *
+ * Return true if the bucket was nonempty and became empty; return false
+ * otherwise.
+ */
+int
+token_bucket_dec_read(token_bucket_t *bucket,
+ ssize_t n)
+{
+ return decrement_single_bucket(&bucket->read_bucket, n);
+}
+
+/**
+ * Decrement the write token bucket in bucket by n bytes.
+ *
+ * Return true if the bucket was nonempty and became empty; return false
+ * otherwise.
+ */
+int
+token_bucket_dec_write(token_bucket_t *bucket,
+ ssize_t n)
+{
+ return decrement_single_bucket(&bucket->write_bucket, n);
+}
+
+/**
+ * As token_bucket_dec_read and token_bucket_dec_write, in a single operation.
+ */
+void
+token_bucket_dec(token_bucket_t *bucket,
+ ssize_t n_read, ssize_t n_written)
+{
+ token_bucket_dec_read(bucket, n_read);
+ token_bucket_dec_read(bucket, n_written);
+}
+
diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h
new file mode 100644
index 0000000000..2d1ccd5cf3
--- /dev/null
+++ b/src/common/token_bucket.h
@@ -0,0 +1,75 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file token_bucket.h
+ * \brief Headers for token_bucket.c
+ **/
+
+#ifndef TOR_TOKEN_BUCKET_H
+#define TOR_TOKEN_BUCKET_H
+
+#include "torint.h"
+
+typedef struct token_bucket_t {
+ uint32_t rate;
+ int32_t burst;
+ int32_t read_bucket;
+ int32_t write_bucket;
+ uint32_t last_refilled_at_ts;
+} token_bucket_t;
+
+#define TOKEN_BUCKET_MAX_BURST INT32_MAX
+
+void token_bucket_init(token_bucket_t *bucket,
+ uint32_t rate,
+ uint32_t burst,
+ uint32_t now_ts);
+
+void token_bucket_adjust(token_bucket_t *bucket,
+ uint32_t rate, uint32_t burst);
+
+void token_bucket_reset(token_bucket_t *bucket,
+ uint32_t now_ts);
+
+#define TB_READ 1
+#define TB_WRITE 2
+
+int token_bucket_refill(token_bucket_t *bucket,
+ uint32_t now_ts);
+
+int token_bucket_dec_read(token_bucket_t *bucket,
+ ssize_t n);
+int token_bucket_dec_write(token_bucket_t *bucket,
+ ssize_t n);
+
+void token_bucket_dec(token_bucket_t *bucket,
+ ssize_t n_read, ssize_t n_written);
+
+static inline size_t token_bucket_get_read(const token_bucket_t *bucket);
+static inline size_t
+token_bucket_get_read(const token_bucket_t *bucket)
+{
+ const ssize_t b = bucket->read_bucket;
+ return b >= 0 ? b : 0;
+}
+
+static inline size_t token_bucket_get_write(const token_bucket_t *bucket);
+static inline size_t
+token_bucket_get_write(const token_bucket_t *bucket)
+{
+ const ssize_t b = bucket->write_bucket;
+ return b >= 0 ? b : 0;
+}
+
+#ifdef TOKEN_BUCKET_PRIVATE
+
+/* To avoid making the rates too small, we consider units of "steps",
+ * where a "step" is defined as this many timestamp ticks. Keep this
+ * a power of two if you can. */
+#define TICKS_PER_STEP 16
+
+#endif
+
+#endif /* TOR_TOKEN_BUCKET_H */
+
diff --git a/src/or/config.c b/src/or/config.c
index 4064cf64ab..206274cd38 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -337,7 +337,7 @@ static config_var_t option_vars_[] = {
V(DownloadExtraInfo, BOOL, "0"),
V(TestingEnableConnBwEvent, BOOL, "0"),
V(TestingEnableCellStatsEvent, BOOL, "0"),
- V(TestingEnableTbEmptyEvent, BOOL, "0"),
+ OBSOLETE("TestingEnableTbEmptyEvent"),
V(EnforceDistinctSubnets, BOOL, "1"),
V(EntryNodes, ROUTERSET, NULL),
V(EntryStatistics, BOOL, "0"),
@@ -707,7 +707,6 @@ static const config_var_t testing_tor_network_defaults[] = {
V(TestingDirConnectionMaxStall, INTERVAL, "30 seconds"),
V(TestingEnableConnBwEvent, BOOL, "1"),
V(TestingEnableCellStatsEvent, BOOL, "1"),
- V(TestingEnableTbEmptyEvent, BOOL, "1"),
VAR("___UsingTestNetworkDefaults", BOOL, UsingTestNetworkDefaults_, "1"),
V(RendPostPeriod, INTERVAL, "2 minutes"),
@@ -2189,6 +2188,12 @@ options_act(const or_options_t *old_options)
options->PerConnBWBurst != old_options->PerConnBWBurst)
connection_or_update_token_buckets(get_connection_array(), options);
+ if (options->BandwidthRate != old_options->BandwidthRate ||
+ options->BandwidthBurst != old_options->BandwidthBurst ||
+ options->BandwidthRate != old_options->BandwidthRate ||
+ options->RelayBandwidthBurst != old_options->RelayBandwidthBurst)
+ connection_bucket_adjust(options);
+
if (options->MainloopStats != old_options->MainloopStats) {
reset_main_loop_counters();
}
@@ -4459,12 +4464,6 @@ options_validate(or_options_t *old_options, or_options_t *options,
"Tor networks!");
}
- if (options->TestingEnableTbEmptyEvent &&
- !options->TestingTorNetwork && !options->UsingTestNetworkDefaults_) {
- REJECT("TestingEnableTbEmptyEvent may only be changed in testing "
- "Tor networks!");
- }
-
if (options->TestingTorNetwork) {
log_warn(LD_CONFIG, "TestingTorNetwork is set. This will make your node "
"almost unusable in the public Tor network, and is "
diff --git a/src/or/connection.c b/src/or/connection.c
index 5532551cfe..1aad68678e 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -119,8 +119,6 @@ static connection_t *connection_listener_new(
static void connection_init(time_t now, connection_t *conn, int type,
int socket_family);
static int connection_handle_listener_read(connection_t *conn, int new_type);
-static int connection_bucket_should_increase(int bucket,
- or_connection_t *conn);
static int connection_finished_flushing(connection_t *conn);
static int connection_flushed_some(connection_t *conn);
static int connection_finished_connecting(connection_t *conn);
@@ -2848,7 +2846,7 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now)
* non-negative) provides an upper limit for our answer. */
static ssize_t
connection_bucket_round_robin(int base, int priority,
- ssize_t global_bucket, ssize_t conn_bucket)
+ ssize_t global_bucket_val, ssize_t conn_bucket)
{
ssize_t at_most;
ssize_t num_bytes_high = (priority ? 32 : 16) * base;
@@ -2857,15 +2855,15 @@ connection_bucket_round_robin(int base, int priority,
/* Do a rudimentary round-robin so one circuit can't hog a connection.
* Pick at most 32 cells, at least 4 cells if possible, and if we're in
* the middle pick 1/8 of the available bandwidth. */
- at_most = global_bucket / 8;
+ at_most = global_bucket_val / 8;
at_most -= (at_most % base); /* round down */
if (at_most > num_bytes_high) /* 16 KB, or 8 KB for low-priority */
at_most = num_bytes_high;
else if (at_most < num_bytes_low) /* 2 KB, or 1 KB for low-priority */
at_most = num_bytes_low;
- if (at_most > global_bucket)
- at_most = global_bucket;
+ if (at_most > global_bucket_val)
+ at_most = global_bucket_val;
if (conn_bucket >= 0 && at_most > conn_bucket)
at_most = conn_bucket;
@@ -2881,13 +2879,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
{
int base = RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR;
- int conn_bucket = -1;
- int global_bucket = global_read_bucket;
+ ssize_t conn_bucket = -1;
+ size_t global_bucket_val = token_bucket_get_read(&global_bucket);
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
if (conn->state == OR_CONN_STATE_OPEN)
- conn_bucket = or_conn->read_bucket;
+ conn_bucket = token_bucket_get_read(&or_conn->bucket);
base = get_cell_network_size(or_conn->wide_circ_ids);
}
@@ -2896,12 +2894,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
return conn_bucket>=0 ? conn_bucket : 1<<14;
}
- if (connection_counts_as_relayed_traffic(conn, now) &&
- global_relayed_read_bucket <= global_read_bucket)
- global_bucket = global_relayed_read_bucket;
+ if (connection_counts_as_relayed_traffic(conn, now)) {
+ size_t relayed = token_bucket_get_read(&global_relayed_bucket);
+ global_bucket_val = MIN(global_bucket_val, relayed);
+ }
return connection_bucket_round_robin(base, priority,
- global_bucket, conn_bucket);
+ global_bucket_val, conn_bucket);
}
/** How many bytes at most can we write onto this connection? */
@@ -2910,8 +2909,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
{
int base = RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR;
- int conn_bucket = (int)conn->outbuf_flushlen;
- int global_bucket = global_write_bucket;
+ size_t conn_bucket = conn->outbuf_flushlen;
+ size_t global_bucket_val = token_bucket_get_write(&global_bucket);
if (!connection_is_rate_limited(conn)) {
/* be willing to write to local conns even if our buckets are empty */
@@ -2919,22 +2918,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
}
if (connection_speaks_cells(conn)) {
- /* use the per-conn write limit if it's lower, but if it's less
- * than zero just use zero */
+ /* use the per-conn write limit if it's lower */
or_connection_t *or_conn = TO_OR_CONN(conn);
if (conn->state == OR_CONN_STATE_OPEN)
- if (or_conn->write_bucket < conn_bucket)
- conn_bucket = or_conn->write_bucket >= 0 ?
- or_conn->write_bucket : 0;
+ conn_bucket = MIN(conn_bucket, token_bucket_get_write(&or_conn->bucket));
base = get_cell_network_size(or_conn->wide_circ_ids);
}
- if (connection_counts_as_relayed_traffic(conn, now) &&
- global_relayed_write_bucket <= global_write_bucket)
- global_bucket = global_relayed_write_bucket;
+ if (connection_counts_as_relayed_traffic(conn, now)) {
+ size_t relayed = token_bucket_get_write(&global_relayed_bucket);
+ global_bucket_val = MIN(global_bucket_val, relayed);
+ }
return connection_bucket_round_robin(base, priority,
- global_bucket, conn_bucket);
+ global_bucket_val, conn_bucket);
}
/** Return 1 if the global write buckets are low enough that we
@@ -2959,15 +2956,15 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{
- int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
- global_write_bucket : global_relayed_write_bucket;
+ size_t smaller_bucket = MIN(token_bucket_get_write(&global_bucket),
+ token_bucket_get_write(&global_relayed_bucket));
if (authdir_mode(get_options()) && priority>1)
return 0; /* there's always room to answer v2 if we're an auth dir */
if (!connection_is_rate_limited(conn))
return 0; /* local conns don't get limited */
- if (smaller_bucket < (int)attempt)
+ if (smaller_bucket < attempt)
return 1; /* not enough space no matter the priority */
if (write_buckets_empty_last_second)
@@ -2976,10 +2973,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
if (priority == 1) { /* old-style v1 query */
/* Could we handle *two* of these requests within the next two seconds? */
const or_options_t *options = get_options();
- int64_t can_write = (int64_t)smaller_bucket
+ size_t can_write = smaller_bucket
+ 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate :
options->BandwidthRate);
- if (can_write < 2*(int64_t)attempt)
+ if (can_write < 2*attempt)
return 1;
} else { /* v2 query */
/* no further constraints yet */
@@ -3019,57 +3016,6 @@ record_num_bytes_transferred_impl(connection_t *conn,
rep_hist_note_exit_bytes(conn->port, num_written, num_read);
}
-/** Helper: convert given tvnow time value to milliseconds since
- * midnight. */
-static uint32_t
-msec_since_midnight(const struct timeval *tvnow)
-{
- return (uint32_t)(((tvnow->tv_sec % 86400L) * 1000L) +
- ((uint32_t)tvnow->tv_usec / (uint32_t)1000L));
-}
-
-/** Helper: return the time in milliseconds since last_empty_time
- * when a bucket ran empty that previously had tokens_before tokens
- * now has tokens_after tokens after refilling at timestamp
- * tvnow, capped at milliseconds_elapsed milliseconds since
- * last refilling that bucket. Return 0 if the bucket has not been empty
- * since the last refill or has not been refilled. */
-uint32_t
-bucket_millis_empty(int tokens_before, uint32_t last_empty_time,
- int tokens_after, int milliseconds_elapsed,
- const struct timeval *tvnow)
-{
- uint32_t result = 0, refilled;
- if (tokens_before <= 0 && tokens_after > tokens_before) {
- refilled = msec_since_midnight(tvnow);
- result = (uint32_t)((refilled + 86400L * 1000L - last_empty_time) %
- (86400L * 1000L));
- if (result > (uint32_t)milliseconds_elapsed)
- result = (uint32_t)milliseconds_elapsed;
- }
- return result;
-}
-
-/** Check if a bucket which had tokens_before tokens and which got
- * tokens_removed tokens removed at timestamp tvnow has run
- * out of tokens, and if so, note the milliseconds since midnight in
- * timestamp_var for the next TB_EMPTY event. */
-void
-connection_buckets_note_empty_ts(uint32_t *timestamp_var,
- int tokens_before, size_t tokens_removed,
- const struct timeval *tvnow)
-{
- if (tokens_before > 0 && (uint32_t)tokens_before <= tokens_removed)
- *timestamp_var = msec_since_midnight(tvnow);
-}
-
-/** Last time at which the global or relay buckets were emptied in msec
- * since midnight. */
-static uint32_t global_relayed_read_emptied = 0,
- global_relayed_write_emptied = 0,
- global_read_emptied = 0,
- global_write_emptied = 0;
-
/** We just read num_read and wrote num_written bytes
* onto conn. Decrement buckets appropriately. */
static void
@@ -3094,39 +3040,13 @@ connection_buckets_decrement(connection_t *conn, time_t now,
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
- /* If one or more of our token buckets ran dry just now, note the
- * timestamp for TB_EMPTY events. */
- if (get_options()->TestingEnableTbEmptyEvent) {
- struct timeval tvnow;
- tor_gettimeofday_cached(&tvnow);
- if (connection_counts_as_relayed_traffic(conn, now)) {
- connection_buckets_note_empty_ts(&global_relayed_read_emptied,
- global_relayed_read_bucket, num_read, &tvnow);
- connection_buckets_note_empty_ts(&global_relayed_write_emptied,
- global_relayed_write_bucket, num_written, &tvnow);
- }
- connection_buckets_note_empty_ts(&global_read_emptied,
- global_read_bucket, num_read, &tvnow);
- connection_buckets_note_empty_ts(&global_write_emptied,
- global_write_bucket, num_written, &tvnow);
- if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
- or_connection_t *or_conn = TO_OR_CONN(conn);
- connection_buckets_note_empty_ts(&or_conn->read_emptied_time,
- or_conn->read_bucket, num_read, &tvnow);
- connection_buckets_note_empty_ts(&or_conn->write_emptied_time,
- or_conn->write_bucket, num_written, &tvnow);
- }
- }
-
if (connection_counts_as_relayed_traffic(conn, now)) {
- global_relayed_read_bucket -= (int)num_read;
- global_relayed_write_bucket -= (int)num_written;
+ token_bucket_dec(&global_relayed_bucket, num_read, num_written);
}
- global_read_bucket -= (int)num_read;
- global_write_bucket -= (int)num_written;
+ token_bucket_dec(&global_bucket, num_read, num_written);
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
- TO_OR_CONN(conn)->read_bucket -= (int)num_read;
- TO_OR_CONN(conn)->write_bucket -= (int)num_written;
+ or_connection_t *or_conn = TO_OR_CONN(conn);
+ token_bucket_dec(&or_conn->bucket, num_read, num_written);
}
}
@@ -3140,14 +3060,14 @@ connection_consider_empty_read_buckets(connection_t *conn)
if (!connection_is_rate_limited(conn))
return; /* Always okay. */
- if (global_read_bucket <= 0) {
+ if (token_bucket_get_read(&global_bucket) <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
- global_relayed_read_bucket <= 0) {
+ token_bucket_get_read(&global_relayed_bucket) <= 0) {
reason = "global relayed read bucket exhausted. Pausing.";
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
- TO_OR_CONN(conn)->read_bucket <= 0) {
+ token_bucket_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
reason = "connection read bucket exhausted. Pausing.";
} else
return; /* all good, no need to stop it */
@@ -3167,14 +3087,14 @@ connection_consider_empty_write_buckets(connection_t *conn)
if (!connection_is_rate_limited(conn))
return; /* Always okay. */
- if (global_write_bucket <= 0) {
+ if (token_bucket_get_write(&global_bucket) <= 0) {
reason = "global write bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
- global_relayed_write_bucket <= 0) {
+ token_bucket_get_write(&global_relayed_bucket) <= 0) {
reason = "global relayed write bucket exhausted. Pausing.";
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
- TO_OR_CONN(conn)->write_bucket <= 0) {
+ token_bucket_get_write(&TO_OR_CONN(conn)->bucket) <= 0) {
reason = "connection write bucket exhausted. Pausing.";
} else
return; /* all good, no need to stop it */
@@ -3184,180 +3104,79 @@ connection_consider_empty_write_buckets(connection_t *conn)
connection_stop_writing(conn);
}
-/** Initialize the global read bucket to options-\>BandwidthBurst. */
+/** Initialize the global buckets to the values configured in the
+ * options */
void
connection_bucket_init(void)
{
const or_options_t *options = get_options();
- /* start it at max traffic */
- global_read_bucket = (int)options->BandwidthBurst;
- global_write_bucket = (int)options->BandwidthBurst;
+ const uint32_t now_ts = monotime_coarse_get_stamp();
+ token_bucket_init(&global_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst,
+ now_ts);
if (options->RelayBandwidthRate) {
- global_relayed_read_bucket = (int)options->RelayBandwidthBurst;
- global_relayed_write_bucket = (int)options->RelayBandwidthBurst;
+ token_bucket_init(&global_relayed_bucket,
+ (int32_t)options->RelayBandwidthRate,
+ (int32_t)options->RelayBandwidthBurst,
+ now_ts);
} else {
- global_relayed_read_bucket = (int)options->BandwidthBurst;
- global_relayed_write_bucket = (int)options->BandwidthBurst;
+ token_bucket_init(&global_relayed_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst,
+ now_ts);
}
}
-/** Refill a single bucket called name with bandwidth rate per
- * second rate and bandwidth burst burst, assuming that
- * milliseconds_elapsed milliseconds have passed since the last
- * call. */
-static void
-connection_bucket_refill_helper(int *bucket, int rate, int burst,
- int milliseconds_elapsed,
- const char *name)
+/** Update the global connection bucket settings to a new value. */
+void
+connection_bucket_adjust(const or_options_t *options)
{
- int starting_bucket = *bucket;
- if (starting_bucket < burst && milliseconds_elapsed > 0) {
- int64_t incr = (((int64_t)rate) * milliseconds_elapsed) / 1000;
- if ((burst - starting_bucket) < incr) {
- *bucket = burst; /* We would overflow the bucket; just set it to
- * the maximum. */
- } else {
- *bucket += (int)incr;
- if (*bucket > burst || *bucket < starting_bucket) {
- /* If we overflow the burst, or underflow our starting bucket,
- * cap the bucket value to burst. */
- /* XXXX this might be redundant now, but it doesn't show up
- * in profiles. Remove it after analysis. */
- *bucket = burst;
- }
- }
- log_debug(LD_NET,"%s now %d.", name, *bucket);
+ token_bucket_adjust(&global_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst);
+ if (options->RelayBandwidthRate) {
+ token_bucket_adjust(&global_relayed_bucket,
+ (int32_t)options->RelayBandwidthRate,
+ (int32_t)options->RelayBandwidthBurst);
+ } else {
+ token_bucket_adjust(&global_relayed_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst);
}
}
/** Time has passed; increment buckets appropriately. */
void
-connection_bucket_refill(int milliseconds_elapsed, time_t now)
+connection_bucket_refill(time_t now, uint32_t now_ts)
{
- const or_options_t *options = get_options();
smartlist_t *conns = get_connection_array();
- int bandwidthrate, bandwidthburst, relayrate, relayburst;
-
- int prev_global_read = global_read_bucket;
- int prev_global_write = global_write_bucket;
- int prev_relay_read = global_relayed_read_bucket;
- int prev_relay_write = global_relayed_write_bucket;
- struct timeval tvnow; /*< Only used if TB_EMPTY events are enabled. */
-
- bandwidthrate = (int)options->BandwidthRate;
- bandwidthburst = (int)options->BandwidthBurst;
-
- if (options->RelayBandwidthRate) {
- relayrate = (int)options->RelayBandwidthRate;
- relayburst = (int)options->RelayBandwidthBurst;
- } else {
- relayrate = bandwidthrate;
- relayburst = bandwidthburst;
- }
-
- tor_assert(milliseconds_elapsed >= 0);
write_buckets_empty_last_second =
- global_relayed_write_bucket <= 0 || global_write_bucket <= 0;
+ token_bucket_get_write(&global_bucket) <= 0 ||
+ token_bucket_get_write(&global_relayed_bucket) <= 0;
/* refill the global buckets */
- connection_bucket_refill_helper(&global_read_bucket,
- bandwidthrate, bandwidthburst,
- milliseconds_elapsed,
- "global_read_bucket");
- connection_bucket_refill_helper(&global_write_bucket,
- bandwidthrate, bandwidthburst,
- milliseconds_elapsed,
- "global_write_bucket");
- connection_bucket_refill_helper(&global_relayed_read_bucket,
- relayrate, relayburst,
- milliseconds_elapsed,
- "global_relayed_read_bucket");
- connection_bucket_refill_helper(&global_relayed_write_bucket,
- relayrate, relayburst,
- milliseconds_elapsed,
- "global_relayed_write_bucket");
-
- /* If buckets were empty before and have now been refilled, tell any
- * interested controllers. */
- if (get_options()->TestingEnableTbEmptyEvent) {
- uint32_t global_read_empty_time, global_write_empty_time,
- relay_read_empty_time, relay_write_empty_time;
- tor_gettimeofday_cached(&tvnow);
- global_read_empty_time = bucket_millis_empty(prev_global_read,
- global_read_emptied, global_read_bucket,
- milliseconds_elapsed, &tvnow);
- global_write_empty_time = bucket_millis_empty(prev_global_write,
- global_write_emptied, global_write_bucket,
- milliseconds_elapsed, &tvnow);
- control_event_tb_empty("GLOBAL", global_read_empty_time,
- global_write_empty_time, milliseconds_elapsed);
- relay_read_empty_time = bucket_millis_empty(prev_relay_read,
- global_relayed_read_emptied,
- global_relayed_read_bucket,
- milliseconds_elapsed, &tvnow);
- relay_write_empty_time = bucket_millis_empty(prev_relay_write,
- global_relayed_write_emptied,
- global_relayed_write_bucket,
- milliseconds_elapsed, &tvnow);
- control_event_tb_empty("RELAY", relay_read_empty_time,
- relay_write_empty_time, milliseconds_elapsed);
- }
+ token_bucket_refill(&global_bucket, now_ts);
+ token_bucket_refill(&global_relayed_bucket, now_ts);
/* refill the per-connection buckets */
SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
- int orbandwidthrate = or_conn->bandwidthrate;
- int orbandwidthburst = or_conn->bandwidthburst;
- int prev_conn_read = or_conn->read_bucket;
- int prev_conn_write = or_conn->write_bucket;
-
- if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) {
- connection_bucket_refill_helper(&or_conn->read_bucket,
- orbandwidthrate,
- orbandwidthburst,
- milliseconds_elapsed,
- "or_conn->read_bucket");
- }
- if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) {
- connection_bucket_refill_helper(&or_conn->write_bucket,
- orbandwidthrate,
- orbandwidthburst,
- milliseconds_elapsed,
- "or_conn->write_bucket");
- }
-
- /* If buckets were empty before and have now been refilled, tell any
- * interested controllers. */
- if (get_options()->TestingEnableTbEmptyEvent) {
- char *bucket;
- uint32_t conn_read_empty_time, conn_write_empty_time;
- tor_asprintf(&bucket, "ORCONN ID="U64_FORMAT,
- U64_PRINTF_ARG(or_conn->base_.global_identifier));
- conn_read_empty_time = bucket_millis_empty(prev_conn_read,
- or_conn->read_emptied_time,
- or_conn->read_bucket,
- milliseconds_elapsed, &tvnow);
- conn_write_empty_time = bucket_millis_empty(prev_conn_write,
- or_conn->write_emptied_time,
- or_conn->write_bucket,
- milliseconds_elapsed, &tvnow);
- control_event_tb_empty(bucket, conn_read_empty_time,
- conn_write_empty_time,
- milliseconds_elapsed);
- tor_free(bucket);
+ if (conn->state == OR_CONN_STATE_OPEN) {
+ token_bucket_refill(&or_conn->bucket, now_ts);
}
}
if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
- && global_read_bucket > 0 /* and we're allowed to read */
+ && token_bucket_get_read(&global_bucket) > 0 /* and we can read */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
- global_relayed_read_bucket > 0) /* even if we're relayed traffic */
+ token_bucket_get_read(&global_relayed_bucket) > 0)
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
- TO_OR_CONN(conn)->read_bucket > 0)) {
+ token_bucket_get_read(&TO_OR_CONN(conn)->bucket) > 0)) {
/* and either a non-cell conn or a cell conn with non-empty bucket */
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for read", (int)conn->s));
@@ -3366,12 +3185,12 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now)
}
if (conn->write_blocked_on_bw == 1
- && global_write_bucket > 0 /* and we're allowed to write */
+ && token_bucket_get_write(&global_bucket) > 0 /* and we can write */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
- global_relayed_write_bucket > 0) /* even if it's relayed traffic */
+ token_bucket_get_write(&global_relayed_bucket) > 0)
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
- TO_OR_CONN(conn)->write_bucket > 0)) {
+ token_bucket_get_write(&TO_OR_CONN(conn)->bucket) > 0)) {
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for write", (int)conn->s));
conn->write_blocked_on_bw = 0;
@@ -3380,22 +3199,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now)
} SMARTLIST_FOREACH_END(conn);
}
-/** Is the bucket for connection conn low enough that we
- * should add another pile of tokens to it?
- */
-static int
-connection_bucket_should_increase(int bucket, or_connection_t *conn)
-{
- tor_assert(conn);
-
- if (conn->base_.state != OR_CONN_STATE_OPEN)
- return 0; /* only open connections play the rate limiting game */
- if (bucket >= conn->bandwidthburst)
- return 0;
-
- return 1;
-}
-
/** Read bytes from conn-\>s and process them.
*
* It calls connection_buf_read_from_socket() to bring in any new bytes,
diff --git a/src/or/connection.h b/src/or/connection.h
index 6bc5a7cfd0..cfe31c3727 100644
--- a/src/or/connection.h
+++ b/src/or/connection.h
@@ -122,7 +122,9 @@ void connection_mark_all_noncontrol_connections(void);
ssize_t connection_bucket_write_limit(connection_t *conn, time_t now);
int global_write_bucket_low(connection_t *conn, size_t attempt, int priority);
void connection_bucket_init(void);
-void connection_bucket_refill(int seconds_elapsed, time_t now);
+void connection_bucket_adjust(const or_options_t *options);
+void connection_bucket_refill(time_t now,
+ uint32_t now_ts);
int connection_handle_read(connection_t *conn);
@@ -272,13 +274,6 @@ void connection_check_oos(int n_socks, int failed);
STATIC void connection_free_minimal(connection_t *conn);
/* Used only by connection.c and test*.c */
-uint32_t bucket_millis_empty(int tokens_before, uint32_t last_empty_time,
- int tokens_after, int milliseconds_elapsed,
- const struct timeval *tvnow);
-void connection_buckets_note_empty_ts(uint32_t *timestamp_var,
- int tokens_before,
- size_t tokens_removed,
- const struct timeval *tvnow);
MOCK_DECL(STATIC int,connection_connect_sockaddr,
(connection_t *conn,
const struct sockaddr *sa,
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index 267463312c..3afdfa6b5a 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -793,18 +793,10 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
(int)options->BandwidthBurst, 1, INT32_MAX);
}
- conn->bandwidthrate = rate;
- conn->bandwidthburst = burst;
- if (reset) { /* set up the token buckets to be full */
- conn->read_bucket = conn->write_bucket = burst;
- return;
+ token_bucket_adjust(&conn->bucket, rate, burst);
+ if (reset) {
+ token_bucket_reset(&conn->bucket, monotime_coarse_get_stamp());
}
- /* If the new token bucket is smaller, take out the extra tokens.
- * (If it's larger, don't -- the buckets can grow to reach the cap.) */
- if (conn->read_bucket > burst)
- conn->read_bucket = burst;
- if (conn->write_bucket > burst)
- conn->write_bucket = burst;
}
/** Either our set of relays or our per-conn rate limits have changed.
diff --git a/src/or/control.c b/src/or/control.c
index 5a2fae64e7..0539ddaca3 100644
--- a/src/or/control.c
+++ b/src/or/control.c
@@ -1214,7 +1214,6 @@ static const struct control_event_t control_event_table[] = {
{ EVENT_CONF_CHANGED, "CONF_CHANGED"},
{ EVENT_CONN_BW, "CONN_BW" },
{ EVENT_CELL_STATS, "CELL_STATS" },
- { EVENT_TB_EMPTY, "TB_EMPTY" },
{ EVENT_CIRC_BANDWIDTH_USED, "CIRC_BW" },
{ EVENT_TRANSPORT_LAUNCHED, "TRANSPORT_LAUNCHED" },
{ EVENT_HS_DESC, "HS_DESC" },
@@ -6077,28 +6076,6 @@ control_event_circuit_cell_stats(void)
return 0;
}
-/** Tokens in bucket have been refilled: the read bucket was empty
- * for read_empty_time millis, the write bucket was empty for
- * write_empty_time millis, and buckets were last refilled
- * milliseconds_elapsed millis ago. Only emit TB_EMPTY event if
- * either read or write bucket have been empty before. */
-int
-control_event_tb_empty(const char *bucket, uint32_t read_empty_time,
- uint32_t write_empty_time,
- int milliseconds_elapsed)
-{
- if (get_options()->TestingEnableTbEmptyEvent &&
- EVENT_IS_INTERESTING(EVENT_TB_EMPTY) &&
- (read_empty_time > 0 || write_empty_time > 0)) {
- send_control_event(EVENT_TB_EMPTY,
- "650 TB_EMPTY %s READ=%d WRITTEN=%d "
- "LAST=%d\r\n",
- bucket, read_empty_time, write_empty_time,
- milliseconds_elapsed);
- }
- return 0;
-}
-
/* about 5 minutes worth. */
#define N_BW_EVENTS_TO_CACHE 300
/* Index into cached_bw_events to next write. */
diff --git a/src/or/control.h b/src/or/control.h
index 28ffeaed86..2fd3c553f3 100644
--- a/src/or/control.h
+++ b/src/or/control.h
@@ -59,9 +59,6 @@ int control_event_circ_bandwidth_used(void);
int control_event_conn_bandwidth(connection_t *conn);
int control_event_conn_bandwidth_used(void);
int control_event_circuit_cell_stats(void);
-int control_event_tb_empty(const char *bucket, uint32_t read_empty_time,
- uint32_t write_empty_time,
- int milliseconds_elapsed);
void control_event_logmsg(int severity, uint32_t domain, const char *msg);
int control_event_descriptors_changed(smartlist_t *routers);
int control_event_address_mapped(const char *from, const char *to,
@@ -194,7 +191,7 @@ void control_free_all(void);
#define EVENT_CONF_CHANGED 0x0019
#define EVENT_CONN_BW 0x001A
#define EVENT_CELL_STATS 0x001B
-#define EVENT_TB_EMPTY 0x001C
+/* UNUSED : 0x001C */
#define EVENT_CIRC_BANDWIDTH_USED 0x001D
#define EVENT_TRANSPORT_LAUNCHED 0x0020
#define EVENT_HS_DESC 0x0021
diff --git a/src/or/main.c b/src/or/main.c
index 4cc51de07b..0e67ea6f68 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -152,19 +152,19 @@ static void shutdown_did_not_work_callback(evutil_socket_t fd, short event,
void *arg) ATTR_NORETURN;
/********* START VARIABLES **********/
-int global_read_bucket; /**< Max number of bytes I can read this second. */
-int global_write_bucket; /**< Max number of bytes I can write this second. */
-/** Max number of relayed (bandwidth class 1) bytes I can read this second. */
-int global_relayed_read_bucket;
-/** Max number of relayed (bandwidth class 1) bytes I can write this second. */
-int global_relayed_write_bucket;
-/** What was the read bucket before the last second_elapsed_callback() call?
- * (used to determine how many bytes we've read). */
-static int stats_prev_global_read_bucket;
+/* Token bucket for all traffic. */
+token_bucket_t global_bucket;
+
+/* Token bucket for relayed traffic. */
+token_bucket_t global_relayed_bucket;
+
+/** What was the read/write bucket before the last second_elapsed_callback()
+ * call? (used to determine how many bytes we've read). */
+static size_t stats_prev_global_read_bucket;
/** What was the write bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've written). */
-static int stats_prev_global_write_bucket;
+static size_t stats_prev_global_write_bucket;
/* DOCDOC stats_prev_n_read */
static uint64_t stats_prev_n_read = 0;
@@ -2389,19 +2389,23 @@ refill_callback(periodic_timer_t *timer, void *arg)
refill_timer_current_millisecond.tv_sec);
}
- bytes_written = stats_prev_global_write_bucket - global_write_bucket;
- bytes_read = stats_prev_global_read_bucket - global_read_bucket;
+ bytes_written = stats_prev_global_write_bucket -
+ token_bucket_get_write(&global_bucket);
+ bytes_read = stats_prev_global_read_bucket -
+ token_bucket_get_read(&global_bucket);
stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && milliseconds_elapsed >= 0)
accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over);
- if (milliseconds_elapsed > 0)
- connection_bucket_refill(milliseconds_elapsed, (time_t)now.tv_sec);
+ if (milliseconds_elapsed > 0) {
+ connection_bucket_refill((time_t)now.tv_sec,
+ monotime_coarse_get_stamp());
+ }
- stats_prev_global_read_bucket = global_read_bucket;
- stats_prev_global_write_bucket = global_write_bucket;
+ stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket);
+ stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket);
/* remember what time it is, for next time */
refill_timer_current_millisecond = now;
@@ -2605,8 +2609,8 @@ do_main_loop(void)
/* Set up our buckets */
connection_bucket_init();
- stats_prev_global_read_bucket = global_read_bucket;
- stats_prev_global_write_bucket = global_write_bucket;
+ stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket);
+ stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket);
/* initialize the bootstrap status events to know we're starting up */
control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
@@ -3501,8 +3505,8 @@ tor_free_all(int postfork)
periodic_timer_free(systemd_watchdog_timer);
#endif
- global_read_bucket = global_write_bucket = 0;
- global_relayed_read_bucket = global_relayed_write_bucket = 0;
+ memset(&global_bucket, 0, sizeof(global_bucket));
+ memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket));
stats_prev_global_read_bucket = stats_prev_global_write_bucket = 0;
stats_prev_n_read = stats_prev_n_written = 0;
stats_n_bytes_read = stats_n_bytes_written = 0;
diff --git a/src/or/main.h b/src/or/main.h
index f01506fcea..9ef5b9472f 100644
--- a/src/or/main.h
+++ b/src/or/main.h
@@ -89,10 +89,8 @@ uint64_t get_main_loop_idle_count(void);
extern time_t time_of_process_start;
extern int quiet_level;
-extern int global_read_bucket;
-extern int global_write_bucket;
-extern int global_relayed_read_bucket;
-extern int global_relayed_write_bucket;
+extern token_bucket_t global_bucket;
+extern token_bucket_t global_relayed_bucket;
#ifdef MAIN_PRIVATE
STATIC void init_connection_lists(void);
diff --git a/src/or/or.h b/src/or/or.h
index b845443947..829b6755d2 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -80,6 +80,7 @@
#include "crypto_curve25519.h"
#include "crypto_ed25519.h"
#include "tor_queue.h"
+#include "token_bucket.h"
#include "util_format.h"
#include "hs_circuitmap.h"
@@ -1660,20 +1661,8 @@ typedef struct or_connection_t {
time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/
- /* bandwidth* and *_bucket only used by ORs in OPEN state: */
- int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */
- int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */
- int read_bucket; /**< When this hits 0, stop receiving. Every second we
- * add 'bandwidthrate' to this, capping it at
- * bandwidthburst. (OPEN ORs only) */
- int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */
-
- /** Last emptied read token bucket in msec since midnight; only used if
- * TB_EMPTY events are enabled. */
- uint32_t read_emptied_time;
- /** Last emptied write token bucket in msec since midnight; only used if
- * TB_EMPTY events are enabled. */
- uint32_t write_emptied_time;
+ token_bucket_t bucket; /**< Used for rate limiting when the connection is
+ * in state CONN_OPEN. */
/*
* Count the number of bytes flushed out on this orconn, and the number of
@@ -4431,9 +4420,6 @@ typedef struct {
/** Enable CELL_STATS events. Only altered on testing networks. */
int TestingEnableCellStatsEvent;
- /** Enable TB_EMPTY events. Only altered on testing networks. */
- int TestingEnableTbEmptyEvent;
-
/** If true, and we have GeoIP data, and we're a bridge, keep a per-country
* count of how many client addresses have contacted us so that we can help
* the bridge authority guess which countries have blocked access to us. */
diff --git a/src/test/include.am b/src/test/include.am
index a663fa5524..474da3f880 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -90,6 +90,7 @@ src_test_test_SOURCES = \
src/test/test_address_set.c \
src/test/test_bridges.c \
src/test/test_buffers.c \
+ src/test/test_bwmgt.c \
src/test/test_cell_formats.c \
src/test/test_cell_queue.c \
src/test/test_channel.c \
diff --git a/src/test/test.c b/src/test/test.c
index f90669b5dd..422e181b94 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -813,6 +813,7 @@ struct testgroup_t testgroups[] = {
{ "address_set/", address_set_tests },
{ "bridges/", bridges_tests },
{ "buffer/", buffer_tests },
+ { "bwmgt/", bwmgt_tests },
{ "cellfmt/", cell_format_tests },
{ "cellqueue/", cell_queue_tests },
{ "channel/", channel_tests },
diff --git a/src/test/test.h b/src/test/test.h
index 34c6e46427..1728831ed0 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -187,6 +187,7 @@ extern struct testcase_t addr_tests[];
extern struct testcase_t address_tests[];
extern struct testcase_t address_set_tests[];
extern struct testcase_t bridges_tests[];
+extern struct testcase_t bwmgt_tests[];
extern struct testcase_t buffer_tests[];
extern struct testcase_t cell_format_tests[];
extern struct testcase_t cell_queue_tests[];
diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c
new file mode 100644
index 0000000000..1a54f44fc4
--- /dev/null
+++ b/src/test/test_bwmgt.c
@@ -0,0 +1,205 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file test_bwmgt.c
+ * \brief tests for bandwidth management / token bucket functions
+ */
+
+#define TOKEN_BUCKET_PRIVATE
+
+#include "or.h"
+#include "test.h"
+
+#include "token_bucket.h"
+
+// an imaginary time, in timestamp units. Chosen so it will roll over.
+static const uint32_t START_TS = UINT32_MAX-10;
+static const int32_t KB = 1024;
+
+static void
+test_bwmgt_token_buf_init(void *arg)
+{
+ (void)arg;
+ token_bucket_t b;
+
+ token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+ // Burst is correct
+ tt_uint_op(b.burst, OP_EQ, 64*KB);
+ // Rate is correct, within 1 percent.
+ {
+ uint32_t ticks_per_sec =
+ (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000);
+ uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP);
+
+ tt_uint_op(rate_per_sec, OP_GT, 16*KB-160);
+ tt_uint_op(rate_per_sec, OP_LT, 16*KB+160);
+ }
+ // Bucket starts out full:
+ tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS);
+ tt_int_op(b.read_bucket, OP_EQ, 64*KB);
+
+ done:
+ ;
+}
+
+static void
+test_bwmgt_token_buf_adjust(void *arg)
+{
+ (void)arg;
+ token_bucket_t b;
+
+ token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+ uint32_t rate_orig = b.rate;
+ // Increasing burst
+ token_bucket_adjust(&b, 16*KB, 128*KB);
+ tt_uint_op(b.rate, OP_EQ, rate_orig);
+ tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
+ tt_uint_op(b.burst, OP_EQ, 128*KB);
+
+ // Decreasing burst but staying above bucket
+ token_bucket_adjust(&b, 16*KB, 96*KB);
+ tt_uint_op(b.rate, OP_EQ, rate_orig);
+ tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
+ tt_uint_op(b.burst, OP_EQ, 96*KB);
+
+ // Decreasing burst below bucket,
+ token_bucket_adjust(&b, 16*KB, 48*KB);
+ tt_uint_op(b.rate, OP_EQ, rate_orig);
+ tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
+ tt_uint_op(b.burst, OP_EQ, 48*KB);
+
+ // Changing rate.
+ token_bucket_adjust(&b, 32*KB, 48*KB);
+ tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10);
+ tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10);
+ tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
+ tt_uint_op(b.burst, OP_EQ, 48*KB);
+
+ done:
+ ;
+}
+
+static void
+test_bwmgt_token_buf_dec(void *arg)
+{
+ (void)arg;
+ token_bucket_t b;
+ token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+ // full-to-not-full.
+ tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB));
+ tt_int_op(b.read_bucket, OP_EQ, 63*KB);
+
+ // Full to almost-not-full
+ tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1));
+ tt_int_op(b.read_bucket, OP_EQ, 1);
+
+ // almost-not-full to empty.
+ tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1));
+ tt_int_op(b.read_bucket, OP_EQ, 0);
+
+ // reset bucket, try full-to-empty
+ token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+ tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB));
+ tt_int_op(b.read_bucket, OP_EQ, 0);
+
+ // reset bucket, try underflow.
+ token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+ tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1));
+ tt_int_op(b.read_bucket, OP_EQ, -1);
+
+ // A second underflow does not make the bucket empty.
+ tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000));
+ tt_int_op(b.read_bucket, OP_EQ, -1001);
+
+ done:
+ ;
+}
+
+static void
+test_bwmgt_token_buf_refill(void *arg)
+{
+ (void)arg;
+ token_bucket_t b;
+ const uint32_t SEC =
+ (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000);
+ printf("%d\n", (int)SEC);
+ token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+ /* Make the buffer much emptier, then let one second elapse. */
+ token_bucket_dec_read(&b, 48*KB);
+ tt_int_op(b.read_bucket, OP_EQ, 16*KB);
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC));
+ tt_int_op(b.read_bucket, OP_GT, 32*KB - 300);
+ tt_int_op(b.read_bucket, OP_LT, 32*KB + 300);
+
+ /* Another half second. */
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
+ tt_int_op(b.read_bucket, OP_GT, 40*KB - 400);
+ tt_int_op(b.read_bucket, OP_LT, 40*KB + 400);
+ tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2);
+
+ /* No time: nothing happens. */
+ {
+ const uint32_t bucket_orig = b.read_bucket;
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
+ tt_int_op(b.read_bucket, OP_EQ, bucket_orig);
+ }
+
+ /* Another 30 seconds: fill the bucket. */
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30));
+ tt_int_op(b.read_bucket, OP_EQ, b.burst);
+ tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30);
+
+ /* Another 30 seconds: nothing happens. */
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60));
+ tt_int_op(b.read_bucket, OP_EQ, b.burst);
+ tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60);
+
+ /* Empty the bucket, let two seconds pass, and make sure that a refill is
+ * noticed. */
+ tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst));
+ tt_int_op(0, OP_EQ, b.read_bucket);
+ tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61));
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62));
+ tt_int_op(b.read_bucket, OP_GT, 32*KB-400);
+ tt_int_op(b.read_bucket, OP_LT, 32*KB+400);
+
+ /* Underflow the bucket, make sure we detect when it has tokens again. */
+ tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB));
+ tt_int_op(-16*KB, OP_EQ, b.read_bucket);
+ // half a second passes...
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
+ tt_int_op(b.read_bucket, OP_GT, -8*KB-300);
+ tt_int_op(b.read_bucket, OP_LT, -8*KB+300);
+ // a second passes
+ tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65));
+ tt_int_op(b.read_bucket, OP_GT, 8*KB-400);
+ tt_int_op(b.read_bucket, OP_LT, 8*KB+400);
+
+ // We step a second backwards, and nothing happens.
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
+ tt_int_op(b.read_bucket, OP_GT, 8*KB-400);
+ tt_int_op(b.read_bucket, OP_LT, 8*KB+400);
+
+ // A ridiculous amount of time passes.
+ tt_int_op(0, OP_EQ, token_bucket_refill(&b, INT32_MAX));
+ tt_int_op(b.read_bucket, OP_EQ, b.burst);
+
+ done:
+ ;
+}
+
+#define BWMGT(name) \
+ { #name, test_bwmgt_ ## name , 0, NULL, NULL }
+
+struct testcase_t bwmgt_tests[] = {
+ BWMGT(token_buf_init),
+ BWMGT(token_buf_adjust),
+ BWMGT(token_buf_dec),
+ BWMGT(token_buf_refill),
+ END_OF_TESTCASES
+};
+
diff --git a/src/test/test_controller_events.c b/src/test/test_controller_events.c
index 901ad7ab3d..e81aea8d66 100644
--- a/src/test/test_controller_events.c
+++ b/src/test/test_controller_events.c
@@ -11,79 +11,6 @@
#include "control.h"
#include "test.h"
-static void
-help_test_bucket_note_empty(uint32_t expected_msec_since_midnight,
- int tokens_before, size_t tokens_removed,
- uint32_t msec_since_epoch)
-{
- uint32_t timestamp_var = 0;
- struct timeval tvnow;
- tvnow.tv_sec = msec_since_epoch / 1000;
- tvnow.tv_usec = (msec_since_epoch % 1000) * 1000;
- connection_buckets_note_empty_ts(×tamp_var, tokens_before,
- tokens_removed, &tvnow);
- tt_int_op(expected_msec_since_midnight, OP_EQ, timestamp_var);
-
- done:
- ;
-}
-
-static void
-test_cntev_bucket_note_empty(void *arg)
-{
- (void)arg;
-
- /* Two cases with nothing to note, because bucket was empty before;
- * 86442200 == 1970-01-02 00:00:42.200000 */
- help_test_bucket_note_empty(0, 0, 0, 86442200);
- help_test_bucket_note_empty(0, -100, 100, 86442200);
-
- /* Nothing to note, because bucket has not been emptied. */
- help_test_bucket_note_empty(0, 101, 100, 86442200);
-
- /* Bucket was emptied, note 42200 msec since midnight. */
- help_test_bucket_note_empty(42200, 101, 101, 86442200);
- help_test_bucket_note_empty(42200, 101, 102, 86442200);
-}
-
-static void
-test_cntev_bucket_millis_empty(void *arg)
-{
- struct timeval tvnow;
- (void)arg;
-
- /* 1970-01-02 00:00:42.200000 */
- tvnow.tv_sec = 86400 + 42;
- tvnow.tv_usec = 200000;
-
- /* Bucket has not been refilled. */
- tt_int_op(0, OP_EQ, bucket_millis_empty(0, 42120, 0, 100, &tvnow));
- tt_int_op(0, OP_EQ, bucket_millis_empty(-10, 42120, -10, 100, &tvnow));
-
- /* Bucket was not empty. */
- tt_int_op(0, OP_EQ, bucket_millis_empty(10, 42120, 20, 100, &tvnow));
-
- /* Bucket has been emptied 80 msec ago and has just been refilled. */
- tt_int_op(80, OP_EQ, bucket_millis_empty(-20, 42120, -10, 100, &tvnow));
- tt_int_op(80, OP_EQ, bucket_millis_empty(-10, 42120, 0, 100, &tvnow));
- tt_int_op(80, OP_EQ, bucket_millis_empty(0, 42120, 10, 100, &tvnow));
-
- /* Bucket has been emptied 180 msec ago, last refill was 100 msec ago
- * which was insufficient to make it positive, so cap msec at 100. */
- tt_int_op(100, OP_EQ, bucket_millis_empty(0, 42020, 1, 100, &tvnow));
-
- /* 1970-01-02 00:00:00:050000 */
- tvnow.tv_sec = 86400;
- tvnow.tv_usec = 50000;
-
- /* Last emptied 30 msec before midnight, tvnow is 50 msec after
- * midnight, that's 80 msec in total. */
- tt_int_op(80, OP_EQ, bucket_millis_empty(0, 86400000 - 30, 1, 100, &tvnow));
-
- done:
- ;
-}
-
static void
add_testing_cell_stats_entry(circuit_t *circ, uint8_t command,
unsigned int waiting_time,
@@ -395,8 +322,6 @@ test_cntev_event_mask(void *arg)
{ #name, test_cntev_ ## name, flags, 0, NULL }
struct testcase_t controller_event_tests[] = {
- TEST(bucket_note_empty, TT_FORK),
- TEST(bucket_millis_empty, TT_FORK),
TEST(sum_up_cell_stats, TT_FORK),
TEST(append_cell_stats, TT_FORK),
TEST(format_cell_stats, TT_FORK),
diff --git a/src/test/test_options.c b/src/test/test_options.c
index af349ed015..9974ed2575 100644
--- a/src/test/test_options.c
+++ b/src/test/test_options.c
@@ -4101,16 +4101,6 @@ test_options_validate__testing_options(void *ignored)
tt_assert(!msg);
tor_free(msg);
- free_options_test_data(tdata);
- tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES
- "TestingEnableTbEmptyEvent 1\n"
- );
- ret = options_validate(tdata->old_opt, tdata->opt, tdata->def_opt, 0, &msg);
- tt_int_op(ret, OP_EQ, -1);
- tt_str_op(msg, OP_EQ, "TestingEnableTbEmptyEvent may only be changed "
- "in testing Tor networks!");
- tor_free(msg);
-
free_options_test_data(tdata);
tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES
"TestingEnableTbEmptyEvent 1\n"
diff --git a/src/test/test_util.c b/src/test/test_util.c
index ce8567d9af..3dd2b51a3a 100644
--- a/src/test/test_util.c
+++ b/src/test/test_util.c
@@ -5907,6 +5907,13 @@ test_util_monotonic_time(void *arg)
tt_u64_op(coarse_stamp_diff, OP_GE, 120);
tt_u64_op(coarse_stamp_diff, OP_LE, 1200);
+ {
+ uint64_t units = monotime_msec_to_approx_coarse_stamp_units(5000);
+ uint64_t ms = monotime_coarse_stamp_units_to_approx_msec(units);
+ tt_int_op(ms, OP_GE, 4950);
+ tt_int_op(ms, OP_LT, 5050);
+ }
+
done:
;
}