diff --git a/src/or/channel.c b/src/or/channel.c index 334f843ebd..8241556b57 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -19,6 +19,7 @@ #include "circuitbuild.h" #include "circuitlist.h" #include "connection_or.h" /* For var_cell_free() */ +#include "circuitmux.h" #include "geoip.h" #include "nodelist.h" #include "relay.h" @@ -813,9 +814,10 @@ channel_free(channel_t *chan) channel_clear_remote_end(chan); - if (chan->active_circuit_pqueue) { - smartlist_free(chan->active_circuit_pqueue); - chan->active_circuit_pqueue = NULL; + if (chan->cmux) { + circuitmux_detach_all_circuits(chan->cmux); + circuitmux_free(chan->cmux); + chan->cmux = NULL; } /* We're in CLOSED or ERROR, so the cell queue is already empty */ @@ -866,7 +868,6 @@ channel_force_free(channel_t *chan) if (chan->free) chan->free(chan); channel_clear_remote_end(chan); - smartlist_free(chan->active_circuit_pqueue); /* We might still have a cell queue; kill it */ if (chan->incoming_queue) { @@ -2031,12 +2032,13 @@ channel_flush_some_cells(channel_t *chan, ssize_t num_cells) (unlimited ? -1 : num_cells - flushed)); if (!unlimited && num_cells <= flushed) goto done; - if (chan->active_circuits) { + if (circuitmux_num_cells(chan->cmux) > 0) { /* Try to get more cells from any active circuits */ - num_cells_from_circs = - channel_flush_from_first_active_circuit(chan, - (unlimited ? MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED : - (num_cells - flushed))); + num_cells_from_circs = channel_flush_from_first_active_circuit( + chan, + (unlimited ? + MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED : + (num_cells - flushed))); /* If it claims we got some, process the queue again */ if (num_cells_from_circs > 0) { @@ -2227,7 +2229,7 @@ channel_more_to_flush(channel_t *chan) smartlist_len(chan->incoming_queue) > 0) return 1; /* Check if any circuits would like to queue some */ - if (chan->active_circuits) return 1; + if (circuitmux_num_cells(chan->cmux) > 0) return 1; /* Else no */ return 0; @@ -2935,8 +2937,8 @@ channel_is_better(time_t now, channel_t *a, channel_t *b, * one that has no circuits is in its grace period. */ - a_has_circs = (a->n_circuits > 0); - b_has_circs = (b->n_circuits > 0); + a_has_circs = (channel_num_circuits(a) > 0); + b_has_circs = (channel_num_circuits(b) > 0); a_grace = (forgive_new_connections && (now < channel_when_created(a) + NEW_CHAN_GRACE_PERIOD)); b_grace = (forgive_new_connections && @@ -3223,9 +3225,10 @@ channel_dump_statistics(channel_t *chan, int severity) " * Channel " U64_FORMAT " has %d active circuits out of" " %d in total", U64_PRINTF_ARG(chan->global_identifier), - (chan->active_circuit_pqueue != NULL) ? - smartlist_len(chan->active_circuit_pqueue) : 0, - chan->n_circuits); + (chan->cmux != NULL) ? + circuitmux_num_active_circuits(chan->cmux) : 0, + (chan->cmux != NULL) ? + circuitmux_num_circuits(chan->cmux) : 0); /* Describe timestamps */ log(severity, LD_GENERAL, @@ -4007,6 +4010,22 @@ channel_matches_target_addr_for_extend(channel_t *chan, return chan->matches_target(chan, target); } +/** + * Return the total number of circuits used by a channel + * + * @param chan Channel to query + * @return Number of circuits using this as n_chan or p_chan + */ + +unsigned int +channel_num_circuits(channel_t *chan) +{ + tor_assert(chan); + + return chan->num_n_circuits + + chan->num_p_circuits; +} + /** * Set up circuit ID generation * diff --git a/src/or/channel.h b/src/or/channel.h index 27fba8fc00..4d3db41cef 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -10,6 +10,7 @@ #define _TOR_CHANNEL_H #include "or.h" +#include "circuitmux.h" /* Channel handler function pointer typedefs */ typedef void (*channel_listener_fn_ptr)(channel_listener_t *, channel_t *); @@ -99,7 +100,7 @@ struct channel_s { int (*matches_target)(channel_t *, const tor_addr_t *); /* Write a cell to an open channel */ int (*write_cell)(channel_t *, cell_t *); - /* Write a packed cell to an open channel */ + /* Write a packed cell to an open channel */ int (*write_packed_cell)(channel_t *, packed_cell_t *); /* Write a variable-length cell to an open channel */ int (*write_var_cell)(channel_t *, var_cell_t *); @@ -124,29 +125,8 @@ struct channel_s { /* List of queued outgoing cells */ smartlist_t *outgoing_queue; - /* Circuit stuff for use by relay.c */ - - /* - * Double-linked ring of circuits with queued cells waiting for room to - * free up on this connection's outbuf. Every time we pull cells from - * a circuit, we advance this pointer to the next circuit in the ring. - */ - struct circuit_t *active_circuits; - /* - * Priority queue of cell_ewma_t for circuits with queued cells waiting - * for room to free up on this connection's outbuf. Kept in heap order - * according to EWMA. - * - * This is redundant with active_circuits; if we ever decide only to use - * the cell_ewma algorithm for choosing circuits, we can remove - * active_circuits. - */ - smartlist_t *active_circuit_pqueue; - /* - * The tick on which the cell_ewma_ts in active_circuit_pqueue last had - * their ewma values rescaled. - */ - unsigned active_circuit_pqueue_last_recalibrated; + /* Circuit mux for circuits sending on this channel */ + circuitmux_t *cmux; /* Circuit ID generation stuff for use by circuitbuild.c */ @@ -161,8 +141,8 @@ struct channel_s { */ circid_t next_circ_id; - /* How many circuits use this connection as p_chan or n_chan? */ - int n_circuits; + /* For how many circuits are we n_chan? What about p_chan? */ + unsigned int num_n_circuits, num_p_circuits; /* * True iff this channel shouldn't get any new circs attached to it, @@ -456,6 +436,7 @@ void channel_mark_client(channel_t *chan); int channel_matches_extend_info(channel_t *chan, extend_info_t *extend_info); int channel_matches_target_addr_for_extend(channel_t *chan, const tor_addr_t *target); +unsigned int channel_num_circuits(channel_t *chan); void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd); void channel_timestamp_client(channel_t *chan); diff --git a/src/or/channeltls.c b/src/or/channeltls.c index 5d6a7a912f..036d14f3e4 100644 --- a/src/or/channeltls.c +++ b/src/or/channeltls.c @@ -16,6 +16,7 @@ #include "or.h" #include "channel.h" #include "channeltls.h" +#include "circuitmux.h" #include "config.h" #include "connection.h" #include "connection_or.h" @@ -127,8 +128,11 @@ channel_tls_connect(const tor_addr_t *addr, uint16_t port, if (is_local_addr(addr)) channel_mark_local(chan); channel_mark_outgoing(chan); - chan->active_circuit_pqueue = smartlist_new(); - chan->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick(); + chan->cmux = circuitmux_alloc(); + /* TODO get rid of this and set policy once we have them + chan->cmux->active_circuit_pqueue_last_recalibrated = + cell_ewma_get_tick(); + */ /* Set up or_connection stuff */ tlschan->conn = connection_or_connect(addr, port, id_digest, tlschan); @@ -146,7 +150,7 @@ channel_tls_connect(const tor_addr_t *addr, uint16_t port, goto done; err: - smartlist_free(chan->active_circuit_pqueue); + circuitmux_free(chan->cmux); tor_free(tlschan); chan = NULL; @@ -260,8 +264,11 @@ channel_tls_handle_incoming(or_connection_t *orconn) if (is_local_addr(&(TO_CONN(orconn)->addr))) channel_mark_local(chan); channel_mark_incoming(chan); - chan->active_circuit_pqueue = smartlist_new(); - chan->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick(); + chan->cmux = circuitmux_alloc(); + /* TODO set cmux policy + chan->active_circuit_pqueue_last_recalibrated = + cell_ewma_get_tick(); + */ /* If we got one, we should register it */ if (chan) channel_register(chan); diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index cf6020de06..bec3dc8175 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -134,10 +134,20 @@ circuit_set_circid_chan_helper(circuit_t *circ, int direction, found = HT_REMOVE(chan_circid_map, &chan_circid_map, &search); if (found) { tor_free(found); - --old_chan->n_circuits; + if (direction == CELL_DIRECTION_OUT) { + /* One fewer circuits use old_chan as n_chan */ + --(old_chan->num_n_circuits); + } else { + /* One fewer circuits use old_chan as p_chan */ + --(old_chan->num_p_circuits); + } + } + + /* If we're changing channels, detach the circuit */ + if (old_chan != chan) { + tor_assert(old_chan->cmux); + circuitmux_detach_circuit(old_chan->cmux, circ); } - if (was_active && old_chan != chan) - make_circuit_inactive_on_chan(circ, old_chan); } /* Change the values only after we have possibly made the circuit inactive @@ -161,10 +171,26 @@ circuit_set_circid_chan_helper(circuit_t *circ, int direction, found->circuit = circ; HT_INSERT(chan_circid_map, &chan_circid_map, found); } - if (make_active && old_chan != chan) - make_circuit_active_on_chan(circ,chan); - ++chan->n_circuits; + /* Attach to the circuitmux if we're changing channels */ + if (old_chan != chan) { + tor_assert(chan->cmux); + circuitmux_attach_circuit(chan->cmux, circ, direction); + } + + /* + * This is a no-op if we have no cells, but if we do it marks us active to + * the circuitmux + */ + if (make_active && old_chan != chan) + update_circuit_on_cmux(circ, direction); + + /* Adjust circuit counts on new channel */ + if (direction == CELL_DIRECTION_OUT) { + ++chan->num_n_circuits; + } else { + ++chan->num_p_circuits; + } } /** Set the p_conn field of a circuit circ, along @@ -994,7 +1020,7 @@ circuit_unlink_all_from_channel(channel_t *chan, int reason) { circuit_t *circ; - channel_unlink_all_active_circs(chan); + channel_unlink_all_circuits(chan); for (circ = global_circuitlist; circ; circ = circ->next) { int mark = 0; diff --git a/src/or/connection_or.c b/src/or/connection_or.c index bf69711691..f143e9b76b 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -336,7 +336,7 @@ connection_or_get_num_circuits(or_connection_t *conn) tor_assert(conn); if (conn->chan) { - return TLS_CHAN_TO_BASE(conn->chan)->n_circuits; + return channel_num_circuits(TLS_CHAN_TO_BASE(conn->chan)); } else return 0; } diff --git a/src/or/or.h b/src/or/or.h index 87ee7bb7f4..a9b036171a 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -2634,8 +2634,6 @@ typedef struct circuit_t { uint32_t magic; /**< For memory and type debugging: must equal * ORIGIN_CIRCUIT_MAGIC or OR_CIRCUIT_MAGIC. */ - /** Queue of cells waiting to be transmitted on n_conn. */ - cell_queue_t n_chan_cells; /** The channel that is next in this circuit. */ channel_t *n_chan; @@ -2643,13 +2641,36 @@ typedef struct circuit_t { * The circuit_id used in the next (forward) hop of this circuit; * this is unique to n_chan, but this ordered pair is globally * unique: +<<<<<<< HEAD * +======= + * +>>>>>>> f1e8169... Use circuitmux_t in channels and when relaying cells * (n_chan->global_identifier, n_circ_id) */ circid_t n_circ_id; - /** The hop to which we want to extend this circuit. Should be NULL if - * the circuit has attached to a connection. */ + /** + * Circuit mux associated with n_chan to which this circuit is attached; + * NULL if we have no n_chan. + */ + circuitmux_t *mux; + + /** Queue of cells waiting to be transmitted on n_chan */ + cell_queue_t n_chan_cells; + + /** + * The hop to which we want to extend this circuit. Should be NULL if + * the circuit has attached to a connection. + * + * TODO: + * - If this is NULL, we have extended. Is it true that if this is + * NULL then n_chan is not NULL? + * - If n_chan is NULL, then what is n_circ_id? + * - It doesn't matter, because we'll only ever attach to a circuitmux_t + * when n_chan is not NULL, and that's what needs to use a unique ID + * for circuits. + */ extend_info_t *n_hop; /** True iff we are waiting for n_chan_cells to become less full before @@ -2701,6 +2722,15 @@ typedef struct circuit_t { const char *marked_for_close_file; /**< For debugging: in which file was this * circuit marked for close? */ + /** Unique ID for measuring tunneled network status requests. */ + uint64_t dirreq_id; + + /** TODO is this *all* circuits or all circuits on n_chan? */ + struct circuit_t *next; /**< Next circuit in linked list of all circuits. */ + + /** TODO all this from here on down should go away in favor of + * circuitmux_t. + */ /** Next circuit in the doubly-linked ring of circuits waiting to add * cells to n_conn. NULL if we have no cells pending, or if we're not * linked to an OR connection. */ @@ -2709,10 +2739,6 @@ typedef struct circuit_t { * cells to n_conn. NULL if we have no cells pending, or if we're not * linked to an OR connection. */ struct circuit_t *prev_active_on_n_chan; - struct circuit_t *next; /**< Next circuit in linked list of all circuits. */ - - /** Unique ID for measuring tunneled network status requests. */ - uint64_t dirreq_id; /** The EWMA count for the number of cells flushed from the * n_chan_cells queue. Used to determine which circuit to flush from next. diff --git a/src/or/relay.c b/src/or/relay.c index 60f696cd47..f0792437c3 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -1779,10 +1779,10 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint) } #ifdef ACTIVE_CIRCUITS_PARANOIA -#define assert_active_circuits_ok_paranoid(conn) \ - assert_active_circuits_ok(conn) +#define assert_cmux_ok_paranoid(chan) \ + assert_cmux_okay(chan) #else -#define assert_active_circuits_ok_paranoid(conn) +#define assert_cmux_ok_paranoid(chan) #endif /** The total number of cells we have allocated from the memory pool. */ @@ -2004,6 +2004,7 @@ prev_circ_on_chan_p(circuit_t *circ, channel_t *chan) } } +#if 0 /** Helper for sorting cell_ewma_t values in their priority queue. */ static int compare_cell_ewma_counts(const void *p1, const void *p2) @@ -2240,122 +2241,61 @@ pop_first_cell_ewma_from_chan(channel_t *chan) compare_cell_ewma_counts, STRUCT_OFFSET(cell_ewma_t, heap_index)); } +#endif -/** Add circ to the list of circuits with pending cells on - * chan. No effect if circ is already linked. */ +/** + * Update the number of cells available on the circuit's n_chan or p_chan's + * circuit mux. + */ void -make_circuit_active_on_chan(circuit_t *circ, channel_t *chan) +update_circuit_on_cmux(circuit_t *circ, cell_direction_t direction) { - circuit_t **nextp = NULL, **prevp = NULL; + channel_t *chan = NULL; + or_circuit_t *or_circ = NULL; + circuitmux_t *cmux = NULL; - tor_assert(chan); tor_assert(circ); - nextp = next_circ_on_chan_p(circ, chan); - prevp = prev_circ_on_chan_p(circ, chan); - - if (*nextp && *prevp) { - /* Already active. */ - return; - } - - assert_active_circuits_ok_paranoid(chan); - - if (!(chan->active_circuits)) { - chan->active_circuits = circ; - *prevp = *nextp = circ; + /* Okay, get the channel */ + if (direction == CELL_DIRECTION_OUT) { + chan = circ->n_chan; } else { - circuit_t *head = chan->active_circuits; - circuit_t *old_tail = *prev_circ_on_chan_p(head, chan); - *next_circ_on_chan_p(old_tail, chan) = circ; - *nextp = head; - *prev_circ_on_chan_p(head, chan) = circ; - *prevp = old_tail; + or_circ = TO_OR_CIRCUIT(circ); + chan = or_circ->p_chan; } - if (circ->n_chan == chan) { - add_cell_ewma_to_chan(chan, &circ->n_cell_ewma); - } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - tor_assert(chan == orcirc->p_chan); - add_cell_ewma_to_chan(chan, &orcirc->p_cell_ewma); - } - - assert_active_circuits_ok_paranoid(chan); -} - -/** Remove circ from the list of circuits with pending cells on - * chan. No effect if circ is already unlinked. */ -void -make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan) -{ - circuit_t **nextp = NULL, **prevp = NULL; - circuit_t *next = NULL, *prev = NULL; - tor_assert(chan); - tor_assert(circ); + tor_assert(chan->cmux); - nextp = next_circ_on_chan_p(circ, chan); - prevp = prev_circ_on_chan_p(circ, chan); - next = *nextp; - prev = *prevp; + /* Now get the cmux */ + cmux = chan->cmux; - if (!next && !prev) { - /* Already inactive. */ - return; - } + /* Cmux sanity check */ + tor_assert(circuitmux_is_circuit_attached(cmux, circ)); + tor_assert(circuitmux_attached_circuit_direction(cmux, circ) == direction); - assert_active_circuits_ok_paranoid(chan); + assert_cmux_ok_paranoid(chan); - tor_assert(next && prev); - tor_assert(*prev_circ_on_chan_p(next, chan) == circ); - tor_assert(*next_circ_on_chan_p(prev, chan) == circ); - - if (next == circ) { - chan->active_circuits = NULL; + /* Update the number of cells we have for the circuit mux */ + if (direction == CELL_DIRECTION_OUT) { + circuitmux_set_num_cells(cmux, circ, circ->n_chan_cells.n); } else { - *prev_circ_on_chan_p(next, chan) = prev; - *next_circ_on_chan_p(prev, chan) = next; - if (chan->active_circuits == circ) - chan->active_circuits = next; - } - *prevp = *nextp = NULL; - - if (circ->n_chan == chan) { - remove_cell_ewma_from_chan(chan, &circ->n_cell_ewma); - } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - tor_assert(chan == orcirc->p_chan); - remove_cell_ewma_from_chan(chan, &orcirc->p_cell_ewma); + circuitmux_set_num_cells(cmux, circ, or_circ->p_chan_cells.n); } - assert_active_circuits_ok_paranoid(chan); + assert_cmux_ok_paranoid(chan); } -/** Remove all circuits from the list of circuits with pending cells on - * chan. */ +/** Remove all circuits from the cmux on chan. */ void -channel_unlink_all_active_circs(channel_t *chan) +channel_unlink_all_circuits(channel_t *chan) { - circuit_t *head = NULL, *cur = NULL; - tor_assert(chan); + tor_assert(chan->cmux); - cur = head = chan->active_circuits; - if (! head) - return; - do { - circuit_t *next = *next_circ_on_chan_p(cur, chan); - *prev_circ_on_chan_p(cur, chan) = NULL; - *next_circ_on_chan_p(cur, chan) = NULL; - cur = next; - } while (cur != head); - chan->active_circuits = NULL; - - SMARTLIST_FOREACH(chan->active_circuit_pqueue, - cell_ewma_t *, e, - e->heap_index = -1); - smartlist_clear(chan->active_circuit_pqueue); + circuitmux_detach_all_circuits(chan->cmux); + chan->num_n_circuits = 0; + chan->num_p_circuits = 0; } /** Block (if block is true) or unblock (if block is false) @@ -2419,53 +2359,71 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan, int channel_flush_from_first_active_circuit(channel_t *chan, int max) { - int n_flushed; + circuitmux_t *cmux = NULL; + int n_flushed = 0; cell_queue_t *queue; circuit_t *circ; + or_circuit_t *or_circ; int streams_blocked; + packed_cell_t *cell; +#if 0 /* The current (hi-res) time */ struct timeval now_hires; /* The EWMA cell counter for the circuit we're flushing. */ cell_ewma_t *cell_ewma = NULL; double ewma_increment = -1; +#endif + /* Get the cmux */ tor_assert(chan); + tor_assert(chan->cmux); + cmux = chan->cmux; - circ = chan->active_circuits; - if (!circ) return 0; - assert_active_circuits_ok_paranoid(chan); + /* Main loop: pick a circuit, send a cell, update the cmux */ + while (n_flushed < max) { + circ = circuitmux_get_first_active_circuit(cmux); + /* If it returns NULL, no cells left to send */ + if (!circ) break; + assert_cmux_ok_paranoid(chan); - /* See if we're doing the ewma circuit selection algorithm. */ - if (ewma_enabled) { - unsigned tick; - double fractional_tick; - tor_gettimeofday_cached(&now_hires); - tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick); +#if 0 + /* This will go in circuitmux_get_first_active_circuit() */ + /* See if we're doing the ewma circuit selection algorithm. */ + if (ewma_enabled) { + unsigned tick; + double fractional_tick; + tor_gettimeofday_cached(&now_hires); + tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick); - if (tick != chan->active_circuit_pqueue_last_recalibrated) { - scale_active_circuits(chan, tick); + if (tick != chan->active_circuit_pqueue_last_recalibrated) { + scale_active_circuits(chan, tick); + } + + ewma_increment = pow(ewma_scale_factor, -fractional_tick); + + cell_ewma = smartlist_get(chan->active_circuit_pqueue, 0); + circ = cell_ewma_to_circuit(cell_ewma); + } +#endif + + if (circ->n_chan == chan) { + queue = &circ->n_chan_cells; + streams_blocked = circ->streams_blocked_on_n_chan; + } else { + or_circ = TO_OR_CIRCUIT(circ); + tor_assert(or_circ->p_chan == chan); + queue = &TO_OR_CIRCUIT(circ)->p_chan_cells; + streams_blocked = circ->streams_blocked_on_p_chan; } - ewma_increment = pow(ewma_scale_factor, -fractional_tick); - - cell_ewma = smartlist_get(chan->active_circuit_pqueue, 0); - circ = cell_ewma_to_circuit(cell_ewma); - } - - if (circ->n_chan == chan) { - queue = &circ->n_chan_cells; - streams_blocked = circ->streams_blocked_on_n_chan; - } else { - queue = &TO_OR_CIRCUIT(circ)->p_chan_cells; - streams_blocked = circ->streams_blocked_on_p_chan; - } - tor_assert(*next_circ_on_chan_p(circ, chan)); - - for (n_flushed = 0; n_flushed < max && queue->head; ) { - packed_cell_t *cell = cell_queue_pop(queue); - tor_assert(*next_circ_on_chan_p(circ, chan)); + /* + * Get just one cell here; once we've sent it, that can change the circuit + * selection, so we have to loop around for another even if this circuit + * has more than one. + */ + cell = cell_queue_pop(queue); /* Calculate the exact time that this cell has spent in the queue. */ if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) { @@ -2481,8 +2439,8 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max) "Looks like the CellStatistics option was " "recently enabled."); } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); insertion_time_elem_t *elem = it_queue->first; + or_circ = TO_OR_CIRCUIT(circ); cell_waiting_time = (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L - elem->insertion_time * 10L) % @@ -2495,8 +2453,8 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max) it_queue->last = NULL; mp_pool_release(elem); } - orcirc->total_cell_waiting_time += cell_waiting_time; - orcirc->processed_cells++; + or_circ->total_cell_waiting_time += cell_waiting_time; + or_circ->processed_cells++; } } @@ -2507,14 +2465,34 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max) DIRREQ_TUNNELED, DIRREQ_CIRC_QUEUE_FLUSHED); + /* Now send the cell */ channel_write_packed_cell(chan, cell); + cell = NULL; + /* * Don't packed_cell_free_unchecked(cell) here because the channel will * do so when it gets out of the channel queue (probably already did, in * which case that was an immediate double-free bug). */ + /* Update the counter */ ++n_flushed; + + /* + * Now update the cmux; tell it we've just sent a cell, and how many + * we have left. + */ + circuitmux_notify_xmit_cells(cmux, circ, 1); + circuitmux_set_num_cells(cmux, circ, queue->n); + if (queue->n == 0) + log_debug(LD_GENERAL, "Made a circuit inactive."); + + /* Is the cell queue low enough to unblock all the streams that are waiting + * to write to this circuit? */ + if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) + set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ + +#if 0 if (cell_ewma) { cell_ewma_t *tmp; cell_ewma->cell_count += ewma_increment; @@ -2534,22 +2512,13 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max) assert_active_circuits_ok_paranoid(chan); goto done; } - } - tor_assert(*next_circ_on_chan_p(circ, chan)); - assert_active_circuits_ok_paranoid(chan); - chan->active_circuits = *next_circ_on_chan_p(circ, chan); +#endif - /* Is the cell queue low enough to unblock all the streams that are waiting - * to write to this circuit? */ - if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) - set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ - - /* Did we just run out of cells on this circuit's queue? */ - if (queue->n == 0) { - log_debug(LD_GENERAL, "Made a circuit inactive."); - make_circuit_inactive_on_chan(circ, chan); + /* If n_flushed < max still, loop around and pick another circuit */ } - done: + + /* Okay, we're done sending now */ + assert_cmux_ok_paranoid(chan); return n_flushed; } @@ -2587,11 +2556,11 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, set_streams_blocked_on_circ(circ, chan, 1, fromstream); } + update_circuit_on_cmux(circ, direction); if (queue->n == 1) { - /* This was the first cell added to the queue. We need to make this + /* This was the first cell added to the queue. We just made this * circuit active. */ log_debug(LD_GENERAL, "Made a circuit active."); - make_circuit_active_on_chan(circ, chan); } if (!channel_has_queued_writes(chan)) { @@ -2669,20 +2638,37 @@ void circuit_clear_cell_queue(circuit_t *circ, channel_t *chan) { cell_queue_t *queue; + cell_direction_t direction; + if (circ->n_chan == chan) { queue = &circ->n_chan_cells; + direction = CELL_DIRECTION_OUT; } else { or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); tor_assert(orcirc->p_chan == chan); queue = &orcirc->p_chan_cells; + direction = CELL_DIRECTION_IN; } - if (queue->n) - make_circuit_inactive_on_chan(circ, chan); - + /* Clear the queue */ cell_queue_clear(queue); + + /* Update the cell counter in the cmux */ + update_circuit_on_cmux(circ, direction); } +/** Fail with an assert if the circuit mux on chan is corrupt + */ +void +assert_circuit_mux_okay(channel_t *chan) +{ + tor_assert(chan); + tor_assert(chan->cmux); + + circuitmux_assert_okay(chan->cmux); +} + +#if 0 /** Fail with an assert if the active circuits ring on orconn is * corrupt. */ void @@ -2721,6 +2707,7 @@ assert_active_circuits_ok(channel_t *chan) tor_assert(n == smartlist_len(chan->active_circuit_pqueue)); } +#endif /** Return 1 if we shouldn't restart reading on this circuit, even if * we get a SENDME. Else return 0. diff --git a/src/or/relay.h b/src/or/relay.h index 7f96d59d15..ef5074bbe4 100644 --- a/src/or/relay.h +++ b/src/or/relay.h @@ -51,11 +51,10 @@ void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell); void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, cell_t *cell, cell_direction_t direction, streamid_t fromstream); -void channel_unlink_all_active_circs(channel_t *chan); +void channel_unlink_all_circuits(channel_t *chan); int channel_flush_from_first_active_circuit(channel_t *chan, int max); -void assert_active_circuits_ok(channel_t *chan); -void make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan); -void make_circuit_active_on_chan(circuit_t *circ, channel_t *chan); +void assert_circuit_mux_okay(channel_t *chan); +void update_circuit_on_cmux(circuit_t *circ, cell_direction_t direction); int append_address_to_payload(uint8_t *payload_out, const tor_addr_t *addr); const uint8_t *decode_address_from_payload(tor_addr_t *addr_out,