Refactor stream blocking due to channel cell queues

Streams can get blocked on a circuit in two ways:
  1. When the circuit package window is full
  2. When the channel's cell queue is too high

Conflux needs to decouple stream blocking from both of these conditions,
because streams can continue on another circuit, even if the primary circuit
is blocked for either of these cases.

However, both conflux and congestion control need to know if the channel's
cell queue hit the highwatermark and is still draining, because this condition
is used by those components, independent of stream state.

Therefore, this commit renames the 'streams_blocked_on_chan' variable to
signify that it refers to the cell queue state, and also refactors the actual
stream blocking bits out, so they can be handled separately if conflux is
present.
This commit is contained in:
Mike Perry 2023-01-20 19:14:33 +00:00
parent a4ee0c29ee
commit 21c861bfa3
8 changed files with 60 additions and 51 deletions

View File

@ -497,7 +497,7 @@ connection_watch_events(connection_t *conn, watchable_events_t events)
/** Return true iff <b>conn</b> is listening for read events. */
int
connection_is_reading(connection_t *conn)
connection_is_reading(const connection_t *conn)
{
tor_assert(conn);

View File

@ -38,7 +38,7 @@ typedef enum watchable_events {
WRITE_EVENT=0x04 /**< We want to know when a connection is writable */
} watchable_events_t;
void connection_watch_events(connection_t *conn, watchable_events_t events);
int connection_is_reading(connection_t *conn);
int connection_is_reading(const connection_t *conn);
MOCK_DECL(void,connection_stop_reading,(connection_t *conn));
MOCK_DECL(void,connection_start_reading,(connection_t *conn));

View File

@ -88,11 +88,11 @@ struct circuit_t {
extend_info_t *n_hop;
/** True iff we are waiting for n_chan_cells to become less full before
* allowing p_streams to add any more cells. (Origin circuit only.) */
unsigned int streams_blocked_on_n_chan : 1;
* allowing any more cells on this circuit. (Origin circuit only.) */
unsigned int circuit_blocked_on_n_chan : 1;
/** True iff we are waiting for p_chan_cells to become less full before
* allowing n_streams to add any more cells. (OR circuit only.) */
unsigned int streams_blocked_on_p_chan : 1;
* allowing any more cells on this circuit. (OR circuit only.) */
unsigned int circuit_blocked_on_p_chan : 1;
/** True iff we have queued a delete backwards on this circuit, but not put
* it on the output buffer. */

View File

@ -63,6 +63,7 @@
#include "lib/math/fp.h"
#include "lib/time/tvdiff.h"
#include "lib/trace/events.h"
#include "src/core/mainloop/mainloop.h"
#include "core/or/cpath_build_state_st.h"
#include "feature/dircommon/dir_connection_st.h"
@ -938,7 +939,7 @@ circuit_log_ancient_one_hop_circuits(int age)
c->marked_for_close,
c->hold_open_until_flushed ? "" : "not ",
conn->edge_has_sent_end ? "" : "not ",
conn->edge_blocked_on_circ ? "Blocked" : "Not blocked");
connection_is_reading(c) ? "Not blocked" : "Blocked");
if (! c->linked_conn)
continue;

View File

@ -954,11 +954,11 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
if (CIRCUIT_IS_ORIGIN(circ)) {
/* origin circs use n_chan */
chan_q = circ->n_chan_cells.n;
blocked_on_chan = circ->streams_blocked_on_n_chan;
blocked_on_chan = circ->circuit_blocked_on_n_chan;
} else {
/* Both onion services and exits use or_circuit and p_chan */
chan_q = CONST_TO_OR_CIRCUIT(circ)->p_chan_cells.n;
blocked_on_chan = circ->streams_blocked_on_p_chan;
blocked_on_chan = circ->circuit_blocked_on_p_chan;
}
/* If we have no EWMA RTT, it is because monotime has been stalled

View File

@ -66,9 +66,6 @@ struct edge_connection_t {
* connections. Set once we've set the stream end,
* and check in connection_about_to_close_connection().
*/
/** True iff we've blocked reading until the circuit has fewer queued
* cells. */
unsigned int edge_blocked_on_circ:1;
/** Unique ID for directory requests; this used to be in connection_t, but
* that's going away and being used on channels instead. We still tag

View File

@ -122,6 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
static void set_block_state_for_streams(edge_connection_t *stream_list,
int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
@ -3005,41 +3007,46 @@ channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out)
chan->num_p_circuits = 0;
}
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
/**
* Called when a circuit becomes blocked or unblocked due to the channel
* cell queue.
*
* Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
* every edge connection that is using <b>circ</b> to write to <b>chan</b>,
* and start or stop reading as appropriate.
*
* If <b>stream_id</b> is nonzero, block only the edge connection whose
* stream_id matches it.
*
* Returns the number of streams whose status we changed.
*/
static int
set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
int block, streamid_t stream_id)
static void
set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
{
edge_connection_t *edge = NULL;
int n = 0;
if (circ->n_chan == chan) {
circ->streams_blocked_on_n_chan = block;
circ->circuit_blocked_on_n_chan = block;
if (CIRCUIT_IS_ORIGIN(circ))
edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
circ->streams_blocked_on_p_chan = block;
circ->circuit_blocked_on_p_chan = block;
tor_assert(!CIRCUIT_IS_ORIGIN(circ));
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
for (; edge; edge = edge->next_stream) {
set_block_state_for_streams(edge, block, 0);
}
/**
* Helper function to block or unblock streams in a stream list.
*
* If <b>stream_id</id> is 0, apply the <b>block</b> state to all streams
* in the stream list. If it is non-zero, only apply to that specific stream.
*/
static void
set_block_state_for_streams(edge_connection_t *stream_list, int block,
streamid_t stream_id)
{
for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
if (edge->edge_blocked_on_circ != block) {
++n;
edge->edge_blocked_on_circ = block;
}
if (!conn->read_event) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
@ -3055,8 +3062,6 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
connection_start_reading(conn);
}
}
return n;
}
/** Extract the command from a packed cell. */
@ -3094,7 +3099,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
destroy_cell_queue_t *destroy_queue=NULL;
circuit_t *circ;
or_circuit_t *or_circ;
int streams_blocked;
int circ_blocked;
packed_cell_t *cell;
/* Get the cmux */
@ -3134,12 +3139,12 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
if (circ->n_chan == chan) {
queue = &circ->n_chan_cells;
streams_blocked = circ->streams_blocked_on_n_chan;
circ_blocked = circ->circuit_blocked_on_n_chan;
} else {
or_circ = TO_OR_CIRCUIT(circ);
tor_assert(or_circ->p_chan == chan);
queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
streams_blocked = circ->streams_blocked_on_p_chan;
circ_blocked = circ->circuit_blocked_on_p_chan;
}
/* Circuitmux told us this was active, so it should have cells.
@ -3240,8 +3245,8 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
if (streams_blocked && queue->n <= cell_queue_lowwatermark())
set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
if (circ_blocked && queue->n <= cell_queue_lowwatermark())
set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */
/* If n_flushed < max still, loop around and pick another circuit */
}
@ -3346,9 +3351,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
streamid_t fromstream)
{
or_circuit_t *orcirc = NULL;
edge_connection_t *stream_list = NULL;
cell_queue_t *queue;
int32_t max_queue_size;
int streams_blocked;
int circ_blocked;
int exitward;
if (circ->marked_for_close)
return;
@ -3356,13 +3362,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
exitward = (direction == CELL_DIRECTION_OUT);
if (exitward) {
queue = &circ->n_chan_cells;
streams_blocked = circ->streams_blocked_on_n_chan;
circ_blocked = circ->circuit_blocked_on_n_chan;
max_queue_size = max_circuit_cell_queue_size_out;
if (CIRCUIT_IS_ORIGIN(circ))
stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
orcirc = TO_OR_CIRCUIT(circ);
queue = &orcirc->p_chan_cells;
streams_blocked = circ->streams_blocked_on_p_chan;
circ_blocked = circ->circuit_blocked_on_p_chan;
max_queue_size = max_circuit_cell_queue_size;
stream_list = TO_OR_CIRCUIT(circ)->n_streams;
}
if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) {
@ -3395,14 +3404,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
return;
}
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
if (!streams_blocked && queue->n >= cell_queue_highwatermark())
set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
/* If we have too many cells on the circuit, note that it should
* be blocked from new cells. */
if (!circ_blocked && queue->n >= cell_queue_highwatermark())
set_circuit_blocked_on_chan(circ, chan, 1);
if (streams_blocked && fromstream) {
/* This edge connection is apparently not blocked; block it. */
set_streams_blocked_on_circ(circ, chan, 1, fromstream);
if (circ_blocked && fromstream) {
/* This edge connection is apparently not blocked; this can happen for
* new streams on a blocked circuit, for their CONNECTED response.
* block it now. */
set_block_state_for_streams(stream_list, 1, fromstream);
}
update_circuit_on_cmux(circ, direction);
@ -3508,8 +3519,8 @@ static int
circuit_queue_streams_are_blocked(circuit_t *circ)
{
if (CIRCUIT_IS_ORIGIN(circ)) {
return circ->streams_blocked_on_n_chan;
return circ->circuit_blocked_on_n_chan;
} else {
return circ->streams_blocked_on_p_chan;
return circ->circuit_blocked_on_p_chan;
}
}

View File

@ -41,8 +41,8 @@ new_fake_orcirc(channel_t *nchan, channel_t *pchan)
cell_queue_init(&(circ->n_chan_cells));
circ->n_hop = NULL;
circ->streams_blocked_on_n_chan = 0;
circ->streams_blocked_on_p_chan = 0;
circ->circuit_blocked_on_n_chan = 0;
circ->circuit_blocked_on_p_chan = 0;
circ->n_delete_pending = 0;
circ->p_delete_pending = 0;
circ->received_destroy = 0;