Use SIMPLEQ, not smartlist_t, for channel cell queues.

This lets  us use fewer memory allocations, and avoid O(n^2) iterations
This commit is contained in:
Nick Mathewson 2012-10-12 17:58:01 -04:00
parent b555388dac
commit 7c9954a02a
2 changed files with 65 additions and 100 deletions

View File

@ -31,6 +31,7 @@
typedef struct cell_queue_entry_s cell_queue_entry_t; typedef struct cell_queue_entry_s cell_queue_entry_t;
struct cell_queue_entry_s { struct cell_queue_entry_s {
SIMPLEQ_ENTRY(cell_queue_entry_s) next;
enum { enum {
CELL_QUEUE_FIXED, CELL_QUEUE_FIXED,
CELL_QUEUE_VAR, CELL_QUEUE_VAR,
@ -768,6 +769,10 @@ channel_init(channel_t *chan)
/* Init next_circ_id */ /* Init next_circ_id */
chan->next_circ_id = crypto_rand_int(1 << 15); chan->next_circ_id = crypto_rand_int(1 << 15);
/* Initialize queues. */
SIMPLEQ_INIT(&chan->incoming_queue);
SIMPLEQ_INIT(&chan->outgoing_queue);
/* Timestamp it */ /* Timestamp it */
channel_timestamp_created(chan); channel_timestamp_created(chan);
} }
@ -861,6 +866,7 @@ channel_listener_free(channel_listener_t *chan_l)
static void static void
channel_force_free(channel_t *chan) channel_force_free(channel_t *chan)
{ {
cell_queue_entry_t *cell, *cell_tmp;
tor_assert(chan); tor_assert(chan);
/* Call a free method if there is one */ /* Call a free method if there is one */
@ -869,26 +875,16 @@ channel_force_free(channel_t *chan)
channel_clear_remote_end(chan); channel_clear_remote_end(chan);
/* We might still have a cell queue; kill it */ /* We might still have a cell queue; kill it */
if (chan->incoming_queue) { SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) {
SMARTLIST_FOREACH_BEGIN(chan->incoming_queue, cell_queue_entry_free(cell, 0);
cell_queue_entry_t *, q) {
cell_queue_entry_free(q, 0);
} SMARTLIST_FOREACH_END(q);
smartlist_free(chan->incoming_queue);
chan->incoming_queue = NULL;
} }
SIMPLEQ_INIT(&chan->incoming_queue);
/* Outgoing cell queue is similar, but we can have to free packed cells */ /* Outgoing cell queue is similar, but we can have to free packed cells */
if (chan->outgoing_queue) { SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) {
SMARTLIST_FOREACH_BEGIN(chan->outgoing_queue, cell_queue_entry_free(cell, 0);
cell_queue_entry_t *, q) {
cell_queue_entry_free(q, 0);
} SMARTLIST_FOREACH_END(q);
smartlist_free(chan->outgoing_queue);
chan->outgoing_queue = NULL;
} }
SIMPLEQ_INIT(&chan->outgoing_queue);
tor_free(chan); tor_free(chan);
} }
@ -1046,8 +1042,7 @@ channel_set_cell_handlers(channel_t *chan,
chan->var_cell_handler = var_cell_handler; chan->var_cell_handler = var_cell_handler;
/* Re-run the queue if we have one and there's any reason to */ /* Re-run the queue if we have one and there's any reason to */
if (chan->incoming_queue && if (! SIMPLEQ_EMPTY(&chan->incoming_queue) &&
(smartlist_len(chan->incoming_queue) > 0) &&
try_again && try_again &&
(chan->cell_handler || (chan->cell_handler ||
chan->var_cell_handler)) channel_process_cells(chan); chan->var_cell_handler)) channel_process_cells(chan);
@ -1669,9 +1664,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
} }
/* Can we send it right out? If so, try */ /* Can we send it right out? If so, try */
if (!(chan->outgoing_queue && if (SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
(smartlist_len(chan->outgoing_queue) > 0)) && chan->state == CHANNEL_STATE_OPEN) {
chan->state == CHANNEL_STATE_OPEN) {
/* Pick the right write function for this cell type and save the result */ /* Pick the right write function for this cell type and save the result */
switch (q->type) { switch (q->type) {
case CELL_QUEUE_FIXED: case CELL_QUEUE_FIXED:
@ -1707,14 +1701,12 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
if (!sent) { if (!sent) {
/* Not sent, queue it */ /* Not sent, queue it */
if (!(chan->outgoing_queue))
chan->outgoing_queue = smartlist_new();
/* /*
* We have to copy the queue entry passed in, since the caller probably * We have to copy the queue entry passed in, since the caller probably
* used the stack. * used the stack.
*/ */
tmp = cell_queue_entry_dup(q); tmp = cell_queue_entry_dup(q);
smartlist_add(chan->outgoing_queue, tmp); SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next);
/* Try to process the queue? */ /* Try to process the queue? */
if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan); if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan);
} }
@ -1900,19 +1892,15 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
channel_do_open_actions(chan); channel_do_open_actions(chan);
/* Check for queued cells to process */ /* Check for queued cells to process */
if (chan->incoming_queue && if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
smartlist_len(chan->incoming_queue) > 0)
channel_process_cells(chan); channel_process_cells(chan);
if (chan->outgoing_queue && if (! SIMPLEQ_EMPTY(&chan->outgoing_queue))
smartlist_len(chan->outgoing_queue) > 0)
channel_flush_cells(chan); channel_flush_cells(chan);
} else if (to_state == CHANNEL_STATE_CLOSED || } else if (to_state == CHANNEL_STATE_CLOSED ||
to_state == CHANNEL_STATE_ERROR) { to_state == CHANNEL_STATE_ERROR) {
/* Assert that all queues are empty */ /* Assert that all queues are empty */
tor_assert(!(chan->incoming_queue) || tor_assert(SIMPLEQ_EMPTY(&chan->incoming_queue));
smartlist_len(chan->incoming_queue) == 0); tor_assert(SIMPLEQ_EMPTY(&chan->outgoing_queue));
tor_assert(!(chan->outgoing_queue) ||
smartlist_len(chan->outgoing_queue) == 0);
} }
} }
@ -2086,16 +2074,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
/* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */
if (chan->state == CHANNEL_STATE_OPEN) { if (chan->state == CHANNEL_STATE_OPEN) {
while ((unlimited || num_cells > flushed) && while ((unlimited || num_cells > flushed) &&
(chan->outgoing_queue && NULL != (q = SIMPLEQ_FIRST(&chan->outgoing_queue))) {
(smartlist_len(chan->outgoing_queue) > 0))) {
/*
* Ewww, smartlist_del_keeporder() is O(n) in list length; maybe a
* a linked list would make more sense for the queue.
*/
/* Get the head of the queue */ if (1) {
q = smartlist_get(chan->outgoing_queue, 0);
if (q) {
/* /*
* Okay, we have a good queue entry, try to give it to the lower * Okay, we have a good queue entry, try to give it to the lower
* layer. * layer.
@ -2180,26 +2161,17 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
cell_queue_entry_free(q, 0); cell_queue_entry_free(q, 0);
q = NULL; q = NULL;
} }
} else {
/* This shouldn't happen; log and throw it away */
log_info(LD_CHANNEL,
"Saw a NULL entry in the outgoing cell queue on channel %p "
"(global ID " U64_FORMAT "); this is definitely a bug.",
chan, U64_PRINTF_ARG(chan->global_identifier));
/* q is already NULL, so we know to delete that queue entry */
}
/* if q got NULLed out, we used it and should remove the queue entry */ /* if q got NULLed out, we used it and should remove the queue entry */
if (!q) smartlist_del_keeporder(chan->outgoing_queue, 0); if (!q) SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
/* No cell removed from list, so we can't go on any further */ /* No cell removed from list, so we can't go on any further */
else break; else break;
}
} }
} }
/* Did we drain the queue? */ /* Did we drain the queue? */
if (!(chan->outgoing_queue) || if (SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
smartlist_len(chan->outgoing_queue) == 0) {
/* Timestamp it */
channel_timestamp_drained(chan); channel_timestamp_drained(chan);
} }
@ -2233,8 +2205,8 @@ channel_more_to_flush(channel_t *chan)
tor_assert(chan); tor_assert(chan);
/* Check if we have any queued */ /* Check if we have any queued */
if (chan->incoming_queue && if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
smartlist_len(chan->incoming_queue) > 0) return 1; return 1;
/* Check if any circuits would like to queue some */ /* Check if any circuits would like to queue some */
if (circuitmux_num_cells(chan->cmux) > 0) return 1; if (circuitmux_num_cells(chan->cmux) > 0) return 1;
@ -2427,8 +2399,7 @@ channel_listener_queue_incoming(channel_listener_t *listener,
void void
channel_process_cells(channel_t *chan) channel_process_cells(channel_t *chan)
{ {
smartlist_t *queue; cell_queue_entry_t *q;
int putback = 0;
tor_assert(chan); tor_assert(chan);
tor_assert(chan->state == CHANNEL_STATE_CLOSING || tor_assert(chan->state == CHANNEL_STATE_CLOSING ||
chan->state == CHANNEL_STATE_MAINT || chan->state == CHANNEL_STATE_MAINT ||
@ -2442,24 +2413,21 @@ channel_process_cells(channel_t *chan)
if (!(chan->cell_handler || if (!(chan->cell_handler ||
chan->var_cell_handler)) return; chan->var_cell_handler)) return;
/* Nothing we can do if we have no cells */ /* Nothing we can do if we have no cells */
if (!(chan->incoming_queue)) return; if (SIMPLEQ_EMPTY(&chan->incoming_queue)) return;
queue = chan->incoming_queue;
chan->incoming_queue = NULL;
/* /*
* Process cells until we're done or find one we have no current handler * Process cells until we're done or find one we have no current handler
* for. * for.
*/ */
SMARTLIST_FOREACH_BEGIN(queue, cell_queue_entry_t *, q) { while (NULL != (q = SIMPLEQ_FIRST(&chan->incoming_queue))) {
tor_assert(q); tor_assert(q);
tor_assert(q->type == CELL_QUEUE_FIXED || tor_assert(q->type == CELL_QUEUE_FIXED ||
q->type == CELL_QUEUE_VAR); q->type == CELL_QUEUE_VAR);
if (putback) { if (q->type == CELL_QUEUE_FIXED &&
smartlist_add(chan->incoming_queue, q);
} else if (q->type == CELL_QUEUE_FIXED &&
chan->cell_handler) { chan->cell_handler) {
/* Handle a fixed-length cell */ /* Handle a fixed-length cell */
SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
tor_assert(q->u.fixed.cell); tor_assert(q->u.fixed.cell);
log_debug(LD_CHANNEL, log_debug(LD_CHANNEL,
"Processing incoming cell_t %p for channel %p (global ID " "Processing incoming cell_t %p for channel %p (global ID "
@ -2471,6 +2439,7 @@ channel_process_cells(channel_t *chan)
} else if (q->type == CELL_QUEUE_VAR && } else if (q->type == CELL_QUEUE_VAR &&
chan->var_cell_handler) { chan->var_cell_handler) {
/* Handle a variable-length cell */ /* Handle a variable-length cell */
SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
tor_assert(q->u.var.var_cell); tor_assert(q->u.var.var_cell);
log_debug(LD_CHANNEL, log_debug(LD_CHANNEL,
"Processing incoming var_cell_t %p for channel %p (global ID " "Processing incoming var_cell_t %p for channel %p (global ID "
@ -2481,15 +2450,10 @@ channel_process_cells(channel_t *chan)
tor_free(q); tor_free(q);
} else { } else {
/* Can't handle this one */ /* Can't handle this one */
if (!chan->incoming_queue) break;
chan->incoming_queue = smartlist_new();
smartlist_add(chan->incoming_queue, q);
putback = 1;
} }
} SMARTLIST_FOREACH_END(q); }
/* If the list is empty, free it */
smartlist_free(queue);
} }
/** /**
@ -2511,15 +2475,9 @@ channel_queue_cell(channel_t *chan, cell_t *cell)
/* Do we need to queue it, or can we just call the handler right away? */ /* Do we need to queue it, or can we just call the handler right away? */
if (!(chan->cell_handler)) need_to_queue = 1; if (!(chan->cell_handler)) need_to_queue = 1;
if (chan->incoming_queue && if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
(smartlist_len(chan->incoming_queue) > 0))
need_to_queue = 1; need_to_queue = 1;
/* If we need to queue and have no queue, create one */
if (need_to_queue && !(chan->incoming_queue)) {
chan->incoming_queue = smartlist_new();
}
/* Timestamp for receiving */ /* Timestamp for receiving */
channel_timestamp_recv(chan); channel_timestamp_recv(chan);
@ -2537,14 +2495,13 @@ channel_queue_cell(channel_t *chan, cell_t *cell)
chan->cell_handler(chan, cell); chan->cell_handler(chan, cell);
} else { } else {
/* Otherwise queue it and then process the queue if possible. */ /* Otherwise queue it and then process the queue if possible. */
tor_assert(chan->incoming_queue);
q = cell_queue_entry_new_fixed(cell); q = cell_queue_entry_new_fixed(cell);
log_debug(LD_CHANNEL, log_debug(LD_CHANNEL,
"Queueing incoming cell_t %p for channel %p " "Queueing incoming cell_t %p for channel %p "
"(global ID " U64_FORMAT ")", "(global ID " U64_FORMAT ")",
cell, chan, cell, chan,
U64_PRINTF_ARG(chan->global_identifier)); U64_PRINTF_ARG(chan->global_identifier));
smartlist_add(chan->incoming_queue, q); SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
if (chan->cell_handler || if (chan->cell_handler ||
chan->var_cell_handler) { chan->var_cell_handler) {
channel_process_cells(chan); channel_process_cells(chan);
@ -2571,15 +2528,9 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
/* Do we need to queue it, or can we just call the handler right away? */ /* Do we need to queue it, or can we just call the handler right away? */
if (!(chan->var_cell_handler)) need_to_queue = 1; if (!(chan->var_cell_handler)) need_to_queue = 1;
if (chan->incoming_queue && if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
(smartlist_len(chan->incoming_queue) > 0))
need_to_queue = 1; need_to_queue = 1;
/* If we need to queue and have no queue, create one */
if (need_to_queue && !(chan->incoming_queue)) {
chan->incoming_queue = smartlist_new();
}
/* Timestamp for receiving */ /* Timestamp for receiving */
channel_timestamp_recv(chan); channel_timestamp_recv(chan);
@ -2597,14 +2548,13 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
chan->var_cell_handler(chan, var_cell); chan->var_cell_handler(chan, var_cell);
} else { } else {
/* Otherwise queue it and then process the queue if possible. */ /* Otherwise queue it and then process the queue if possible. */
tor_assert(chan->incoming_queue);
q = cell_queue_entry_new_var(var_cell); q = cell_queue_entry_new_var(var_cell);
log_debug(LD_CHANNEL, log_debug(LD_CHANNEL,
"Queueing incoming var_cell_t %p for channel %p " "Queueing incoming var_cell_t %p for channel %p "
"(global ID " U64_FORMAT ")", "(global ID " U64_FORMAT ")",
var_cell, chan, var_cell, chan,
U64_PRINTF_ARG(chan->global_identifier)); U64_PRINTF_ARG(chan->global_identifier));
smartlist_add(chan->incoming_queue, q); SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
if (chan->cell_handler || if (chan->cell_handler ||
chan->var_cell_handler) { chan->var_cell_handler) {
channel_process_cells(chan); channel_process_cells(chan);
@ -3133,6 +3083,19 @@ channel_listener_describe_transport(channel_listener_t *chan_l)
return chan_l->describe_transport(chan_l); return chan_l->describe_transport(chan_l);
} }
/**
* Return the number of entries in <b>queue</b>
*/
static int
chan_cell_queue_len(const chan_cell_queue_t *queue)
{
int r = 0;
cell_queue_entry_t *cell;
SIMPLEQ_FOREACH(cell, queue, next)
++r;
return r;
}
/** /**
* Dump channel statistics * Dump channel statistics
* *
@ -3246,10 +3209,8 @@ channel_dump_statistics(channel_t *chan, int severity)
" * Channel " U64_FORMAT " has %d queued incoming cells" " * Channel " U64_FORMAT " has %d queued incoming cells"
" and %d queued outgoing cells", " and %d queued outgoing cells",
U64_PRINTF_ARG(chan->global_identifier), U64_PRINTF_ARG(chan->global_identifier),
(chan->incoming_queue != NULL) ? chan_cell_queue_len(&chan->incoming_queue),
smartlist_len(chan->incoming_queue) : 0, chan_cell_queue_len(&chan->outgoing_queue));
(chan->outgoing_queue != NULL) ?
smartlist_len(chan->outgoing_queue) : 0);
/* Describe circuits */ /* Describe circuits */
log(severity, LD_GENERAL, log(severity, LD_GENERAL,
@ -3498,8 +3459,7 @@ channel_has_queued_writes(channel_t *chan)
tor_assert(chan); tor_assert(chan);
tor_assert(chan->has_queued_writes); tor_assert(chan->has_queued_writes);
if (chan->outgoing_queue && if (! SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
smartlist_len(chan->outgoing_queue) > 0) {
has_writes = 1; has_writes = 1;
} else { } else {
/* Check with the lower layer */ /* Check with the lower layer */

View File

@ -10,6 +10,7 @@
#define _TOR_CHANNEL_H #define _TOR_CHANNEL_H
#include "or.h" #include "or.h"
#include "tor_queue.h"
#include "circuitmux.h" #include "circuitmux.h"
/* Channel handler function pointer typedefs */ /* Channel handler function pointer typedefs */
@ -17,6 +18,10 @@ typedef void (*channel_listener_fn_ptr)(channel_listener_t *, channel_t *);
typedef void (*channel_cell_handler_fn_ptr)(channel_t *, cell_t *); typedef void (*channel_cell_handler_fn_ptr)(channel_t *, cell_t *);
typedef void (*channel_var_cell_handler_fn_ptr)(channel_t *, var_cell_t *); typedef void (*channel_var_cell_handler_fn_ptr)(channel_t *, var_cell_t *);
struct cell_queue_entry_s;
SIMPLEQ_HEAD(chan_cell_queue, cell_queue_entry_s) incoming_queue;
typedef struct chan_cell_queue chan_cell_queue_t;
/* /*
* Channel struct; see the channel_t typedef in or.h. A channel is an * Channel struct; see the channel_t typedef in or.h. A channel is an
* abstract interface for the OR-to-OR connection, similar to connection_or_t, * abstract interface for the OR-to-OR connection, similar to connection_or_t,
@ -120,10 +125,10 @@ struct channel_s {
channel_t *next_with_same_id, *prev_with_same_id; channel_t *next_with_same_id, *prev_with_same_id;
/* List of incoming cells to handle */ /* List of incoming cells to handle */
smartlist_t *incoming_queue; chan_cell_queue_t incoming_queue;
/* List of queued outgoing cells */ /* List of queued outgoing cells */
smartlist_t *outgoing_queue; chan_cell_queue_t outgoing_queue;
/* Circuit mux for circuits sending on this channel */ /* Circuit mux for circuits sending on this channel */
circuitmux_t *cmux; circuitmux_t *cmux;