Fix behavior of adding a cell to a blocked queue.

We frequently add cells to stream-blocked queues for valid reasons
that don't mean we need to block streams.  The most obvious reason
is if the cell arrives over a circuit rather than from an edge: we
don't block circuits, no matter how full queues get.  The next most
obvious reason is that we allow CONNECTED cells from a newly created
stream to get delivered just fine.

This patch changes the behavior so that we only iterate over the
streams on a circuit when the cell in question came from a stream,
and we only block the stream that generated the cell, so that other
streams can still get their CONNECTEDs in.
This commit is contained in:
Nick Mathewson 2010-09-02 15:26:17 -04:00
parent 9456da17db
commit f89323afda
3 changed files with 22 additions and 21 deletions

View File

@ -1752,7 +1752,7 @@ circuit_deliver_create_cell(circuit_t *circ, uint8_t cell_type,
cell.circ_id = circ->n_circ_id; cell.circ_id = circ->n_circ_id;
memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN); memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN);
append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT); append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT, 0);
if (CIRCUIT_IS_ORIGIN(circ)) { if (CIRCUIT_IS_ORIGIN(circ)) {
/* mark it so it gets better rate limiting treatment. */ /* mark it so it gets better rate limiting treatment. */
@ -2329,7 +2329,7 @@ onionskin_answer(or_circuit_t *circ, uint8_t cell_type, const char *payload,
circ->is_first_hop = (cell_type == CELL_CREATED_FAST); circ->is_first_hop = (cell_type == CELL_CREATED_FAST);
append_cell_to_circuit_queue(TO_CIRCUIT(circ), append_cell_to_circuit_queue(TO_CIRCUIT(circ),
circ->p_conn, &cell, CELL_DIRECTION_IN); circ->p_conn, &cell, CELL_DIRECTION_IN, 0);
log_debug(LD_CIRC,"Finished sending 'created' cell."); log_debug(LD_CIRC,"Finished sending 'created' cell.");
if (!is_local_addr(&circ->p_conn->_base.addr) && if (!is_local_addr(&circ->p_conn->_base.addr) &&

View File

@ -269,7 +269,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
* we might kill the circ before we relay * we might kill the circ before we relay
* the cells. */ * the cells. */
append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction); append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
return 0; return 0;
} }
@ -366,7 +366,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
static int static int
circuit_package_relay_cell(cell_t *cell, circuit_t *circ, circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
cell_direction_t cell_direction, cell_direction_t cell_direction,
crypt_path_t *layer_hint) crypt_path_t *layer_hint, uint16_t on_stream)
{ {
or_connection_t *conn; /* where to send the cell */ or_connection_t *conn; /* where to send the cell */
@ -410,7 +410,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
} }
++stats_n_relay_cells_relayed; ++stats_n_relay_cells_relayed;
append_cell_to_circuit_queue(circ, conn, cell, cell_direction); append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
return 0; return 0;
} }
@ -625,7 +625,7 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
} }
} }
if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer) if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer, 0)
< 0) { < 0) {
log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing."); log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL); circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
@ -2102,11 +2102,14 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
* every edge connection that is using <b>circ</b> to write to <b>orconn</b>, * every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
* and start or stop reading as appropriate. * and start or stop reading as appropriate.
* *
* If <b>stream_id</b> is nonzero, block only the edge connection whose
* stream_id matches it.
*
* Returns the number of streams whose status we changed. * Returns the number of streams whose status we changed.
*/ */
static int static int
set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
int block) int block, uint16_t stream_id)
{ {
edge_connection_t *edge = NULL; edge_connection_t *edge = NULL;
int n = 0; int n = 0;
@ -2122,6 +2125,9 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
for (; edge; edge = edge->next_stream) { for (; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge); connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
if (edge->edge_blocked_on_circ != block) { if (edge->edge_blocked_on_circ != block) {
++n; ++n;
edge->edge_blocked_on_circ = block; edge->edge_blocked_on_circ = block;
@ -2269,7 +2275,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
/* Is the cell queue low enough to unblock all the streams that are waiting /* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */ * to write to this circuit? */
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */ set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
/* Did we just run out of cells on this circuit's queue? */ /* Did we just run out of cells on this circuit's queue? */
if (queue->n == 0) { if (queue->n == 0) {
@ -2286,7 +2292,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
* transmitting in <b>direction</b>. */ * transmitting in <b>direction</b>. */
void void
append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
cell_t *cell, cell_direction_t direction) cell_t *cell, cell_direction_t direction,
uint16_t fromstream)
{ {
cell_queue_t *queue; cell_queue_t *queue;
int streams_blocked; int streams_blocked;
@ -2308,18 +2315,11 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
/* If we have too many cells on the circuit, we should stop reading from /* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */ * the edge streams for a while. */
if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */ set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
if (streams_blocked) { if (streams_blocked && fromstream) {
/* We must have missed a connection! */ /* This edge connection is apparently not blocked; block it. */
int n = set_streams_blocked_on_circ(circ, orconn, 1); set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
if (n) {
log_info(LD_BUG, "Got a cell added to a cell queue when streams were "
"supposed to be blocked; found that %d streams weren't.", n);
} else {
log_info(LD_BUG, "Got a cell added to a cell queue when streams were "
"all blocked. We should figure out why.");
}
} }
if (queue->n == 1) { if (queue->n == 1) {

View File

@ -45,7 +45,8 @@ void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell); void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell);
void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
cell_t *cell, cell_direction_t direction); cell_t *cell, cell_direction_t direction,
uint16_t fromstream);
void connection_or_unlink_all_active_circs(or_connection_t *conn); void connection_or_unlink_all_active_circs(or_connection_t *conn);
int connection_or_flush_from_first_active_circuit(or_connection_t *conn, int connection_or_flush_from_first_active_circuit(or_connection_t *conn,
int max, time_t now); int max, time_t now);