Merge branch 'ewma'

This commit is contained in:
Nick Mathewson 2009-12-18 22:33:02 -05:00
commit 05a2473b7f
12 changed files with 641 additions and 52 deletions

View File

@ -1,4 +1,14 @@
Changes in version 0.2.2.7-alpha - 2009-??-??
o Major features:
- When choosing which cells to relay first, we can now favor circuits
that have been quiet recently, so as to get lower latency for
low-volume circuits. By default, relays enable or disable this
feature based on a setting in the consensus. Preliminary testing
suggests that this should make the network feel faster as more relays
use it. You can override this default by using the new
"CircuitPriorityHalflife" config option. Design and code by Ian
Goldberg, Can Tang, and Chris Alexander.
o Minor features:
- New config option "CircuitStreamTimeout" to override our internal
timeout schedule for how many seconds until we detach a stream from

View File

@ -424,6 +424,19 @@ ORPort. (Default: 1)
\fBPreferTunneledDirConns \fR\fB0\fR|\fB1\fP
If non-zero, we will avoid directory servers that don't support tunneled
directory connections, when possible. (Default: 1)
.LP
.TP
\fBCircuitPriorityHalflife \fR\fBNUM\fB1\fP
If this value is set, we override the default algorithm for choosing which
circuit's cells to deliver or relay first. When the value is 0, we
round-robin between the active circuits on a connection, delivering one cell
from each in turn. When the value is positive, we prefer delivering cells
from whichever connection has the lowest weighted cell count, where cells are
weighted exponentially according to the supplied CircuitPrioirityHalflife
value (in seconds). If this option is not set at all, we use the behavior
recommended in the current consensus networkstatus.
This is an advanced option; you generally shouldn't have mess with it.
(Default: not set.)
.SH CLIENT OPTIONS
.PP

View File

@ -605,6 +605,38 @@ smartlist_uniq_strings(smartlist_t *sl)
/* Heap-based priority queue implementation for O(lg N) insert and remove.
* Recall that the heap property is that, for every index I, h[I] <
* H[LEFT_CHILD[I]] and h[I] < H[RIGHT_CHILD[I]].
*
* For us to remove items other than the topmost item, each item must store
* its own index within the heap. When calling the pqueue functions, tell
* them about the offset of the field that stores the index within the item.
*
* Example:
*
* typedef struct timer_t {
* struct timeval tv;
* int heap_index;
* } timer_t;
*
* static int compare(const void *p1, const void *p2) {
* const timer_t *t1 = p1, *t2 = p2;
* if (t1->tv.tv_sec < t2->tv.tv_sec) {
* return -1;
* } else if (t1->tv.tv_sec > t2->tv.tv_sec) {
* return 1;
* } else {
* return t1->tv.tv_usec - t2->tv_usec;
* }
* }
*
* void timer_heap_insert(smartlist_t *heap, timer_t *timer) {
* smartlist_pqueue_add(heap, compare, STRUCT_OFFSET(timer_t, heap_index),
* timer);
* }
*
* void timer_heap_pop(smartlist_t *heap) {
* return smartlist_pqueue_pop(heap, compare,
* STRUCT_OFFSET(timer_t, heap_index));
* }
*/
/* For a 1-indexed array, we would use LEFT_CHILD[x] = 2*x and RIGHT_CHILD[x]
@ -616,12 +648,22 @@ smartlist_uniq_strings(smartlist_t *sl)
#define RIGHT_CHILD(i) ( 2*(i) + 2 )
#define PARENT(i) ( ((i)-1) / 2 )
#define IDXP(p) ((int*)STRUCT_VAR_P(p, idx_field_offset))
#define UPDATE_IDX(i) do { \
void *updated = sl->list[i]; \
*IDXP(updated) = i; \
} while (0)
#define IDX_OF_ITEM(p) (*IDXP(p))
/** Helper. <b>sl</b> may have at most one violation of the heap property:
* the item at <b>idx</b> may be greater than one or both of its children.
* Restore the heap property. */
static INLINE void
smartlist_heapify(smartlist_t *sl,
int (*compare)(const void *a, const void *b),
int idx_field_offset,
int idx)
{
while (1) {
@ -644,21 +686,28 @@ smartlist_heapify(smartlist_t *sl,
void *tmp = sl->list[idx];
sl->list[idx] = sl->list[best_idx];
sl->list[best_idx] = tmp;
UPDATE_IDX(idx);
UPDATE_IDX(best_idx);
idx = best_idx;
}
}
}
/** Insert <b>item</b> into the heap stored in <b>sl</b>, where order
* is determined by <b>compare</b>. */
/** Insert <b>item</b> into the heap stored in <b>sl</b>, where order is
* determined by <b>compare</b> and the offset of the item in the heap is
* stored in an int-typed field at position <b>idx_field_offset</b> within
* item.
*/
void
smartlist_pqueue_add(smartlist_t *sl,
int (*compare)(const void *a, const void *b),
int idx_field_offset,
void *item)
{
int idx;
smartlist_add(sl,item);
UPDATE_IDX(sl->num_used-1);
for (idx = sl->num_used - 1; idx; ) {
int parent = PARENT(idx);
@ -666,6 +715,8 @@ smartlist_pqueue_add(smartlist_t *sl,
void *tmp = sl->list[parent];
sl->list[parent] = sl->list[idx];
sl->list[idx] = tmp;
UPDATE_IDX(parent);
UPDATE_IDX(idx);
idx = parent;
} else {
return;
@ -674,32 +725,63 @@ smartlist_pqueue_add(smartlist_t *sl,
}
/** Remove and return the top-priority item from the heap stored in <b>sl</b>,
* where order is determined by <b>compare</b>. <b>sl</b> must not be
* empty. */
* where order is determined by <b>compare</b> and the item's position is
* stored at position <b>idx_field_offset</b> within the item. <b>sl</b> must
* not be empty. */
void *
smartlist_pqueue_pop(smartlist_t *sl,
int (*compare)(const void *a, const void *b))
int (*compare)(const void *a, const void *b),
int idx_field_offset)
{
void *top;
tor_assert(sl->num_used);
top = sl->list[0];
*IDXP(top)=-1;
if (--sl->num_used) {
sl->list[0] = sl->list[sl->num_used];
smartlist_heapify(sl, compare, 0);
UPDATE_IDX(0);
smartlist_heapify(sl, compare, idx_field_offset, 0);
}
return top;
}
/** Remove the item <b>item</b> from the heap stored in <b>sl</b>,
* where order is determined by <b>compare</b> and the item's position is
* stored at position <b>idx_field_offset</b> within the item. <b>sl</b> must
* not be empty. */
void
smartlist_pqueue_remove(smartlist_t *sl,
int (*compare)(const void *a, const void *b),
int idx_field_offset,
void *item)
{
int idx = IDX_OF_ITEM(item);
tor_assert(idx >= 0);
tor_assert(sl->list[idx] == item);
--sl->num_used;
*IDXP(item) = -1;
if (idx == sl->num_used) {
return;
} else {
sl->list[idx] = sl->list[sl->num_used];
UPDATE_IDX(idx);
smartlist_heapify(sl, compare, idx_field_offset, idx);
}
}
/** Assert that the heap property is correctly maintained by the heap stored
* in <b>sl</b>, where order is determined by <b>compare</b>. */
void
smartlist_pqueue_assert_ok(smartlist_t *sl,
int (*compare)(const void *a, const void *b))
int (*compare)(const void *a, const void *b),
int idx_field_offset)
{
int i;
for (i = sl->num_used - 1; i > 0; --i) {
tor_assert(compare(sl->list[PARENT(i)], sl->list[i]) <= 0);
for (i = sl->num_used - 1; i >= 0; --i) {
if (i>0)
tor_assert(compare(sl->list[PARENT(i)], sl->list[i]) <= 0);
tor_assert(IDX_OF_ITEM(sl->list[i]) == i);
}
}

View File

@ -118,11 +118,18 @@ int smartlist_bsearch_idx(const smartlist_t *sl, const void *key,
void smartlist_pqueue_add(smartlist_t *sl,
int (*compare)(const void *a, const void *b),
int idx_field_offset,
void *item);
void *smartlist_pqueue_pop(smartlist_t *sl,
int (*compare)(const void *a, const void *b));
int (*compare)(const void *a, const void *b),
int idx_field_offset);
void smartlist_pqueue_remove(smartlist_t *sl,
int (*compare)(const void *a, const void *b),
int idx_field_offset,
void *item);
void smartlist_pqueue_assert_ok(smartlist_t *sl,
int (*compare)(const void *a, const void *b));
int (*compare)(const void *a, const void *b),
int idx_field_offset);
#define SPLIT_SKIP_SPACE 0x01
#define SPLIT_IGNORE_BLANK 0x02

View File

@ -385,6 +385,12 @@ init_circuit_base(circuit_t *circ)
circ->package_window = circuit_initial_package_window();
circ->deliver_window = CIRCWINDOW_START;
/* Initialize the cell_ewma_t structure */
circ->n_cell_ewma.last_adjusted_tick = cell_ewma_get_tick();
circ->n_cell_ewma.cell_count = 0.0;
circ->n_cell_ewma.heap_index = -1;
circ->n_cell_ewma.is_for_p_conn = 0;
circuit_add(circ);
}
@ -432,6 +438,16 @@ or_circuit_new(circid_t p_circ_id, or_connection_t *p_conn)
init_circuit_base(TO_CIRCUIT(circ));
/* Initialize the cell_ewma_t structure */
/* Initialize the cell counts to 0 */
circ->p_cell_ewma.cell_count = 0.0;
circ->p_cell_ewma.last_adjusted_tick = cell_ewma_get_tick();
circ->p_cell_ewma.is_for_p_conn = 1;
/* It's not in any heap yet. */
circ->p_cell_ewma.heap_index = -1;
return circ;
}

View File

@ -167,6 +167,7 @@ static config_var_t _option_vars[] = {
V(CircuitBuildTimeout, INTERVAL, "0"),
V(CircuitIdleTimeout, INTERVAL, "1 hour"),
V(CircuitStreamTimeout, INTERVAL, "0"),
V(CircuitPriorityHalflife, DOUBLE, "-100.0"), /*negative:'Use default'*/
V(ClientDNSRejectInternalAddresses, BOOL,"1"),
V(ClientOnly, BOOL, "0"),
V(ConsensusParams, STRING, NULL),
@ -212,6 +213,7 @@ static config_var_t _option_vars[] = {
V(ExitPolicyRejectPrivate, BOOL, "1"),
V(ExitPortStatistics, BOOL, "0"),
V(ExtraInfoStatistics, BOOL, "0"),
V(FallbackNetworkstatusFile, FILENAME,
SHARE_DATADIR PATH_SEPARATOR "tor" PATH_SEPARATOR "fallback-consensus"),
V(FascistFirewall, BOOL, "0"),
@ -355,6 +357,7 @@ static config_var_t _option_vars[] = {
VAR("__HashedControlSessionPassword", LINELIST, HashedControlSessionPassword,
NULL),
V(MinUptimeHidServDirectoryV2, INTERVAL, "24 hours"),
{ NULL, CONFIG_TYPE_OBSOLETE, 0, NULL }
};
@ -1406,6 +1409,9 @@ options_act(or_options_t *old_options)
if (accounting_is_enabled(options))
configure_accounting(time(NULL));
/* Change the cell EWMA settings */
cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus());
/* Check for transitions that need action. */
if (old_options) {
if (options->UseEntryGuards && !old_options->UseEntryGuards) {

View File

@ -180,6 +180,9 @@ or_connection_new(int socket_family)
or_conn->timestamp_last_added_nonpadding = time(NULL);
or_conn->next_circ_id = crypto_rand_int(1<<15);
or_conn->active_circuit_pqueue = smartlist_create();
or_conn->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick();
return or_conn;
}
@ -375,6 +378,7 @@ _connection_free(connection_t *conn)
or_conn->tls = NULL;
or_handshake_state_free(or_conn->handshake_state);
or_conn->handshake_state = NULL;
smartlist_free(or_conn->active_circuit_pqueue);
tor_free(or_conn->nickname);
}
if (CONN_IS_EDGE(conn)) {
@ -2278,8 +2282,8 @@ connection_read_bucket_should_increase(or_connection_t *conn)
* Mark the connection and return -1 if you want to close it, else
* return 0.
*/
int
connection_handle_read(connection_t *conn)
static int
connection_handle_read_impl(connection_t *conn)
{
int max_to_read=-1, try_to_read;
size_t before, n_read = 0;
@ -2374,6 +2378,16 @@ loop_again:
return 0;
}
int
connection_handle_read(connection_t *conn)
{
int res;
tor_gettimeofday_cache_clear();
res = connection_handle_read_impl(conn);
return res;
}
/** Pull in new bytes from conn-\>s or conn-\>linked_conn onto conn-\>inbuf,
* either directly or via TLS. Reduce the token buckets by the number of bytes
* read.
@ -2575,8 +2589,8 @@ connection_outbuf_too_full(connection_t *conn)
* Mark the connection and return -1 if you want to close it, else
* return 0.
*/
int
connection_handle_write(connection_t *conn, int force)
static int
connection_handle_write_impl(connection_t *conn, int force)
{
int e;
socklen_t len=(socklen_t)sizeof(e);
@ -2743,6 +2757,15 @@ connection_handle_write(connection_t *conn, int force)
return 0;
}
int
connection_handle_write(connection_t *conn, int force)
{
int res;
tor_gettimeofday_cache_clear();
res = connection_handle_write_impl(conn, force);
return res;
}
/** OpenSSL TLS record size is 16383; this is close. The goal here is to
* push data out as soon as we know there's enough for a TLS record, so
* during periods of high load we won't read entire megabytes from

View File

@ -128,6 +128,8 @@ typedef struct cached_resolve_t {
uint32_t ttl; /**< What TTL did the nameserver tell us? */
/** Connections that want to know when we get an answer for this resolve. */
pending_connection_t *pending_connections;
/** Position of this element in the heap*/
int minheap_idx;
} cached_resolve_t;
static void purge_expired_resolves(time_t now);
@ -344,6 +346,7 @@ set_expiry(cached_resolve_t *resolve, time_t expires)
resolve->expire = expires;
smartlist_pqueue_add(cached_resolve_pqueue,
_compare_cached_resolves_by_expiry,
STRUCT_OFFSET(cached_resolve_t, minheap_idx),
resolve);
}
@ -389,7 +392,8 @@ purge_expired_resolves(time_t now)
if (resolve->expire > now)
break;
smartlist_pqueue_pop(cached_resolve_pqueue,
_compare_cached_resolves_by_expiry);
_compare_cached_resolves_by_expiry,
STRUCT_OFFSET(cached_resolve_t, minheap_idx));
if (resolve->state == CACHE_STATE_PENDING) {
log_debug(LD_EXIT,
@ -751,6 +755,7 @@ dns_resolve_impl(edge_connection_t *exitconn, int is_resolve,
resolve = tor_malloc_zero(sizeof(cached_resolve_t));
resolve->magic = CACHED_RESOLVE_MAGIC;
resolve->state = CACHE_STATE_PENDING;
resolve->minheap_idx = -1;
resolve->is_reverse = is_reverse;
strlcpy(resolve->address, exitconn->_base.address, sizeof(resolve->address));
@ -1736,7 +1741,8 @@ _assert_cache_ok(void)
return;
smartlist_pqueue_assert_ok(cached_resolve_pqueue,
_compare_cached_resolves_by_expiry);
_compare_cached_resolves_by_expiry,
STRUCT_OFFSET(cached_resolve_t, minheap_idx));
SMARTLIST_FOREACH(cached_resolve_pqueue, cached_resolve_t *, res,
{

View File

@ -1679,6 +1679,7 @@ networkstatus_set_current_consensus(const char *consensus,
update_consensus_networkstatus_fetch_time(now);
dirvote_recalculate_timing(get_options(), now);
routerstatus_list_update_named_server_map();
cell_ewma_set_scale_factor(get_options(), current_consensus);
}
if (!from_cache) {

View File

@ -1075,6 +1075,17 @@ typedef struct or_connection_t {
* free up on this connection's outbuf. Every time we pull cells from a
* circuit, we advance this pointer to the next circuit in the ring. */
struct circuit_t *active_circuits;
/** Priority queue of cell_ewma_t for circuits with queued cells waiting for
* room to free up on this connection's outbuf. Kept in heap order
* according to EWMA.
*
* This is redundant with active_circuits; if we ever decide only to use the
* cell_ewma algorithm for choosing circuits, we can remove active_circuits.
*/
smartlist_t *active_circuit_pqueue;
/** The tick on which the cell_ewma_ts in active_circuit_pqueue last had
* their ewma values rescaled. */
unsigned active_circuit_pqueue_last_recalibrated;
struct or_connection_t *next_with_same_id; /**< Next connection with same
* identity digest as this one. */
} or_connection_t;
@ -1992,6 +2003,29 @@ typedef struct {
time_t expiry_time;
} cpath_build_state_t;
/**
* The cell_ewma_t structure keeps track of how many cells a circuit has
* transferred recently. It keeps an EWMA (exponentially weighted moving
* average) of the number of cells flushed from the circuit queue onto a
* connection in connection_or_flush_from_first_active_circuit().
*/
typedef struct {
/** The last 'tick' at which we recalibrated cell_count.
*
* A cell sent at exactly the start of this tick has weight 1.0. Cells sent
* since the start of this tick have weight greater than 1.0; ones sent
* earlier have less weight. */
unsigned last_adjusted_tick;
/** The EWMA of the cell count. */
double cell_count;
/** True iff this is the cell count for a circuit's previous
* connection. */
unsigned int is_for_p_conn : 1;
/** The position of the circuit within the or connection's priority
* queue. */
int heap_index;
} cell_ewma_t;
#define ORIGIN_CIRCUIT_MAGIC 0x35315243u
#define OR_CIRCUIT_MAGIC 0x98ABC04Fu
@ -2081,6 +2115,11 @@ typedef struct circuit_t {
/** Unique ID for measuring tunneled network status requests. */
uint64_t dirreq_id;
/** The EWMA count for the number of cells flushed from the
* n_conn_cells queue. Used to determine which circuit to flush from next.
*/
cell_ewma_t n_cell_ewma;
} circuit_t;
/** Largest number of relay_early cells that we can send on a given
@ -2212,6 +2251,10 @@ typedef struct or_circuit_t {
* exit-ward queues of this circuit; reset every time when writing
* buffer stats to disk. */
uint64_t total_cell_waiting_time;
/** The EWMA count for the number of cells flushed from the
* p_conn_cells queue. */
cell_ewma_t p_cell_ewma;
} or_circuit_t;
/** Convert a circuit subtype to a circuit_t.*/
@ -2743,6 +2786,21 @@ typedef struct {
* to make this false. */
int ReloadTorrcOnSIGHUP;
/* The main parameter for picking circuits within a connection.
*
* If this value is positive, when picking a cell to relay on a connection,
* we always relay from the circuit whose weighted cell count is lowest.
* Cells are weighted exponentially such that if one cell is sent
* 'CircuitPriorityHalflife' seconds before another, it counts for half as
* much.
*
* If this value is zero, we're disabling the cell-EWMA algorithm.
*
* If this value is negative, we're using the default approach
* according to either Tor or a parameter set in the consensus.
*/
double CircuitPriorityHalflife;
} or_options_t;
/** Persistent state for an onion router, as saved to disk. */
@ -4453,6 +4511,9 @@ int append_address_to_payload(char *payload_out, const tor_addr_t *addr);
const char *decode_address_from_payload(tor_addr_t *addr_out,
const char *payload,
int payload_len);
unsigned cell_ewma_get_tick(void);
void cell_ewma_set_scale_factor(or_options_t *options,
networkstatus_t *consensus);
/********************************* rephist.c ***************************/
@ -5135,5 +5196,7 @@ int rend_parse_introduction_points(rend_service_descriptor_t *parsed,
size_t intro_points_encoded_size);
int rend_parse_client_keys(strmap_t *parsed_clients, const char *str);
void tor_gettimeofday_cache_clear(void);
#endif

View File

@ -10,6 +10,7 @@
* receiving from circuits, plus queuing on circuits.
**/
#include <math.h>
#include "or.h"
#include "mempool.h"
@ -35,6 +36,26 @@ circuit_resume_edge_reading_helper(edge_connection_t *conn,
static int
circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint);
/** Cache the current hi-res time; the cache gets reset when libevent
* calls us. */
static struct timeval cached_time_hires = {0, 0};
static void
tor_gettimeofday_cached(struct timeval *tv)
{
if (cached_time_hires.tv_sec == 0) {
tor_gettimeofday(&cached_time_hires);
}
*tv = cached_time_hires;
}
void
tor_gettimeofday_cache_clear(void)
{
cached_time_hires.tv_sec = 0;
}
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
*/
@ -1633,7 +1654,7 @@ cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell)
insertion_time_queue_t *it_queue = queue->insertion_times;
if (!it_pool)
it_pool = mp_pool_new(sizeof(insertion_time_elem_t), 1024);
tor_gettimeofday(&now);
tor_gettimeofday_cached(&now);
#define SECONDS_IN_A_DAY 86400L
added = (uint32_t)(((now.tv_sec % SECONDS_IN_A_DAY) * 100L)
+ ((uint32_t)now.tv_usec / (uint32_t)10000L));
@ -1731,6 +1752,224 @@ prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
}
}
/** Helper for sorting cell_ewma_t values in their priority queue. */
static int
compare_cell_ewma_counts(const void *p1, const void *p2)
{
const cell_ewma_t *e1=p1, *e2=p2;
if (e1->cell_count < e2->cell_count)
return -1;
else if (e1->cell_count > e2->cell_count)
return 1;
else
return 0;
}
/** Given a cell_ewma_t, return a pointer to the circuit containing it. */
static circuit_t *
cell_ewma_to_circuit(cell_ewma_t *ewma)
{
if (ewma->is_for_p_conn) {
/* This is an or_circuit_t's p_cell_ewma. */
or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
return TO_CIRCUIT(orcirc);
} else {
/* This is some circuit's n_cell_ewma. */
return SUBTYPE_P(ewma, circuit_t, n_cell_ewma);
}
}
/* ==== Functions for scaling cell_ewma_t ====
When choosing which cells to relay first, we favor circuits that have been
quiet recently. This gives better latency on connections that aren't
pushing lots of data, and makes the network feel more interactive.
Conceptually, we take an exponentially weighted mean average of the number
of cells a circuit has sent, and allow active circuits (those with cells to
relay) to send cells in reverse order of their exponentially-weighted mean
average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts'
F^N times as much as a cell sent now, for 0<F<1.0, and we favor the
circuit that has sent the fewest cells]
If 'double' had infinite precision, we could do this simply by counting a
cell sent at startup as having weight 1.0, and a cell sent N seconds later
as having weight F^-N. This way, we would never need to re-scale
any already-sent cells.
To prevent double from overflowing, we could count a cell sent now as
having weight 1.0 and a cell sent N seconds ago as having weight F^N.
This, however, would mean we'd need to re-scale *ALL* old circuits every
time we wanted to send a cell.
So as a compromise, we divide time into 'ticks' (currently, 10-second
increments) and say that a cell sent at the start of a current tick is
worth 1.0, a cell sent N seconds before the start of the current tick is
worth F^N, and a cell sent N seconds after the start of the current tick is
worth F^-N. This way we don't overflow, and we don't need to constantly
rescale.
*/
/** How long does a tick last (seconds)? */
#define EWMA_TICK_LEN 10
/** The default per-tick scale factor, if it hasn't been overridden by a
* consensus or a configuration setting. zero means "disabled". */
#define EWMA_DEFAULT_HALFLIFE 0.0
/** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs
* and the fraction of the tick that has elapsed between the start of the tick
* and <b>now</b>. Return the former and store the latter in
* *<b>remainder_out</b>.
*
* These tick values are not meant to be shared between Tor instances, or used
* for other purposes. */
static unsigned
cell_ewma_tick_from_timeval(const struct timeval *now,
double *remainder_out)
{
unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN);
/* rem */
double rem = (now->tv_sec % EWMA_TICK_LEN) +
((double)(now->tv_usec)) / 1.0e6;
*remainder_out = rem / EWMA_TICK_LEN;
return res;
}
/** Compute and return the current cell_ewma tick. */
unsigned
cell_ewma_get_tick(void)
{
return ((unsigned)approx_time() / EWMA_TICK_LEN);
}
/** The per-tick scale factor to be used when computing cell-count EWMA
* values. (A cell sent N ticks before the start of the current tick
* has value ewma_scale_factor ** N.)
*/
static double ewma_scale_factor = 0.1;
static int ewma_enabled = 0;
#define EPSILON 0.00001
#define LOG_ONEHALF -0.69314718055994529
/** Adjust the global cell scale factor based on <b>options</b> */
void
cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus)
{
int32_t halflife_ms;
double halflife;
const char *source;
if (options && options->CircuitPriorityHalflife >= -EPSILON) {
halflife = options->CircuitPriorityHalflife;
source = "CircuitPriorityHalflife in configuration";
} else if (consensus &&
(halflife_ms = networkstatus_get_param(
consensus, "CircPriorityHalflifeMsec", -1) >= 0)) {
halflife = ((double)halflife_ms)/1000.0;
source = "CircPriorityHalflifeMsec in consensus";
} else {
halflife = EWMA_DEFAULT_HALFLIFE;
source = "Default value";
}
if (halflife <= EPSILON) {
/* The cell EWMA algorithm is disabled. */
ewma_scale_factor = 0.1;
ewma_enabled = 0;
log_info(LD_OR,
"Disabled cell_ewma algorithm because of value in %s",
source);
} else {
/* convert halflife into halflife-per-tick. */
halflife /= EWMA_TICK_LEN;
/* compute per-tick scale factor. */
ewma_scale_factor = exp( LOG_ONEHALF / halflife );
ewma_enabled = 1;
log_info(LD_OR,
"Enabled cell_ewma algorithm because of value in %s; "
"scale factor is %lf per %d seconds",
source, ewma_scale_factor, EWMA_TICK_LEN);
}
}
/** Return the multiplier necessary to convert the value of a cell sent in
* 'from_tick' to one sent in 'to_tick'. */
static INLINE double
get_scale_factor(unsigned from_tick, unsigned to_tick)
{
/* This math can wrap around, but that's okay: unsigned overflow is
well-defined */
int diff = (int)(to_tick - from_tick);
return pow(ewma_scale_factor, diff);
}
/** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to
* <b>cur_tick</b> */
static void
scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
{
double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick);
ewma->cell_count *= factor;
ewma->last_adjusted_tick = cur_tick;
}
/** Adjust the cell count of every active circuit on <b>conn</b> so
* that they are scaled with respect to <b>cur_tick</b> */
static void
scale_active_circuits(or_connection_t *conn, unsigned cur_tick)
{
double factor = get_scale_factor(
conn->active_circuit_pqueue_last_recalibrated,
cur_tick);
/** Ordinarily it isn't okay to change the value of an element in a heap,
* but it's okay here, since we are preserving the order. */
SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, {
tor_assert(e->last_adjusted_tick ==
conn->active_circuit_pqueue_last_recalibrated);
e->cell_count *= factor;
e->last_adjusted_tick = cur_tick;
});
conn->active_circuit_pqueue_last_recalibrated = cur_tick;
}
/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to
* <b>conn</b>'s priority queue of active circuits */
static void
add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma)
{
tor_assert(ewma->heap_index == -1);
scale_single_cell_ewma(ewma,
conn->active_circuit_pqueue_last_recalibrated);
smartlist_pqueue_add(conn->active_circuit_pqueue,
compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index),
ewma);
}
/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */
static void
remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma)
{
tor_assert(ewma->heap_index != -1);
smartlist_pqueue_remove(conn->active_circuit_pqueue,
compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index),
ewma);
}
/** Remove and return the first cell_ewma_t from conn's priority queue of
* active circuits. Requires that the priority queue is nonempty. */
static cell_ewma_t *
pop_first_cell_ewma_from_conn(or_connection_t *conn)
{
return smartlist_pqueue_pop(conn->active_circuit_pqueue,
compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index));
}
/** Add <b>circ</b> to the list of circuits with pending cells on
* <b>conn</b>. No effect if <b>circ</b> is already linked. */
void
@ -1744,6 +1983,8 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
return;
}
assert_active_circuits_ok_paranoid(conn);
if (! conn->active_circuits) {
conn->active_circuits = circ;
*prevp = *nextp = circ;
@ -1755,6 +1996,15 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
*prev_circ_on_conn_p(head, conn) = circ;
*prevp = old_tail;
}
if (circ->n_conn == conn) {
add_cell_ewma_to_conn(conn, &circ->n_cell_ewma);
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn);
add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma);
}
assert_active_circuits_ok_paranoid(conn);
}
@ -1772,6 +2022,8 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
return;
}
assert_active_circuits_ok_paranoid(conn);
tor_assert(next && prev);
tor_assert(*prev_circ_on_conn_p(next, conn) == circ);
tor_assert(*next_circ_on_conn_p(prev, conn) == circ);
@ -1785,6 +2037,15 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
conn->active_circuits = next;
}
*prevp = *nextp = NULL;
if (circ->n_conn == conn) {
remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma);
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn);
remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma);
}
assert_active_circuits_ok_paranoid(conn);
}
@ -1804,6 +2065,10 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
cur = next;
} while (cur != head);
orconn->active_circuits = NULL;
SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e,
e->heap_index = -1);
smartlist_clear(orconn->active_circuit_pqueue);
}
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
@ -1857,9 +2122,35 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
cell_queue_t *queue;
circuit_t *circ;
int streams_blocked;
/* The current (hi-res) time */
struct timeval now_hires;
/* The EWMA cell counter for the circuit we're flushing. */
cell_ewma_t *cell_ewma = NULL;
double ewma_increment = -1;
circ = conn->active_circuits;
if (!circ) return 0;
assert_active_circuits_ok_paranoid(conn);
/* See if we're doing the ewma circuit selection algorithm. */
if (ewma_enabled) {
unsigned tick;
double fractional_tick;
tor_gettimeofday_cached(&now_hires);
tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
if (tick != conn->active_circuit_pqueue_last_recalibrated) {
scale_active_circuits(conn, tick);
}
ewma_increment = pow(ewma_scale_factor, -fractional_tick);
cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0);
circ = cell_ewma_to_circuit(cell_ewma);
}
if (circ->n_conn == conn) {
queue = &circ->n_conn_cells;
streams_blocked = circ->streams_blocked_on_n_conn;
@ -1879,7 +2170,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
uint32_t flushed;
uint32_t cell_waiting_time;
insertion_time_queue_t *it_queue = queue->insertion_times;
tor_gettimeofday(&now);
tor_gettimeofday_cached(&now);
flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L +
(uint32_t)now.tv_usec / (uint32_t)10000L);
if (!it_queue || !it_queue->first) {
@ -1915,6 +2206,16 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
packed_cell_free_unchecked(cell);
++n_flushed;
if (cell_ewma) {
cell_ewma_t *tmp;
cell_ewma->cell_count += ewma_increment;
/* We pop and re-add the cell_ewma_t here, not above, since we need to
* re-add it immediately to keep the priority queue consistent with
* the linked-list implementation */
tmp = pop_first_cell_ewma_from_conn(conn);
tor_assert(tmp == cell_ewma);
add_cell_ewma_to_conn(conn, cell_ewma);
}
if (circ != conn->active_circuits) {
/* If this happens, the current circuit just got made inactive by
* a call in connection_write_to_buf(). That's nothing to worry about:
@ -1934,7 +2235,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */
/* Did we just ran out of cells on this queue? */
/* Did we just run out of cells on this circuit's queue? */
if (queue->n == 0) {
log_debug(LD_GENERAL, "Made a circuit inactive.");
make_circuit_inactive_on_conn(circ, conn);
@ -2057,16 +2358,31 @@ assert_active_circuits_ok(or_connection_t *orconn)
{
circuit_t *head = orconn->active_circuits;
circuit_t *cur = head;
int n = 0;
if (! head)
return;
do {
circuit_t *next = *next_circ_on_conn_p(cur, orconn);
circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
cell_ewma_t *ewma;
tor_assert(next);
tor_assert(prev);
tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
if (orconn == cur->n_conn) {
ewma = &cur->n_cell_ewma;
tor_assert(!ewma->is_for_p_conn);
} else {
ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
tor_assert(ewma->is_for_p_conn);
}
tor_assert(ewma->heap_index != -1);
tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue,
ewma->heap_index));
n++;
cur = next;
} while (cur != head);
tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
}

View File

@ -515,11 +515,17 @@ test_container_digestset(void)
smartlist_free(included);
}
/** Helper: return a tristate based on comparing two strings. */
typedef struct pq_entry_t {
const char *val;
int idx;
} pq_entry_t;
/** Helper: return a tristate based on comparing two pq_entry_t values. */
static int
_compare_strings_for_pqueue(const void *s1, const void *s2)
_compare_strings_for_pqueue(const void *p1, const void *p2)
{
return strcmp((const char*)s1, (const char*)s2);
const pq_entry_t *e1=p1, *e2=p2;
return strcmp(e1->val, e2->val);
}
/** Run unit tests for heap-based priority queue functions. */
@ -528,50 +534,90 @@ test_container_pqueue(void)
{
smartlist_t *sl = smartlist_create();
int (*cmp)(const void *, const void*);
#define OK() smartlist_pqueue_assert_ok(sl, cmp)
const int offset = STRUCT_OFFSET(pq_entry_t, idx);
#define ENTRY(s) pq_entry_t s = { #s, -1 }
ENTRY(cows);
ENTRY(zebras);
ENTRY(fish);
ENTRY(frogs);
ENTRY(apples);
ENTRY(squid);
ENTRY(daschunds);
ENTRY(eggplants);
ENTRY(weissbier);
ENTRY(lobsters);
ENTRY(roquefort);
ENTRY(chinchillas);
ENTRY(fireflies);
#define OK() smartlist_pqueue_assert_ok(sl, cmp, offset)
cmp = _compare_strings_for_pqueue;
smartlist_pqueue_add(sl, cmp, (char*)"cows");
smartlist_pqueue_add(sl, cmp, (char*)"zebras");
smartlist_pqueue_add(sl, cmp, (char*)"fish");
smartlist_pqueue_add(sl, cmp, (char*)"frogs");
smartlist_pqueue_add(sl, cmp, (char*)"apples");
smartlist_pqueue_add(sl, cmp, (char*)"squid");
smartlist_pqueue_add(sl, cmp, (char*)"daschunds");
smartlist_pqueue_add(sl, cmp, (char*)"eggplants");
smartlist_pqueue_add(sl, cmp, (char*)"weissbier");
smartlist_pqueue_add(sl, cmp, (char*)"lobsters");
smartlist_pqueue_add(sl, cmp, (char*)"roquefort");
smartlist_pqueue_add(sl, cmp, offset, &cows);
smartlist_pqueue_add(sl, cmp, offset, &zebras);
smartlist_pqueue_add(sl, cmp, offset, &fish);
smartlist_pqueue_add(sl, cmp, offset, &frogs);
smartlist_pqueue_add(sl, cmp, offset, &apples);
smartlist_pqueue_add(sl, cmp, offset, &squid);
smartlist_pqueue_add(sl, cmp, offset, &daschunds);
smartlist_pqueue_add(sl, cmp, offset, &eggplants);
smartlist_pqueue_add(sl, cmp, offset, &weissbier);
smartlist_pqueue_add(sl, cmp, offset, &lobsters);
smartlist_pqueue_add(sl, cmp, offset, &roquefort);
OK();
test_eq(smartlist_len(sl), 11);
test_streq(smartlist_get(sl, 0), "apples");
test_streq(smartlist_pqueue_pop(sl, cmp), "apples");
test_eq_ptr(smartlist_get(sl, 0), &apples);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &apples);
test_eq(smartlist_len(sl), 10);
OK();
test_streq(smartlist_pqueue_pop(sl, cmp), "cows");
test_streq(smartlist_pqueue_pop(sl, cmp), "daschunds");
smartlist_pqueue_add(sl, cmp, (char*)"chinchillas");
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &cows);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &daschunds);
smartlist_pqueue_add(sl, cmp, offset, &chinchillas);
OK();
smartlist_pqueue_add(sl, cmp, (char*)"fireflies");
smartlist_pqueue_add(sl, cmp, offset, &fireflies);
OK();
test_streq(smartlist_pqueue_pop(sl, cmp), "chinchillas");
test_streq(smartlist_pqueue_pop(sl, cmp), "eggplants");
test_streq(smartlist_pqueue_pop(sl, cmp), "fireflies");
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &chinchillas);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &eggplants);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &fireflies);
OK();
test_streq(smartlist_pqueue_pop(sl, cmp), "fish");
test_streq(smartlist_pqueue_pop(sl, cmp), "frogs");
test_streq(smartlist_pqueue_pop(sl, cmp), "lobsters");
test_streq(smartlist_pqueue_pop(sl, cmp), "roquefort");
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &fish);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &frogs);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &lobsters);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &roquefort);
OK();
test_eq(smartlist_len(sl), 3);
test_streq(smartlist_pqueue_pop(sl, cmp), "squid");
test_streq(smartlist_pqueue_pop(sl, cmp), "weissbier");
test_streq(smartlist_pqueue_pop(sl, cmp), "zebras");
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &squid);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &weissbier);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &zebras);
test_eq(smartlist_len(sl), 0);
OK();
/* Now test remove. */
smartlist_pqueue_add(sl, cmp, offset, &cows);
smartlist_pqueue_add(sl, cmp, offset, &fish);
smartlist_pqueue_add(sl, cmp, offset, &frogs);
smartlist_pqueue_add(sl, cmp, offset, &apples);
smartlist_pqueue_add(sl, cmp, offset, &squid);
smartlist_pqueue_add(sl, cmp, offset, &zebras);
test_eq(smartlist_len(sl), 6);
OK();
smartlist_pqueue_remove(sl, cmp, offset, &zebras);
test_eq(smartlist_len(sl), 5);
OK();
smartlist_pqueue_remove(sl, cmp, offset, &cows);
test_eq(smartlist_len(sl), 4);
OK();
smartlist_pqueue_remove(sl, cmp, offset, &apples);
test_eq(smartlist_len(sl), 3);
OK();
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &fish);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &frogs);
test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &squid);
test_eq(smartlist_len(sl), 0);
OK();
#undef OK
done: