diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c index a1ea32220a..e1c9786b2e 100644 --- a/src/core/mainloop/mainloop.c +++ b/src/core/mainloop/mainloop.c @@ -497,7 +497,7 @@ connection_watch_events(connection_t *conn, watchable_events_t events) /** Return true iff conn is listening for read events. */ int -connection_is_reading(connection_t *conn) +connection_is_reading(const connection_t *conn) { tor_assert(conn); diff --git a/src/core/mainloop/mainloop.h b/src/core/mainloop/mainloop.h index 98d0b3a058..64782c1318 100644 --- a/src/core/mainloop/mainloop.h +++ b/src/core/mainloop/mainloop.h @@ -38,7 +38,7 @@ typedef enum watchable_events { WRITE_EVENT=0x04 /**< We want to know when a connection is writable */ } watchable_events_t; void connection_watch_events(connection_t *conn, watchable_events_t events); -int connection_is_reading(connection_t *conn); +int connection_is_reading(const connection_t *conn); MOCK_DECL(void,connection_stop_reading,(connection_t *conn)); MOCK_DECL(void,connection_start_reading,(connection_t *conn)); diff --git a/src/core/or/circuit_st.h b/src/core/or/circuit_st.h index 7f39c9337e..1afb4d4426 100644 --- a/src/core/or/circuit_st.h +++ b/src/core/or/circuit_st.h @@ -88,11 +88,11 @@ struct circuit_t { extend_info_t *n_hop; /** True iff we are waiting for n_chan_cells to become less full before - * allowing p_streams to add any more cells. (Origin circuit only.) */ - unsigned int streams_blocked_on_n_chan : 1; + * allowing any more cells on this circuit. (Origin circuit only.) */ + unsigned int circuit_blocked_on_n_chan : 1; /** True iff we are waiting for p_chan_cells to become less full before - * allowing n_streams to add any more cells. (OR circuit only.) */ - unsigned int streams_blocked_on_p_chan : 1; + * allowing any more cells on this circuit. (OR circuit only.) */ + unsigned int circuit_blocked_on_p_chan : 1; /** True iff we have queued a delete backwards on this circuit, but not put * it on the output buffer. */ diff --git a/src/core/or/circuituse.c b/src/core/or/circuituse.c index 9110252976..25401aea55 100644 --- a/src/core/or/circuituse.c +++ b/src/core/or/circuituse.c @@ -63,6 +63,7 @@ #include "lib/math/fp.h" #include "lib/time/tvdiff.h" #include "lib/trace/events.h" +#include "src/core/mainloop/mainloop.h" #include "core/or/cpath_build_state_st.h" #include "feature/dircommon/dir_connection_st.h" @@ -938,7 +939,7 @@ circuit_log_ancient_one_hop_circuits(int age) c->marked_for_close, c->hold_open_until_flushed ? "" : "not ", conn->edge_has_sent_end ? "" : "not ", - conn->edge_blocked_on_circ ? "Blocked" : "Not blocked"); + connection_is_reading(c) ? "Not blocked" : "Blocked"); if (! c->linked_conn) continue; diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index 920b57cf00..c7c950d0c8 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -954,11 +954,11 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc, if (CIRCUIT_IS_ORIGIN(circ)) { /* origin circs use n_chan */ chan_q = circ->n_chan_cells.n; - blocked_on_chan = circ->streams_blocked_on_n_chan; + blocked_on_chan = circ->circuit_blocked_on_n_chan; } else { /* Both onion services and exits use or_circuit and p_chan */ chan_q = CONST_TO_OR_CIRCUIT(circ)->p_chan_cells.n; - blocked_on_chan = circ->streams_blocked_on_p_chan; + blocked_on_chan = circ->circuit_blocked_on_p_chan; } /* If we have no EWMA RTT, it is because monotime has been stalled diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h index 942991f139..22f9040d15 100644 --- a/src/core/or/edge_connection_st.h +++ b/src/core/or/edge_connection_st.h @@ -66,9 +66,6 @@ struct edge_connection_t { * connections. Set once we've set the stream end, * and check in connection_about_to_close_connection(). */ - /** True iff we've blocked reading until the circuit has fewer queued - * cells. */ - unsigned int edge_blocked_on_circ:1; /** Unique ID for directory requests; this used to be in connection_t, but * that's going away and being used on channels instead. We still tag diff --git a/src/core/or/relay.c b/src/core/or/relay.c index 38fb560e34..827f0c3e46 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -122,6 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell, edge_connection_t *conn, crypt_path_t *layer_hint, relay_header_t *rh); +static void set_block_state_for_streams(edge_connection_t *stream_list, + int block, streamid_t stream_id); /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? @@ -3005,41 +3007,46 @@ channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out) chan->num_p_circuits = 0; } -/** Block (if block is true) or unblock (if block is false) +/** + * Called when a circuit becomes blocked or unblocked due to the channel + * cell queue. + * + * Block (if block is true) or unblock (if block is false) * every edge connection that is using circ to write to chan, * and start or stop reading as appropriate. - * - * If stream_id is nonzero, block only the edge connection whose - * stream_id matches it. - * - * Returns the number of streams whose status we changed. */ -static int -set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan, - int block, streamid_t stream_id) +static void +set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block) { edge_connection_t *edge = NULL; - int n = 0; if (circ->n_chan == chan) { - circ->streams_blocked_on_n_chan = block; + circ->circuit_blocked_on_n_chan = block; if (CIRCUIT_IS_ORIGIN(circ)) edge = TO_ORIGIN_CIRCUIT(circ)->p_streams; } else { - circ->streams_blocked_on_p_chan = block; + circ->circuit_blocked_on_p_chan = block; tor_assert(!CIRCUIT_IS_ORIGIN(circ)); edge = TO_OR_CIRCUIT(circ)->n_streams; } - for (; edge; edge = edge->next_stream) { + set_block_state_for_streams(edge, block, 0); +} + +/** + * Helper function to block or unblock streams in a stream list. + * + * If stream_id is 0, apply the block state to all streams + * in the stream list. If it is non-zero, only apply to that specific stream. + */ +static void +set_block_state_for_streams(edge_connection_t *stream_list, int block, + streamid_t stream_id) +{ + for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) { connection_t *conn = TO_CONN(edge); if (stream_id && edge->stream_id != stream_id) continue; - if (edge->edge_blocked_on_circ != block) { - ++n; - edge->edge_blocked_on_circ = block; - } - if (!conn->read_event) { /* This connection is a placeholder for something; probably a DNS * request. It can't actually stop or start reading.*/ @@ -3055,8 +3062,6 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan, connection_start_reading(conn); } } - - return n; } /** Extract the command from a packed cell. */ @@ -3094,7 +3099,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) destroy_cell_queue_t *destroy_queue=NULL; circuit_t *circ; or_circuit_t *or_circ; - int streams_blocked; + int circ_blocked; packed_cell_t *cell; /* Get the cmux */ @@ -3134,12 +3139,12 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) if (circ->n_chan == chan) { queue = &circ->n_chan_cells; - streams_blocked = circ->streams_blocked_on_n_chan; + circ_blocked = circ->circuit_blocked_on_n_chan; } else { or_circ = TO_OR_CIRCUIT(circ); tor_assert(or_circ->p_chan == chan); queue = &TO_OR_CIRCUIT(circ)->p_chan_cells; - streams_blocked = circ->streams_blocked_on_p_chan; + circ_blocked = circ->circuit_blocked_on_p_chan; } /* Circuitmux told us this was active, so it should have cells. @@ -3240,8 +3245,8 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ - if (streams_blocked && queue->n <= cell_queue_lowwatermark()) - set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ + if (circ_blocked && queue->n <= cell_queue_lowwatermark()) + set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */ /* If n_flushed < max still, loop around and pick another circuit */ } @@ -3346,9 +3351,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, streamid_t fromstream) { or_circuit_t *orcirc = NULL; + edge_connection_t *stream_list = NULL; cell_queue_t *queue; int32_t max_queue_size; - int streams_blocked; + int circ_blocked; int exitward; if (circ->marked_for_close) return; @@ -3356,13 +3362,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, exitward = (direction == CELL_DIRECTION_OUT); if (exitward) { queue = &circ->n_chan_cells; - streams_blocked = circ->streams_blocked_on_n_chan; + circ_blocked = circ->circuit_blocked_on_n_chan; max_queue_size = max_circuit_cell_queue_size_out; + if (CIRCUIT_IS_ORIGIN(circ)) + stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams; } else { orcirc = TO_OR_CIRCUIT(circ); queue = &orcirc->p_chan_cells; - streams_blocked = circ->streams_blocked_on_p_chan; + circ_blocked = circ->circuit_blocked_on_p_chan; max_queue_size = max_circuit_cell_queue_size; + stream_list = TO_OR_CIRCUIT(circ)->n_streams; } if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) { @@ -3395,14 +3404,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, return; } - /* If we have too many cells on the circuit, we should stop reading from - * the edge streams for a while. */ - if (!streams_blocked && queue->n >= cell_queue_highwatermark()) - set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */ + /* If we have too many cells on the circuit, note that it should + * be blocked from new cells. */ + if (!circ_blocked && queue->n >= cell_queue_highwatermark()) + set_circuit_blocked_on_chan(circ, chan, 1); - if (streams_blocked && fromstream) { - /* This edge connection is apparently not blocked; block it. */ - set_streams_blocked_on_circ(circ, chan, 1, fromstream); + if (circ_blocked && fromstream) { + /* This edge connection is apparently not blocked; this can happen for + * new streams on a blocked circuit, for their CONNECTED response. + * block it now. */ + set_block_state_for_streams(stream_list, 1, fromstream); } update_circuit_on_cmux(circ, direction); @@ -3508,8 +3519,8 @@ static int circuit_queue_streams_are_blocked(circuit_t *circ) { if (CIRCUIT_IS_ORIGIN(circ)) { - return circ->streams_blocked_on_n_chan; + return circ->circuit_blocked_on_n_chan; } else { - return circ->streams_blocked_on_p_chan; + return circ->circuit_blocked_on_p_chan; } } diff --git a/src/test/fakecircs.c b/src/test/fakecircs.c index cca3b43483..caeacd84ef 100644 --- a/src/test/fakecircs.c +++ b/src/test/fakecircs.c @@ -41,8 +41,8 @@ new_fake_orcirc(channel_t *nchan, channel_t *pchan) cell_queue_init(&(circ->n_chan_cells)); circ->n_hop = NULL; - circ->streams_blocked_on_n_chan = 0; - circ->streams_blocked_on_p_chan = 0; + circ->circuit_blocked_on_n_chan = 0; + circ->circuit_blocked_on_p_chan = 0; circ->n_delete_pending = 0; circ->p_delete_pending = 0; circ->received_destroy = 0;