Optimize cell-ewma circuit priority algorithm.

There are two big changes here:
  - We store active circuits in a priority queue for each or_conn,
    rather than doing a linear search over all the active circuits
    before we send each cell.
  - Rather than multiplying every circuit's cell-ewma by a decay
    factor every time we send a cell (thus normalizing the value of a
    current cell to 1.0 and a past cell to alpha^t), we instead
    only scale down the cell-ewma every tick (ten seconds atm),
    normalizing so that a cell sent at the start of the tick has
    value 1.0).
This commit is contained in:
Nick Mathewson 2009-12-12 00:49:48 -05:00
parent c43fee131d
commit 06e8370c33
6 changed files with 297 additions and 93 deletions

View File

@ -386,8 +386,10 @@ init_circuit_base(circuit_t *circ)
circ->deliver_window = CIRCWINDOW_START;
/* Initialize the cell_ewma_t structure */
circ->n_cell_ewma.last_cell_time = circ->highres_created;
circ->n_cell_ewma.last_adjusted_tick = cell_ewma_get_tick();
circ->n_cell_ewma.cell_count = 0.0;
circ->n_cell_ewma.heap_index = -1;
circ->n_cell_ewma.is_for_p_conn = 0;
circuit_add(circ);
}
@ -438,11 +440,13 @@ or_circuit_new(circid_t p_circ_id, or_connection_t *p_conn)
/* Initialize the cell_ewma_t structure */
/* Fetch the timeval that init_circuit_base filled in. */
circ->p_cell_ewma.last_cell_time = TO_CIRCUIT(circ)->highres_created;
/* Initialize the cell counts to 0 */
circ->p_cell_ewma.cell_count = 0.0;
circ->p_cell_ewma.last_adjusted_tick = cell_ewma_get_tick();
circ->p_cell_ewma.is_for_p_conn = 1;
/* It's not in any heap yet. */
circ->p_cell_ewma.heap_index = -1;
return circ;
}

View File

@ -1379,6 +1379,9 @@ options_act(or_options_t *old_options)
if (accounting_is_enabled(options))
configure_accounting(time(NULL));
/* Change the cell EWMA settings */
cell_ewma_set_scale_factor(options);
/* Check for transitions that need action. */
if (old_options) {
if (options->UseEntryGuards && !old_options->UseEntryGuards) {

View File

@ -180,6 +180,9 @@ or_connection_new(int socket_family)
or_conn->timestamp_last_added_nonpadding = time(NULL);
or_conn->next_circ_id = crypto_rand_int(1<<15);
or_conn->active_circuit_pqueue = smartlist_create();
or_conn->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick();
return or_conn;
}
@ -375,6 +378,7 @@ _connection_free(connection_t *conn)
or_conn->tls = NULL;
or_handshake_state_free(or_conn->handshake_state);
or_conn->handshake_state = NULL;
smartlist_free(or_conn->active_circuit_pqueue);
tor_free(or_conn->nickname);
}
if (CONN_IS_EDGE(conn)) {

View File

@ -80,7 +80,6 @@ connection_or_clear_identity_map(void)
}
});
digestmap_free(orconn_identity_map, NULL);
orconn_identity_map = NULL;
}

View File

@ -1075,6 +1075,17 @@ typedef struct or_connection_t {
* free up on this connection's outbuf. Every time we pull cells from a
* circuit, we advance this pointer to the next circuit in the ring. */
struct circuit_t *active_circuits;
/** Priority queue of cell_ewma_t for circuits with queued cells waiting for
* room to free up on this connection's outbuf. Kept in heap order
* according to EWMA.
*
* This is redundant with active_circuits; if we ever decide only to use the
* cell_ewma algorithm for choosing circuits, we can remove active_circuits.
*/
smartlist_t *active_circuit_pqueue;
/** The tick on which the ciell_ewma_t's in active_circuit_pqueue last had
* their ewma values rescaled. */
unsigned active_circuit_pqueue_last_recalibrated;
struct or_connection_t *next_with_same_id; /**< Next connection with same
* identity digest as this one. */
} or_connection_t;
@ -1994,16 +2005,25 @@ typedef struct {
/**
* The cell_ewma_t structure keeps track of how many cells a circuit has
* transferred recently. It keeps an EWMA (exponentially weighted
* moving average) of the number of cells flushed in
* connection_or_flush_from_first_active_circuit().
* transferred recently. It keeps an EWMA (exponentially weighted moving
* average) of the number of cells flushed from the circuit queue onto a
* connection in connection_or_flush_from_first_active_circuit().
*/
typedef struct {
/** The last time a cell was flushed from this circuit. */
struct timeval last_cell_time;
/** The EWMA of the cell count. */
double cell_count;
/** The last 'tick' at which we recalibrated cell_count.
*
* A cell sent at exactly the start of this tick has weight 1.0. Cells sent
* since the start of this tick have weight greater than 1.0; ones sent
* earlier have less weight. */
unsigned last_adjusted_tick;
/** The EWMA of the cell count. */
double cell_count;
/** True iff this is a the cell count for a circuit's previous
* connection. */
unsigned int is_for_p_conn : 1;
/** The position of the circuit within the or connection's priority
* queue. */
int heap_index;
} cell_ewma_t;
#define ORIGIN_CIRCUIT_MAGIC 0x35315243u
@ -2097,7 +2117,8 @@ typedef struct circuit_t {
uint64_t dirreq_id;
/** The EWMA count for the number of cells flushed from the
* n_conn_cells queue. */
* n_conn_cells queue. Used to determine which circuit to flush from next.
*/
cell_ewma_t n_cell_ewma;
} circuit_t;
@ -4469,6 +4490,8 @@ int append_address_to_payload(char *payload_out, const tor_addr_t *addr);
const char *decode_address_from_payload(tor_addr_t *addr_out,
const char *payload,
int payload_len);
unsigned cell_ewma_get_tick(void);
void cell_ewma_set_scale_factor(or_options_t *options);
/********************************* rephist.c ***************************/

View File

@ -1752,6 +1752,194 @@ prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
}
}
/** Helper for sorting cell_ewma_t values in their priority queue. */
static int
compare_cell_ewma_counts(const void *p1, const void *p2)
{
const cell_ewma_t *e1=p1, *e2=p2;
if (e1->cell_count < e2->cell_count)
return -1;
else if (e1->cell_count > e2->cell_count)
return 1;
else
return 0;
}
/** Given a cell_ewma_t, return a pointer to the circuit containing it. */
static circuit_t *
cell_ewma_to_circuit(cell_ewma_t *ewma)
{
if (ewma->is_for_p_conn) {
/* This is an or_circuit_t's p_cell_ewma. */
or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
return TO_CIRCUIT(orcirc);
} else {
/* This is some circuit's n_cell_ewma. */
return SUBTYPE_P(ewma, circuit_t, n_cell_ewma);
}
}
/* ==== Functions for scaling cell_ewma_t ====
When choosing which cells to relay first, we favor circuits that have been
quiet recently. This gives better latency on connections that aren't
pushing lots of data, and makes the network feel more interactive.
Conceptually, we take an exponentially weighted mean average of the number
of cells a circuit has sent, and allow active circuits (those with cells to
relay) to send cells in order of their exponentially-weighted mean average
(EWMA) cell count. [That is, a cell sent N seconds ago 'counts' F^N times
as much as a cell sent now, for 0<F<1.0.]
If 'double' had infinite precision, we could do this simply by counting a
cell sent at startup as having weight 1.0, and a cell sent N seconds later
as having weight F^-N. This way, we would never need to re-scale
any already-sent cells.
To prevent double from overflowing, we could count a cell sent now as
having weight 1.0 and a cell sent N seconds ago as having weight F^N.
This, however, would mean we'd need to re-scale *ALL* old circuits every
time we wanted to send a cell.
So as a compromise, we divide time into 'ticks' (currently, 10-second
increments) and say that a cell sent at the start of a current tick is
worth 1.0, a cell sent N seconds before the start of the current tick is
worth F^N, and a cell sent N seconds after the start of the current tick is
worth F^-N. This way we don't overflow, and we don't need to constantly
rescale.
*/
/** How long does a tick last (seconds)? */
#define EWMA_TICK_LEN 10
/** The default per-tick scale factor, if it hasn't been overridden by a
* consensus or a configuration setting. */
#define EWMA_DEFAULT_SCALE_FACTOR 0.9
/** Given a timeval 'now', compute the cell_ewma tick in which it occurs
* and the fraction of the tick that has elapsed before
*
* These tick values are not meant to be shared between Tor instances, or used
* for other purposes. */
static unsigned
cell_ewma_tick_from_timeval(const struct timeval *now,
double *remainder_out)
{
unsigned res = now->tv_sec / EWMA_TICK_LEN;
/* rem */
double rem = (now->tv_sec % EWMA_TICK_LEN) +
((double)(now->tv_usec)) / 1.0e6;
*remainder_out = rem / EWMA_TICK_LEN;
return res;
}
/** Compute and return the current cell_ewma tick. */
unsigned
cell_ewma_get_tick(void)
{
return ((unsigned)approx_time() / EWMA_TICK_LEN);
}
/** The per-tick scale factor to be used when computing cell-count EWMA
* values. (A cell sent N ticks before the start of the current tick
* has value ewma_scale_factor ** N.)
*
* If ewma_scale_factor is <= 0, the EWMA algorithm is disabled.
*/
static double ewma_scale_factor = EWMA_DEFAULT_SCALE_FACTOR;
/** Adjust the global cell scale factor based on <b>options</b> */
void
cell_ewma_set_scale_factor(or_options_t *options)
{
double f;
if (options->EWMAInterval > 0.0001) {
f = pow(options->EWMASignificance,
EWMA_TICK_LEN / options->EWMAInterval);
} else {
f = EWMA_DEFAULT_SCALE_FACTOR;
}
ewma_scale_factor = f;
}
/** Return the multiplier necessary to convert the value of a cell sent in
* 'from_tick' to one sent in 'to_tick'. */
static INLINE double
get_scale_factor(unsigned from_tick, unsigned to_tick)
{
/* This math can wrap around, but that's okay: unsigned overflow is
well-defined */
int diff = (int)(to_tick - from_tick);
return pow(ewma_scale_factor, diff);
}
/** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to
* <b>cur_tick</b> */
static void
scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
{
double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick);
ewma->cell_count *= factor;
ewma->last_adjusted_tick = cur_tick;
}
/** Adjust the cell count of every active circuit on <b>conn</b> so
* that they are scaled with respect to <b>cur_tick</b> */
static void
scale_active_circuits(or_connection_t *conn, unsigned cur_tick)
{
double factor = get_scale_factor(
conn->active_circuit_pqueue_last_recalibrated,
cur_tick);
/** Ordinarily it isn't okay to change the value of an element in a heap,
* but it's okay here, since we are preserving the order. */
SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, {
tor_assert(e->last_adjusted_tick ==
conn->active_circuit_pqueue_last_recalibrated);
e->cell_count *= factor;
e->last_adjusted_tick = cur_tick;
});
conn->active_circuit_pqueue_last_recalibrated = cur_tick;
}
/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to
* <b>conn</b>'s priority queue of active circuits */
static void
add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma)
{
tor_assert(ewma->heap_index == -1);
scale_single_cell_ewma(ewma,
conn->active_circuit_pqueue_last_recalibrated);
smartlist_pqueue_add(conn->active_circuit_pqueue,
compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index),
ewma);
}
/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */
static void
remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma)
{
tor_assert(ewma->heap_index != -1);
smartlist_pqueue_remove(conn->active_circuit_pqueue,
compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index),
ewma);
}
/** Remove and return the first cell_ewma_t from conn's priority queue of
* active circuits. Requires that the priority queue is nonempty. */
static cell_ewma_t *
pop_first_cell_ewma_from_conn(or_connection_t *conn)
{
return smartlist_pqueue_pop(conn->active_circuit_pqueue,
compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index));
}
/** Add <b>circ</b> to the list of circuits with pending cells on
* <b>conn</b>. No effect if <b>circ</b> is already linked. */
void
@ -1765,6 +1953,8 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
return;
}
assert_active_circuits_ok_paranoid(conn);
if (! conn->active_circuits) {
conn->active_circuits = circ;
*prevp = *nextp = circ;
@ -1776,6 +1966,15 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
*prev_circ_on_conn_p(head, conn) = circ;
*prevp = old_tail;
}
if (circ->n_conn == conn) {
add_cell_ewma_to_conn(conn, &circ->n_cell_ewma);
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn);
add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma);
}
assert_active_circuits_ok_paranoid(conn);
}
@ -1793,6 +1992,8 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
return;
}
assert_active_circuits_ok_paranoid(conn);
tor_assert(next && prev);
tor_assert(*prev_circ_on_conn_p(next, conn) == circ);
tor_assert(*next_circ_on_conn_p(prev, conn) == circ);
@ -1806,6 +2007,15 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
conn->active_circuits = next;
}
*prevp = *nextp = NULL;
if (circ->n_conn == conn) {
remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma);
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn);
remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma);
}
assert_active_circuits_ok_paranoid(conn);
}
@ -1825,6 +2035,10 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
cur = next;
} while (cur != head);
orconn->active_circuits = NULL;
SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e,
e->heap_index = -1);
smartlist_clear(orconn->active_circuit_pqueue);
}
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
@ -1884,92 +2098,27 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
/* The EWMA cell counter for the circuit we're flushing. */
cell_ewma_t *cell_ewma = NULL;
/* The global cell EWMA parameters. The algorithm is parameterized by
* two values (type double):
*
* "significance" (between 0 and 1) and "interval"
*
* The cell count is weighted so that the most recent "interval"
* seconds will account for "significance" of the weight.
*
* If "interval" is set to 0, it disables the algorithm, and the old
* algorithm (round-robin) is used.
*
* These parameters should really be set by the consensus, but can be
* overridden by the torrc (in which case the options values will be
* >= 0.0).
*/
static double cell_ewma_significance = 0.9;
static double cell_ewma_interval = 10.0;
double significance_override = get_options()->EWMASignificance;
double interval_override = get_options()->EWMAInterval;
if (significance_override >= 0.0) {
cell_ewma_significance = significance_override;
}
if (interval_override >= 0.0) {
cell_ewma_interval = interval_override;
}
double ewma_increment = -1;
circ = conn->active_circuits;
if (!circ) return 0;
assert_active_circuits_ok_paranoid(conn);
/* See if we're doing the ewma circuit selection algorithm. */
if (cell_ewma_interval > 0.0) {
/* Is there another circuit we might like better? */
circuit_t *circ_iter, *circ_start;
circuit_t *circ_min_cell_count = NULL;
double min_cell_count = 0.0;
if (ewma_scale_factor > 0.0) {
unsigned tick;
double fractional_tick;
tor_gettimeofday_cached(&now_hires);
tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
/* Start with circ, and go around the circular linked list */
circ_start = circ_iter = circ;
do {
double delta_t;
/* Find the appropriate EWMA cell counter to use. */
if (circ_iter->n_conn == conn) {
cell_ewma = &(circ_iter->n_cell_ewma);
} else {
cell_ewma = &(TO_OR_CIRCUIT(circ_iter)->p_cell_ewma);
}
/* Update the EWMA cell counter to account for the passage of time. */
delta_t = (double)(now_hires.tv_sec -
cell_ewma->last_cell_time.tv_sec);
delta_t += ((double)(now_hires.tv_usec -
cell_ewma->last_cell_time.tv_usec)) / 1000000.0;
if (delta_t > 0.0) {
cell_ewma->cell_count *=
pow(cell_ewma_significance, delta_t / cell_ewma_interval);
//printf("cc: %f ", cell_ewma->cell_count);
}
cell_ewma->last_cell_time = now_hires;
/* Now keep track of the lowest cell count we've seen. */
if (circ_min_cell_count == NULL ||
cell_ewma->cell_count < min_cell_count) {
min_cell_count = cell_ewma->cell_count;
circ_min_cell_count = circ_iter;
}
circ_iter = *next_circ_on_conn_p(circ_iter, conn);
} while (circ_iter != circ_start);
/* OK, we've gone all the way around. Let's use the circ with the
* lowest (recent) cell count. */
circ = circ_min_cell_count;
/* Now set the appropriate EWMA cell counter to use below to add the
* cells we actually send. */
if (circ_min_cell_count->n_conn == conn) {
cell_ewma = &(circ_min_cell_count->n_cell_ewma);
} else {
cell_ewma = &(TO_OR_CIRCUIT(circ_min_cell_count)->p_cell_ewma);
if (tick != conn->active_circuit_pqueue_last_recalibrated) {
scale_active_circuits(conn, tick);
}
ewma_increment = pow(ewma_scale_factor, -fractional_tick);
cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0);
circ = cell_ewma_to_circuit(cell_ewma);
}
if (circ->n_conn == conn) {
@ -2028,7 +2177,14 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
packed_cell_free_unchecked(cell);
++n_flushed;
if (cell_ewma) {
cell_ewma->cell_count += 1.0;
cell_ewma_t *tmp;
cell_ewma->cell_count += ewma_increment;
/* We pop and re-add the cell_ewma_t here, not above, since we need to
* re-add it immediately to keep the priority queue consistent with
* the linked-list implementation */
tmp = pop_first_cell_ewma_from_conn(conn);
tor_assert(tmp == cell_ewma);
add_cell_ewma_to_conn(conn, cell_ewma);
}
if (circ != conn->active_circuits) {
/* If this happens, the current circuit just got made inactive by
@ -2049,7 +2205,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */
/* Did we just ran out of cells on this queue? */
/* Did we just ran out of cells on this circuit's queue? */
if (queue->n == 0) {
log_debug(LD_GENERAL, "Made a circuit inactive.");
make_circuit_inactive_on_conn(circ, conn);
@ -2172,16 +2328,31 @@ assert_active_circuits_ok(or_connection_t *orconn)
{
circuit_t *head = orconn->active_circuits;
circuit_t *cur = head;
int n = 0;
if (! head)
return;
do {
circuit_t *next = *next_circ_on_conn_p(cur, orconn);
circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
cell_ewma_t *ewma;
tor_assert(next);
tor_assert(prev);
tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
if (orconn == cur->n_conn) {
ewma = &cur->n_cell_ewma;
tor_assert(!ewma->is_for_p_conn);
} else {
ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
tor_assert(ewma->is_for_p_conn);
}
tor_assert(ewma->heap_index != -1);
tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue,
ewma->heap_index));
n++;
cur = next;
} while (cur != head);
tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
}