From f89323afdadadb8db7eb48f7cbe75c5f4384dae4 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 2 Sep 2010 15:26:17 -0400 Subject: [PATCH] Fix behavior of adding a cell to a blocked queue. We frequently add cells to stream-blocked queues for valid reasons that don't mean we need to block streams. The most obvious reason is if the cell arrives over a circuit rather than from an edge: we don't block circuits, no matter how full queues get. The next most obvious reason is that we allow CONNECTED cells from a newly created stream to get delivered just fine. This patch changes the behavior so that we only iterate over the streams on a circuit when the cell in question came from a stream, and we only block the stream that generated the cell, so that other streams can still get their CONNECTEDs in. --- src/or/circuitbuild.c | 4 ++-- src/or/relay.c | 36 ++++++++++++++++++------------------ src/or/relay.h | 3 ++- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c index e5e7d22195..5567b246ab 100644 --- a/src/or/circuitbuild.c +++ b/src/or/circuitbuild.c @@ -1752,7 +1752,7 @@ circuit_deliver_create_cell(circuit_t *circ, uint8_t cell_type, cell.circ_id = circ->n_circ_id; memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN); - append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT); + append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT, 0); if (CIRCUIT_IS_ORIGIN(circ)) { /* mark it so it gets better rate limiting treatment. */ @@ -2329,7 +2329,7 @@ onionskin_answer(or_circuit_t *circ, uint8_t cell_type, const char *payload, circ->is_first_hop = (cell_type == CELL_CREATED_FAST); append_cell_to_circuit_queue(TO_CIRCUIT(circ), - circ->p_conn, &cell, CELL_DIRECTION_IN); + circ->p_conn, &cell, CELL_DIRECTION_IN, 0); log_debug(LD_CIRC,"Finished sending 'created' cell."); if (!is_local_addr(&circ->p_conn->_base.addr) && diff --git a/src/or/relay.c b/src/or/relay.c index 88106e5d96..794f448523 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -269,7 +269,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, * we might kill the circ before we relay * the cells. */ - append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction); + append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0); return 0; } @@ -366,7 +366,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, static int circuit_package_relay_cell(cell_t *cell, circuit_t *circ, cell_direction_t cell_direction, - crypt_path_t *layer_hint) + crypt_path_t *layer_hint, uint16_t on_stream) { or_connection_t *conn; /* where to send the cell */ @@ -410,7 +410,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ, } ++stats_n_relay_cells_relayed; - append_cell_to_circuit_queue(circ, conn, cell, cell_direction); + append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream); return 0; } @@ -625,7 +625,7 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ, } } - if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer) + if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer, 0) < 0) { log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing."); circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL); @@ -2102,11 +2102,14 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn) * every edge connection that is using circ to write to orconn, * and start or stop reading as appropriate. * + * If stream_id is nonzero, block only the edge connection whose + * stream_id matches it. + * * Returns the number of streams whose status we changed. */ static int set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, - int block) + int block, uint16_t stream_id) { edge_connection_t *edge = NULL; int n = 0; @@ -2122,6 +2125,9 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, for (; edge; edge = edge->next_stream) { connection_t *conn = TO_CONN(edge); + if (stream_id && edge->stream_id != stream_id) + continue; + if (edge->edge_blocked_on_circ != block) { ++n; edge->edge_blocked_on_circ = block; @@ -2269,7 +2275,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) - set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */ + set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */ /* Did we just run out of cells on this circuit's queue? */ if (queue->n == 0) { @@ -2286,7 +2292,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, * transmitting in direction. */ void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, - cell_t *cell, cell_direction_t direction) + cell_t *cell, cell_direction_t direction, + uint16_t fromstream) { cell_queue_t *queue; int streams_blocked; @@ -2308,18 +2315,11 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) - set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */ + set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */ - if (streams_blocked) { - /* We must have missed a connection! */ - int n = set_streams_blocked_on_circ(circ, orconn, 1); - if (n) { - log_info(LD_BUG, "Got a cell added to a cell queue when streams were " - "supposed to be blocked; found that %d streams weren't.", n); - } else { - log_info(LD_BUG, "Got a cell added to a cell queue when streams were " - "all blocked. We should figure out why."); - } + if (streams_blocked && fromstream) { + /* This edge connection is apparently not blocked; block it. */ + set_streams_blocked_on_circ(circ, orconn, 1, fromstream); } if (queue->n == 1) { diff --git a/src/or/relay.h b/src/or/relay.h index 73855a52bf..088ef3228a 100644 --- a/src/or/relay.h +++ b/src/or/relay.h @@ -45,7 +45,8 @@ void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell); void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell); void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, - cell_t *cell, cell_direction_t direction); + cell_t *cell, cell_direction_t direction, + uint16_t fromstream); void connection_or_unlink_all_active_circs(or_connection_t *conn); int connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, time_t now);