Use a non-stupid data structure in the scheduler

This commit is contained in:
Andrea Shepard 2013-12-12 04:22:53 -08:00
parent 3530825c53
commit ed1927d6bf
2 changed files with 139 additions and 58 deletions

View File

@ -80,6 +80,9 @@ struct channel_s {
SCHED_CHAN_PENDING SCHED_CHAN_PENDING
} scheduler_state; } scheduler_state;
/** Heap index for use by the scheduler */
int sched_heap_idx;
/** Timestamps for both cell channels and listeners */ /** Timestamps for both cell channels and listeners */
time_t timestamp_created; /* Channel created */ time_t timestamp_created; /* Channel created */
time_t timestamp_active; /* Any activity */ time_t timestamp_active; /* Any activity */

View File

@ -23,6 +23,14 @@
#define SCHED_Q_LOW_WATER 16384 #define SCHED_Q_LOW_WATER 16384
#define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER) #define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER)
/*
* Maximum cells to flush in a single call to channel_flush_some_cells();
* setting this low means more calls, but too high and we could overshoot
* SCHED_Q_HIGH_WATER.
*/
#define SCHED_MAX_FLUSH_CELLS 16
/* /*
* Write scheduling works by keeping track of which channels can * Write scheduling works by keeping track of which channels can
* accept cells, and have cells to write. From the scheduler's perspective, * accept cells, and have cells to write. From the scheduler's perspective,
@ -100,7 +108,7 @@
* is reserved for our use. * is reserved for our use.
*/ */
/* List of channels that can write and have cells (pending work) */ /* Pqueue of channels that can write and have cells (pending work) */
static smartlist_t *channels_pending = NULL; static smartlist_t *channels_pending = NULL;
/* /*
@ -125,7 +133,7 @@ static time_t queue_heuristic_timestamp = 0;
/* Scheduler static function declarations */ /* Scheduler static function declarations */
static int scheduler_compare_channels(const void **c1_v, const void **c2_v); static int scheduler_compare_channels(const void *c1_v, const void *c2_v);
static void scheduler_evt_callback(evutil_socket_t fd, static void scheduler_evt_callback(evutil_socket_t fd,
short events, void *arg); short events, void *arg);
static int scheduler_more_work(void); static int scheduler_more_work(void);
@ -162,7 +170,7 @@ scheduler_free_all(void)
*/ */
static int static int
scheduler_compare_channels(const void **c1_v, const void **c2_v) scheduler_compare_channels(const void *c1_v, const void *c2_v)
{ {
channel_t *c1 = NULL, *c2 = NULL; channel_t *c1 = NULL, *c2 = NULL;
/* These are a workaround for -Wbad-function-cast throwing a fit */ /* These are a workaround for -Wbad-function-cast throwing a fit */
@ -172,8 +180,8 @@ scheduler_compare_channels(const void **c1_v, const void **c2_v)
tor_assert(c1_v); tor_assert(c1_v);
tor_assert(c2_v); tor_assert(c2_v);
c1 = (channel_t *)(*c1_v); c1 = (channel_t *)(c1_v);
c2 = (channel_t *)(*c2_v); c2 = (channel_t *)(c2_v);
tor_assert(c1); tor_assert(c1);
tor_assert(c2); tor_assert(c2);
@ -241,7 +249,10 @@ scheduler_channel_doesnt_want_writes(channel_t *chan)
* the other lists. It can't write any more, so it goes to * the other lists. It can't write any more, so it goes to
* channels_waiting_to_write. * channels_waiting_to_write.
*/ */
smartlist_remove(channels_pending, chan); smartlist_pqueue_remove(channels_pending,
scheduler_compare_channels,
STRUCT_OFFSET(channel_t, sched_heap_idx),
chan);
chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
log_debug(LD_SCHED, log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from pending " "Channel " U64_FORMAT " at %p went from pending "
@ -280,7 +291,10 @@ scheduler_channel_has_waiting_cells(channel_t *chan)
* channels_pending. * channels_pending.
*/ */
chan->scheduler_state = SCHED_CHAN_PENDING; chan->scheduler_state = SCHED_CHAN_PENDING;
smartlist_add(channels_pending, chan); smartlist_pqueue_add(channels_pending,
scheduler_compare_channels,
STRUCT_OFFSET(channel_t, sched_heap_idx),
chan);
log_debug(LD_SCHED, log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from waiting_for_cells " "Channel " U64_FORMAT " at %p went from waiting_for_cells "
"to pending", "to pending",
@ -353,7 +367,10 @@ scheduler_release_channel(channel_t *chan)
tor_assert(channels_pending); tor_assert(channels_pending);
if (chan->scheduler_state == SCHED_CHAN_PENDING) { if (chan->scheduler_state == SCHED_CHAN_PENDING) {
smartlist_remove(channels_pending, chan); smartlist_pqueue_remove(channels_pending,
scheduler_compare_channels,
STRUCT_OFFSET(channel_t, sched_heap_idx),
chan);
} }
chan->scheduler_state = SCHED_CHAN_IDLE; chan->scheduler_state = SCHED_CHAN_IDLE;
@ -364,10 +381,11 @@ scheduler_release_channel(channel_t *chan)
void void
scheduler_run(void) scheduler_run(void)
{ {
smartlist_t *tmp = NULL;
int n_cells, n_chans_before, n_chans_after; int n_cells, n_chans_before, n_chans_after;
uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after; uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
ssize_t flushed, flushed_this_time; ssize_t flushed, flushed_this_time;
smartlist_t *to_readd = NULL;
channel_t *chan = NULL;
log_debug(LD_SCHED, "We have a chance to run the scheduler"); log_debug(LD_SCHED, "We have a chance to run the scheduler");
@ -375,61 +393,118 @@ scheduler_run(void)
n_chans_before = smartlist_len(channels_pending); n_chans_before = smartlist_len(channels_pending);
q_len_before = channel_get_global_queue_estimate(); q_len_before = channel_get_global_queue_estimate();
q_heur_before = scheduler_get_queue_heuristic(); q_heur_before = scheduler_get_queue_heuristic();
tmp = channels_pending;
channels_pending = smartlist_new();
/* while (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER &&
* UGLY HACK: sort the list on each invocation smartlist_len(channels_pending) > 0) {
* /* Pop off a channel */
* TODO smarter data structures chan = smartlist_pqueue_pop(channels_pending,
*/ scheduler_compare_channels,
smartlist_sort(tmp, scheduler_compare_channels); STRUCT_OFFSET(channel_t, sched_heap_idx));
tor_assert(chan);
SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) { /* Figure out how many cells we can write */
if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) { n_cells = channel_num_cells_writeable(chan);
n_cells = channel_num_cells_writeable(chan); if (n_cells > 0) {
if (n_cells > 0) { log_debug(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"%d cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
flushed = 0;
while (flushed < n_cells &&
scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
flushed_this_time =
channel_flush_some_cells(chan,
MIN(SCHED_MAX_FLUSH_CELLS,
n_cells - flushed));
if (flushed_this_time <= 0) break;
flushed += flushed_this_time;
}
if (flushed < n_cells) {
/* We ran out of cells to flush */
chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
log_debug(LD_SCHED, log_debug(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with " "Channel " U64_FORMAT " at %p "
"%d cells writeable", "entered waiting_for_cells from pending",
U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); U64_PRINTF_ARG(chan->global_identifier),
flushed = 0;
while (flushed < n_cells) {
flushed_this_time =
channel_flush_some_cells(chan, n_cells - flushed);
if (flushed_this_time <= 0) break;
flushed += flushed_this_time;
}
if (flushed < n_cells) {
/* We ran out of cells to flush */
chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
} else {
/* TODO get this right */
}
log_debug(LD_SCHED,
"Scheduler flushed %d cells onto pending channel "
U64_FORMAT " at %p",
(int)flushed, U64_PRINTF_ARG(chan->global_identifier),
chan); chan);
} else { } else {
log_info(LD_SCHED, /* The channel may still have some cells */
"Scheduler saw pending channel " U64_FORMAT " at %p with " if (channel_more_to_flush(chan)) {
"no cells writeable", /* The channel goes to either pending or waiting_to_write */
U64_PRINTF_ARG(chan->global_identifier), chan); if (channel_num_cells_writeable(chan) > 0) {
/* Put it back to WAITING_TO_WRITE */ /* Add it back to pending later */
chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; if (!to_readd) to_readd = smartlist_new();
smartlist_add(to_readd, chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p "
"is still pending",
U64_PRINTF_ARG(chan->global_identifier),
chan);
} else {
/* It's waiting to be able to write more */
chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p "
"entered waiting_to_write from pending",
U64_PRINTF_ARG(chan->global_identifier),
chan);
}
} else {
/* No cells left; it can go to idle or waiting_for_cells */
if (channel_num_cells_writeable(chan) > 0) {
/*
* It can still accept writes, so it goes to
* waiting_for_cells
*/
chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p "
"entered waiting_for_cells from pending",
U64_PRINTF_ARG(chan->global_identifier),
chan);
} else {
/*
* We exactly filled up the output queue with all available
* cells; go to idle.
*/
chan->scheduler_state = SCHED_CHAN_IDLE;
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p "
"become idle from pending",
U64_PRINTF_ARG(chan->global_identifier),
chan);
}
}
} }
} else {
/* Not getting it this round; put it back on the list */
smartlist_add(channels_pending, chan);
/* It states in SCHED_CHAN_PENDING */
}
} SMARTLIST_FOREACH_END(chan);
smartlist_free(tmp); log_debug(LD_SCHED,
"Scheduler flushed %d cells onto pending channel "
U64_FORMAT " at %p",
(int)flushed, U64_PRINTF_ARG(chan->global_identifier),
chan);
} else {
log_info(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"no cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan);
/* Put it back to WAITING_TO_WRITE */
chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
}
}
/* Readd any channels we need to */
if (to_readd) {
SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, chan) {
chan->scheduler_state = SCHED_CHAN_PENDING;
smartlist_pqueue_add(channels_pending,
scheduler_compare_channels,
STRUCT_OFFSET(channel_t, sched_heap_idx),
chan);
} SMARTLIST_FOREACH_END(chan);
smartlist_free(to_readd);
}
n_chans_after = smartlist_len(channels_pending); n_chans_after = smartlist_len(channels_pending);
q_len_after = channel_get_global_queue_estimate(); q_len_after = channel_get_global_queue_estimate();
@ -473,7 +548,10 @@ scheduler_channel_wants_writes(channel_t *chan)
/* /*
* It can write now, so it goes to channels_pending. * It can write now, so it goes to channels_pending.
*/ */
smartlist_add(channels_pending, chan); smartlist_pqueue_add(channels_pending,
scheduler_compare_channels,
STRUCT_OFFSET(channel_t, sched_heap_idx),
chan);
chan->scheduler_state = SCHED_CHAN_PENDING; chan->scheduler_state = SCHED_CHAN_PENDING;
log_debug(LD_SCHED, log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from waiting_to_write " "Channel " U64_FORMAT " at %p went from waiting_to_write "