diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index eb8c90f460..0dff873cf6 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -386,8 +386,10 @@ init_circuit_base(circuit_t *circ) circ->deliver_window = CIRCWINDOW_START; /* Initialize the cell_ewma_t structure */ - circ->n_cell_ewma.last_cell_time = circ->highres_created; + circ->n_cell_ewma.last_adjusted_tick = cell_ewma_get_tick(); circ->n_cell_ewma.cell_count = 0.0; + circ->n_cell_ewma.heap_index = -1; + circ->n_cell_ewma.is_for_p_conn = 0; circuit_add(circ); } @@ -438,11 +440,13 @@ or_circuit_new(circid_t p_circ_id, or_connection_t *p_conn) /* Initialize the cell_ewma_t structure */ - /* Fetch the timeval that init_circuit_base filled in. */ - circ->p_cell_ewma.last_cell_time = TO_CIRCUIT(circ)->highres_created; - /* Initialize the cell counts to 0 */ circ->p_cell_ewma.cell_count = 0.0; + circ->p_cell_ewma.last_adjusted_tick = cell_ewma_get_tick(); + circ->p_cell_ewma.is_for_p_conn = 1; + + /* It's not in any heap yet. */ + circ->p_cell_ewma.heap_index = -1; return circ; } diff --git a/src/or/config.c b/src/or/config.c index fe5fe9f7ee..a22ec4b13d 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -1379,6 +1379,9 @@ options_act(or_options_t *old_options) if (accounting_is_enabled(options)) configure_accounting(time(NULL)); + /* Change the cell EWMA settings */ + cell_ewma_set_scale_factor(options); + /* Check for transitions that need action. */ if (old_options) { if (options->UseEntryGuards && !old_options->UseEntryGuards) { diff --git a/src/or/connection.c b/src/or/connection.c index 409884b920..a95850b9e5 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -180,6 +180,9 @@ or_connection_new(int socket_family) or_conn->timestamp_last_added_nonpadding = time(NULL); or_conn->next_circ_id = crypto_rand_int(1<<15); + or_conn->active_circuit_pqueue = smartlist_create(); + or_conn->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick(); + return or_conn; } @@ -375,6 +378,7 @@ _connection_free(connection_t *conn) or_conn->tls = NULL; or_handshake_state_free(or_conn->handshake_state); or_conn->handshake_state = NULL; + smartlist_free(or_conn->active_circuit_pqueue); tor_free(or_conn->nickname); } if (CONN_IS_EDGE(conn)) { diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 0c32eeff0d..55e7956011 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -80,7 +80,6 @@ connection_or_clear_identity_map(void) } }); - digestmap_free(orconn_identity_map, NULL); orconn_identity_map = NULL; } diff --git a/src/or/or.h b/src/or/or.h index 003d0ebdad..8be84910f1 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1075,6 +1075,17 @@ typedef struct or_connection_t { * free up on this connection's outbuf. Every time we pull cells from a * circuit, we advance this pointer to the next circuit in the ring. */ struct circuit_t *active_circuits; + /** Priority queue of cell_ewma_t for circuits with queued cells waiting for + * room to free up on this connection's outbuf. Kept in heap order + * according to EWMA. + * + * This is redundant with active_circuits; if we ever decide only to use the + * cell_ewma algorithm for choosing circuits, we can remove active_circuits. + */ + smartlist_t *active_circuit_pqueue; + /** The tick on which the ciell_ewma_t's in active_circuit_pqueue last had + * their ewma values rescaled. */ + unsigned active_circuit_pqueue_last_recalibrated; struct or_connection_t *next_with_same_id; /**< Next connection with same * identity digest as this one. */ } or_connection_t; @@ -1994,16 +2005,25 @@ typedef struct { /** * The cell_ewma_t structure keeps track of how many cells a circuit has - * transferred recently. It keeps an EWMA (exponentially weighted - * moving average) of the number of cells flushed in - * connection_or_flush_from_first_active_circuit(). + * transferred recently. It keeps an EWMA (exponentially weighted moving + * average) of the number of cells flushed from the circuit queue onto a + * connection in connection_or_flush_from_first_active_circuit(). */ - typedef struct { - /** The last time a cell was flushed from this circuit. */ - struct timeval last_cell_time; - /** The EWMA of the cell count. */ - double cell_count; + /** The last 'tick' at which we recalibrated cell_count. + * + * A cell sent at exactly the start of this tick has weight 1.0. Cells sent + * since the start of this tick have weight greater than 1.0; ones sent + * earlier have less weight. */ + unsigned last_adjusted_tick; + /** The EWMA of the cell count. */ + double cell_count; + /** True iff this is a the cell count for a circuit's previous + * connection. */ + unsigned int is_for_p_conn : 1; + /** The position of the circuit within the or connection's priority + * queue. */ + int heap_index; } cell_ewma_t; #define ORIGIN_CIRCUIT_MAGIC 0x35315243u @@ -2097,7 +2117,8 @@ typedef struct circuit_t { uint64_t dirreq_id; /** The EWMA count for the number of cells flushed from the - * n_conn_cells queue. */ + * n_conn_cells queue. Used to determine which circuit to flush from next. + */ cell_ewma_t n_cell_ewma; } circuit_t; @@ -4469,6 +4490,8 @@ int append_address_to_payload(char *payload_out, const tor_addr_t *addr); const char *decode_address_from_payload(tor_addr_t *addr_out, const char *payload, int payload_len); +unsigned cell_ewma_get_tick(void); +void cell_ewma_set_scale_factor(or_options_t *options); /********************************* rephist.c ***************************/ diff --git a/src/or/relay.c b/src/or/relay.c index 9b7396afac..19bb6871ce 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -1752,6 +1752,194 @@ prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) } } +/** Helper for sorting cell_ewma_t values in their priority queue. */ +static int +compare_cell_ewma_counts(const void *p1, const void *p2) +{ + const cell_ewma_t *e1=p1, *e2=p2; + if (e1->cell_count < e2->cell_count) + return -1; + else if (e1->cell_count > e2->cell_count) + return 1; + else + return 0; +} + +/** Given a cell_ewma_t, return a pointer to the circuit containing it. */ +static circuit_t * +cell_ewma_to_circuit(cell_ewma_t *ewma) +{ + if (ewma->is_for_p_conn) { + /* This is an or_circuit_t's p_cell_ewma. */ + or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma); + return TO_CIRCUIT(orcirc); + } else { + /* This is some circuit's n_cell_ewma. */ + return SUBTYPE_P(ewma, circuit_t, n_cell_ewma); + } +} + +/* ==== Functions for scaling cell_ewma_t ==== + + When choosing which cells to relay first, we favor circuits that have been + quiet recently. This gives better latency on connections that aren't + pushing lots of data, and makes the network feel more interactive. + + Conceptually, we take an exponentially weighted mean average of the number + of cells a circuit has sent, and allow active circuits (those with cells to + relay) to send cells in order of their exponentially-weighted mean average + (EWMA) cell count. [That is, a cell sent N seconds ago 'counts' F^N times + as much as a cell sent now, for 0tv_sec / EWMA_TICK_LEN; + /* rem */ + double rem = (now->tv_sec % EWMA_TICK_LEN) + + ((double)(now->tv_usec)) / 1.0e6; + *remainder_out = rem / EWMA_TICK_LEN; + return res; +} + +/** Compute and return the current cell_ewma tick. */ +unsigned +cell_ewma_get_tick(void) +{ + return ((unsigned)approx_time() / EWMA_TICK_LEN); +} + +/** The per-tick scale factor to be used when computing cell-count EWMA + * values. (A cell sent N ticks before the start of the current tick + * has value ewma_scale_factor ** N.) + * + * If ewma_scale_factor is <= 0, the EWMA algorithm is disabled. + */ +static double ewma_scale_factor = EWMA_DEFAULT_SCALE_FACTOR; + +/** Adjust the global cell scale factor based on options */ +void +cell_ewma_set_scale_factor(or_options_t *options) +{ + double f; + if (options->EWMAInterval > 0.0001) { + f = pow(options->EWMASignificance, + EWMA_TICK_LEN / options->EWMAInterval); + } else { + f = EWMA_DEFAULT_SCALE_FACTOR; + } + + ewma_scale_factor = f; +} + +/** Return the multiplier necessary to convert the value of a cell sent in + * 'from_tick' to one sent in 'to_tick'. */ +static INLINE double +get_scale_factor(unsigned from_tick, unsigned to_tick) +{ + /* This math can wrap around, but that's okay: unsigned overflow is + well-defined */ + int diff = (int)(to_tick - from_tick); + return pow(ewma_scale_factor, diff); +} + +/** Adjust the cell count of ewma so that it is scaled with respect to + * cur_tick */ +static void +scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick) +{ + double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick); + ewma->cell_count *= factor; + ewma->last_adjusted_tick = cur_tick; +} + +/** Adjust the cell count of every active circuit on conn so + * that they are scaled with respect to cur_tick */ +static void +scale_active_circuits(or_connection_t *conn, unsigned cur_tick) +{ + + double factor = get_scale_factor( + conn->active_circuit_pqueue_last_recalibrated, + cur_tick); + /** Ordinarily it isn't okay to change the value of an element in a heap, + * but it's okay here, since we are preserving the order. */ + SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, { + tor_assert(e->last_adjusted_tick == + conn->active_circuit_pqueue_last_recalibrated); + e->cell_count *= factor; + e->last_adjusted_tick = cur_tick; + }); + conn->active_circuit_pqueue_last_recalibrated = cur_tick; +} + +/** Rescale ewma to the same scale as conn, and add it to + * conn's priority queue of active circuits */ +static void +add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma) +{ + tor_assert(ewma->heap_index == -1); + scale_single_cell_ewma(ewma, + conn->active_circuit_pqueue_last_recalibrated); + + smartlist_pqueue_add(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index), + ewma); +} + +/** Remove ewma from conn's priority queue of active circuits */ +static void +remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma) +{ + tor_assert(ewma->heap_index != -1); + smartlist_pqueue_remove(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index), + ewma); +} + +/** Remove and return the first cell_ewma_t from conn's priority queue of + * active circuits. Requires that the priority queue is nonempty. */ +static cell_ewma_t * +pop_first_cell_ewma_from_conn(or_connection_t *conn) +{ + return smartlist_pqueue_pop(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index)); +} + /** Add circ to the list of circuits with pending cells on * conn. No effect if circ is already linked. */ void @@ -1765,6 +1953,8 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) return; } + assert_active_circuits_ok_paranoid(conn); + if (! conn->active_circuits) { conn->active_circuits = circ; *prevp = *nextp = circ; @@ -1776,6 +1966,15 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) *prev_circ_on_conn_p(head, conn) = circ; *prevp = old_tail; } + + if (circ->n_conn == conn) { + add_cell_ewma_to_conn(conn, &circ->n_cell_ewma); + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + tor_assert(conn == orcirc->p_conn); + add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma); + } + assert_active_circuits_ok_paranoid(conn); } @@ -1793,6 +1992,8 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) return; } + assert_active_circuits_ok_paranoid(conn); + tor_assert(next && prev); tor_assert(*prev_circ_on_conn_p(next, conn) == circ); tor_assert(*next_circ_on_conn_p(prev, conn) == circ); @@ -1806,6 +2007,15 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) conn->active_circuits = next; } *prevp = *nextp = NULL; + + if (circ->n_conn == conn) { + remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma); + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + tor_assert(conn == orcirc->p_conn); + remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma); + } + assert_active_circuits_ok_paranoid(conn); } @@ -1825,6 +2035,10 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn) cur = next; } while (cur != head); orconn->active_circuits = NULL; + + SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e, + e->heap_index = -1); + smartlist_clear(orconn->active_circuit_pqueue); } /** Block (if block is true) or unblock (if block is false) @@ -1884,92 +2098,27 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, /* The EWMA cell counter for the circuit we're flushing. */ cell_ewma_t *cell_ewma = NULL; - - /* The global cell EWMA parameters. The algorithm is parameterized by - * two values (type double): - * - * "significance" (between 0 and 1) and "interval" - * - * The cell count is weighted so that the most recent "interval" - * seconds will account for "significance" of the weight. - * - * If "interval" is set to 0, it disables the algorithm, and the old - * algorithm (round-robin) is used. - * - * These parameters should really be set by the consensus, but can be - * overridden by the torrc (in which case the options values will be - * >= 0.0). - */ - static double cell_ewma_significance = 0.9; - static double cell_ewma_interval = 10.0; - - double significance_override = get_options()->EWMASignificance; - double interval_override = get_options()->EWMAInterval; - if (significance_override >= 0.0) { - cell_ewma_significance = significance_override; - } - if (interval_override >= 0.0) { - cell_ewma_interval = interval_override; - } + double ewma_increment = -1; circ = conn->active_circuits; if (!circ) return 0; assert_active_circuits_ok_paranoid(conn); /* See if we're doing the ewma circuit selection algorithm. */ - if (cell_ewma_interval > 0.0) { - /* Is there another circuit we might like better? */ - circuit_t *circ_iter, *circ_start; - circuit_t *circ_min_cell_count = NULL; - double min_cell_count = 0.0; + if (ewma_scale_factor > 0.0) { + unsigned tick; + double fractional_tick; tor_gettimeofday_cached(&now_hires); + tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick); - /* Start with circ, and go around the circular linked list */ - circ_start = circ_iter = circ; - do { - double delta_t; - - /* Find the appropriate EWMA cell counter to use. */ - if (circ_iter->n_conn == conn) { - cell_ewma = &(circ_iter->n_cell_ewma); - } else { - cell_ewma = &(TO_OR_CIRCUIT(circ_iter)->p_cell_ewma); - } - - /* Update the EWMA cell counter to account for the passage of time. */ - delta_t = (double)(now_hires.tv_sec - - cell_ewma->last_cell_time.tv_sec); - delta_t += ((double)(now_hires.tv_usec - - cell_ewma->last_cell_time.tv_usec)) / 1000000.0; - - if (delta_t > 0.0) { - cell_ewma->cell_count *= - pow(cell_ewma_significance, delta_t / cell_ewma_interval); - //printf("cc: %f ", cell_ewma->cell_count); - } - cell_ewma->last_cell_time = now_hires; - - /* Now keep track of the lowest cell count we've seen. */ - if (circ_min_cell_count == NULL || - cell_ewma->cell_count < min_cell_count) { - min_cell_count = cell_ewma->cell_count; - circ_min_cell_count = circ_iter; - } - - circ_iter = *next_circ_on_conn_p(circ_iter, conn); - } while (circ_iter != circ_start); - - /* OK, we've gone all the way around. Let's use the circ with the - * lowest (recent) cell count. */ - circ = circ_min_cell_count; - - /* Now set the appropriate EWMA cell counter to use below to add the - * cells we actually send. */ - if (circ_min_cell_count->n_conn == conn) { - cell_ewma = &(circ_min_cell_count->n_cell_ewma); - } else { - cell_ewma = &(TO_OR_CIRCUIT(circ_min_cell_count)->p_cell_ewma); + if (tick != conn->active_circuit_pqueue_last_recalibrated) { + scale_active_circuits(conn, tick); } + + ewma_increment = pow(ewma_scale_factor, -fractional_tick); + + cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0); + circ = cell_ewma_to_circuit(cell_ewma); } if (circ->n_conn == conn) { @@ -2028,7 +2177,14 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, packed_cell_free_unchecked(cell); ++n_flushed; if (cell_ewma) { - cell_ewma->cell_count += 1.0; + cell_ewma_t *tmp; + cell_ewma->cell_count += ewma_increment; + /* We pop and re-add the cell_ewma_t here, not above, since we need to + * re-add it immediately to keep the priority queue consistent with + * the linked-list implementation */ + tmp = pop_first_cell_ewma_from_conn(conn); + tor_assert(tmp == cell_ewma); + add_cell_ewma_to_conn(conn, cell_ewma); } if (circ != conn->active_circuits) { /* If this happens, the current circuit just got made inactive by @@ -2049,7 +2205,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */ - /* Did we just ran out of cells on this queue? */ + /* Did we just ran out of cells on this circuit's queue? */ if (queue->n == 0) { log_debug(LD_GENERAL, "Made a circuit inactive."); make_circuit_inactive_on_conn(circ, conn); @@ -2172,16 +2328,31 @@ assert_active_circuits_ok(or_connection_t *orconn) { circuit_t *head = orconn->active_circuits; circuit_t *cur = head; + int n = 0; if (! head) return; do { circuit_t *next = *next_circ_on_conn_p(cur, orconn); circuit_t *prev = *prev_circ_on_conn_p(cur, orconn); + cell_ewma_t *ewma; tor_assert(next); tor_assert(prev); tor_assert(*next_circ_on_conn_p(prev, orconn) == cur); tor_assert(*prev_circ_on_conn_p(next, orconn) == cur); + if (orconn == cur->n_conn) { + ewma = &cur->n_cell_ewma; + tor_assert(!ewma->is_for_p_conn); + } else { + ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma; + tor_assert(ewma->is_for_p_conn); + } + tor_assert(ewma->heap_index != -1); + tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue, + ewma->heap_index)); + n++; cur = next; } while (cur != head); + + tor_assert(n == smartlist_len(orconn->active_circuit_pqueue)); }