r12651@Kushana: nickm | 2007-03-24 18:26:42 -0400

Initial version of circuit-based cell queues.  Instead of hammering or_conns with piles of cells, queue cells on their corresponding circuits, and append them to the or_conn as needed.  This seems to work so far, but needs a bit more work.  This will break the memory-use-limitation patch for begin_dir conns: the solution will be a fun but fiddly.


svn:r9904
This commit is contained in:
Nick Mathewson 2007-03-26 14:07:59 +00:00
parent 6e51bdd5e4
commit 38c0bb3a99
8 changed files with 351 additions and 42 deletions

View File

@ -1,4 +1,12 @@
Changes in version 0.2.0.1-alpha - 2007-??-??
o Major features:
- Change the way that Tor buffers data that it is waiting to write.
Instead of queueing data cells in an enormous ring buffer for each
client->OR or OR->OR connection, we now queue cells on a separate
queue for each circuit. This lets us use less slack memory, and
will eventually let us be smarter about prioritizing different kinds
of traffic.
o Security fixes:
- Directory authorities now call routers stable if they have an
uptime of at least 30 days, even if that's not the median uptime
@ -57,8 +65,8 @@ Changes in version 0.2.0.1-alpha - 2007-??-??
to 'getinfo addr-mappings/*'.
o Code simplifications and refactoring
- Stop passing around crypt_path_t pointers that are implicit in other
procedure arguments.
- Stop passing around circuit_t and crypt_path_t pointers that are
implicit in other procedure arguments.
Changes in version 0.1.2.12-rc - 2007-03-16

View File

@ -61,11 +61,21 @@ Things we'd like to do in 0.2.0.x:
_on_ on a socks connection: have edge_connection_t and (say)
dns_request_t both extend an edge_stream_t, and have p_streams and
n_streams both be linked lists of edge_stream_t.
- Make cells get buffered on circuit, not on the or_conn.
- Don't move them into the target conn until there is space on the
. Make cells get buffered on circuit, not on the or_conn.
O Implement cell queues
o Keep doubly-linked list of active circuits on each or_conn.
o Put all relay data on the circuit cell queue, not on the outbuf.
o Don't move them into the target conn until there is space on the
target conn's outbuf.
o When making a circuit active on a connection with an empty buf,
we need to "prime" the buffer, so that we can trigger the "I flushed
some" test.
- Change how directory-bridge-choking works: choke when circuit queue
is full, not when the orconn is "too full".
- Do we switch to arena-allocation for cells?
- Can we stop doing so many memcpys on cells?
- Also, only package data from exitconns when there is space on the
target OR conn's outbuf.
target OR conn's outbuf? or when the circuit is not too full.
- MAYBE kill stalled circuits rather than stalled connections; consider
anonymity implications.
- Move all status info out of routerinfo into local_routerstatus. Make

View File

@ -488,7 +488,7 @@ circuit_deliver_create_cell(circuit_t *circ, uint8_t cell_type,
cell.circ_id = circ->n_circ_id;
memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN);
connection_or_write_cell_to_buf(&cell, circ->n_conn);
append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT);
/* mark it so it gets better rate limiting treatment. */
circ->n_conn->client_used = 1;
@ -650,7 +650,7 @@ circuit_send_next_onion_skin(origin_circuit_t *circ)
return - END_CIRC_REASON_INTERNAL;
}
log_debug(LD_CIRC,"Sending extend relay cell.");
log_info(LD_CIRC,"Sending extend relay cell.");
/* send it to hop->prev, because it will transfer
* it to a create cell and then send to hop */
if (relay_send_command_from_edge(0, TO_CIRCUIT(circ),
@ -988,7 +988,8 @@ onionskin_answer(or_circuit_t *circ, uint8_t cell_type, const char *payload,
circ->is_first_hop = (cell_type == CELL_CREATED_FAST);
connection_or_write_cell_to_buf(&cell, circ->p_conn);
append_cell_to_circuit_queue(TO_CIRCUIT(circ),
circ->p_conn, &cell, CELL_DIRECTION_IN);
log_debug(LD_CIRC,"Finished sending 'created' cell.");
if (!is_local_IP(circ->p_conn->_base.addr) &&

View File

@ -71,10 +71,12 @@ HT_GENERATE(orconn_circid_map, orconn_circid_circuit_map_t, node,
*/
orconn_circid_circuit_map_t *_last_circid_orconn_ent = NULL;
/** DOCDOC */
static void
circuit_set_circid_orconn_helper(circuit_t *circ, uint16_t id,
or_connection_t *conn,
uint16_t old_id, or_connection_t *old_conn)
uint16_t old_id, or_connection_t *old_conn,
int active)
{
orconn_circid_circuit_map_t search;
orconn_circid_circuit_map_t *found;
@ -96,6 +98,8 @@ circuit_set_circid_orconn_helper(circuit_t *circ, uint16_t id,
tor_free(found);
--old_conn->n_circuits;
}
if (active)
make_circuit_inactive_on_conn(circ,old_conn);
}
if (conn == NULL)
@ -114,6 +118,9 @@ circuit_set_circid_orconn_helper(circuit_t *circ, uint16_t id,
found->circuit = circ;
HT_INSERT(orconn_circid_map, &orconn_circid_circuit_map, found);
}
if (active)
make_circuit_active_on_conn(circ,conn);
++conn->n_circuits;
}
@ -126,16 +133,18 @@ circuit_set_p_circid_orconn(or_circuit_t *circ, uint16_t id,
{
uint16_t old_id;
or_connection_t *old_conn;
int active;
old_id = circ->p_circ_id;
old_conn = circ->p_conn;
circ->p_circ_id = id;
circ->p_conn = conn;
active = circ->p_conn_cells.n > 0;
if (id == old_id && conn == old_conn)
return;
circuit_set_circid_orconn_helper(TO_CIRCUIT(circ), id, conn,
old_id, old_conn);
old_id, old_conn, active);
}
/** Set the n_conn field of a circuit <b>circ</b>, along
@ -147,15 +156,17 @@ circuit_set_n_circid_orconn(circuit_t *circ, uint16_t id,
{
uint16_t old_id;
or_connection_t *old_conn;
int active;
old_id = circ->n_circ_id;
old_conn = circ->n_conn;
circ->n_circ_id = id;
circ->n_conn = conn;
active = circ->n_conn_cells.n > 0;
if (id == old_id && conn == old_conn)
return;
circuit_set_circid_orconn_helper(circ, id, conn, old_id, old_conn);
circuit_set_circid_orconn_helper(circ, id, conn, old_id, old_conn, active);
}
/** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
@ -380,12 +391,16 @@ circuit_free(circuit_t *circ)
other->rend_splice = NULL;
}
cell_queue_clear(&ocirc->p_conn_cells);
tor_free(circ->onionskin);
/* remove from map. */
circuit_set_p_circid_orconn(ocirc, 0, NULL);
}
cell_queue_clear(&circ->n_conn_cells);
/* Remove from map. */
circuit_set_n_circid_orconn(circ, 0, NULL);
@ -637,6 +652,9 @@ void
circuit_unlink_all_from_or_conn(or_connection_t *conn, int reason)
{
circuit_t *circ;
connection_or_unlink_all_active_circs(conn);
for (circ = global_circuitlist; circ; circ = circ->next) {
int mark = 0;
if (circ->n_conn == conn) {

View File

@ -305,7 +305,7 @@ static void
command_process_relay_cell(cell_t *cell, or_connection_t *conn)
{
circuit_t *circ;
int reason;
int reason, direction;
circ = circuit_get_by_circid_orconn(cell->circ_id, conn);
@ -323,23 +323,16 @@ command_process_relay_cell(cell_t *cell, or_connection_t *conn)
}
if (!CIRCUIT_IS_ORIGIN(circ) &&
cell->circ_id == TO_OR_CIRCUIT(circ)->p_circ_id) {
/* it's an outgoing cell */
if ((reason = circuit_receive_relay_cell(cell, circ,
CELL_DIRECTION_OUT)) < 0) {
log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
"(forward) failed. Closing.");
circuit_mark_for_close(circ, -reason);
return;
}
} else { /* it's an ingoing cell */
if ((reason = circuit_receive_relay_cell(cell, circ,
CELL_DIRECTION_IN)) < 0) {
log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
"(backward) failed. Closing.");
circuit_mark_for_close(circ, -reason);
return;
}
cell->circ_id == TO_OR_CIRCUIT(circ)->p_circ_id)
direction = CELL_DIRECTION_OUT;
else
direction = CELL_DIRECTION_IN;
if ((reason = circuit_receive_relay_cell(cell, circ, direction)) < 0) {
log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
"(%s) failed. Closing.",
direction==CELL_DIRECTION_OUT?"forward":"backward");
circuit_mark_for_close(circ, -reason);
}
}

View File

@ -232,7 +232,8 @@ connection_or_process_inbuf(or_connection_t *conn)
}
}
/** Called whenever we have flushed some data on an or_conn. */
/** Called whenever we have flushed some data on an or_conn: add more data
* from active circuits. */
int
connection_or_flushed_some(or_connection_t *conn)
{
@ -240,6 +241,15 @@ connection_or_flushed_some(or_connection_t *conn)
connection_or_empty_enough_for_dirserv_data(conn)) {
connection_dirserv_stop_blocking_all_on_or_conn(conn);
}
if (buf_datalen(conn->_base.outbuf) < 16*1024) {
int n = (32*1024 - buf_datalen(conn->_base.outbuf)) / CELL_NETWORK_SIZE;
while (conn->active_circuits && n > 0) {
int flushed;
log_info(LD_GENERAL, "Loop, n==%d",n);
flushed = connection_or_flush_from_first_active_circuit(conn, 1);
n -= flushed;
}
}
return 0;
}
@ -784,26 +794,27 @@ connection_or_send_destroy(uint16_t circ_id, or_connection_t *conn, int reason)
cell.command = CELL_DESTROY;
cell.payload[0] = (uint8_t) reason;
log_debug(LD_OR,"Sending destroy (circID %d).", circ_id);
/* XXXX clear the rest of the cell queue on the circuit. {cells} */
connection_or_write_cell_to_buf(&cell, conn);
return 0;
}
/** A high waterlevel for whether to refill this OR connection
* with more directory information, if any is pending. */
#define BUF_FULLNESS_THRESHOLD (128*1024)
#define DIR_BUF_FULLNESS_THRESHOLD (128*1024)
/** A bottom waterlevel for whether to refill this OR connection
* with more directory information, if any is pending. We don't want
* to make this too low, since we already run the risk of starving
* the pending dir connections if the OR conn is frequently busy with
* other things. */
#define BUF_EMPTINESS_THRESHOLD (96*1024)
#define DIR_BUF_EMPTINESS_THRESHOLD (96*1024)
/** Return true iff there is so much data waiting to be flushed on <b>conn</b>
* that we should stop writing directory data to it. */
int
connection_or_too_full_for_dirserv_data(or_connection_t *conn)
{
return buf_datalen(conn->_base.outbuf) > BUF_FULLNESS_THRESHOLD;
return buf_datalen(conn->_base.outbuf) > DIR_BUF_FULLNESS_THRESHOLD;
}
/** Return true iff there is no longer so much data waiting to be flushed on
@ -814,6 +825,6 @@ connection_or_empty_enough_for_dirserv_data(or_connection_t *conn)
/* Note that the threshold to stop writing is a bit higher than the
* threshold to start again: this should (with any luck) keep us from
* flapping about indefinitely. */
return buf_datalen(conn->_base.outbuf) < BUF_EMPTINESS_THRESHOLD;
return buf_datalen(conn->_base.outbuf) < DIR_BUF_EMPTINESS_THRESHOLD;
}

View File

@ -674,14 +674,23 @@ typedef enum {
/** Largest number of bytes that can fit in a relay cell payload. */
#define RELAY_PAYLOAD_SIZE (CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE)
typedef struct cell_t cell_t;
/** Parsed onion routing cell. All communication between nodes
* is via cells. */
typedef struct {
struct cell_t {
struct cell_t *next; /**< Next cell queued on a this circuit. */
uint16_t circ_id; /**< Circuit which received the cell. */
uint8_t command; /**< Type of the cell: one of PADDING, CREATE, RELAY,
* or DESTROY. */
char payload[CELL_PAYLOAD_SIZE]; /**< Cell body. */
} cell_t;
};
/** DOCDOC */
typedef struct cell_queue_t {
cell_t *head;
cell_t *tail;
int n;
} cell_queue_t;
/** Beginning of a RELAY cell payload. */
typedef struct {
@ -806,6 +815,7 @@ typedef struct or_connection_t {
* bandwidthburst. (OPEN ORs only) */
int n_circuits; /**< How many circuits use this connection as p_conn or
* n_conn ? */
struct circuit_t *active_circuits; /**< DOCDOC */
struct or_connection_t *next_with_same_id; /**< Next connection with same
* identity digest as this one. */
/** Linked list of bridged dirserver connections that can't write until
@ -1374,6 +1384,8 @@ typedef struct circuit_t {
uint32_t magic; /**< For memory and type debugging: must equal
* ORIGIN_CIRCUIT_MAGIC or OR_CIRCUIT_MAGIC. */
/** DOCDOC */
cell_queue_t n_conn_cells;
/** The OR connection that is next in this circuit. */
or_connection_t *n_conn;
/** The identity hash of n_conn. */
@ -1413,6 +1425,8 @@ typedef struct circuit_t {
const char *marked_for_close_file; /**< For debugging: in which file was this
* circuit marked for close? */
struct circuit_t *next_active_on_n_conn; /**< DOCDOC */
struct circuit_t *prev_active_on_n_conn; /**< DOCDOC */
struct circuit_t *next; /**< Next circuit in linked list. */
} circuit_t;
@ -1467,8 +1481,13 @@ typedef struct origin_circuit_t {
typedef struct or_circuit_t {
circuit_t _base;
struct circuit_t *next_active_on_p_conn; /**< DOCDOC */
struct circuit_t *prev_active_on_p_conn; /**< DOCDOC */
/** The circuit_id used in the previous (backward) hop of this circuit. */
circid_t p_circ_id;
/** DOCDOC */
cell_queue_t p_conn_cells;
/** The OR connection that is previous in this circuit. */
or_connection_t *p_conn;
/** Linked list of Exit streams associated with this circuit. */
@ -2631,6 +2650,19 @@ extern uint64_t stats_n_data_bytes_packaged;
extern uint64_t stats_n_data_cells_received;
extern uint64_t stats_n_data_bytes_received;
void cell_queue_clear(cell_queue_t *queue);
void cell_queue_append(cell_queue_t *queue, cell_t *cell);
void cell_queue_append_copy(cell_queue_t *queue, const cell_t *cell);
void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
cell_t *cell, int direction);
void connection_or_unlink_all_active_circs(or_connection_t *conn);
int connection_or_flush_from_first_active_circuit(or_connection_t *conn,
int max);
void assert_active_circuits_ok(or_connection_t *orconn);
void make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn);
void make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn);
/********************************* rephist.c ***************************/
void rep_hist_init(void);

View File

@ -9,7 +9,7 @@ const char relay_c_id[] =
/**
* \file relay.c
* \brief Handle relay cell encryption/decryption, plus packaging and
* receiving from circuits.
* receiving from circuits, plus queueing on circuits.
**/
#include "or.h"
@ -137,7 +137,7 @@ relay_crypt_one_payload(crypto_cipher_env_t *cipher, char *in,
* a conn that the cell is intended for, and deliver it to
* connection_edge.
* - Else connection_or_write_cell_to_buf to the conn on the other
* side of the circuit.
* side of the circuit.DOCDOC
*
* Return -reason on failure.
*/
@ -225,8 +225,12 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, int cell_direction)
}
log_debug(LD_OR,"Passing on unrecognized cell.");
++stats_n_relay_cells_relayed;
connection_or_write_cell_to_buf(cell, or_conn);
++stats_n_relay_cells_relayed; /* XXXX no longer quite accurate {cells}
* we might kill the circ before we relay
* the cells. */
append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction);
return 0;
}
@ -318,7 +322,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, int cell_direction,
/** Package a relay cell from an edge:
* - Encrypt it to the right layer
* - connection_or_write_cell_to_buf to the right conn
* - connection_or_write_cell_to_buf to the right conn DOCDOC
*/
static int
circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
@ -364,7 +368,8 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
return -1;
}
++stats_n_relay_cells_relayed;
connection_or_write_cell_to_buf(cell, conn);
append_cell_to_circuit_queue(circ, conn, cell, cell_direction);
return 0;
}
@ -1457,3 +1462,234 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
}
}
/** DOCDOC */
static INLINE void
cell_free(cell_t *cell)
{
tor_free(cell);
}
/** DOCDOC */
static INLINE cell_t *
cell_copy(const cell_t *cell)
{
cell_t *c = tor_malloc(sizeof(cell_t));
memcpy(c, cell, sizeof(cell_t));
c->next = NULL;
return c;
}
/** DOCDOC */
void
cell_queue_append(cell_queue_t *queue, cell_t *cell)
{
if (queue->tail) {
tor_assert(!queue->tail->next);
queue->tail->next = cell;
} else {
queue->head = cell;
}
queue->tail = cell;
cell->next = NULL;
++queue->n;
}
/** DOCDOC */
void
cell_queue_append_copy(cell_queue_t *queue, const cell_t *cell)
{
cell_queue_append(queue, cell_copy(cell));
}
/** DOCDOC */
void
cell_queue_clear(cell_queue_t *queue)
{
cell_t *cell, *next;
cell = queue->head;
while (cell) {
next = cell->next;
cell_free(cell);
cell = next;
}
queue->head = queue->tail = NULL;
queue->n = 0;
}
/** DOCDOC */
static INLINE cell_t *
cell_queue_pop(cell_queue_t *queue)
{
cell_t *cell = queue->head;
if (!cell)
return NULL;
queue->head = cell->next;
if (cell == queue->tail) {
tor_assert(!queue->head);
queue->tail = NULL;
}
--queue->n;
return cell;
}
static INLINE circuit_t **
next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
{
if (conn == circ->n_conn) {
return &circ->next_active_on_n_conn;
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn);
return &orcirc->next_active_on_p_conn;
}
}
/** DOCDOC */
static INLINE circuit_t **
prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
{
if (conn == circ->n_conn) {
return &circ->prev_active_on_n_conn;
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn);
return &orcirc->prev_active_on_p_conn;
}
}
/** DOCDOC */
void
make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
{
if (! conn->active_circuits) {
conn->active_circuits = circ;
*prev_circ_on_conn_p(circ, conn) = circ;
*next_circ_on_conn_p(circ, conn) = circ;
} else {
circuit_t *head = conn->active_circuits;
circuit_t *old_tail = *prev_circ_on_conn_p(head, conn);
*next_circ_on_conn_p(old_tail, conn) = circ;
*next_circ_on_conn_p(circ, conn) = head;
*prev_circ_on_conn_p(head, conn) = circ;
*prev_circ_on_conn_p(circ, conn) = old_tail;
}
}
/** DOCDOC */
void
make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
{
// XXXX add some assert.
circuit_t *next = *next_circ_on_conn_p(circ, conn);
circuit_t *prev = *next_circ_on_conn_p(circ, conn);
if (next == circ) {
conn->active_circuits = NULL;
} else {
*prev_circ_on_conn_p(next, conn) = prev;
*next_circ_on_conn_p(prev, conn) = next;
if (conn->active_circuits == circ)
conn->active_circuits = next;
}
*prev_circ_on_conn_p(circ, conn) = NULL;
*next_circ_on_conn_p(circ, conn) = NULL;
}
/** DOCDOC */
void
connection_or_unlink_all_active_circs(or_connection_t *orconn)
{
circuit_t *head = orconn->active_circuits;
circuit_t *cur = head;
if (! head)
return;
do {
circuit_t *next = *next_circ_on_conn_p(cur, orconn);
*prev_circ_on_conn_p(cur, orconn) = NULL;
*next_circ_on_conn_p(cur, orconn) = NULL;
cur = next;
} while (cur != head);
orconn->active_circuits = NULL;
}
/** DOCDOC */
int
connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max)
{
int n_flushed;
cell_queue_t *queue;
circuit_t *circ;
circ = conn->active_circuits;
if (!circ) return 0;
if (circ->n_conn == conn)
queue = &circ->n_conn_cells;
else
queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
for (n_flushed = 0; n_flushed < max && queue->head; ++n_flushed) {
cell_t *cell = cell_queue_pop(queue);
connection_or_write_cell_to_buf(cell, conn);
cell_free(cell);
log_info(LD_GENERAL, "flushed a cell; n ==%d", queue->n);
++n_flushed;
}
conn->active_circuits = *next_circ_on_conn_p(circ, conn);
if (queue->n == 0) {
log_info(LD_GENERAL, "Made a circuit inactive.");
make_circuit_inactive_on_conn(circ, conn);
}
return n_flushed;
}
/** DOCDOC */
void
append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
cell_t *cell, int direction)
{
cell_queue_t *queue;
if (direction == CELL_DIRECTION_OUT) {
queue = &circ->n_conn_cells;
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
queue = &orcirc->p_conn_cells;
}
cell_queue_append_copy(queue, cell);
log_info(LD_GENERAL, "Added a cell; n ==%d", queue->n);
if (queue->n == 1) {
/* This was the first cell added to the queue. We need to make this
* circuit active. */
log_info(LD_GENERAL, "Made a circuit active.");
make_circuit_active_on_conn(circ, orconn);
}
if (! buf_datalen(orconn->_base.outbuf)) {
/* XXXX Should this be a "<16K"? {cells} */
/* There is no data at all waiting to be sent on the outbuf. Add a
* cell, so that we can notice when it gets flushed, flushed_some can
* get called, and we can start putting more data onto the buffer then.
*/
log_info(LD_GENERAL, "Primed a buffer.");
connection_or_flush_from_first_active_circuit(orconn, 1);
}
}
void
assert_active_circuits_ok(or_connection_t *orconn)
{
circuit_t *head = orconn->active_circuits;
circuit_t *cur = head;
if (! head)
return;
do {
circuit_t *next = *next_circ_on_conn_p(cur, orconn);
circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
tor_assert(next);
tor_assert(prev);
tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
cur = next;
} while (cur != head);
}