Merge remote-tracking branch 'public/bug7912_squashed'

This commit is contained in:
Nick Mathewson 2013-06-13 10:31:02 -04:00
commit 4b781e24fb
9 changed files with 362 additions and 32 deletions

8
changes/bug7912 Normal file
View File

@ -0,0 +1,8 @@
o Major bugfixes:
- Instead of writing destroy cells directly to outgoing connection
buffers, queue them and intersperse them with other outgoing cells.
This can prevent a set of resource starvation conditions where too
many pending destroy cells prevent data cells from actually getting
delivered. Reported by "oftc_must_be_destroyed". Fixes bug 7912;
bugfix on 0.2.0.1-alpha.

View File

@ -122,6 +122,8 @@ static cell_queue_entry_t *
cell_queue_entry_new_fixed(cell_t *cell); cell_queue_entry_new_fixed(cell_t *cell);
static cell_queue_entry_t * static cell_queue_entry_t *
cell_queue_entry_new_var(var_cell_t *var_cell); cell_queue_entry_new_var(var_cell_t *var_cell);
static int is_destroy_cell(channel_t *chan,
const cell_queue_entry_t *q, circid_t *circid_out);
/* Functions to maintain the digest map */ /* Functions to maintain the digest map */
static void channel_add_to_digest_map(channel_t *chan); static void channel_add_to_digest_map(channel_t *chan);
@ -1685,6 +1687,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
chan->timestamp_last_added_nonpadding = approx_time(); chan->timestamp_last_added_nonpadding = approx_time();
} }
{
circid_t circ_id;
if (is_destroy_cell(chan, q, &circ_id)) {
channel_note_destroy_not_pending(chan, circ_id);
}
}
/* Can we send it right out? If so, try */ /* Can we send it right out? If so, try */
if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) && if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
chan->state == CHANNEL_STATE_OPEN) { chan->state == CHANNEL_STATE_OPEN) {
@ -2607,6 +2616,42 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
} }
} }
/** DOCDOC */
static int
is_destroy_cell(channel_t *chan,
const cell_queue_entry_t *q, circid_t *circid_out)
{
*circid_out = 0;
switch (q->type) {
case CELL_QUEUE_FIXED:
if (q->u.fixed.cell->command == CELL_DESTROY) {
*circid_out = q->u.fixed.cell->circ_id;
return 1;
}
break;
case CELL_QUEUE_VAR:
if (q->u.var.var_cell->command == CELL_DESTROY) {
*circid_out = q->u.var.var_cell->circ_id;
return 1;
}
break;
case CELL_QUEUE_PACKED:
if (chan->wide_circ_ids) {
if (q->u.packed.packed_cell->body[4] == CELL_DESTROY) {
*circid_out = ntohl(get_uint32(q->u.packed.packed_cell->body));
return 1;
}
} else {
if (q->u.packed.packed_cell->body[2] == CELL_DESTROY) {
*circid_out = ntohs(get_uint16(q->u.packed.packed_cell->body));
return 1;
}
}
break;
}
return 0;
}
/** /**
* Send destroy cell on a channel * Send destroy cell on a channel
* *
@ -2618,25 +2663,20 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
int int
channel_send_destroy(circid_t circ_id, channel_t *chan, int reason) channel_send_destroy(circid_t circ_id, channel_t *chan, int reason)
{ {
cell_t cell;
tor_assert(chan); tor_assert(chan);
/* Check to make sure we can send on this channel first */ /* Check to make sure we can send on this channel first */
if (!(chan->state == CHANNEL_STATE_CLOSING || if (!(chan->state == CHANNEL_STATE_CLOSING ||
chan->state == CHANNEL_STATE_CLOSED || chan->state == CHANNEL_STATE_CLOSED ||
chan->state == CHANNEL_STATE_ERROR)) { chan->state == CHANNEL_STATE_ERROR) &&
memset(&cell, 0, sizeof(cell_t)); chan->cmux) {
cell.circ_id = circ_id; channel_note_destroy_pending(chan, circ_id);
cell.command = CELL_DESTROY; circuitmux_append_destroy_cell(chan, chan->cmux, circ_id, reason);
cell.payload[0] = (uint8_t) reason;
log_debug(LD_OR, log_debug(LD_OR,
"Sending destroy (circID %u) on channel %p " "Sending destroy (circID %u) on channel %p "
"(global ID " U64_FORMAT ")", "(global ID " U64_FORMAT ")",
(unsigned)circ_id, chan, (unsigned)circ_id, chan,
U64_PRINTF_ARG(chan->global_identifier)); U64_PRINTF_ARG(chan->global_identifier));
channel_write_cell(chan, &cell);
} else { } else {
log_warn(LD_BUG, log_warn(LD_BUG,
"Someone called channel_send_destroy() for circID %u " "Someone called channel_send_destroy() for circID %u "

View File

@ -207,18 +207,123 @@ circuit_set_circid_chan_helper(circuit_t *circ, int direction,
} }
} }
/** Mark that circuit id <b>id</b> shouldn't be used on channel <b>chan</b>,
* even if there is no circuit on the channel. We use this to keep the
* circuit id from getting re-used while we have queued but not yet sent
* a destroy cell. */
void
channel_mark_circid_unusable(channel_t *chan, circid_t id)
{
chan_circid_circuit_map_t search;
chan_circid_circuit_map_t *ent;
/* See if there's an entry there. That wouldn't be good. */
memset(&search, 0, sizeof(search));
search.chan = chan;
search.circ_id = id;
ent = HT_FIND(chan_circid_map, &chan_circid_map, &search);
if (ent && ent->circuit) {
/* we have a problem. */
log_warn(LD_BUG, "Tried to mark %u unusable on %p, but there was already "
"a circuit there.", (unsigned)id, chan);
} else if (ent) {
/* It's already marked. */
} else {
ent = tor_malloc_zero(sizeof(chan_circid_circuit_map_t));
ent->chan = chan;
ent->circ_id = id;
/* leave circuit at NULL */
HT_INSERT(chan_circid_map, &chan_circid_map, ent);
}
}
/** Mark that a circuit id <b>id</b> can be used again on <b>chan</b>.
* We use this to re-enable the circuit ID after we've sent a destroy cell.
*/
void
channel_mark_circid_usable(channel_t *chan, circid_t id)
{
chan_circid_circuit_map_t search;
chan_circid_circuit_map_t *ent;
/* See if there's an entry there. That wouldn't be good. */
memset(&search, 0, sizeof(search));
search.chan = chan;
search.circ_id = id;
ent = HT_REMOVE(chan_circid_map, &chan_circid_map, &search);
if (ent && ent->circuit) {
log_warn(LD_BUG, "Tried to mark %u usable on %p, but there was already "
"a circuit there.", (unsigned)id, chan);
return;
}
if (_last_circid_chan_ent == ent)
_last_circid_chan_ent = NULL;
tor_free(ent);
}
/** Called to indicate that a DESTROY is pending on <b>chan</b> with
* circuit ID <b>id</b>, but hasn't been sent yet. */
void
channel_note_destroy_pending(channel_t *chan, circid_t id)
{
circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan);
if (circ) {
if (circ->n_chan == chan && circ->n_circ_id == id) {
circ->n_delete_pending = 1;
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
if (orcirc->p_chan == chan && orcirc->p_circ_id == id) {
circ->p_delete_pending = 1;
}
}
return;
}
channel_mark_circid_unusable(chan, id);
}
/** Called to indicate that a DESTROY is no longer pending on <b>chan</b> with
* circuit ID <b>id</b> -- typically, because it has been sent. */
void
channel_note_destroy_not_pending(channel_t *chan, circid_t id)
{
circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan);
if (circ) {
if (circ->n_chan == chan && circ->n_circ_id == id) {
circ->n_delete_pending = 0;
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
if (orcirc->p_chan == chan && orcirc->p_circ_id == id) {
circ->p_delete_pending = 0;
}
}
/* XXXX this shouldn't happen; log a bug here. */
return;
}
channel_mark_circid_usable(chan, id);
}
/** Set the p_conn field of a circuit <b>circ</b>, along /** Set the p_conn field of a circuit <b>circ</b>, along
* with the corresponding circuit ID, and add the circuit as appropriate * with the corresponding circuit ID, and add the circuit as appropriate
* to the (chan,id)-\>circuit map. */ * to the (chan,id)-\>circuit map. */
void void
circuit_set_p_circid_chan(or_circuit_t *circ, circid_t id, circuit_set_p_circid_chan(or_circuit_t *or_circ, circid_t id,
channel_t *chan) channel_t *chan)
{ {
circuit_set_circid_chan_helper(TO_CIRCUIT(circ), CELL_DIRECTION_IN, circuit_t *circ = TO_CIRCUIT(or_circ);
id, chan); channel_t *old_chan = or_circ->p_chan;
circid_t old_id = or_circ->p_circ_id;
circuit_set_circid_chan_helper(circ, CELL_DIRECTION_IN, id, chan);
if (chan) if (chan)
tor_assert(bool_eq(circ->p_chan_cells.n, circ->next_active_on_p_chan)); tor_assert(bool_eq(or_circ->p_chan_cells.n,
or_circ->next_active_on_p_chan));
if (circ->p_delete_pending && old_chan) {
channel_mark_circid_unusable(old_chan, old_id);
circ->p_delete_pending = 0;
}
} }
/** Set the n_conn field of a circuit <b>circ</b>, along /** Set the n_conn field of a circuit <b>circ</b>, along
@ -228,10 +333,18 @@ void
circuit_set_n_circid_chan(circuit_t *circ, circid_t id, circuit_set_n_circid_chan(circuit_t *circ, circid_t id,
channel_t *chan) channel_t *chan)
{ {
channel_t *old_chan = circ->n_chan;
circid_t old_id = circ->n_circ_id;
circuit_set_circid_chan_helper(circ, CELL_DIRECTION_OUT, id, chan); circuit_set_circid_chan_helper(circ, CELL_DIRECTION_OUT, id, chan);
if (chan) if (chan)
tor_assert(bool_eq(circ->n_chan_cells.n, circ->next_active_on_n_chan)); tor_assert(bool_eq(circ->n_chan_cells.n, circ->next_active_on_n_chan));
if (circ->n_delete_pending && old_chan) {
channel_mark_circid_unusable(old_chan, old_id);
circ->n_delete_pending = 0;
}
} }
/** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing /** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
@ -928,9 +1041,13 @@ circuit_get_by_global_id(uint32_t id)
* - circ-\>n_circ_id or circ-\>p_circ_id is equal to <b>circ_id</b>, and * - circ-\>n_circ_id or circ-\>p_circ_id is equal to <b>circ_id</b>, and
* - circ is attached to <b>chan</b>, either as p_chan or n_chan. * - circ is attached to <b>chan</b>, either as p_chan or n_chan.
* Return NULL if no such circuit exists. * Return NULL if no such circuit exists.
*
* If <b>found_entry_out</b> is provided, set it to true if we have a
* placeholder entry for circid/chan, and leave it unset otherwise.
*/ */
static INLINE circuit_t * static INLINE circuit_t *
circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan) circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan,
int *found_entry_out)
{ {
chan_circid_circuit_map_t search; chan_circid_circuit_map_t search;
chan_circid_circuit_map_t *found; chan_circid_circuit_map_t *found;
@ -951,15 +1068,21 @@ circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan)
" circ_id %u, channel ID " U64_FORMAT " (%p)", " circ_id %u, channel ID " U64_FORMAT " (%p)",
found->circuit, (unsigned)circ_id, found->circuit, (unsigned)circ_id,
U64_PRINTF_ARG(chan->global_identifier), chan); U64_PRINTF_ARG(chan->global_identifier), chan);
if (found_entry_out)
*found_entry_out = 1;
return found->circuit; return found->circuit;
} }
log_debug(LD_CIRC, log_debug(LD_CIRC,
"circuit_get_by_circid_channel_impl() found nothing for" "circuit_get_by_circid_channel_impl() found %s for"
" circ_id %u, channel ID " U64_FORMAT " (%p)", " circ_id %u, channel ID " U64_FORMAT " (%p)",
found ? "placeholder" : "nothing",
(unsigned)circ_id, (unsigned)circ_id,
U64_PRINTF_ARG(chan->global_identifier), chan); U64_PRINTF_ARG(chan->global_identifier), chan);
if (found_entry_out)
*found_entry_out = found ? 1 : 0;
return NULL; return NULL;
/* The rest of this checks for bugs. Disabled by default. */ /* The rest of this checks for bugs. Disabled by default. */
/* We comment it out because coverity complains otherwise. /* We comment it out because coverity complains otherwise.
@ -993,7 +1116,7 @@ circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan)
circuit_t * circuit_t *
circuit_get_by_circid_channel(circid_t circ_id, channel_t *chan) circuit_get_by_circid_channel(circid_t circ_id, channel_t *chan)
{ {
circuit_t *circ = circuit_get_by_circid_channel_impl(circ_id, chan); circuit_t *circ = circuit_get_by_circid_channel_impl(circ_id, chan, NULL);
if (!circ || circ->marked_for_close) if (!circ || circ->marked_for_close)
return NULL; return NULL;
else else
@ -1009,7 +1132,7 @@ circuit_t *
circuit_get_by_circid_channel_even_if_marked(circid_t circ_id, circuit_get_by_circid_channel_even_if_marked(circid_t circ_id,
channel_t *chan) channel_t *chan)
{ {
return circuit_get_by_circid_channel_impl(circ_id, chan); return circuit_get_by_circid_channel_impl(circ_id, chan, NULL);
} }
/** Return true iff the circuit ID <b>circ_id</b> is currently used by a /** Return true iff the circuit ID <b>circ_id</b> is currently used by a
@ -1017,7 +1140,9 @@ circuit_get_by_circid_channel_even_if_marked(circid_t circ_id,
int int
circuit_id_in_use_on_channel(circid_t circ_id, channel_t *chan) circuit_id_in_use_on_channel(circid_t circ_id, channel_t *chan)
{ {
return circuit_get_by_circid_channel_impl(circ_id, chan) != NULL; int found = 0;
return circuit_get_by_circid_channel_impl(circ_id, chan, &found) != NULL
|| found;
} }
/** Return the circuit that a given edge connection is using. */ /** Return the circuit that a given edge connection is using. */
@ -1585,15 +1710,16 @@ assert_circuit_ok(const circuit_t *c)
/* We use the _impl variant here to make sure we don't fail on marked /* We use the _impl variant here to make sure we don't fail on marked
* circuits, which would not be returned by the regular function. */ * circuits, which would not be returned by the regular function. */
circuit_t *c2 = circuit_get_by_circid_channel_impl(c->n_circ_id, circuit_t *c2 = circuit_get_by_circid_channel_impl(c->n_circ_id,
c->n_chan); c->n_chan, NULL);
tor_assert(c == c2); tor_assert(c == c2);
} }
} }
if (or_circ && or_circ->p_chan) { if (or_circ && or_circ->p_chan) {
if (or_circ->p_circ_id) { if (or_circ->p_circ_id) {
/* ibid */ /* ibid */
circuit_t *c2 = circuit_get_by_circid_channel_impl(or_circ->p_circ_id, circuit_t *c2 =
or_circ->p_chan); circuit_get_by_circid_channel_impl(or_circ->p_circ_id,
or_circ->p_chan, NULL);
tor_assert(c == c2); tor_assert(c == c2);
} }
} }

View File

@ -23,6 +23,8 @@ void circuit_set_p_circid_chan(or_circuit_t *circ, circid_t id,
channel_t *chan); channel_t *chan);
void circuit_set_n_circid_chan(circuit_t *circ, circid_t id, void circuit_set_n_circid_chan(circuit_t *circ, circid_t id,
channel_t *chan); channel_t *chan);
void channel_mark_circid_unusable(channel_t *chan, circid_t id);
void channel_mark_circid_usable(channel_t *chan, circid_t id);
void circuit_set_state(circuit_t *circ, uint8_t state); void circuit_set_state(circuit_t *circ, uint8_t state);
void circuit_close_all_marked(void); void circuit_close_all_marked(void);
int32_t circuit_initial_package_window(void); int32_t circuit_initial_package_window(void);
@ -62,5 +64,8 @@ void assert_cpath_layer_ok(const crypt_path_t *cp);
void assert_circuit_ok(const circuit_t *c); void assert_circuit_ok(const circuit_t *c);
void circuit_free_all(void); void circuit_free_all(void);
void channel_note_destroy_pending(channel_t *chan, circid_t id);
void channel_note_destroy_not_pending(channel_t *chan, circid_t id);
#endif #endif

View File

@ -10,6 +10,7 @@
#include "channel.h" #include "channel.h"
#include "circuitlist.h" #include "circuitlist.h"
#include "circuitmux.h" #include "circuitmux.h"
#include "relay.h"
/* /*
* Private typedefs for circuitmux.c * Private typedefs for circuitmux.c
@ -115,6 +116,22 @@ struct circuitmux_s {
*/ */
struct circuit_t *active_circuits_head, *active_circuits_tail; struct circuit_t *active_circuits_head, *active_circuits_tail;
/** List of queued destroy cells */
cell_queue_t destroy_cell_queue;
/** Boolean: True iff the last cell to circuitmux_get_first_active_circuit
* returned the destroy queue. Used to force alternation between
* destroy/non-destroy cells.
*
* XXXX There is no reason to think that alternating is a particularly good
* approach -- it's just designed to prevent destroys from starving other
* cells completely.
*/
unsigned int last_cell_was_destroy : 1;
/** Destroy counter: increment this when a destroy gets queued, decrement
* when we unqueue it, so we can test to make sure they don't starve.
*/
int64_t destroy_ctr;
/* /*
* Circuitmux policy; if this is non-NULL, it can override the built- * Circuitmux policy; if this is non-NULL, it can override the built-
* in round-robin active circuits behavior. This is how EWMA works in * in round-robin active circuits behavior. This is how EWMA works in
@ -193,6 +210,11 @@ static void circuitmux_assert_okay_pass_one(circuitmux_t *cmux);
static void circuitmux_assert_okay_pass_two(circuitmux_t *cmux); static void circuitmux_assert_okay_pass_two(circuitmux_t *cmux);
static void circuitmux_assert_okay_pass_three(circuitmux_t *cmux); static void circuitmux_assert_okay_pass_three(circuitmux_t *cmux);
/* Static global variables */
/** Count the destroy balance to debug destroy queue logic */
static int64_t global_destroy_ctr = 0;
/* Function definitions */ /* Function definitions */
/** /**
@ -508,6 +530,30 @@ circuitmux_free(circuitmux_t *cmux)
tor_free(cmux->chanid_circid_map); tor_free(cmux->chanid_circid_map);
} }
/*
* We're throwing away some destroys; log the counter and
* adjust the global counter by the queue size.
*/
if (cmux->destroy_cell_queue.n > 0) {
cmux->destroy_ctr -= cmux->destroy_cell_queue.n;
global_destroy_ctr -= cmux->destroy_cell_queue.n;
log_debug(LD_CIRC,
"Freeing cmux at %p with %u queued destroys; the last cmux "
"destroy balance was "I64_FORMAT", global is "I64_FORMAT,
cmux, cmux->destroy_cell_queue.n,
I64_PRINTF_ARG(cmux->destroy_ctr),
I64_PRINTF_ARG(global_destroy_ctr));
} else {
log_debug(LD_CIRC,
"Freeing cmux at %p with no queued destroys, the cmux destroy "
"balance was "I64_FORMAT", global is "I64_FORMAT,
cmux,
I64_PRINTF_ARG(cmux->destroy_ctr),
I64_PRINTF_ARG(global_destroy_ctr));
}
cell_queue_clear(&cmux->destroy_cell_queue);
tor_free(cmux); tor_free(cmux);
} }
@ -816,7 +862,7 @@ circuitmux_num_cells(circuitmux_t *cmux)
{ {
tor_assert(cmux); tor_assert(cmux);
return cmux->n_cells; return cmux->n_cells + cmux->destroy_cell_queue.n;
} }
/** /**
@ -1368,16 +1414,36 @@ circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
/** /**
* Pick a circuit to send from, using the active circuits list or a * Pick a circuit to send from, using the active circuits list or a
* circuitmux policy if one is available. This is called from channel.c. * circuitmux policy if one is available. This is called from channel.c.
*
* If we would rather send a destroy cell, return NULL and set
* *<b>destroy_queue_out</b> to the destroy queue.
*
* If we have nothing to send, set *<b>destroy_queue_out</b> to NULL and
* return NULL.
*/ */
circuit_t * circuit_t *
circuitmux_get_first_active_circuit(circuitmux_t *cmux) circuitmux_get_first_active_circuit(circuitmux_t *cmux,
cell_queue_t **destroy_queue_out)
{ {
circuit_t *circ = NULL; circuit_t *circ = NULL;
tor_assert(cmux); tor_assert(cmux);
tor_assert(destroy_queue_out);
if (cmux->n_active_circuits > 0) { *destroy_queue_out = NULL;
if (cmux->destroy_cell_queue.n &&
(!cmux->last_cell_was_destroy || cmux->n_active_circuits == 0)) {
/* We have destroy cells to send, and either we just sent a relay cell,
* or we have no relay cells to send. */
/* XXXX We should let the cmux policy have some say in this eventually. */
/* XXXX Alternating is not a terribly brilliant approach here. */
*destroy_queue_out = &cmux->destroy_cell_queue;
cmux->last_cell_was_destroy = 1;
} else if (cmux->n_active_circuits > 0) {
/* We also must have a cell available for this to be the case */ /* We also must have a cell available for this to be the case */
tor_assert(cmux->n_cells > 0); tor_assert(cmux->n_cells > 0);
/* Do we have a policy-provided circuit selector? */ /* Do we have a policy-provided circuit selector? */
@ -1389,7 +1455,11 @@ circuitmux_get_first_active_circuit(circuitmux_t *cmux)
tor_assert(cmux->active_circuits_head); tor_assert(cmux->active_circuits_head);
circ = cmux->active_circuits_head; circ = cmux->active_circuits_head;
} }
} else tor_assert(cmux->n_cells == 0); cmux->last_cell_was_destroy = 0;
} else {
tor_assert(cmux->n_cells == 0);
tor_assert(cmux->destroy_cell_queue.n == 0);
}
return circ; return circ;
} }
@ -1463,6 +1533,26 @@ circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ,
circuitmux_assert_okay_paranoid(cmux); circuitmux_assert_okay_paranoid(cmux);
} }
/**
* Notify the circuitmux that a destroy was sent, so we can update
* the counter.
*/
void
circuitmux_notify_xmit_destroy(circuitmux_t *cmux)
{
tor_assert(cmux);
--(cmux->destroy_ctr);
--(global_destroy_ctr);
log_debug(LD_CIRC,
"Cmux at %p sent a destroy, cmux counter is now "I64_FORMAT", "
"global counter is now "I64_FORMAT,
cmux,
I64_PRINTF_ARG(cmux->destroy_ctr),
I64_PRINTF_ARG(global_destroy_ctr));
}
/* /*
* Circuitmux consistency checking assertions * Circuitmux consistency checking assertions
*/ */
@ -1743,3 +1833,40 @@ circuitmux_assert_okay_pass_three(circuitmux_t *cmux)
} }
} }
/*DOCDOC */
void
circuitmux_append_destroy_cell(channel_t *chan,
circuitmux_t *cmux,
circid_t circ_id,
uint8_t reason)
{
cell_t cell;
memset(&cell, 0, sizeof(cell_t));
cell.circ_id = circ_id;
cell.command = CELL_DESTROY;
cell.payload[0] = (uint8_t) reason;
cell_queue_append_packed_copy(&cmux->destroy_cell_queue, &cell,
chan->wide_circ_ids, 0);
/* Destroy entering the queue, update counters */
++(cmux->destroy_ctr);
++global_destroy_ctr;
log_debug(LD_CIRC,
"Cmux at %p queued a destroy for circ %u, cmux counter is now "
I64_FORMAT", global counter is now "I64_FORMAT,
cmux, circ_id,
I64_PRINTF_ARG(cmux->destroy_ctr),
I64_PRINTF_ARG(global_destroy_ctr));
/* XXXX Duplicate code from append_cell_to_circuit_queue */
if (!channel_has_queued_writes(chan)) {
/* There is no data at all waiting to be sent on the outbuf. Add a
* cell, so that we can notice when it gets flushed, flushed_some can
* get called, and we can start putting more data onto the buffer then.
*/
log_debug(LD_GENERAL, "Primed a buffer.");
channel_flush_from_first_active_circuit(chan, 1);
}
}

View File

@ -120,9 +120,11 @@ unsigned int circuitmux_num_circuits(circuitmux_t *cmux);
unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux); unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux);
/* Channel interface */ /* Channel interface */
circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux); circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux,
cell_queue_t **destroy_queue_out);
void circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ, void circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ,
unsigned int n_cells); unsigned int n_cells);
void circuitmux_notify_xmit_destroy(circuitmux_t *cmux);
/* Circuit interface */ /* Circuit interface */
void circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ, void circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ,
@ -132,5 +134,9 @@ void circuitmux_clear_num_cells(circuitmux_t *cmux, circuit_t *circ);
void circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ, void circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
unsigned int n_cells); unsigned int n_cells);
void circuitmux_append_destroy_cell(channel_t *chan,
circuitmux_t *cmux, circid_t circ_id,
uint8_t reason);
#endif /* TOR_CIRCUITMUX_H */ #endif /* TOR_CIRCUITMUX_H */

View File

@ -2777,6 +2777,13 @@ typedef struct circuit_t {
* allowing n_streams to add any more cells. (OR circuit only.) */ * allowing n_streams to add any more cells. (OR circuit only.) */
unsigned int streams_blocked_on_p_chan : 1; unsigned int streams_blocked_on_p_chan : 1;
/** True iff we have queued a delete backwards on this circuit, but not put
* it on the output buffer. */
unsigned int p_delete_pending : 1;
/** True iff we have queued a delete forwards on this circuit, but not put
* it on the output buffer. */
unsigned int n_delete_pending : 1;
uint8_t state; /**< Current status of this circuit. */ uint8_t state; /**< Current status of this circuit. */
uint8_t purpose; /**< Why are we creating this circuit? */ uint8_t purpose; /**< Why are we creating this circuit? */

View File

@ -2148,11 +2148,11 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
/** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */ /** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
void void
cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell, cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
int wide_circ_ids) int wide_circ_ids, int use_stats)
{ {
packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids); packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids);
/* Remember the time when this cell was put in the queue. */ /* Remember the time when this cell was put in the queue. */
if (get_options()->CellStatistics) { if (get_options()->CellStatistics && use_stats) {
struct timeval now; struct timeval now;
uint32_t added; uint32_t added;
insertion_time_queue_t *it_queue = queue->insertion_times; insertion_time_queue_t *it_queue = queue->insertion_times;
@ -2347,7 +2347,7 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
{ {
circuitmux_t *cmux = NULL; circuitmux_t *cmux = NULL;
int n_flushed = 0; int n_flushed = 0;
cell_queue_t *queue; cell_queue_t *queue, *destroy_queue=NULL;
circuit_t *circ; circuit_t *circ;
or_circuit_t *or_circ; or_circuit_t *or_circ;
int streams_blocked; int streams_blocked;
@ -2360,7 +2360,18 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
/* Main loop: pick a circuit, send a cell, update the cmux */ /* Main loop: pick a circuit, send a cell, update the cmux */
while (n_flushed < max) { while (n_flushed < max) {
circ = circuitmux_get_first_active_circuit(cmux); circ = circuitmux_get_first_active_circuit(cmux, &destroy_queue);
if (destroy_queue) {
/* this code is duplicated from some of the logic below. Ugly! XXXX */
tor_assert(destroy_queue->n > 0);
cell = cell_queue_pop(destroy_queue);
channel_write_packed_cell(chan, cell);
/* Update the cmux destroy counter */
circuitmux_notify_xmit_destroy(cmux);
cell = NULL;
++n_flushed;
continue;
}
/* If it returns NULL, no cells left to send */ /* If it returns NULL, no cells left to send */
if (!circ) break; if (!circ) break;
assert_cmux_ok_paranoid(chan); assert_cmux_ok_paranoid(chan);
@ -2482,7 +2493,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
streams_blocked = circ->streams_blocked_on_p_chan; streams_blocked = circ->streams_blocked_on_p_chan;
} }
cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids); cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids, 1);
/* If we have too many cells on the circuit, we should stop reading from /* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */ * the edge streams for a while. */

View File

@ -53,7 +53,7 @@ void packed_cell_free(packed_cell_t *cell);
void cell_queue_clear(cell_queue_t *queue); void cell_queue_clear(cell_queue_t *queue);
void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell); void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell, void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
int wide_circ_ids); int wide_circ_ids, int use_stats);
void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
cell_t *cell, cell_direction_t direction, cell_t *cell, cell_direction_t direction,