Prop#324: Hook up flow control

This commit is contained in:
Mike Perry 2021-08-10 21:35:46 +00:00 committed by David Goulet
parent a89a71cd7b
commit 0422eb26a7
8 changed files with 99 additions and 8 deletions

View File

@ -27,6 +27,7 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
#include "core/or/congestion_control_flow.h"
#include "core/or/circuitlist.h"
#include "core/or/command.h"
#include "core/or/connection_or.h"
@ -630,6 +631,7 @@ tor_init(int argc, char *argv[])
* until we get a consensus */
channelpadding_new_consensus_params(NULL);
circpad_new_consensus_params(NULL);
flow_control_new_consensus_params(NULL);
/* Initialize circuit padding to defaults+torrc until we get a consensus */
circpad_machines_init();

View File

@ -147,6 +147,8 @@
#include "feature/nodelist/routerinfo_st.h"
#include "core/or/socks_request_st.h"
#include "core/or/congestion_control_flow.h"
/**
* On Windows and Linux we cannot reliably bind() a socket to an
* address and port if: 1) There's already a socket bound to wildcard
@ -4594,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force)
!dont_stop_writing) { /* it's done flushing */
if (connection_finished_flushing(conn) < 0) {
/* already marked */
return -1;
goto err;
}
return 0;
goto done;
}
/* Call even if result is 0, since the global write bucket may
@ -4606,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force)
if (n_read > 0 && connection_is_reading(conn))
connection_consider_empty_read_buckets(conn);
done:
/* If this is an edge connection with congestion control, check to see
* if it is time to send an xon */
if (conn_uses_flow_control(conn)) {
flow_control_decide_xon(TO_EDGE_CONN(conn), n_written);
}
return 0;
err:
return -1;
}
/* DOCDOC connection_handle_write */

View File

@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn))
if (connection_should_read_from_linked_conn(conn))
connection_start_reading_from_linked_conn(conn);
} else {
if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) {
/* We should not get called here if we're waiting for an XON, but
* belt-and-suspenders */
log_notice(LD_NET,
"Request to start reading on an edgeconn blocked with XOFF");
return;
}
if (event_add(conn->read_event, NULL))
log_warn(LD_NET, "Error from libevent setting read event state for %d "
"to watched: %s",

View File

@ -210,6 +210,9 @@ struct curve25519_public_key_t;
#define RELAY_COMMAND_PADDING_NEGOTIATE 41
#define RELAY_COMMAND_PADDING_NEGOTIATED 42
#define RELAY_COMMAND_XOFF 43
#define RELAY_COMMAND_XON 44
/* Reasons why an OR connection is closed. */
#define END_OR_CONN_REASON_DONE 1
#define END_OR_CONN_REASON_REFUSED 2 /* connection refused */

View File

@ -98,6 +98,7 @@
#include "core/or/socks_request_st.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@ -1739,6 +1740,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ,
sendme_connection_edge_consider_sending(conn);
}
return 0;
case RELAY_COMMAND_XOFF:
if (!conn) {
if (CIRCUIT_IS_ORIGIN(circ)) {
origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
connection_half_edge_is_valid_data(ocirc->half_streams,
rh->stream_id)) {
circuit_read_valid_data(ocirc, rh->length);
}
}
return 0;
}
if (circuit_process_stream_xoff(conn, layer_hint, cell)) {
if (CIRCUIT_IS_ORIGIN(circ)) {
circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
}
}
return 0;
case RELAY_COMMAND_XON:
if (!conn) {
if (CIRCUIT_IS_ORIGIN(circ)) {
origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
connection_half_edge_is_valid_data(ocirc->half_streams,
rh->stream_id)) {
circuit_read_valid_data(ocirc, rh->length);
}
}
return 0;
}
if (circuit_process_stream_xon(conn, layer_hint, cell)) {
if (CIRCUIT_IS_ORIGIN(circ)) {
circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
}
}
return 0;
case RELAY_COMMAND_END:
reason = rh->length > 0 ?
@ -2287,7 +2326,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
}
/* Handle the stream-level SENDME package window. */
if (sendme_note_stream_data_packaged(conn) < 0) {
if (sendme_note_stream_data_packaged(conn, length) < 0) {
connection_stop_reading(TO_CONN(conn));
log_debug(domain,"conn->package_window reached 0.");
circuit_consider_stop_edge_reading(circ, cpath_layer);
@ -2402,7 +2441,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
if (conn->base_.marked_for_close || conn->package_window <= 0)
if (conn->base_.marked_for_close || conn->package_window <= 0 ||
conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@ -2413,7 +2453,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
if (conn->base_.marked_for_close || conn->package_window <= 0)
if (conn->base_.marked_for_close || conn->package_window <= 0 ||
conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));

View File

@ -22,6 +22,7 @@
#include "core/or/relay.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
#include "feature/nodelist/networkstatus.h"
#include "lib/ctime/di_ops.h"
#include "trunnel/sendme_cell.h"
@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn)
int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
/* If we use flow control, we do not send stream sendmes */
if (edge_uses_flow_control(conn))
goto end;
/* Don't send it if we still have data to deliver. */
if (connection_outbuf_too_full(TO_CONN(conn))) {
goto end;
@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ,
tor_assert(conn);
tor_assert(circ);
if (edge_uses_flow_control(conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Congestion control got stream sendme");
return -END_CIRC_REASON_TORPROTOCOL;
}
/* Don't allow the other endpoint to request more than our maximum (i.e.
* initial) stream SENDME window worth of data. Well-behaved stock clients
* will not request more than this max (as per the check in the while loop
@ -603,7 +614,12 @@ int
sendme_stream_data_received(edge_connection_t *conn)
{
tor_assert(conn);
return --conn->deliver_window;
if (edge_uses_flow_control(conn)) {
return flow_control_decide_xoff(conn);
} else {
return --conn->deliver_window;
}
}
/* Called when a relay DATA cell is packaged on the given circuit. If
@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint)
/* Called when a relay DATA cell is packaged for the given edge connection
* conn. Update the package window and return its new value. */
int
sendme_note_stream_data_packaged(edge_connection_t *conn)
sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len)
{
tor_assert(conn);
if (edge_uses_flow_control(conn)) {
flow_control_note_sent_data(conn, len);
if (conn->xoff_received)
return -1;
else
return 1;
}
--conn->package_window;
log_debug(LD_APP, "Stream package_window now %d.", conn->package_window);
return conn->package_window;

View File

@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint);
/* Update package window functions. */
int sendme_note_circuit_data_packaged(circuit_t *circ,
crypt_path_t *layer_hint);
int sendme_note_stream_data_packaged(edge_connection_t *conn);
int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len);
/* Record cell digest on circuit. */
void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath);

View File

@ -45,6 +45,7 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
#include "core/or/congestion_control_flow.h"
#include "core/or/circuitmux.h"
#include "core/or/circuitmux_ewma.h"
#include "core/or/circuitstats.h"
@ -1699,6 +1700,7 @@ notify_after_networkstatus_changes(void)
channelpadding_new_consensus_params(c);
circpad_new_consensus_params(c);
router_new_consensus_params(c);
flow_control_new_consensus_params(c);
/* Maintenance of our L2 guard list */
maintain_layer2_guards();