Start re-refactoring the token bucket interface.

Begin by creating a lowest-level triple of the types needed to
implement a token bucket: a configuration, a timestamp, and the raw
bucket itself.

Note that for low-level buckets, the units of the timestamp and the
bucket itself are unspecified: each user can use a different type.

(This patch breaks check-spaces; a later patch will fix it)
This commit is contained in:
Nick Mathewson 2018-04-13 11:30:53 -04:00
parent 03b96882de
commit 0b40ed5e70
3 changed files with 203 additions and 106 deletions

View File

@ -9,8 +9,9 @@
* Tor uses these token buckets to keep track of bandwidth usage, and * Tor uses these token buckets to keep track of bandwidth usage, and
* sometimes other things too. * sometimes other things too.
* *
* The time units we use internally are based on "timestamp" units -- see * There are two layers of abstraction here: "raw" token buckets, in which all
* monotime_coarse_to_stamp() for a rationale. * the pieces are decoupled, and "read-write" token buckets, which combine all
* the moving parts into one.
* *
* Token buckets may become negative. * Token buckets may become negative.
**/ **/
@ -20,6 +21,51 @@
#include "token_bucket.h" #include "token_bucket.h"
#include "util_bug.h" #include "util_bug.h"
/**
* Set the <b>rate</b> and <b>burst</b> value in a token_bucket_cfg.
*
* Note that the <b>rate</b> value is in arbitrary units, but those units will
* determine the units of token_bucket_raw_dec(), token_bucket_raw_refill, and
* so on.
*/
void
token_bucket_cfg_init(token_bucket_cfg_t *cfg,
uint32_t rate,
uint32_t burst)
{
tor_assert_nonfatal(rate > 0);
tor_assert_nonfatal(burst > 0);
if (burst > TOKEN_BUCKET_MAX_BURST)
burst = TOKEN_BUCKET_MAX_BURST;
cfg->rate = rate;
cfg->burst = burst;
}
/**
* Initialize a raw token bucket and its associated timestamp to the "full"
* state, according to <b>cfg</b>.
*/
void
token_bucket_raw_reset(token_bucket_raw_t *bucket,
token_bucket_timestamp_t *stamp,
const token_bucket_cfg_t *cfg,
uint32_t now)
{
bucket->bucket = cfg->burst;
stamp->last_refilled_at = now;
}
/**
* Adust a preexisting token bucket to respect the new configuration
* <b>cfg</b>, by decreasing its current level if needed. */
void
token_bucket_raw_adjust(token_bucket_raw_t *bucket,
const token_bucket_cfg_t *cfg)
{
bucket->bucket = MIN(bucket->bucket, cfg->burst);
}
/** Convert a rate in bytes per second to a rate in bytes per step */ /** Convert a rate in bytes per second to a rate in bytes per step */
static uint32_t static uint32_t
rate_per_sec_to_rate_per_step(uint32_t rate) rate_per_sec_to_rate_per_step(uint32_t rate)
@ -57,18 +103,14 @@ token_bucket_rw_init(token_bucket_rw_t *bucket,
*/ */
void void
token_bucket_rw_adjust(token_bucket_rw_t *bucket, token_bucket_rw_adjust(token_bucket_rw_t *bucket,
uint32_t rate, uint32_t rate,
uint32_t burst) uint32_t burst)
{ {
tor_assert_nonfatal(rate > 0); token_bucket_cfg_init(&bucket->cfg,
tor_assert_nonfatal(burst > 0); rate_per_sec_to_rate_per_step(rate),
if (burst > TOKEN_BUCKET_RW_MAX_BURST) burst);
burst = TOKEN_BUCKET_RW_MAX_BURST; token_bucket_raw_adjust(&bucket->read_bucket, &bucket->cfg);
token_bucket_raw_adjust(&bucket->write_bucket, &bucket->cfg);
bucket->rate = rate_per_sec_to_rate_per_step(rate);
bucket->burst = burst;
bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst);
bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst);
} }
/** /**
@ -76,36 +118,41 @@ token_bucket_rw_adjust(token_bucket_rw_t *bucket,
*/ */
void void
token_bucket_rw_reset(token_bucket_rw_t *bucket, token_bucket_rw_reset(token_bucket_rw_t *bucket,
uint32_t now_ts) uint32_t now_ts)
{ {
bucket->read_bucket = bucket->burst; token_bucket_raw_reset(&bucket->read_bucket, &bucket->stamp,
bucket->write_bucket = bucket->burst; &bucket->cfg, now_ts);
bucket->last_refilled_at_ts = now_ts; token_bucket_raw_reset(&bucket->write_bucket, &bucket->stamp,
&bucket->cfg, now_ts);
} }
/* Helper: see token_bucket_rw_refill */ /**
static int * Given an amount of <b>elapsed</b> time units, and a bucket configuration
refill_single_bucket(int32_t *bucketptr, * <b>cfg</b>, refill the level of <b>bucket</b> accordingly. Note that the
const uint32_t rate, * units of time in <b>elapsed</b> must correspond to those used to set the
const int32_t burst, * rate in <b>cfg</b>, or the result will be illogical.
const uint32_t elapsed_steps) */
int
token_bucket_raw_refill_steps(token_bucket_raw_t *bucket,
const token_bucket_cfg_t *cfg,
const uint32_t elapsed)
{ {
const int was_empty = (*bucketptr <= 0); const int was_empty = (bucket->bucket <= 0);
/* The casts here prevent an underflow. /* The casts here prevent an underflow.
* *
* Note that even if the bucket value is negative, subtracting it from * Note that even if the bucket value is negative, subtracting it from
* "burst" will still produce a correct result. If this result is * "burst" will still produce a correct result. If this result is
* ridiculously high, then the "elapsed_steps > gap / rate" check below * ridiculously high, then the "elapsed > gap / rate" check below
* should catch it. */ * should catch it. */
const size_t gap = ((size_t)burst) - ((size_t)*bucketptr); const size_t gap = ((size_t)cfg->burst) - ((size_t)bucket->bucket);
if (elapsed_steps > gap / rate) { if (elapsed > gap / cfg->rate) {
*bucketptr = burst; bucket->bucket = cfg->burst;
} else { } else {
*bucketptr += rate * elapsed_steps; bucket->bucket += cfg->rate * elapsed;
} }
return was_empty && *bucketptr > 0; return was_empty && bucket->bucket > 0;
} }
/** /**
@ -119,7 +166,7 @@ int
token_bucket_rw_refill(token_bucket_rw_t *bucket, token_bucket_rw_refill(token_bucket_rw_t *bucket,
uint32_t now_ts) uint32_t now_ts)
{ {
const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts); const uint32_t elapsed_ticks = (now_ts - bucket->stamp.last_refilled_at);
if (elapsed_ticks > UINT32_MAX-(300*1000)) { if (elapsed_ticks > UINT32_MAX-(300*1000)) {
/* Either about 48 days have passed since the last refill, or the /* Either about 48 days have passed since the last refill, or the
* monotonic clock has somehow moved backwards. (We're looking at you, * monotonic clock has somehow moved backwards. (We're looking at you,
@ -132,31 +179,35 @@ token_bucket_rw_refill(token_bucket_rw_t *bucket,
if (!elapsed_steps) { if (!elapsed_steps) {
/* Note that if less than one whole step elapsed, we don't advance the /* Note that if less than one whole step elapsed, we don't advance the
* time in last_refilled_at_ts. That's intentional: we want to make sure * time in last_refilled_at. That's intentional: we want to make sure
* that we add some bytes to it eventually. */ * that we add some bytes to it eventually. */
return 0; return 0;
} }
int flags = 0; int flags = 0;
if (refill_single_bucket(&bucket->read_bucket, if (token_bucket_raw_refill_steps(&bucket->read_bucket,
bucket->rate, bucket->burst, elapsed_steps)) &bucket->cfg, elapsed_steps))
flags |= TB_READ; flags |= TB_READ;
if (refill_single_bucket(&bucket->write_bucket, if (token_bucket_raw_refill_steps(&bucket->write_bucket,
bucket->rate, bucket->burst, elapsed_steps)) &bucket->cfg, elapsed_steps))
flags |= TB_WRITE; flags |= TB_WRITE;
bucket->last_refilled_at_ts = now_ts; bucket->stamp.last_refilled_at = now_ts;
return flags; return flags;
} }
static int /**
decrement_single_bucket(int32_t *bucketptr, * Decrement a provided bucket by <b>n</b> units. Note that <b>n</b>
ssize_t n) * must be nonnegative.
*/
int
token_bucket_raw_dec(token_bucket_raw_t *bucket,
ssize_t n)
{ {
if (BUG(n < 0)) if (BUG(n < 0))
return 0; return 0;
const int becomes_empty = *bucketptr > 0 && n >= *bucketptr; const int becomes_empty = bucket->bucket > 0 && n >= bucket->bucket;
*bucketptr -= n; bucket->bucket -= n;
return becomes_empty; return becomes_empty;
} }
@ -170,7 +221,7 @@ int
token_bucket_rw_dec_read(token_bucket_rw_t *bucket, token_bucket_rw_dec_read(token_bucket_rw_t *bucket,
ssize_t n) ssize_t n)
{ {
return decrement_single_bucket(&bucket->read_bucket, n); return token_bucket_raw_dec(&bucket->read_bucket, n);
} }
/** /**
@ -183,7 +234,7 @@ int
token_bucket_rw_dec_write(token_bucket_rw_t *bucket, token_bucket_rw_dec_write(token_bucket_rw_t *bucket,
ssize_t n) ssize_t n)
{ {
return decrement_single_bucket(&bucket->write_bucket, n); return token_bucket_raw_dec(&bucket->write_bucket, n);
} }
/** /**
@ -192,9 +243,9 @@ token_bucket_rw_dec_write(token_bucket_rw_t *bucket,
*/ */
void void
token_bucket_rw_dec(token_bucket_rw_t *bucket, token_bucket_rw_dec(token_bucket_rw_t *bucket,
ssize_t n_read, ssize_t n_written) ssize_t n_read, ssize_t n_written)
{ {
token_bucket_rw_dec_read(bucket, n_read); token_bucket_rw_dec_read(bucket, n_read);
token_bucket_rw_dec_read(bucket, n_written); token_bucket_rw_dec_write(bucket, n_written);
} }

View File

@ -11,15 +11,64 @@
#include "torint.h" #include "torint.h"
typedef struct token_bucket_rw_t { /** Largest allowable burst value for a token buffer. */
#define TOKEN_BUCKET_MAX_BURST INT32_MAX
/** A generic token buffer configuration: determines the number of tokens
* added to the bucket in each time unit (the "rate"), and the maximum number
* of tokens in the bucket (the "burst") */
typedef struct token_bucket_cfg_t {
uint32_t rate; uint32_t rate;
int32_t burst; int32_t burst;
int32_t read_bucket; } token_bucket_cfg_t;
int32_t write_bucket;
uint32_t last_refilled_at_ts;
} token_bucket_rw_t;
#define TOKEN_BUCKET_RW_MAX_BURST INT32_MAX /** A raw token bucket, decoupled from its configuration and timestamp. */
typedef struct token_bucket_raw_t {
int32_t bucket;
} token_bucket_raw_t;
/** A timestamp for a token bucket. The units of this timestamp are
* unspecified, but must match with the rate set in the token_bucket_cfg_t. */
typedef struct token_bucket_timestamp_t {
uint32_t last_refilled_at;
} token_bucket_timestamp_t;
void token_bucket_cfg_init(token_bucket_cfg_t *cfg,
uint32_t rate,
uint32_t burst);
void token_bucket_raw_adjust(token_bucket_raw_t *bucket,
const token_bucket_cfg_t *cfg);
void token_bucket_raw_reset(token_bucket_raw_t *bucket,
token_bucket_timestamp_t *stamp,
const token_bucket_cfg_t *cfg,
uint32_t now);
int token_bucket_raw_dec(token_bucket_raw_t *bucket,
ssize_t n);
int token_bucket_raw_refill_steps(token_bucket_raw_t *bucket,
const token_bucket_cfg_t *cfg,
const uint32_t elapsed_steps);
static inline size_t token_bucket_raw_get(const token_bucket_raw_t *bucket);
/** Return the current number of bytes set in a token bucket. */
static inline size_t
token_bucket_raw_get(const token_bucket_raw_t *bucket)
{
return bucket->bucket >= 0 ? bucket->bucket : 0;
}
/** A convenience type containing all the pieces needed for a coupled
* read-bucket and write-bucket that have the same rate limit, and which use
* "timestamp units" (see compat_time.h) for their time. */
typedef struct token_bucket_rw_t {
token_bucket_cfg_t cfg;
token_bucket_raw_t read_bucket;
token_bucket_raw_t write_bucket;
token_bucket_timestamp_t stamp;
} token_bucket_rw_t;
void token_bucket_rw_init(token_bucket_rw_t *bucket, void token_bucket_rw_init(token_bucket_rw_t *bucket,
uint32_t rate, uint32_t rate,
@ -50,8 +99,7 @@ static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket);
static inline size_t static inline size_t
token_bucket_rw_get_read(const token_bucket_rw_t *bucket) token_bucket_rw_get_read(const token_bucket_rw_t *bucket)
{ {
const ssize_t b = bucket->read_bucket; return token_bucket_raw_get(&bucket->read_bucket);
return b >= 0 ? b : 0;
} }
static inline size_t token_bucket_rw_get_write( static inline size_t token_bucket_rw_get_write(
@ -59,8 +107,7 @@ static inline size_t token_bucket_rw_get_write(
static inline size_t static inline size_t
token_bucket_rw_get_write(const token_bucket_rw_t *bucket) token_bucket_rw_get_write(const token_bucket_rw_t *bucket)
{ {
const ssize_t b = bucket->write_bucket; return token_bucket_raw_get(&bucket->write_bucket);
return b >= 0 ? b : 0;
} }
#ifdef TOKEN_BUCKET_PRIVATE #ifdef TOKEN_BUCKET_PRIVATE

View File

@ -25,19 +25,19 @@ test_bwmgt_token_buf_init(void *arg)
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
// Burst is correct // Burst is correct
tt_uint_op(b.burst, OP_EQ, 64*KB); tt_uint_op(b.cfg.burst, OP_EQ, 64*KB);
// Rate is correct, within 1 percent. // Rate is correct, within 1 percent.
{ {
uint32_t ticks_per_sec = uint32_t ticks_per_sec =
(uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000); (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000);
uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP); uint32_t rate_per_sec = (b.cfg.rate * ticks_per_sec / TICKS_PER_STEP);
tt_uint_op(rate_per_sec, OP_GT, 16*KB-160); tt_uint_op(rate_per_sec, OP_GT, 16*KB-160);
tt_uint_op(rate_per_sec, OP_LT, 16*KB+160); tt_uint_op(rate_per_sec, OP_LT, 16*KB+160);
} }
// Bucket starts out full: // Bucket starts out full:
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS); tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS);
tt_int_op(b.read_bucket, OP_EQ, 64*KB); tt_int_op(b.read_bucket.bucket, OP_EQ, 64*KB);
done: done:
; ;
@ -51,31 +51,31 @@ test_bwmgt_token_buf_adjust(void *arg)
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
uint32_t rate_orig = b.rate; uint32_t rate_orig = b.cfg.rate;
// Increasing burst // Increasing burst
token_bucket_rw_adjust(&b, 16*KB, 128*KB); token_bucket_rw_adjust(&b, 16*KB, 128*KB);
tt_uint_op(b.rate, OP_EQ, rate_orig); tt_uint_op(b.cfg.rate, OP_EQ, rate_orig);
tt_uint_op(b.read_bucket, OP_EQ, 64*KB); tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB);
tt_uint_op(b.burst, OP_EQ, 128*KB); tt_uint_op(b.cfg.burst, OP_EQ, 128*KB);
// Decreasing burst but staying above bucket // Decreasing burst but staying above bucket
token_bucket_rw_adjust(&b, 16*KB, 96*KB); token_bucket_rw_adjust(&b, 16*KB, 96*KB);
tt_uint_op(b.rate, OP_EQ, rate_orig); tt_uint_op(b.cfg.rate, OP_EQ, rate_orig);
tt_uint_op(b.read_bucket, OP_EQ, 64*KB); tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB);
tt_uint_op(b.burst, OP_EQ, 96*KB); tt_uint_op(b.cfg.burst, OP_EQ, 96*KB);
// Decreasing burst below bucket, // Decreasing burst below bucket,
token_bucket_rw_adjust(&b, 16*KB, 48*KB); token_bucket_rw_adjust(&b, 16*KB, 48*KB);
tt_uint_op(b.rate, OP_EQ, rate_orig); tt_uint_op(b.cfg.rate, OP_EQ, rate_orig);
tt_uint_op(b.read_bucket, OP_EQ, 48*KB); tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB);
tt_uint_op(b.burst, OP_EQ, 48*KB); tt_uint_op(b.cfg.burst, OP_EQ, 48*KB);
// Changing rate. // Changing rate.
token_bucket_rw_adjust(&b, 32*KB, 48*KB); token_bucket_rw_adjust(&b, 32*KB, 48*KB);
tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10); tt_uint_op(b.cfg.rate, OP_GE, rate_orig*2 - 10);
tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10); tt_uint_op(b.cfg.rate, OP_LE, rate_orig*2 + 10);
tt_uint_op(b.read_bucket, OP_EQ, 48*KB); tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB);
tt_uint_op(b.burst, OP_EQ, 48*KB); tt_uint_op(b.cfg.burst, OP_EQ, 48*KB);
done: done:
; ;
@ -90,29 +90,29 @@ test_bwmgt_token_buf_dec(void *arg)
// full-to-not-full. // full-to-not-full.
tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, KB)); tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, KB));
tt_int_op(b.read_bucket, OP_EQ, 63*KB); tt_int_op(b.read_bucket.bucket, OP_EQ, 63*KB);
// Full to almost-not-full // Full to almost-not-full
tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 63*KB - 1)); tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 63*KB - 1));
tt_int_op(b.read_bucket, OP_EQ, 1); tt_int_op(b.read_bucket.bucket, OP_EQ, 1);
// almost-not-full to empty. // almost-not-full to empty.
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 1)); tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 1));
tt_int_op(b.read_bucket, OP_EQ, 0); tt_int_op(b.read_bucket.bucket, OP_EQ, 0);
// reset bucket, try full-to-empty // reset bucket, try full-to-empty
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB)); tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB));
tt_int_op(b.read_bucket, OP_EQ, 0); tt_int_op(b.read_bucket.bucket, OP_EQ, 0);
// reset bucket, try underflow. // reset bucket, try underflow.
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB + 1)); tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB + 1));
tt_int_op(b.read_bucket, OP_EQ, -1); tt_int_op(b.read_bucket.bucket, OP_EQ, -1);
// A second underflow does not make the bucket empty. // A second underflow does not make the bucket empty.
tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 1000)); tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 1000));
tt_int_op(b.read_bucket, OP_EQ, -1001); tt_int_op(b.read_bucket.bucket, OP_EQ, -1001);
done: done:
; ;
@ -125,68 +125,67 @@ test_bwmgt_token_buf_refill(void *arg)
token_bucket_rw_t b; token_bucket_rw_t b;
const uint32_t SEC = const uint32_t SEC =
(uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000); (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000);
printf("%d\n", (int)SEC);
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
/* Make the buffer much emptier, then let one second elapse. */ /* Make the buffer much emptier, then let one second elapse. */
token_bucket_rw_dec_read(&b, 48*KB); token_bucket_rw_dec_read(&b, 48*KB);
tt_int_op(b.read_bucket, OP_EQ, 16*KB); tt_int_op(b.read_bucket.bucket, OP_EQ, 16*KB);
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC));
tt_int_op(b.read_bucket, OP_GT, 32*KB - 300); tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB - 300);
tt_int_op(b.read_bucket, OP_LT, 32*KB + 300); tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB + 300);
/* Another half second. */ /* Another half second. */
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2));
tt_int_op(b.read_bucket, OP_GT, 40*KB - 400); tt_int_op(b.read_bucket.bucket, OP_GT, 40*KB - 400);
tt_int_op(b.read_bucket, OP_LT, 40*KB + 400); tt_int_op(b.read_bucket.bucket, OP_LT, 40*KB + 400);
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2); tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS + SEC*3/2);
/* No time: nothing happens. */ /* No time: nothing happens. */
{ {
const uint32_t bucket_orig = b.read_bucket; const uint32_t bucket_orig = b.read_bucket.bucket;
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2));
tt_int_op(b.read_bucket, OP_EQ, bucket_orig); tt_int_op(b.read_bucket.bucket, OP_EQ, bucket_orig);
} }
/* Another 30 seconds: fill the bucket. */ /* Another 30 seconds: fill the bucket. */
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*30)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*30));
tt_int_op(b.read_bucket, OP_EQ, b.burst); tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst);
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30); tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS + SEC*3/2 + SEC*30);
/* Another 30 seconds: nothing happens. */ /* Another 30 seconds: nothing happens. */
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*60)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*60));
tt_int_op(b.read_bucket, OP_EQ, b.burst); tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst);
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60); tt_uint_op(b.stamp.last_refilled_at, OP_EQ, START_TS + SEC*3/2 + SEC*60);
/* Empty the bucket, let two seconds pass, and make sure that a refill is /* Empty the bucket, let two seconds pass, and make sure that a refill is
* noticed. */ * noticed. */
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.burst)); tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.cfg.burst));
tt_int_op(0, OP_EQ, b.read_bucket); tt_int_op(0, OP_EQ, b.read_bucket.bucket);
tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*61)); tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*61));
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*62)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*62));
tt_int_op(b.read_bucket, OP_GT, 32*KB-400); tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB-400);
tt_int_op(b.read_bucket, OP_LT, 32*KB+400); tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB+400);
/* Underflow the bucket, make sure we detect when it has tokens again. */ /* Underflow the bucket, make sure we detect when it has tokens again. */
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.read_bucket+16*KB)); tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.read_bucket.bucket+16*KB));
tt_int_op(-16*KB, OP_EQ, b.read_bucket); tt_int_op(-16*KB, OP_EQ, b.read_bucket.bucket);
// half a second passes... // half a second passes...
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64));
tt_int_op(b.read_bucket, OP_GT, -8*KB-300); tt_int_op(b.read_bucket.bucket, OP_GT, -8*KB-300);
tt_int_op(b.read_bucket, OP_LT, -8*KB+300); tt_int_op(b.read_bucket.bucket, OP_LT, -8*KB+300);
// a second passes // a second passes
tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*65)); tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*65));
tt_int_op(b.read_bucket, OP_GT, 8*KB-400); tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400);
tt_int_op(b.read_bucket, OP_LT, 8*KB+400); tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400);
// We step a second backwards, and nothing happens. // We step a second backwards, and nothing happens.
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64));
tt_int_op(b.read_bucket, OP_GT, 8*KB-400); tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400);
tt_int_op(b.read_bucket, OP_LT, 8*KB+400); tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400);
// A ridiculous amount of time passes. // A ridiculous amount of time passes.
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, INT32_MAX)); tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, INT32_MAX));
tt_int_op(b.read_bucket, OP_EQ, b.burst); tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst);
done: done:
; ;