Add a new token-bucket backend abstraction, with tests

This differs from our previous token bucket abstraction in a few
ways:

  1) It is an abstraction, and not a collection of fields.
  2) It is meant to be used with monotonic timestamps, which should
     produce better results than calling gettimeofday over and over.
This commit is contained in:
Nick Mathewson 2018-04-10 11:23:14 -04:00
parent d8ef9a2d1e
commit c376200f6a
7 changed files with 456 additions and 0 deletions

View File

@ -97,6 +97,7 @@ LIBOR_A_SRC = \
src/common/util_process.c \ src/common/util_process.c \
src/common/sandbox.c \ src/common/sandbox.c \
src/common/storagedir.c \ src/common/storagedir.c \
src/common/token_bucket.c \
src/common/workqueue.c \ src/common/workqueue.c \
$(libor_extra_source) \ $(libor_extra_source) \
$(threads_impl_source) \ $(threads_impl_source) \
@ -184,6 +185,7 @@ COMMONHEADERS = \
src/common/storagedir.h \ src/common/storagedir.h \
src/common/testsupport.h \ src/common/testsupport.h \
src/common/timers.h \ src/common/timers.h \
src/common/token_bucket.h \
src/common/torint.h \ src/common/torint.h \
src/common/torlog.h \ src/common/torlog.h \
src/common/tortls.h \ src/common/tortls.h \

180
src/common/token_bucket.c Normal file
View File

@ -0,0 +1,180 @@
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file token_bucket.c
* \brief Functions to use and manipulate token buckets, used for
* rate-limiting on connections and globally.
*
* Tor uses these token buckets to keep track of bandwidth usage, and
* sometimes other things too.
*
* The time units we use internally are based on "timestamp" units -- see
* monotime_coarse_to_stamp() for a rationale.
*
* Token buckets may become negative.
**/
#define TOKEN_BUCKET_PRIVATE
#include "token_bucket.h"
#include "util_bug.h"
/** Convert a rate in bytes per second to a rate in bytes per step */
static uint32_t
rate_per_sec_to_rate_per_step(uint32_t rate)
{
/*
The precise calculation we'd want to do is
(rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize
rounding error, we do it this way instead, and divide last.
*/
return (uint32_t)
monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000;
}
/**
* Initialize a token bucket in *<b>bucket</b>, set up to allow <b>rate</b>
* bytes per second, with a maximum burst of <b>burst</b> bytes. The bucket
* is created such that <b>now_ts</b> is the current timestamp. The bucket
* starts out full.
*/
void
token_bucket_init(token_bucket_t *bucket,
uint32_t rate,
uint32_t burst,
uint32_t now_ts)
{
memset(bucket, 0, sizeof(token_bucket_t));
token_bucket_adjust(bucket, rate, burst);
token_bucket_reset(bucket, now_ts);
}
/**
* Change the configured rate (in bytes per second) and burst (in bytes)
* for the token bucket in *<b>bucket</b>.
*/
void
token_bucket_adjust(token_bucket_t *bucket,
uint32_t rate,
uint32_t burst)
{
tor_assert_nonfatal(rate > 0);
tor_assert_nonfatal(burst > 0);
if (burst > TOKEN_BUCKET_MAX_BURST)
burst = TOKEN_BUCKET_MAX_BURST;
bucket->rate = rate_per_sec_to_rate_per_step(rate);
bucket->burst = burst;
bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst);
bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst);
}
/**
* Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>.
*/
void
token_bucket_reset(token_bucket_t *bucket,
uint32_t now_ts)
{
bucket->read_bucket = bucket->burst;
bucket->write_bucket = bucket->burst;
bucket->last_refilled_at_ts = now_ts;
}
/* Helper: see token_bucket_refill */
static int
refill_single_bucket(int32_t *bucketptr,
const uint32_t rate,
const int32_t burst,
const uint32_t elapsed_steps)
{
const int was_empty = (*bucketptr <= 0);
/* The casts here prevent an underflow.
*
* Note that even if the bucket value is negative, subtracting it from
* "burst" will still produce a correct result. If this result is
* ridiculously high, then the "elapsed_steps > gap / rate" check below
* should catch it. */
const size_t gap = ((size_t)burst) - ((size_t)*bucketptr);
if (elapsed_steps > gap / rate) {
*bucketptr = burst;
} else {
*bucketptr += rate * elapsed_steps;
}
return was_empty && *bucketptr > 0;
}
/**
* Refill <b>bucket</b> as appropriate, given that the current timestamp
* is <b>now_ts</b>.
*
* Return a bitmask containing TB_READ iff read bucket was empty and became
* nonempty, and TB_WRITE iff the write bucket was empty and became nonempty.
*/
int
token_bucket_refill(token_bucket_t *bucket,
uint32_t now_ts)
{
const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts);
const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP;
if (!elapsed_steps) {
/* Note that if less than one whole step elapsed, we don't advance the
* time in last_refilled_at_ts. That's intentional: we want to make sure
* that we add some bytes to it eventually. */
return 0;
}
int flags = 0;
if (refill_single_bucket(&bucket->read_bucket,
bucket->rate, bucket->burst, elapsed_steps))
flags |= TB_READ;
if (refill_single_bucket(&bucket->write_bucket,
bucket->rate, bucket->burst, elapsed_steps))
flags |= TB_WRITE;
bucket->last_refilled_at_ts = now_ts;
return flags;
}
static int
decrement_single_bucket(int32_t *bucketptr,
ssize_t n)
{
if (BUG(n < 0))
return 0;
const int becomes_empty = *bucketptr > 0 && n >= *bucketptr;
*bucketptr -= n;
return becomes_empty;
}
/**
* Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes.
*
* Return true if the bucket was nonempty and became empty; return false
* otherwise.
*/
int
token_bucket_dec_read(token_bucket_t *bucket,
ssize_t n)
{
return decrement_single_bucket(&bucket->read_bucket, n);
}
/**
* Decrement the write token bucket in <b>bucket</b> by <b>n</b> bytes.
*
* Return true if the bucket was nonempty and became empty; return false
* otherwise.
*/
int
token_bucket_dec_write(token_bucket_t *bucket,
ssize_t n)
{
return decrement_single_bucket(&bucket->write_bucket, n);
}

72
src/common/token_bucket.h Normal file
View File

@ -0,0 +1,72 @@
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file token_bucket.h
* \brief Headers for token_bucket.c
**/
#ifndef TOR_TOKEN_BUCKET_H
#define TOR_TOKEN_BUCKET_H
#include "torint.h"
typedef struct token_bucket_t {
uint32_t rate;
int32_t burst;
int32_t read_bucket;
int32_t write_bucket;
uint32_t last_refilled_at_ts;
} token_bucket_t;
#define TOKEN_BUCKET_MAX_BURST INT32_MAX
void token_bucket_init(token_bucket_t *bucket,
uint32_t rate,
uint32_t burst,
uint32_t now_ts);
void token_bucket_adjust(token_bucket_t *bucket,
uint32_t rate, uint32_t burst);
void token_bucket_reset(token_bucket_t *bucket,
uint32_t now_ts);
#define TB_READ 1
#define TB_WRITE 2
int token_bucket_refill(token_bucket_t *bucket,
uint32_t now_ts);
int token_bucket_dec_read(token_bucket_t *bucket,
ssize_t n);
int token_bucket_dec_write(token_bucket_t *bucket,
ssize_t n);
static inline size_t token_bucket_get_read(const token_bucket_t *bucket);
static inline size_t
token_bucket_get_read(const token_bucket_t *bucket)
{
const ssize_t b = bucket->read_bucket;
return b >= 0 ? b : 0;
}
static inline size_t token_bucket_get_write(const token_bucket_t *bucket);
static inline size_t
token_bucket_get_write(const token_bucket_t *bucket)
{
const ssize_t b = bucket->write_bucket;
return b >= 0 ? b : 0;
}
#ifdef TOKEN_BUCKET_PRIVATE
/* To avoid making the rates too small, we consider units of "steps",
* where a "step" is defined as this many timestamp ticks. Keep this
* a power of two if you can. */
#define TICKS_PER_STEP 16
#endif
#endif /* TOR_TOKEN_BUCKET_H */

View File

@ -89,6 +89,7 @@ src_test_test_SOURCES = \
src/test/test_address.c \ src/test/test_address.c \
src/test/test_address_set.c \ src/test/test_address_set.c \
src/test/test_buffers.c \ src/test/test_buffers.c \
src/test/test_bwmgt.c \
src/test/test_cell_formats.c \ src/test/test_cell_formats.c \
src/test/test_cell_queue.c \ src/test/test_cell_queue.c \
src/test/test_channel.c \ src/test/test_channel.c \

View File

@ -812,6 +812,7 @@ struct testgroup_t testgroups[] = {
{ "address/", address_tests }, { "address/", address_tests },
{ "address_set/", address_set_tests }, { "address_set/", address_set_tests },
{ "buffer/", buffer_tests }, { "buffer/", buffer_tests },
{ "bwmgt/", bwmgt_tests },
{ "cellfmt/", cell_format_tests }, { "cellfmt/", cell_format_tests },
{ "cellqueue/", cell_queue_tests }, { "cellqueue/", cell_queue_tests },
{ "channel/", channel_tests }, { "channel/", channel_tests },

View File

@ -178,6 +178,7 @@ extern struct testcase_t accounting_tests[];
extern struct testcase_t addr_tests[]; extern struct testcase_t addr_tests[];
extern struct testcase_t address_tests[]; extern struct testcase_t address_tests[];
extern struct testcase_t address_set_tests[]; extern struct testcase_t address_set_tests[];
extern struct testcase_t bwmgt_tests[];
extern struct testcase_t buffer_tests[]; extern struct testcase_t buffer_tests[];
extern struct testcase_t cell_format_tests[]; extern struct testcase_t cell_format_tests[];
extern struct testcase_t cell_queue_tests[]; extern struct testcase_t cell_queue_tests[];

199
src/test/test_bwmgt.c Normal file
View File

@ -0,0 +1,199 @@
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file test_bwmgt.c
* \brief tests for bandwidth management / token bucket functions
*/
#define TOKEN_BUCKET_PRIVATE
#include "or.h"
#include "test.h"
#include "token_bucket.h"
// an imaginary time, in timestamp units. Chosen so it will roll over.
static const uint32_t START_TS = UINT32_MAX-10;
static const int32_t KB = 1024;
static void
test_bwmgt_token_buf_init(void *arg)
{
(void)arg;
token_bucket_t b;
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
// Burst is correct
tt_uint_op(b.burst, OP_EQ, 64*KB);
// Rate is correct, within 1 percent.
{
uint32_t ticks_per_sec =
(uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000);
uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP);
tt_uint_op(rate_per_sec, OP_GT, 16*KB-160);
tt_uint_op(rate_per_sec, OP_LT, 16*KB+160);
}
// Bucket starts out full:
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS);
tt_int_op(b.read_bucket, OP_EQ, 64*KB);
done:
;
}
static void
test_bwmgt_token_buf_adjust(void *arg)
{
(void)arg;
token_bucket_t b;
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
uint32_t rate_orig = b.rate;
// Increasing burst
token_bucket_adjust(&b, 16*KB, 128*KB);
tt_uint_op(b.rate, OP_EQ, rate_orig);
tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
tt_uint_op(b.burst, OP_EQ, 128*KB);
// Decreasing burst but staying above bucket
token_bucket_adjust(&b, 16*KB, 96*KB);
tt_uint_op(b.rate, OP_EQ, rate_orig);
tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
tt_uint_op(b.burst, OP_EQ, 96*KB);
// Decreasing burst below bucket,
token_bucket_adjust(&b, 16*KB, 48*KB);
tt_uint_op(b.rate, OP_EQ, rate_orig);
tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
tt_uint_op(b.burst, OP_EQ, 48*KB);
// Changing rate.
token_bucket_adjust(&b, 32*KB, 48*KB);
tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10);
tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10);
tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
tt_uint_op(b.burst, OP_EQ, 48*KB);
done:
;
}
static void
test_bwmgt_token_buf_dec(void *arg)
{
(void)arg;
token_bucket_t b;
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
// full-to-not-full.
tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB));
tt_int_op(b.read_bucket, OP_EQ, 63*KB);
// Full to almost-not-full
tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1));
tt_int_op(b.read_bucket, OP_EQ, 1);
// almost-not-full to empty.
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1));
tt_int_op(b.read_bucket, OP_EQ, 0);
// reset bucket, try full-to-empty
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB));
tt_int_op(b.read_bucket, OP_EQ, 0);
// reset bucket, try underflow.
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1));
tt_int_op(b.read_bucket, OP_EQ, -1);
// A second underflow does not make the bucket empty.
tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000));
tt_int_op(b.read_bucket, OP_EQ, -1001);
done:
;
}
static void
test_bwmgt_token_buf_refill(void *arg)
{
(void)arg;
token_bucket_t b;
const uint32_t SEC =
(uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000);
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
/* Make the buffer much emptier, then let one second elapse. */
token_bucket_dec_read(&b, 48*KB);
tt_int_op(b.read_bucket, OP_EQ, 16*KB);
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC));
tt_int_op(b.read_bucket, OP_GT, 32*KB - 300);
tt_int_op(b.read_bucket, OP_LT, 32*KB + 300);
/* Another half second. */
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
tt_int_op(b.read_bucket, OP_GT, 40*KB - 400);
tt_int_op(b.read_bucket, OP_LT, 40*KB + 400);
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2);
/* No time: nothing happens. */
{
const uint32_t bucket_orig = b.read_bucket;
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
tt_int_op(b.read_bucket, OP_EQ, bucket_orig);
}
/* Another 30 seconds: fill the bucket. */
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30));
tt_int_op(b.read_bucket, OP_EQ, b.burst);
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30);
/* Another 30 seconds: nothing happens. */
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60));
tt_int_op(b.read_bucket, OP_EQ, b.burst);
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60);
/* Empty the bucket, let two seconds pass, and make sure that a refill is
* noticed. */
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst));
tt_int_op(0, OP_EQ, b.read_bucket);
tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61));
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62));
tt_int_op(b.read_bucket, OP_GT, 32*KB-300);
tt_int_op(b.read_bucket, OP_LT, 32*KB+300);
/* Underflow the bucket, make sure we detect when it has tokens again. */
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB));
tt_int_op(-16*KB, OP_EQ, b.read_bucket);
// half a second passes...
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
tt_int_op(b.read_bucket, OP_GT, -8*KB-200);
tt_int_op(b.read_bucket, OP_LT, -8*KB+200);
// a second passes
tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65));
tt_int_op(b.read_bucket, OP_GT, 8*KB-200);
tt_int_op(b.read_bucket, OP_LT, 8*KB+200);
// a ridiculous amount of time passes
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
tt_int_op(b.read_bucket, OP_EQ, b.burst);
done:
;
}
#define BWMGT(name) \
{ #name, test_bwmgt_ ## name , 0, NULL, NULL }
struct testcase_t bwmgt_tests[] = {
BWMGT(token_buf_init),
BWMGT(token_buf_adjust),
BWMGT(token_buf_dec),
BWMGT(token_buf_refill),
END_OF_TESTCASES
};