From 967503c12c46f1c75209622ebddd15242e8af79a Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 14 Mar 2013 12:13:45 -0400 Subject: [PATCH 1/6] Implement a placeholder mechanism in the channel,id->circ map We'll use this to help fix bug 7912, by providing a way to mark that a circuit ID can't get reused while a DESTROY is queued but not sent. --- src/or/circuitlist.c | 81 ++++++++++++++++++++++++++++++++++++++++---- src/or/circuitlist.h | 2 ++ 2 files changed, 76 insertions(+), 7 deletions(-) diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index 1903fbe2eb..9bdf2ad477 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -207,6 +207,61 @@ circuit_set_circid_chan_helper(circuit_t *circ, int direction, } } +/** Mark that circuit id id shouldn't be used on channel chan, + * even if there is no circuit on the channel. We use this to keep the + * circuit id from getting re-used while we have queued but not yet sent + * a destroy cell. */ +void +channel_mark_circid_unusable(channel_t *chan, circid_t id) +{ + chan_circid_circuit_map_t search; + chan_circid_circuit_map_t *ent; + + /* See if there's an entry there. That wouldn't be good. */ + memset(&search, 0, sizeof(search)); + search.chan = chan; + search.circ_id = id; + ent = HT_FIND(chan_circid_map, &chan_circid_map, &search); + + if (ent && ent->circuit) { + /* we have a problem. */ + log_warn(LD_BUG, "Tried to mark %u unusable on %p, but there was already " + "a circuit there.", (unsigned)id, chan); + } else if (ent) { + /* It's already marked. */ + } else { + ent = tor_malloc_zero(sizeof(chan_circid_circuit_map_t)); + ent->chan = chan; + ent->circ_id = id; + /* leave circuit at NULL */ + HT_INSERT(chan_circid_map, &chan_circid_map, ent); + } +} + +/** Mark that a circuit id id can be used again on chan. + * We use this to re-enable the circuit ID after we've sent a destroy cell. + */ +void +channel_mark_circid_usable(channel_t *chan, circid_t id) +{ + chan_circid_circuit_map_t search; + chan_circid_circuit_map_t *ent; + + /* See if there's an entry there. That wouldn't be good. */ + memset(&search, 0, sizeof(search)); + search.chan = chan; + search.circ_id = id; + ent = HT_REMOVE(chan_circid_map, &chan_circid_map, &search); + if (ent && ent->circuit) { + log_warn(LD_BUG, "Tried to mark %u usable on %p, but there was already " + "a circuit there.", (unsigned)id, chan); + return; + } + if (_last_circid_chan_ent == ent) + _last_circid_chan_ent = NULL; + tor_free(ent); +} + /** Set the p_conn field of a circuit circ, along * with the corresponding circuit ID, and add the circuit as appropriate * to the (chan,id)-\>circuit map. */ @@ -928,9 +983,13 @@ circuit_get_by_global_id(uint32_t id) * - circ-\>n_circ_id or circ-\>p_circ_id is equal to circ_id, and * - circ is attached to chan, either as p_chan or n_chan. * Return NULL if no such circuit exists. + * + * If found_entry_out is provided, set it to true if we have a + * placeholder entry for circid/chan, and leave it unset otherwise. */ static INLINE circuit_t * -circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan) +circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan, + int *found_entry_out) { chan_circid_circuit_map_t search; chan_circid_circuit_map_t *found; @@ -951,15 +1010,21 @@ circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan) " circ_id %u, channel ID " U64_FORMAT " (%p)", found->circuit, (unsigned)circ_id, U64_PRINTF_ARG(chan->global_identifier), chan); + if (found_entry_out) + *found_entry_out = 1; return found->circuit; } log_debug(LD_CIRC, - "circuit_get_by_circid_channel_impl() found nothing for" + "circuit_get_by_circid_channel_impl() found %s for" " circ_id %u, channel ID " U64_FORMAT " (%p)", + found ? "placeholder" : "nothing", (unsigned)circ_id, U64_PRINTF_ARG(chan->global_identifier), chan); + if (found_entry_out) + *found_entry_out = found ? 1 : 0; + return NULL; /* The rest of this checks for bugs. Disabled by default. */ /* We comment it out because coverity complains otherwise. @@ -993,7 +1058,7 @@ circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan) circuit_t * circuit_get_by_circid_channel(circid_t circ_id, channel_t *chan) { - circuit_t *circ = circuit_get_by_circid_channel_impl(circ_id, chan); + circuit_t *circ = circuit_get_by_circid_channel_impl(circ_id, chan, NULL); if (!circ || circ->marked_for_close) return NULL; else @@ -1009,7 +1074,7 @@ circuit_t * circuit_get_by_circid_channel_even_if_marked(circid_t circ_id, channel_t *chan) { - return circuit_get_by_circid_channel_impl(circ_id, chan); + return circuit_get_by_circid_channel_impl(circ_id, chan, NULL); } /** Return true iff the circuit ID circ_id is currently used by a @@ -1017,7 +1082,9 @@ circuit_get_by_circid_channel_even_if_marked(circid_t circ_id, int circuit_id_in_use_on_channel(circid_t circ_id, channel_t *chan) { - return circuit_get_by_circid_channel_impl(circ_id, chan) != NULL; + int found = 0; + return circuit_get_by_circid_channel_impl(circ_id, chan, &found) != NULL + || found; } /** Return the circuit that a given edge connection is using. */ @@ -1585,7 +1652,7 @@ assert_circuit_ok(const circuit_t *c) /* We use the _impl variant here to make sure we don't fail on marked * circuits, which would not be returned by the regular function. */ circuit_t *c2 = circuit_get_by_circid_channel_impl(c->n_circ_id, - c->n_chan); + c->n_chan, NULL); tor_assert(c == c2); } } @@ -1593,7 +1660,7 @@ assert_circuit_ok(const circuit_t *c) if (or_circ->p_circ_id) { /* ibid */ circuit_t *c2 = circuit_get_by_circid_channel_impl(or_circ->p_circ_id, - or_circ->p_chan); + or_circ->p_chan, NULL); tor_assert(c == c2); } } diff --git a/src/or/circuitlist.h b/src/or/circuitlist.h index d67f80b065..434d2a8699 100644 --- a/src/or/circuitlist.h +++ b/src/or/circuitlist.h @@ -23,6 +23,8 @@ void circuit_set_p_circid_chan(or_circuit_t *circ, circid_t id, channel_t *chan); void circuit_set_n_circid_chan(circuit_t *circ, circid_t id, channel_t *chan); +void channel_mark_circid_unusable(channel_t *chan, circid_t id); +void channel_mark_circid_usable(channel_t *chan, circid_t id); void circuit_set_state(circuit_t *circ, uint8_t state); void circuit_close_all_marked(void); int32_t circuit_initial_package_window(void); From 801eea03ad71dafd31cc6bfa06fa5e421aa95cd6 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 15 Mar 2013 10:45:48 -0400 Subject: [PATCH 2/6] Code to track on a circuit whether it has a "pending" delete cell This will be used in a fix for bug7912. --- src/or/circuitlist.c | 66 +++++++++++++++++++++++++++++++++++++++++--- src/or/circuitlist.h | 3 ++ src/or/or.h | 7 +++++ 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index 9bdf2ad477..3e8caa8252 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -262,18 +262,68 @@ channel_mark_circid_usable(channel_t *chan, circid_t id) tor_free(ent); } +/** Called to indicate that a DESTROY is pending on chan with + * circuit ID id, but hasn't been sent yet. */ +void +channel_note_destroy_pending(channel_t *chan, circid_t id) +{ + circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan); + if (circ) { + if (circ->n_chan == chan && circ->n_circ_id == id) { + circ->n_delete_pending = 1; + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + if (orcirc->p_chan == chan && orcirc->p_circ_id == id) { + circ->p_delete_pending = 1; + } + } + return; + } + channel_mark_circid_unusable(chan, id); +} + +/** Called to indicate that a DESTROY is no longer pending on chan with + * circuit ID id -- typically, because it has been sent. */ +void +channel_note_destroy_not_pending(channel_t *chan, circid_t id) +{ + circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan); + if (circ) { + if (circ->n_chan == chan && circ->n_circ_id == id) { + circ->n_delete_pending = 0; + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + if (orcirc->p_chan == chan && orcirc->p_circ_id == id) { + circ->p_delete_pending = 0; + } + } + /* XXXX this shouldn't happen; log a bug here. */ + return; + } + channel_mark_circid_usable(chan, id); +} + /** Set the p_conn field of a circuit circ, along * with the corresponding circuit ID, and add the circuit as appropriate * to the (chan,id)-\>circuit map. */ void -circuit_set_p_circid_chan(or_circuit_t *circ, circid_t id, +circuit_set_p_circid_chan(or_circuit_t *or_circ, circid_t id, channel_t *chan) { - circuit_set_circid_chan_helper(TO_CIRCUIT(circ), CELL_DIRECTION_IN, - id, chan); + circuit_t *circ = TO_CIRCUIT(or_circ); + channel_t *old_chan = or_circ->p_chan; + circid_t old_id = or_circ->p_circ_id; + + circuit_set_circid_chan_helper(circ, CELL_DIRECTION_IN, id, chan); if (chan) - tor_assert(bool_eq(circ->p_chan_cells.n, circ->next_active_on_p_chan)); + tor_assert(bool_eq(or_circ->p_chan_cells.n, + or_circ->next_active_on_p_chan)); + + if (circ->p_delete_pending && old_chan) { + channel_mark_circid_unusable(old_chan, old_id); + circ->p_delete_pending = 0; + } } /** Set the n_conn field of a circuit circ, along @@ -283,10 +333,18 @@ void circuit_set_n_circid_chan(circuit_t *circ, circid_t id, channel_t *chan) { + channel_t *old_chan = circ->n_chan; + circid_t old_id = circ->n_circ_id; + circuit_set_circid_chan_helper(circ, CELL_DIRECTION_OUT, id, chan); if (chan) tor_assert(bool_eq(circ->n_chan_cells.n, circ->next_active_on_n_chan)); + + if (circ->n_delete_pending && old_chan) { + channel_mark_circid_unusable(old_chan, old_id); + circ->n_delete_pending = 1; + } } /** Change the state of circ to state, adding it to or removing diff --git a/src/or/circuitlist.h b/src/or/circuitlist.h index 434d2a8699..94887d5faf 100644 --- a/src/or/circuitlist.h +++ b/src/or/circuitlist.h @@ -64,5 +64,8 @@ void assert_cpath_layer_ok(const crypt_path_t *cp); void assert_circuit_ok(const circuit_t *c); void circuit_free_all(void); +void channel_note_destroy_pending(channel_t *chan, circid_t id); +void channel_note_destroy_not_pending(channel_t *chan, circid_t id); + #endif diff --git a/src/or/or.h b/src/or/or.h index 4e19140b4b..529e455677 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -2779,6 +2779,13 @@ typedef struct circuit_t { * allowing n_streams to add any more cells. (OR circuit only.) */ unsigned int streams_blocked_on_p_chan : 1; + /** True iff we have queued a delete backwards on this circuit, but not put + * it on the output buffer. */ + unsigned int p_delete_pending : 1; + /** True iff we have queued a delete forwards on this circuit, but not put + * it on the output buffer. */ + unsigned int n_delete_pending : 1; + uint8_t state; /**< Current status of this circuit. */ uint8_t purpose; /**< Why are we creating this circuit? */ From 43d53e6d86acaf7555c31730a8230fa0cdf31306 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 21 Mar 2013 14:51:27 -0400 Subject: [PATCH 3/6] Implementation of a fix for bug 7912 I added the code to pass a destroy cell to a queueing function rather than writing it immediately, and the code to remember that we shouldn't reuse the circuit id until the destroy is actually sent, and the code to release the circuit id once the destroy has been sent... and then I finished by hooking destroy_cell_queue into the rest of Tor. --- src/or/channel.c | 59 +++++++++++++++++++++++++++++------ src/or/circuitlist.c | 2 +- src/or/circuitmux.c | 73 +++++++++++++++++++++++++++++++++++++++++--- src/or/circuitmux.h | 7 ++++- src/or/relay.c | 19 +++++++++--- src/or/relay.h | 2 +- 6 files changed, 141 insertions(+), 21 deletions(-) diff --git a/src/or/channel.c b/src/or/channel.c index 4e9086f2e6..e327bda518 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -122,6 +122,8 @@ static cell_queue_entry_t * cell_queue_entry_new_fixed(cell_t *cell); static cell_queue_entry_t * cell_queue_entry_new_var(var_cell_t *var_cell); +static int is_destroy_cell(channel_t *chan, + const cell_queue_entry_t *q, circid_t *circid_out); /* Functions to maintain the digest map */ static void channel_add_to_digest_map(channel_t *chan); @@ -1685,6 +1687,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) chan->timestamp_last_added_nonpadding = approx_time(); } + { + circid_t circ_id; + if (is_destroy_cell(chan, q, &circ_id)) { + channel_note_destroy_not_pending(chan, circ_id); + } + } + /* Can we send it right out? If so, try */ if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) && chan->state == CHANNEL_STATE_OPEN) { @@ -2607,6 +2616,43 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) } } +/** DOCDOC */ +static int +is_destroy_cell(channel_t *chan, + const cell_queue_entry_t *q, circid_t *circid_out) +{ + *circid_out = 0; + switch (q->type) { + case CELL_QUEUE_FIXED: + if (q->u.fixed.cell->command == CELL_DESTROY) { + *circid_out = q->u.fixed.cell->circ_id; + return 1; + } + break; + case CELL_QUEUE_VAR: + if (q->u.var.var_cell->command == CELL_DESTROY) { + *circid_out = q->u.var.var_cell->circ_id; + return 1; + } + break; + case CELL_QUEUE_PACKED: + if (chan->wide_circ_ids) { + if (q->u.packed.packed_cell->body[4] == CELL_DESTROY) { + *circid_out = ntohl(get_uint32(q->u.packed.packed_cell->body)); + return 1; + } + } else { + if (q->u.packed.packed_cell->body[2] == CELL_DESTROY) { + *circid_out = ntohs(get_uint16(q->u.packed.packed_cell->body)); + return 1; + } + } + break; + } + return 0; +} + + /** * Send destroy cell on a channel * @@ -2618,25 +2664,20 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) int channel_send_destroy(circid_t circ_id, channel_t *chan, int reason) { - cell_t cell; - tor_assert(chan); /* Check to make sure we can send on this channel first */ if (!(chan->state == CHANNEL_STATE_CLOSING || chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR)) { - memset(&cell, 0, sizeof(cell_t)); - cell.circ_id = circ_id; - cell.command = CELL_DESTROY; - cell.payload[0] = (uint8_t) reason; + chan->state == CHANNEL_STATE_ERROR) && + chan->cmux) { + channel_note_destroy_pending(chan, circ_id); + circuitmux_append_destroy_cell(chan, chan->cmux, circ_id, reason); log_debug(LD_OR, "Sending destroy (circID %u) on channel %p " "(global ID " U64_FORMAT ")", (unsigned)circ_id, chan, U64_PRINTF_ARG(chan->global_identifier)); - - channel_write_cell(chan, &cell); } else { log_warn(LD_BUG, "Someone called channel_send_destroy() for circID %u " diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index 3e8caa8252..deb45b7b60 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -343,7 +343,7 @@ circuit_set_n_circid_chan(circuit_t *circ, circid_t id, if (circ->n_delete_pending && old_chan) { channel_mark_circid_unusable(old_chan, old_id); - circ->n_delete_pending = 1; + circ->n_delete_pending = 0; } } diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c index 545cfd0650..198e518bd4 100644 --- a/src/or/circuitmux.c +++ b/src/or/circuitmux.c @@ -10,6 +10,7 @@ #include "channel.h" #include "circuitlist.h" #include "circuitmux.h" +#include "relay.h" /* * Private typedefs for circuitmux.c @@ -115,6 +116,18 @@ struct circuitmux_s { */ struct circuit_t *active_circuits_head, *active_circuits_tail; + /** List of queued destroy cells */ + cell_queue_t destroy_cell_queue; + /** Boolean: True iff the last cell to circuitmux_get_first_active_circuit + * returned the destroy queue. Used to force alternation between + * destroy/non-destroy cells. + * + * XXXX There is no reason to think that alternating is a particularly good + * approach -- it's just designed to prevent destroys from starving other + * cells completely. + */ + unsigned int last_cell_was_destroy : 1; + /* * Circuitmux policy; if this is non-NULL, it can override the built- * in round-robin active circuits behavior. This is how EWMA works in @@ -508,6 +521,8 @@ circuitmux_free(circuitmux_t *cmux) tor_free(cmux->chanid_circid_map); } + cell_queue_clear(&cmux->destroy_cell_queue); + tor_free(cmux); } @@ -816,7 +831,7 @@ circuitmux_num_cells(circuitmux_t *cmux) { tor_assert(cmux); - return cmux->n_cells; + return cmux->n_cells + cmux->destroy_cell_queue.n; } /** @@ -1368,16 +1383,36 @@ circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ, /** * Pick a circuit to send from, using the active circuits list or a * circuitmux policy if one is available. This is called from channel.c. + * + * If we would rather send a destroy cell, return NULL and set + * *destroy_queue_out to the destroy queue. + * + * If we have nothing to send, set *destroy_queue_out to NULL and + * return NULL. */ circuit_t * -circuitmux_get_first_active_circuit(circuitmux_t *cmux) +circuitmux_get_first_active_circuit(circuitmux_t *cmux, + cell_queue_t **destroy_queue_out) { circuit_t *circ = NULL; tor_assert(cmux); + tor_assert(destroy_queue_out); - if (cmux->n_active_circuits > 0) { + *destroy_queue_out = NULL; + + if (cmux->destroy_cell_queue.n && + (!cmux->last_cell_was_destroy || cmux->n_active_circuits == 0)) { + /* We have destroy cells to send, and either we just sent a relay cell, + * or we have no relay cells to send. */ + + /* XXXX We should let the cmux policy have some say in this eventually. */ + /* XXXX Alternating is not a terribly brilliant approach here. */ + *destroy_queue_out = &cmux->destroy_cell_queue; + + cmux->last_cell_was_destroy = 1; + } else if (cmux->n_active_circuits > 0) { /* We also must have a cell available for this to be the case */ tor_assert(cmux->n_cells > 0); /* Do we have a policy-provided circuit selector? */ @@ -1389,7 +1424,11 @@ circuitmux_get_first_active_circuit(circuitmux_t *cmux) tor_assert(cmux->active_circuits_head); circ = cmux->active_circuits_head; } - } else tor_assert(cmux->n_cells == 0); + cmux->last_cell_was_destroy = 0; + } else { + tor_assert(cmux->n_cells == 0); + tor_assert(cmux->destroy_cell_queue.n == 0); + } return circ; } @@ -1743,3 +1782,29 @@ circuitmux_assert_okay_pass_three(circuitmux_t *cmux) } } +/*DOCDOC */ +void +circuitmux_append_destroy_cell(channel_t *chan, + circuitmux_t *cmux, + circid_t circ_id, + uint8_t reason) +{ + cell_t cell; + memset(&cell, 0, sizeof(cell_t)); + cell.circ_id = circ_id; + cell.command = CELL_DESTROY; + cell.payload[0] = (uint8_t) reason; + + cell_queue_append_packed_copy(&cmux->destroy_cell_queue, &cell, + chan->wide_circ_ids, 0); + + /* XXXX Duplicate code from append_cell_to_circuit_queue */ + if (!channel_has_queued_writes(chan)) { + /* There is no data at all waiting to be sent on the outbuf. Add a + * cell, so that we can notice when it gets flushed, flushed_some can + * get called, and we can start putting more data onto the buffer then. + */ + log_debug(LD_GENERAL, "Primed a buffer."); + channel_flush_from_first_active_circuit(chan, 1); + } +} diff --git a/src/or/circuitmux.h b/src/or/circuitmux.h index 25644ffab7..da62196b21 100644 --- a/src/or/circuitmux.h +++ b/src/or/circuitmux.h @@ -120,7 +120,8 @@ unsigned int circuitmux_num_circuits(circuitmux_t *cmux); unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux); /* Channel interface */ -circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux); +circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux, + cell_queue_t **destroy_queue_out); void circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ, unsigned int n_cells); @@ -132,5 +133,9 @@ void circuitmux_clear_num_cells(circuitmux_t *cmux, circuit_t *circ); void circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ, unsigned int n_cells); +void circuitmux_append_destroy_cell(channel_t *chan, + circuitmux_t *cmux, circid_t circ_id, + uint8_t reason); + #endif /* TOR_CIRCUITMUX_H */ diff --git a/src/or/relay.c b/src/or/relay.c index 0ca3e56fd5..ec860269a6 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -2140,11 +2140,11 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell) /** Append a newly allocated copy of cell to the end of queue */ void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell, - int wide_circ_ids) + int wide_circ_ids, int use_stats) { packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids); /* Remember the time when this cell was put in the queue. */ - if (get_options()->CellStatistics) { + if (get_options()->CellStatistics && use_stats) { struct timeval now; uint32_t added; insertion_time_queue_t *it_queue = queue->insertion_times; @@ -2339,7 +2339,7 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max) { circuitmux_t *cmux = NULL; int n_flushed = 0; - cell_queue_t *queue; + cell_queue_t *queue, *destroy_queue=NULL; circuit_t *circ; or_circuit_t *or_circ; int streams_blocked; @@ -2352,7 +2352,16 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max) /* Main loop: pick a circuit, send a cell, update the cmux */ while (n_flushed < max) { - circ = circuitmux_get_first_active_circuit(cmux); + circ = circuitmux_get_first_active_circuit(cmux, &destroy_queue); + if (destroy_queue) { + /* this code is duplicated from some of the logic below. Ugly! XXXX */ + tor_assert(destroy_queue->n > 0); + cell = cell_queue_pop(destroy_queue); + channel_write_packed_cell(chan, cell); + cell = NULL; + ++n_flushed; + continue; + } /* If it returns NULL, no cells left to send */ if (!circ) break; assert_cmux_ok_paranoid(chan); @@ -2474,7 +2483,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, streams_blocked = circ->streams_blocked_on_p_chan; } - cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids); + cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids, 1); /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ diff --git a/src/or/relay.h b/src/or/relay.h index 7e59838f95..c4cb935bcc 100644 --- a/src/or/relay.h +++ b/src/or/relay.h @@ -47,7 +47,7 @@ void packed_cell_free(packed_cell_t *cell); void cell_queue_clear(cell_queue_t *queue); void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell); void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell, - int wide_circ_ids); + int wide_circ_ids, int use_stats); void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, cell_t *cell, cell_direction_t direction, From 16f9861b22751bc90666fe1836b8cf740630447a Mon Sep 17 00:00:00 2001 From: Andrea Shepard Date: Wed, 12 Jun 2013 22:22:21 -0700 Subject: [PATCH 4/6] Add destroy balance tracking and logging to circuitmux --- src/or/channel.c | 1 - src/or/circuitlist.c | 5 ++-- src/or/circuitmux.c | 55 ++++++++++++++++++++++++++++++++++++++++++++ src/or/circuitmux.h | 1 + src/or/relay.c | 2 ++ 5 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/or/channel.c b/src/or/channel.c index e327bda518..33a32102ee 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -2652,7 +2652,6 @@ is_destroy_cell(channel_t *chan, return 0; } - /** * Send destroy cell on a channel * diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index deb45b7b60..e50ab603e9 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -1717,8 +1717,9 @@ assert_circuit_ok(const circuit_t *c) if (or_circ && or_circ->p_chan) { if (or_circ->p_circ_id) { /* ibid */ - circuit_t *c2 = circuit_get_by_circid_channel_impl(or_circ->p_circ_id, - or_circ->p_chan, NULL); + circuit_t *c2 = + circuit_get_by_circid_channel_impl(or_circ->p_circ_id, + or_circ->p_chan, NULL); tor_assert(c == c2); } } diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c index 198e518bd4..a6256f8049 100644 --- a/src/or/circuitmux.c +++ b/src/or/circuitmux.c @@ -127,6 +127,10 @@ struct circuitmux_s { * cells completely. */ unsigned int last_cell_was_destroy : 1; + /** Destroy counter: increment this when a destroy gets queued, decrement + * when we unqueue it, so we can test to make sure they don't starve. + */ + int64_t destroy_ctr; /* * Circuitmux policy; if this is non-NULL, it can override the built- @@ -206,6 +210,11 @@ static void circuitmux_assert_okay_pass_one(circuitmux_t *cmux); static void circuitmux_assert_okay_pass_two(circuitmux_t *cmux); static void circuitmux_assert_okay_pass_three(circuitmux_t *cmux); +/* Static global variables */ + +/** Count the destroy balance to debug destroy queue logic */ +static int64_t global_destroy_ctr = 0; + /* Function definitions */ /** @@ -521,6 +530,25 @@ circuitmux_free(circuitmux_t *cmux) tor_free(cmux->chanid_circid_map); } + /* + * We're throwing away some destroys; log the counter and + * adjust the global counter by the queue size. + */ + if (cmux->destroy_cell_queue.n > 0) { + cmux->destroy_ctr -= cmux->destroy_cell_queue.n; + global_destroy_ctr -= cmux->destroy_cell_queue.n; + log_debug(LD_CIRC, + "Freeing cmux at %p with %u queued destroys; the last cmux " + "destroy balance was %ld, global is %ld\n", + cmux, cmux->destroy_cell_queue.n, + cmux->destroy_ctr, global_destroy_ctr); + } else { + log_debug(LD_CIRC, + "Freeing cmux at %p with no queued destroys, the cmux destroy " + "balance was %ld, global is %ld\n", + cmux, cmux->destroy_ctr, global_destroy_ctr); + } + cell_queue_clear(&cmux->destroy_cell_queue); tor_free(cmux); @@ -1502,6 +1530,24 @@ circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ, circuitmux_assert_okay_paranoid(cmux); } +/** + * Notify the circuitmux that a destroy was sent, so we can update + * the counter. + */ + +void +circuitmux_notify_xmit_destroy(circuitmux_t *cmux) +{ + tor_assert(cmux); + + --(cmux->destroy_ctr); + --(global_destroy_ctr); + log_debug(LD_CIRC, + "Cmux at %p sent a destroy, cmux counter is now %ld, " + "global counter is now %ld\n", + cmux, cmux->destroy_ctr, global_destroy_ctr); +} + /* * Circuitmux consistency checking assertions */ @@ -1798,6 +1844,14 @@ circuitmux_append_destroy_cell(channel_t *chan, cell_queue_append_packed_copy(&cmux->destroy_cell_queue, &cell, chan->wide_circ_ids, 0); + /* Destroy entering the queue, update counters */ + ++(cmux->destroy_ctr); + ++global_destroy_ctr; + log_debug(LD_CIRC, + "Cmux at %p queued a destroy for circ %u, " + "cmux counter is now %ld, global counter is now %ld\n", + cmux, circ_id, cmux->destroy_ctr, global_destroy_ctr); + /* XXXX Duplicate code from append_cell_to_circuit_queue */ if (!channel_has_queued_writes(chan)) { /* There is no data at all waiting to be sent on the outbuf. Add a @@ -1808,3 +1862,4 @@ circuitmux_append_destroy_cell(channel_t *chan, channel_flush_from_first_active_circuit(chan, 1); } } + diff --git a/src/or/circuitmux.h b/src/or/circuitmux.h index da62196b21..9ff29de70a 100644 --- a/src/or/circuitmux.h +++ b/src/or/circuitmux.h @@ -124,6 +124,7 @@ circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux, cell_queue_t **destroy_queue_out); void circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ, unsigned int n_cells); +void circuitmux_notify_xmit_destroy(circuitmux_t *cmux); /* Circuit interface */ void circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ, diff --git a/src/or/relay.c b/src/or/relay.c index ec860269a6..46bfc442b8 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -2358,6 +2358,8 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max) tor_assert(destroy_queue->n > 0); cell = cell_queue_pop(destroy_queue); channel_write_packed_cell(chan, cell); + /* Update the cmux destroy counter */ + circuitmux_notify_xmit_destroy(cmux); cell = NULL; ++n_flushed; continue; From 9b754d1213640882a543f70bb0d6041dfb7a3869 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 13 Jun 2013 10:20:30 -0400 Subject: [PATCH 5/6] Add a changes file for bug 7912 I'm calling it a bugfix on 0.2.0.1-alpha, since that's where cell queues were first introduced. --- changes/bug7912 | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 changes/bug7912 diff --git a/changes/bug7912 b/changes/bug7912 new file mode 100644 index 0000000000..48c65d2241 --- /dev/null +++ b/changes/bug7912 @@ -0,0 +1,8 @@ + o Major bugfixes: + - Instead of writing destroy cells directly to outgoing connection + buffers, queue them and intersperse them with other outgoing cells. + This can prevent a set of resource starvation conditions where too + many pending destroy cells prevent data cells from actually getting + delivered. Reported by "oftc_must_be_destroyed". Fixes bug 7912; + bugfix on 0.2.0.1-alpha. + From e61df2ec651345f1c46777105bbae69916402ecd Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 13 Jun 2013 10:30:34 -0400 Subject: [PATCH 6/6] Fix compile warnings wrt printf formating of int64_t --- src/or/circuitmux.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c index a6256f8049..f579c3ead1 100644 --- a/src/or/circuitmux.c +++ b/src/or/circuitmux.c @@ -539,14 +539,17 @@ circuitmux_free(circuitmux_t *cmux) global_destroy_ctr -= cmux->destroy_cell_queue.n; log_debug(LD_CIRC, "Freeing cmux at %p with %u queued destroys; the last cmux " - "destroy balance was %ld, global is %ld\n", + "destroy balance was "I64_FORMAT", global is "I64_FORMAT, cmux, cmux->destroy_cell_queue.n, - cmux->destroy_ctr, global_destroy_ctr); + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); } else { log_debug(LD_CIRC, "Freeing cmux at %p with no queued destroys, the cmux destroy " - "balance was %ld, global is %ld\n", - cmux, cmux->destroy_ctr, global_destroy_ctr); + "balance was "I64_FORMAT", global is "I64_FORMAT, + cmux, + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); } cell_queue_clear(&cmux->destroy_cell_queue); @@ -1543,9 +1546,11 @@ circuitmux_notify_xmit_destroy(circuitmux_t *cmux) --(cmux->destroy_ctr); --(global_destroy_ctr); log_debug(LD_CIRC, - "Cmux at %p sent a destroy, cmux counter is now %ld, " - "global counter is now %ld\n", - cmux, cmux->destroy_ctr, global_destroy_ctr); + "Cmux at %p sent a destroy, cmux counter is now "I64_FORMAT", " + "global counter is now "I64_FORMAT, + cmux, + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); } /* @@ -1848,9 +1853,11 @@ circuitmux_append_destroy_cell(channel_t *chan, ++(cmux->destroy_ctr); ++global_destroy_ctr; log_debug(LD_CIRC, - "Cmux at %p queued a destroy for circ %u, " - "cmux counter is now %ld, global counter is now %ld\n", - cmux, circ_id, cmux->destroy_ctr, global_destroy_ctr); + "Cmux at %p queued a destroy for circ %u, cmux counter is now " + I64_FORMAT", global counter is now "I64_FORMAT, + cmux, circ_id, + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); /* XXXX Duplicate code from append_cell_to_circuit_queue */ if (!channel_has_queued_writes(chan)) {