diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c index 3191568ac3..1ac1bd6d3c 100644 --- a/src/common/token_bucket.c +++ b/src/common/token_bucket.c @@ -9,8 +9,9 @@ * Tor uses these token buckets to keep track of bandwidth usage, and * sometimes other things too. * - * The time units we use internally are based on "timestamp" units -- see - * monotime_coarse_to_stamp() for a rationale. + * There are two layers of abstraction here: "raw" token buckets, in which all + * the pieces are decoupled, and "read-write" token buckets, which combine all + * the moving parts into one. * * Token buckets may become negative. **/ @@ -20,6 +21,51 @@ #include "token_bucket.h" #include "util_bug.h" +/** + * Set the rate and burst value in a token_bucket_cfg. + * + * Note that the rate value is in arbitrary units, but those units will + * determine the units of token_bucket_raw_dec(), token_bucket_raw_refill, and + * so on. + */ +void +token_bucket_cfg_init(token_bucket_cfg_t *cfg, + uint32_t rate, + uint32_t burst) +{ + tor_assert_nonfatal(rate > 0); + tor_assert_nonfatal(burst > 0); + if (burst > TOKEN_BUCKET_MAX_BURST) + burst = TOKEN_BUCKET_MAX_BURST; + + cfg->rate = rate; + cfg->burst = burst; +} + +/** + * Initialize a raw token bucket and its associated timestamp to the "full" + * state, according to cfg. + */ +void +token_bucket_raw_reset(token_bucket_raw_t *bucket, + token_bucket_timestamp_t *stamp, + const token_bucket_cfg_t *cfg, + uint32_t now) +{ + bucket->bucket = cfg->burst; + stamp->last_refilled_at = now; +} + +/** + * Adust a preexisting token bucket to respect the new configuration + * cfg, by decreasing its current level if needed. */ +void +token_bucket_raw_adjust(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg) +{ + bucket->bucket = MIN(bucket->bucket, cfg->burst); +} + /** Convert a rate in bytes per second to a rate in bytes per step */ static uint32_t rate_per_sec_to_rate_per_step(uint32_t rate) @@ -57,18 +103,14 @@ token_bucket_rw_init(token_bucket_rw_t *bucket, */ void token_bucket_rw_adjust(token_bucket_rw_t *bucket, - uint32_t rate, - uint32_t burst) + uint32_t rate, + uint32_t burst) { - tor_assert_nonfatal(rate > 0); - tor_assert_nonfatal(burst > 0); - if (burst > TOKEN_BUCKET_RW_MAX_BURST) - burst = TOKEN_BUCKET_RW_MAX_BURST; - - bucket->rate = rate_per_sec_to_rate_per_step(rate); - bucket->burst = burst; - bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst); - bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst); + token_bucket_cfg_init(&bucket->cfg, + rate_per_sec_to_rate_per_step(rate), + burst); + token_bucket_raw_adjust(&bucket->read_bucket, &bucket->cfg); + token_bucket_raw_adjust(&bucket->write_bucket, &bucket->cfg); } /** @@ -76,36 +118,41 @@ token_bucket_rw_adjust(token_bucket_rw_t *bucket, */ void token_bucket_rw_reset(token_bucket_rw_t *bucket, - uint32_t now_ts) + uint32_t now_ts) { - bucket->read_bucket = bucket->burst; - bucket->write_bucket = bucket->burst; - bucket->last_refilled_at_ts = now_ts; + token_bucket_raw_reset(&bucket->read_bucket, &bucket->stamp, + &bucket->cfg, now_ts); + token_bucket_raw_reset(&bucket->write_bucket, &bucket->stamp, + &bucket->cfg, now_ts); } -/* Helper: see token_bucket_rw_refill */ -static int -refill_single_bucket(int32_t *bucketptr, - const uint32_t rate, - const int32_t burst, - const uint32_t elapsed_steps) +/** + * Given an amount of elapsed time units, and a bucket configuration + * cfg, refill the level of bucket accordingly. Note that the + * units of time in elapsed must correspond to those used to set the + * rate in cfg, or the result will be illogical. + */ +int +token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg, + const uint32_t elapsed) { - const int was_empty = (*bucketptr <= 0); + const int was_empty = (bucket->bucket <= 0); /* The casts here prevent an underflow. * * Note that even if the bucket value is negative, subtracting it from * "burst" will still produce a correct result. If this result is - * ridiculously high, then the "elapsed_steps > gap / rate" check below + * ridiculously high, then the "elapsed > gap / rate" check below * should catch it. */ - const size_t gap = ((size_t)burst) - ((size_t)*bucketptr); + const size_t gap = ((size_t)cfg->burst) - ((size_t)bucket->bucket); - if (elapsed_steps > gap / rate) { - *bucketptr = burst; + if (elapsed > gap / cfg->rate) { + bucket->bucket = cfg->burst; } else { - *bucketptr += rate * elapsed_steps; + bucket->bucket += cfg->rate * elapsed; } - return was_empty && *bucketptr > 0; + return was_empty && bucket->bucket > 0; } /** @@ -119,7 +166,7 @@ int token_bucket_rw_refill(token_bucket_rw_t *bucket, uint32_t now_ts) { - const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts); + const uint32_t elapsed_ticks = (now_ts - bucket->stamp.last_refilled_at); if (elapsed_ticks > UINT32_MAX-(300*1000)) { /* Either about 48 days have passed since the last refill, or the * monotonic clock has somehow moved backwards. (We're looking at you, @@ -132,31 +179,35 @@ token_bucket_rw_refill(token_bucket_rw_t *bucket, if (!elapsed_steps) { /* Note that if less than one whole step elapsed, we don't advance the - * time in last_refilled_at_ts. That's intentional: we want to make sure + * time in last_refilled_at. That's intentional: we want to make sure * that we add some bytes to it eventually. */ return 0; } int flags = 0; - if (refill_single_bucket(&bucket->read_bucket, - bucket->rate, bucket->burst, elapsed_steps)) + if (token_bucket_raw_refill_steps(&bucket->read_bucket, + &bucket->cfg, elapsed_steps)) flags |= TB_READ; - if (refill_single_bucket(&bucket->write_bucket, - bucket->rate, bucket->burst, elapsed_steps)) + if (token_bucket_raw_refill_steps(&bucket->write_bucket, + &bucket->cfg, elapsed_steps)) flags |= TB_WRITE; - bucket->last_refilled_at_ts = now_ts; + bucket->stamp.last_refilled_at = now_ts; return flags; } -static int -decrement_single_bucket(int32_t *bucketptr, - ssize_t n) +/** + * Decrement a provided bucket by n units. Note that n + * must be nonnegative. + */ +int +token_bucket_raw_dec(token_bucket_raw_t *bucket, + ssize_t n) { if (BUG(n < 0)) return 0; - const int becomes_empty = *bucketptr > 0 && n >= *bucketptr; - *bucketptr -= n; + const int becomes_empty = bucket->bucket > 0 && n >= bucket->bucket; + bucket->bucket -= n; return becomes_empty; } @@ -170,7 +221,7 @@ int token_bucket_rw_dec_read(token_bucket_rw_t *bucket, ssize_t n) { - return decrement_single_bucket(&bucket->read_bucket, n); + return token_bucket_raw_dec(&bucket->read_bucket, n); } /** @@ -183,7 +234,7 @@ int token_bucket_rw_dec_write(token_bucket_rw_t *bucket, ssize_t n) { - return decrement_single_bucket(&bucket->write_bucket, n); + return token_bucket_raw_dec(&bucket->write_bucket, n); } /** @@ -192,9 +243,9 @@ token_bucket_rw_dec_write(token_bucket_rw_t *bucket, */ void token_bucket_rw_dec(token_bucket_rw_t *bucket, - ssize_t n_read, ssize_t n_written) + ssize_t n_read, ssize_t n_written) { token_bucket_rw_dec_read(bucket, n_read); - token_bucket_rw_dec_read(bucket, n_written); + token_bucket_rw_dec_write(bucket, n_written); } diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h index f9d04f79a5..2507cd225b 100644 --- a/src/common/token_bucket.h +++ b/src/common/token_bucket.h @@ -11,15 +11,64 @@ #include "torint.h" -typedef struct token_bucket_rw_t { +/** Largest allowable burst value for a token buffer. */ +#define TOKEN_BUCKET_MAX_BURST INT32_MAX + +/** A generic token buffer configuration: determines the number of tokens + * added to the bucket in each time unit (the "rate"), and the maximum number + * of tokens in the bucket (the "burst") */ +typedef struct token_bucket_cfg_t { uint32_t rate; int32_t burst; - int32_t read_bucket; - int32_t write_bucket; - uint32_t last_refilled_at_ts; -} token_bucket_rw_t; +} token_bucket_cfg_t; -#define TOKEN_BUCKET_RW_MAX_BURST INT32_MAX +/** A raw token bucket, decoupled from its configuration and timestamp. */ +typedef struct token_bucket_raw_t { + int32_t bucket; +} token_bucket_raw_t; + +/** A timestamp for a token bucket. The units of this timestamp are + * unspecified, but must match with the rate set in the token_bucket_cfg_t. */ +typedef struct token_bucket_timestamp_t { + uint32_t last_refilled_at; +} token_bucket_timestamp_t; + +void token_bucket_cfg_init(token_bucket_cfg_t *cfg, + uint32_t rate, + uint32_t burst); + +void token_bucket_raw_adjust(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg); + +void token_bucket_raw_reset(token_bucket_raw_t *bucket, + token_bucket_timestamp_t *stamp, + const token_bucket_cfg_t *cfg, + uint32_t now); + +int token_bucket_raw_dec(token_bucket_raw_t *bucket, + ssize_t n); + +int token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg, + const uint32_t elapsed_steps); + +static inline size_t token_bucket_raw_get(const token_bucket_raw_t *bucket); +/** Return the current number of bytes set in a token bucket. */ +static inline size_t +token_bucket_raw_get(const token_bucket_raw_t *bucket) +{ + return bucket->bucket >= 0 ? bucket->bucket : 0; +} + +/** A convenience type containing all the pieces needed for a coupled + * read-bucket and write-bucket that have the same rate limit, and which use + * "timestamp units" (see compat_time.h) for their time. */ +typedef struct token_bucket_rw_t { + token_bucket_cfg_t cfg; + token_bucket_raw_t read_bucket; + token_bucket_raw_t write_bucket; + token_bucket_timestamp_t stamp; +} token_bucket_rw_t; void token_bucket_rw_init(token_bucket_rw_t *bucket, uint32_t rate, @@ -50,8 +99,7 @@ static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket); static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket) { - const ssize_t b = bucket->read_bucket; - return b >= 0 ? b : 0; + return token_bucket_raw_get(&bucket->read_bucket); } static inline size_t token_bucket_rw_get_write( @@ -59,8 +107,7 @@ static inline size_t token_bucket_rw_get_write( static inline size_t token_bucket_rw_get_write(const token_bucket_rw_t *bucket) { - const ssize_t b = bucket->write_bucket; - return b >= 0 ? b : 0; + return token_bucket_raw_get(&bucket->write_bucket); } #ifdef TOKEN_BUCKET_PRIVATE diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c index 9e4748af7f..17914aeb97 100644 --- a/src/test/test_bwmgt.c +++ b/src/test/test_bwmgt.c @@ -25,19 +25,19 @@ test_bwmgt_token_buf_init(void *arg) token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); // Burst is correct - tt_uint_op(b.burst, OP_EQ, 64*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 64*KB); // Rate is correct, within 1 percent. { uint32_t ticks_per_sec = (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000); - uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP); + uint32_t rate_per_sec = (b.cfg.rate * ticks_per_sec / TICKS_PER_STEP); tt_uint_op(rate_per_sec, OP_GT, 16*KB-160); tt_uint_op(rate_per_sec, OP_LT, 16*KB+160); } // Bucket starts out full: - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS); - tt_int_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS); + tt_int_op(b.read_bucket.bucket, OP_EQ, 64*KB); done: ; @@ -51,31 +51,31 @@ test_bwmgt_token_buf_adjust(void *arg) token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); - uint32_t rate_orig = b.rate; + uint32_t rate_orig = b.cfg.rate; // Increasing burst token_bucket_rw_adjust(&b, 16*KB, 128*KB); - tt_uint_op(b.rate, OP_EQ, rate_orig); - tt_uint_op(b.read_bucket, OP_EQ, 64*KB); - tt_uint_op(b.burst, OP_EQ, 128*KB); + tt_uint_op(b.cfg.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 128*KB); // Decreasing burst but staying above bucket token_bucket_rw_adjust(&b, 16*KB, 96*KB); - tt_uint_op(b.rate, OP_EQ, rate_orig); - tt_uint_op(b.read_bucket, OP_EQ, 64*KB); - tt_uint_op(b.burst, OP_EQ, 96*KB); + tt_uint_op(b.cfg.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 96*KB); // Decreasing burst below bucket, token_bucket_rw_adjust(&b, 16*KB, 48*KB); - tt_uint_op(b.rate, OP_EQ, rate_orig); - tt_uint_op(b.read_bucket, OP_EQ, 48*KB); - tt_uint_op(b.burst, OP_EQ, 48*KB); + tt_uint_op(b.cfg.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 48*KB); // Changing rate. token_bucket_rw_adjust(&b, 32*KB, 48*KB); - tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10); - tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10); - tt_uint_op(b.read_bucket, OP_EQ, 48*KB); - tt_uint_op(b.burst, OP_EQ, 48*KB); + tt_uint_op(b.cfg.rate, OP_GE, rate_orig*2 - 10); + tt_uint_op(b.cfg.rate, OP_LE, rate_orig*2 + 10); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 48*KB); done: ; @@ -90,29 +90,29 @@ test_bwmgt_token_buf_dec(void *arg) // full-to-not-full. tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, KB)); - tt_int_op(b.read_bucket, OP_EQ, 63*KB); + tt_int_op(b.read_bucket.bucket, OP_EQ, 63*KB); // Full to almost-not-full tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 63*KB - 1)); - tt_int_op(b.read_bucket, OP_EQ, 1); + tt_int_op(b.read_bucket.bucket, OP_EQ, 1); // almost-not-full to empty. tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 1)); - tt_int_op(b.read_bucket, OP_EQ, 0); + tt_int_op(b.read_bucket.bucket, OP_EQ, 0); // reset bucket, try full-to-empty token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB)); - tt_int_op(b.read_bucket, OP_EQ, 0); + tt_int_op(b.read_bucket.bucket, OP_EQ, 0); // reset bucket, try underflow. token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB + 1)); - tt_int_op(b.read_bucket, OP_EQ, -1); + tt_int_op(b.read_bucket.bucket, OP_EQ, -1); // A second underflow does not make the bucket empty. tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 1000)); - tt_int_op(b.read_bucket, OP_EQ, -1001); + tt_int_op(b.read_bucket.bucket, OP_EQ, -1001); done: ; @@ -125,68 +125,67 @@ test_bwmgt_token_buf_refill(void *arg) token_bucket_rw_t b; const uint32_t SEC = (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000); - printf("%d\n", (int)SEC); token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); /* Make the buffer much emptier, then let one second elapse. */ token_bucket_rw_dec_read(&b, 48*KB); - tt_int_op(b.read_bucket, OP_EQ, 16*KB); + tt_int_op(b.read_bucket.bucket, OP_EQ, 16*KB); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC)); - tt_int_op(b.read_bucket, OP_GT, 32*KB - 300); - tt_int_op(b.read_bucket, OP_LT, 32*KB + 300); + tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB - 300); + tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB + 300); /* Another half second. */ tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2)); - tt_int_op(b.read_bucket, OP_GT, 40*KB - 400); - tt_int_op(b.read_bucket, OP_LT, 40*KB + 400); - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2); + tt_int_op(b.read_bucket.bucket, OP_GT, 40*KB - 400); + tt_int_op(b.read_bucket.bucket, OP_LT, 40*KB + 400); + tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS + SEC*3/2); /* No time: nothing happens. */ { - const uint32_t bucket_orig = b.read_bucket; + const uint32_t bucket_orig = b.read_bucket.bucket; tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2)); - tt_int_op(b.read_bucket, OP_EQ, bucket_orig); + tt_int_op(b.read_bucket.bucket, OP_EQ, bucket_orig); } /* Another 30 seconds: fill the bucket. */ tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*30)); - tt_int_op(b.read_bucket, OP_EQ, b.burst); - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30); + tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst); + tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS + SEC*3/2 + SEC*30); /* Another 30 seconds: nothing happens. */ tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*60)); - tt_int_op(b.read_bucket, OP_EQ, b.burst); - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60); + tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst); + tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS + SEC*3/2 + SEC*60); /* Empty the bucket, let two seconds pass, and make sure that a refill is * noticed. */ - tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.burst)); - tt_int_op(0, OP_EQ, b.read_bucket); + tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.cfg.burst)); + tt_int_op(0, OP_EQ, b.read_bucket.bucket); tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*61)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*62)); - tt_int_op(b.read_bucket, OP_GT, 32*KB-400); - tt_int_op(b.read_bucket, OP_LT, 32*KB+400); + tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB-400); + tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB+400); /* Underflow the bucket, make sure we detect when it has tokens again. */ - tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.read_bucket+16*KB)); - tt_int_op(-16*KB, OP_EQ, b.read_bucket); + tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.read_bucket.bucket+16*KB)); + tt_int_op(-16*KB, OP_EQ, b.read_bucket.bucket); // half a second passes... tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64)); - tt_int_op(b.read_bucket, OP_GT, -8*KB-300); - tt_int_op(b.read_bucket, OP_LT, -8*KB+300); + tt_int_op(b.read_bucket.bucket, OP_GT, -8*KB-300); + tt_int_op(b.read_bucket.bucket, OP_LT, -8*KB+300); // a second passes tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*65)); - tt_int_op(b.read_bucket, OP_GT, 8*KB-400); - tt_int_op(b.read_bucket, OP_LT, 8*KB+400); + tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400); // We step a second backwards, and nothing happens. tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64)); - tt_int_op(b.read_bucket, OP_GT, 8*KB-400); - tt_int_op(b.read_bucket, OP_LT, 8*KB+400); + tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400); // A ridiculous amount of time passes. tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, INT32_MAX)); - tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst); done: ;