Prop#329 Algs: Conflux multiplexed cell receive handling

This commit is contained in:
Mike Perry 2022-12-14 21:03:52 +00:00
parent a1794ef687
commit 2bd1eca78c
2 changed files with 331 additions and 9 deletions

260
src/core/or/conflux.c Normal file
View File

@ -0,0 +1,260 @@
/* Copyright (c) 2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file conflux.c
* \brief Conflux multipath core algorithms
*/
#define TOR_CONFLUX_PRIVATE
#include "core/or/or.h"
#include "core/or/circuit_st.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_st.h"
#include "core/or/origin_circuit_st.h"
#include "core/or/circuitlist.h"
#include "core/or/circuituse.h"
#include "core/or/conflux.h"
#include "core/or/conflux_params.h"
#include "core/or/conflux_util.h"
#include "core/or/conflux_st.h"
#include "lib/time/compat_time.h"
#include "app/config/config.h"
#include "trunnel/extension.h"
/** One million microseconds in a second */
#define USEC_PER_SEC 1000000
/**
* Determine if we should multiplex a specific relay command or not.
*
* TODO: Version of this that is the set of forbidden commands
* on linked circuits
*/
bool
conflux_should_multiplex(int relay_command)
{
switch (relay_command) {
/* These are all fine to multiplex, and must be
* so that ordering is preserved */
case RELAY_COMMAND_BEGIN:
case RELAY_COMMAND_DATA:
case RELAY_COMMAND_END:
case RELAY_COMMAND_CONNECTED:
return true;
/* We can't multiplex these because they are
* circuit-specific */
case RELAY_COMMAND_SENDME:
case RELAY_COMMAND_EXTEND:
case RELAY_COMMAND_EXTENDED:
case RELAY_COMMAND_TRUNCATE:
case RELAY_COMMAND_TRUNCATED:
case RELAY_COMMAND_DROP:
return false;
/* We must multiplex RESOLVEs because their ordering
* impacts begin/end. */
case RELAY_COMMAND_RESOLVE:
case RELAY_COMMAND_RESOLVED:
return true;
/* These are all circuit-specific */
case RELAY_COMMAND_BEGIN_DIR:
case RELAY_COMMAND_EXTEND2:
case RELAY_COMMAND_EXTENDED2:
case RELAY_COMMAND_ESTABLISH_INTRO:
case RELAY_COMMAND_ESTABLISH_RENDEZVOUS:
case RELAY_COMMAND_INTRODUCE1:
case RELAY_COMMAND_INTRODUCE2:
case RELAY_COMMAND_RENDEZVOUS1:
case RELAY_COMMAND_RENDEZVOUS2:
case RELAY_COMMAND_INTRO_ESTABLISHED:
case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
case RELAY_COMMAND_INTRODUCE_ACK:
case RELAY_COMMAND_PADDING_NEGOTIATE:
case RELAY_COMMAND_PADDING_NEGOTIATED:
return false;
/* These must be multiplexed because their ordering
* relative to BEGIN/END must be preserved */
case RELAY_COMMAND_XOFF:
case RELAY_COMMAND_XON:
return true;
/* These two are not multiplexed, because they must
* be processed immediately to update sequence numbers
* before any other cells are processed on the circuit */
case RELAY_COMMAND_CONFLUX_SWITCH:
case RELAY_COMMAND_CONFLUX_LINK:
case RELAY_COMMAND_CONFLUX_LINKED:
case RELAY_COMMAND_CONFLUX_LINKED_ACK:
return false;
default:
log_warn(LD_BUG, "Conflux asked to multiplex unknown relay command %d",
relay_command);
return false;
}
}
/** Return the leg for a circuit in a conflux set. Return NULL if not found. */
conflux_leg_t *
conflux_get_leg(conflux_t *cfx, const circuit_t *circ)
{
conflux_leg_t *leg_found = NULL;
// Find the leg that the cell is written on
CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
if (leg->circ == circ) {
leg_found = leg;
break;
}
} CONFLUX_FOR_EACH_LEG_END(leg);
return leg_found;
}
/**
* Comparison function for ooo_q pqueue.
*
* Ensures that lower sequence numbers are at the head of the pqueue.
*/
static int
conflux_queue_cmp(const void *a, const void *b)
{
// Compare a and b as conflux_cell_t using the seq field, and return a
// comparison result such that the lowest seq is at the head of the pqueue.
const conflux_cell_t *cell_a = a;
const conflux_cell_t *cell_b = b;
tor_assert(cell_a);
tor_assert(cell_b);
if (cell_a->seq < cell_b->seq) {
return -1;
} else if (cell_a->seq > cell_b->seq) {
return 1;
} else {
return 0;
}
}
/**
* Get the congestion control object for a conflux circuit.
*
* Because conflux can only be negotiated with the last hop, we
* can use the last hop of the cpath to obtain the congestion
* control object for origin circuits. For non-origin circuits,
* we can use the circuit itself.
*/
const congestion_control_t *
circuit_ccontrol(const circuit_t *circ)
{
const congestion_control_t *ccontrol = NULL;
tor_assert(circ);
if (CIRCUIT_IS_ORIGIN(circ)) {
tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath);
tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev);
ccontrol = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev->ccontrol;
} else {
ccontrol = circ->ccontrol;
}
/* Conflux circuits always have congestion control*/
tor_assert(ccontrol);
return ccontrol;
}
/**
* Process an incoming relay cell for conflux. Called from
* connection_edge_process_relay_cell().
*
* Returns true if the conflux system now has well-ordered cells to deliver
* to streams, false otherwise.
*/
bool
conflux_process_cell(conflux_t *cfx, circuit_t *in_circ,
crypt_path_t *layer_hint, cell_t *cell)
{
// TODO-329-TUNING: Temporarily validate legs here. We can remove
// this after tuning is complete.
conflux_validate_legs(cfx);
conflux_leg_t *leg = conflux_get_leg(cfx, in_circ);
if (!leg) {
log_warn(LD_BUG, "Got a conflux cell on a circuit without "
"conflux leg. Closing circuit.");
circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
return false;
}
/* We need to make sure this cell came from the expected hop, or
* else it could be a data corruption attack from a middle node. */
if (!conflux_validate_source_hop(in_circ, layer_hint)) {
circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
return false;
}
/* Update the running absolute sequence number */
leg->last_seq_recv++;
/* If this cell is next, fast-path it by processing the cell in-place */
if (leg->last_seq_recv == cfx->last_seq_delivered + 1) {
/* The cell is now ready to be processed, and rest of the queue should
* now be checked for remaining elements */
cfx->last_seq_delivered++;
return true;
} else if (BUG(leg->last_seq_recv <= cfx->last_seq_delivered)) {
log_warn(LD_BUG, "Got a conflux cell with a sequence number "
"less than the last delivered. Closing circuit.");
circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
return false;
} else {
conflux_cell_t *c_cell = tor_malloc_zero(sizeof(conflux_cell_t));
c_cell->seq = leg->last_seq_recv;
memcpy(&c_cell->cell, cell, sizeof(cell_t));
smartlist_pqueue_add(cfx->ooo_q, conflux_queue_cmp,
offsetof(conflux_cell_t, heap_idx), c_cell);
total_ooo_q_bytes += sizeof(cell_t);
/* This cell should not be processed yet, and the queue is not ready
* to process because the next absolute seqnum has not yet arrived */
return false;
}
}
/**
* Dequeue the top cell from our queue.
*
* Returns the cell as a conflux_cell_t, or NULL if the queue is empty
* or has a hole.
*/
conflux_cell_t *
conflux_dequeue_cell(conflux_t *cfx)
{
conflux_cell_t *top = NULL;
if (smartlist_len(cfx->ooo_q) == 0)
return NULL;
top = smartlist_get(cfx->ooo_q, 0);
/* If the top cell is the next sequence number we need, then
* pop and return it. */
if (top->seq == cfx->last_seq_delivered+1) {
smartlist_pqueue_pop(cfx->ooo_q, conflux_queue_cmp,
offsetof(conflux_cell_t, heap_idx));
total_ooo_q_bytes -= sizeof(cell_t);
cfx->last_seq_delivered++;
return top;
} else {
return NULL;
}
}

View File

@ -99,6 +99,7 @@
#include "core/or/sendme.h" #include "core/or/sendme.h"
#include "core/or/congestion_control_common.h" #include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h" #include "core/or/congestion_control_flow.h"
#include "core/or/conflux.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction, cell_direction_t cell_direction,
@ -116,6 +117,11 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
entry_connection_t *conn, entry_connection_t *conn,
node_t *node, node_t *node,
const tor_addr_t *addr); const tor_addr_t *addr);
static int connection_edge_process_ordered_relay_cell(cell_t *cell,
circuit_t *circ,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
/** Stats: how many relay cells have originated at this hop, or have /** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)? * been relayed onward (not recognized at this hop)?
@ -610,7 +616,7 @@ pad_cell_payload(uint8_t *cell_payload, size_t data_len)
* return 0. * return 0.
*/ */
MOCK_IMPL(int, MOCK_IMPL(int,
relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ, relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *orig_circ,
uint8_t relay_command, const char *payload, uint8_t relay_command, const char *payload,
size_t payload_len, crypt_path_t *cpath_layer, size_t payload_len, crypt_path_t *cpath_layer,
const char *filename, int lineno)) const char *filename, int lineno))
@ -640,6 +646,7 @@ relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ,
rh.stream_id = stream_id; rh.stream_id = stream_id;
rh.length = payload_len; rh.length = payload_len;
relay_header_pack(cell.payload, &rh); relay_header_pack(cell.payload, &rh);
if (payload_len) if (payload_len)
memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len); memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len);
@ -2051,9 +2058,6 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
static int num_seen=0; static int num_seen=0;
relay_header_t rh; relay_header_t rh;
unsigned domain = layer_hint?LD_APP:LD_EXIT; unsigned domain = layer_hint?LD_APP:LD_EXIT;
int optimistic_data = 0; /* Set to 1 if we receive data on a stream
* that's in the EXIT_CONN_STATE_RESOLVING
* or EXIT_CONN_STATE_CONNECTING states. */
tor_assert(cell); tor_assert(cell);
tor_assert(circ); tor_assert(circ);
@ -2086,8 +2090,66 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
} }
} }
/* Conflux handling: If conflux is disabled, or the relay command is not
* multiplexed across circuits, then process it immediately.
*
* Otherwise, we need to process the relay cell against our conflux
* queues, and if doing so results in ordered cells to deliver, we
* dequeue and process those in-order until there are no more.
*/
if (!circ->conflux || !conflux_should_multiplex(rh.command)) {
return connection_edge_process_ordered_relay_cell(cell, circ, conn,
layer_hint, &rh);
} else {
// If conflux says this cell is in-order, then begin processing
// cells from queue until there are none. Otherwise, we do nothing
// until further cells arrive.
if (conflux_process_cell(circ->conflux, circ, layer_hint, cell)) {
conflux_cell_t *c_cell = NULL;
int ret = 0;
/* First, process this cell */
if ((ret = connection_edge_process_ordered_relay_cell(cell, circ, conn,
layer_hint, &rh)) < 0) {
return ret;
}
/* Now, check queue for more */
while ((c_cell = conflux_dequeue_cell(circ->conflux))) {
relay_header_unpack(&rh, c_cell->cell.payload);
conn = relay_lookup_conn(circ, &c_cell->cell, CELL_DIRECTION_OUT,
layer_hint);
if ((ret = connection_edge_process_ordered_relay_cell(&c_cell->cell,
circ, conn, layer_hint,
&rh)) < 0) {
/* Negative return value is a fatal error. Return early and tear down
* circuit */
tor_free(c_cell);
return ret;
}
tor_free(c_cell);
}
}
}
return 0;
}
/**
* Helper function to process a relay cell that is in the proper order
* for processing right now. */
static int
connection_edge_process_ordered_relay_cell(cell_t *cell, circuit_t *circ,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh)
{
int optimistic_data = 0; /* Set to 1 if we receive data on a stream
* that's in the EXIT_CONN_STATE_RESOLVING
* or EXIT_CONN_STATE_CONNECTING states. */
/* Tell circpad that we've received a recognized cell */ /* Tell circpad that we've received a recognized cell */
circpad_deliver_recognized_relay_cell_events(circ, rh.command, layer_hint); circpad_deliver_recognized_relay_cell_events(circ, rh->command, layer_hint);
/* either conn is NULL, in which case we've got a control cell, or else /* either conn is NULL, in which case we've got a control cell, or else
* conn points to the recognized stream. */ * conn points to the recognized stream. */
@ -2095,22 +2157,22 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
if (conn->base_.type == CONN_TYPE_EXIT && if (conn->base_.type == CONN_TYPE_EXIT &&
(conn->base_.state == EXIT_CONN_STATE_CONNECTING || (conn->base_.state == EXIT_CONN_STATE_CONNECTING ||
conn->base_.state == EXIT_CONN_STATE_RESOLVING) && conn->base_.state == EXIT_CONN_STATE_RESOLVING) &&
rh.command == RELAY_COMMAND_DATA) { rh->command == RELAY_COMMAND_DATA) {
/* Allow DATA cells to be delivered to an exit node in state /* Allow DATA cells to be delivered to an exit node in state
* EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING. * EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING.
* This speeds up HTTP, for example. */ * This speeds up HTTP, for example. */
optimistic_data = 1; optimistic_data = 1;
} else if (rh.stream_id == 0 && rh.command == RELAY_COMMAND_DATA) { } else if (rh->stream_id == 0 && rh->command == RELAY_COMMAND_DATA) {
log_warn(LD_BUG, "Somehow I had a connection that matched a " log_warn(LD_BUG, "Somehow I had a connection that matched a "
"data cell with stream ID 0."); "data cell with stream ID 0.");
} else { } else {
return connection_edge_process_relay_cell_not_open( return connection_edge_process_relay_cell_not_open(
&rh, cell, circ, conn, layer_hint); rh, cell, circ, conn, layer_hint);
} }
} }
return handle_relay_cell_command(cell, circ, conn, layer_hint, return handle_relay_cell_command(cell, circ, conn, layer_hint,
&rh, optimistic_data); rh, optimistic_data);
} }
/** How many relay_data cells have we built, ever? */ /** How many relay_data cells have we built, ever? */