diff --git a/changes/bug1653 b/changes/bug1653
new file mode 100644
index 0000000000..ecb0b70f1d
--- /dev/null
+++ b/changes/bug1653
@@ -0,0 +1,8 @@
+ o Major bugfixes:
+ - When the exit relay gets a circuit-level sendme cell, it started
+ reading on the exit streams, even if had 500 cells queued in our
+ circuit queue already, so our circuit queue just grew and grew
+ in some cases. We fix this by not re-enabling reading on SENDME
+ while the cell queue is blocked. Fixes bug 1653. Bugfix on
+ 0.2.0.1-alpha. Detected by Mashael AlSabah. Original patch by
+ "yetonetime".
diff --git a/changes/detect-full-queues b/changes/detect-full-queues
new file mode 100644
index 0000000000..c00e3ea8cc
--- /dev/null
+++ b/changes/detect-full-queues
@@ -0,0 +1,8 @@
+ o Major bugfixes:
+ - Newly created streams were allowed to read cells onto circuits,
+ even if the circuit's cell queue was blocked and waiting to drain.
+ This created potential unfairness, as older streams would be
+ blocked, but newer streams would gladly fill the queue completely.
+ We add code to detect this situation and prevent any stream from
+ getting more than one free cell. Bugfix on 0.2.0.1-alpha.
+ Possible partial fix for bug 1298.
diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c
index e5e7d22195..5567b246ab 100644
--- a/src/or/circuitbuild.c
+++ b/src/or/circuitbuild.c
@@ -1752,7 +1752,7 @@ circuit_deliver_create_cell(circuit_t *circ, uint8_t cell_type,
cell.circ_id = circ->n_circ_id;
memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN);
- append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT);
+ append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT, 0);
if (CIRCUIT_IS_ORIGIN(circ)) {
/* mark it so it gets better rate limiting treatment. */
@@ -2329,7 +2329,7 @@ onionskin_answer(or_circuit_t *circ, uint8_t cell_type, const char *payload,
circ->is_first_hop = (cell_type == CELL_CREATED_FAST);
append_cell_to_circuit_queue(TO_CIRCUIT(circ),
- circ->p_conn, &cell, CELL_DIRECTION_IN);
+ circ->p_conn, &cell, CELL_DIRECTION_IN, 0);
log_debug(LD_CIRC,"Finished sending 'created' cell.");
if (!is_local_addr(&circ->p_conn->_base.addr) &&
diff --git a/src/or/relay.c b/src/or/relay.c
index b85cc84c52..d0986c8d4e 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -52,6 +52,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *conn,
crypt_path_t *layer_hint);
static int
circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint);
+static int circuit_queue_streams_are_blocked(circuit_t *circ);
/** Cache the current hi-res time; the cache gets reset when libevent
* calls us. */
@@ -268,7 +269,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
* we might kill the circ before we relay
* the cells. */
- append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction);
+ append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
return 0;
}
@@ -365,7 +366,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
static int
circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
cell_direction_t cell_direction,
- crypt_path_t *layer_hint)
+ crypt_path_t *layer_hint, uint16_t on_stream)
{
or_connection_t *conn; /* where to send the cell */
@@ -409,7 +410,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
}
++stats_n_relay_cells_relayed;
- append_cell_to_circuit_queue(circ, conn, cell, cell_direction);
+ append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
return 0;
}
@@ -624,8 +625,8 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
}
}
- if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer)
- < 0) {
+ if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer,
+ stream_id) < 0) {
log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
return -1;
@@ -1236,6 +1237,10 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
conn->package_window += STREAMWINDOW_INCREMENT;
log_debug(domain,"stream-level sendme, packagewindow now %d.",
conn->package_window);
+ if (circuit_queue_streams_are_blocked(circ)) {
+ /* Still waiting for queue to flush; don't touch conn */
+ return 0;
+ }
connection_start_reading(TO_CONN(conn));
/* handle whatever might still be on the inbuf */
if (connection_edge_package_raw_inbuf(conn, 1) < 0) {
@@ -1436,7 +1441,10 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn)
static void
circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
{
-
+ if (circuit_queue_streams_are_blocked(circ)) {
+ log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
+ return;
+ }
log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
if (CIRCUIT_IS_ORIGIN(circ))
@@ -2092,12 +2100,19 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
/** Block (if block is true) or unblock (if block is false)
* every edge connection that is using circ to write to orconn,
- * and start or stop reading as appropriate. */
-static void
+ * and start or stop reading as appropriate.
+ *
+ * If stream_id is nonzero, block only the edge connection whose
+ * stream_id matches it.
+ *
+ * Returns the number of streams whose status we changed.
+ */
+static int
set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
- int block)
+ int block, uint16_t stream_id)
{
edge_connection_t *edge = NULL;
+ int n = 0;
if (circ->n_conn == orconn) {
circ->streams_blocked_on_n_conn = block;
if (CIRCUIT_IS_ORIGIN(circ))
@@ -2110,7 +2125,13 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
for (; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
- edge->edge_blocked_on_circ = block;
+ if (stream_id && edge->stream_id != stream_id)
+ continue;
+
+ if (edge->edge_blocked_on_circ != block) {
+ ++n;
+ edge->edge_blocked_on_circ = block;
+ }
if (!conn->read_event) {
/* This connection is a placeholder for something; probably a DNS
@@ -2127,6 +2148,8 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
connection_start_reading(conn);
}
}
+
+ return n;
}
/** Pull as many cells as possible (but no more than max) from the
@@ -2252,7 +2275,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
- set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */
+ set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
/* Did we just run out of cells on this circuit's queue? */
if (queue->n == 0) {
@@ -2269,7 +2292,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
* transmitting in direction. */
void
append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
- cell_t *cell, cell_direction_t direction)
+ cell_t *cell, cell_direction_t direction,
+ uint16_t fromstream)
{
cell_queue_t *queue;
int streams_blocked;
@@ -2291,7 +2315,12 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
- set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */
+ set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
+
+ if (streams_blocked && fromstream) {
+ /* This edge connection is apparently not blocked; block it. */
+ set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
+ }
if (queue->n == 1) {
/* This was the first cell added to the queue. We need to make this
@@ -2405,3 +2434,15 @@ assert_active_circuits_ok(or_connection_t *orconn)
tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
}
+/** Return 1 if we shouldn't restart reading on this circuit, even if
+ * we get a SENDME. Else return 0.
+*/
+static int
+circuit_queue_streams_are_blocked(circuit_t *circ)
+{
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ return circ->streams_blocked_on_n_conn;
+ } else {
+ return circ->streams_blocked_on_p_conn;
+ }
+}
diff --git a/src/or/relay.h b/src/or/relay.h
index 73855a52bf..088ef3228a 100644
--- a/src/or/relay.h
+++ b/src/or/relay.h
@@ -45,7 +45,8 @@ void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell);
void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
- cell_t *cell, cell_direction_t direction);
+ cell_t *cell, cell_direction_t direction,
+ uint16_t fromstream);
void connection_or_unlink_all_active_circs(or_connection_t *conn);
int connection_or_flush_from_first_active_circuit(or_connection_t *conn,
int max, time_t now);