Schedule according to a queue size heuristic

This commit is contained in:
Andrea Shepard 2013-11-14 04:45:47 -08:00
parent 4f567c8cc8
commit 1275002a46
3 changed files with 172 additions and 25 deletions

View File

@ -4563,6 +4563,8 @@ channel_update_xmit_queue_size(channel_t *chan)
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
/* Tell the scheduler we're increasing the queue size */
scheduler_adjust_queue_size(chan, 1, adj);
}
} else if (queued < chan->bytes_queued_for_xmit) {
adj = chan->bytes_queued_for_xmit - queued;
@ -4585,6 +4587,8 @@ channel_update_xmit_queue_size(channel_t *chan)
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
/* Tell the scheduler we're decreasing the queue size */
scheduler_adjust_queue_size(chan, -1, adj);
}
}
}

View File

@ -7,7 +7,10 @@
**/
#include "or.h"
#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */
#include "channel.h"
#include "compat_libevent.h"
#include "scheduler.h"
@ -17,6 +20,9 @@
#include <event.h>
#endif
#define SCHED_Q_LOW_WATER 16384
#define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER)
/*
* Write scheduling works by keeping track of lists of channels that can
* accept cells, and have cells to write. From the scheduler's perspective,
@ -118,6 +124,19 @@ static smartlist_t *channels_pending = NULL;
static struct event *run_sched_ev = NULL;
/*
* Queue heuristic; this is not the queue size, but an 'effective queuesize'
* that ages out contributions from stalled channels.
*/
static uint64_t queue_heuristic = 0;
/*
* Timestamp for last queue heuristic update
*/
static time_t queue_heuristic_timestamp = 0;
/* Scheduler static function declarations */
static void scheduler_evt_callback(evutil_socket_t fd,
@ -127,6 +146,8 @@ static void scheduler_retrigger(void);
#if 0
static void scheduler_trigger(void);
#endif
static uint64_t scheduler_get_queue_heuristic(void);
static void scheduler_update_queue_heuristic(time_t now);
/* Scheduler function implementations */
@ -281,6 +302,8 @@ scheduler_init(void)
channels_waiting_for_cells = smartlist_new();
channels_waiting_to_write = smartlist_new();
channels_pending = smartlist_new();
queue_heuristic = 0;
queue_heuristic_timestamp = approx_time();
}
/** Check if there's more scheduling work */
@ -290,7 +313,8 @@ scheduler_more_work(void)
{
tor_assert(channels_pending);
return (smartlist_len(channels_pending) > 0) ? 1 : 0;
return ((scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) &&
((smartlist_len(channels_pending) > 0))) ? 1 : 0;
}
/** Retrigger the scheduler in a way safe to use from the callback */
@ -324,39 +348,70 @@ void
scheduler_run(void)
{
smartlist_t *tmp = NULL;
int n_cells;
int n_cells, n_chans_before, n_chans_after;
uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
ssize_t flushed, flushed_this_time;
log_debug(LD_SCHED, "We have a chance to run the scheduler");
tmp = channels_pending;
channels_pending = smartlist_new();
if (scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) {
n_chans_before = smartlist_len(channels_pending);
q_len_before = channel_get_global_queue_estimate();
q_heur_before = scheduler_get_queue_heuristic();
tmp = channels_pending;
channels_pending = smartlist_new();
/* For now, just run the old scheduler on all the chans in the list */
/*
* For now, just run the old scheduler on all the chans in the list, until
* we hit the high-water mark. TODO real channel priority API
*/
SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
n_cells = channel_num_cells_writeable(chan);
if (n_cells > 0) {
log_debug(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"%d cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
n_cells = channel_num_cells_writeable(chan);
if (n_cells > 0) {
log_debug(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"%d cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
flushed = 0;
while (flushed < n_cells) {
flushed_this_time = channel_flush_some_cells(chan, n_cells - flushed);
if (flushed_this_time <= 0) break;
flushed += flushed_this_time;
flushed = 0;
while (flushed < n_cells) {
flushed_this_time =
channel_flush_some_cells(chan, n_cells - flushed);
if (flushed_this_time <= 0) break;
flushed += flushed_this_time;
}
log_debug(LD_SCHED,
"Scheduler flushed %d cells onto pending channel "
U64_FORMAT " at %p",
flushed, U64_PRINTF_ARG(chan->global_identifier), chan);
} else {
log_info(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"no cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan);
}
} else {
/* Not getting it this round; put it back on the list */
smartlist_add(channels_pending, chan);
}
} else {
log_info(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"no cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan);
}
} SMARTLIST_FOREACH_END(chan);
} SMARTLIST_FOREACH_END(chan);
smartlist_free(tmp);
smartlist_free(tmp);
n_chans_after = smartlist_len(channels_pending);
q_len_after = channel_get_global_queue_estimate();
q_heur_after = scheduler_get_queue_heuristic();
log_debug(LD_SCHED,
"Scheduler handled %d of %d pending channels, queue size from "
U64_FORMAT " to " U64_FORMAT ", queue heuristic from "
U64_FORMAT " to " U64_FORMAT,
n_chans_before - n_chans_after, n_chans_before,
U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after),
U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after));
}
}
/** Trigger the scheduling event so we run the scheduler later */
@ -420,3 +475,88 @@ scheduler_channel_wants_writes(channel_t *chan)
if (became_pending) scheduler_retrigger();
}
/**
* Notify the scheduler of a queue size adjustment, to recalculate the
* queue heuristic.
*/
void
scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj)
{
time_t now = approx_time();
log_debug(LD_SCHED,
"Queue size adjustment by %s" U64_FORMAT " for channel "
U64_FORMAT,
(dir >= 0) ? "+" : "-",
U64_PRINTF_ARG(adj),
U64_PRINTF_ARG(chan->global_identifier));
/* Get the queue heuristic up to date */
scheduler_update_queue_heuristic(now);
/* Adjust as appropriate */
if (dir >= 0) {
/* Increasing it */
queue_heuristic += adj;
} else {
/* Decreasing it */
if (queue_heuristic > adj) queue_heuristic -= adj;
else queue_heuristic = 0;
}
log_debug(LD_SCHED,
"Queue heuristic is now " U64_FORMAT,
U64_PRINTF_ARG(queue_heuristic));
}
/**
* Query the current value of the queue heuristic
*/
static uint64_t
scheduler_get_queue_heuristic(void)
{
time_t now = approx_time();
scheduler_update_queue_heuristic(now);
return queue_heuristic;
}
/**
* Adjust the queue heuristic value to the present time
*/
static void
scheduler_update_queue_heuristic(time_t now)
{
time_t diff;
if (queue_heuristic_timestamp == 0) {
/*
* Nothing we can sensibly do; must not have been initted properly.
* Oh well.
*/
queue_heuristic_timestamp = now;
} else if (queue_heuristic_timestamp < now) {
diff = now - queue_heuristic_timestamp;
/*
* This is a simple exponential age-out; the other proposed alternative
* was a linear age-out using the bandwidth history in rephist.c; I'm
* going with this out of concern that if an adversary can jam the
* scheduler long enough, it would cause the bandwidth to drop to
* zero and render the aging mechanism ineffective thereafter.
*/
if (0 <= diff && diff < 64) queue_heuristic >>= diff;
else queue_heuristic = 0;
queue_heuristic_timestamp = now;
log_debug(LD_SCHED,
"Queue heuristic is now " U64_FORMAT,
U64_PRINTF_ARG(queue_heuristic));
}
/* else no update needed, or time went backward */
}

View File

@ -27,5 +27,8 @@ void scheduler_channel_wants_writes(channel_t *chan);
/* Notify the scheduler of a channel being closed */
void scheduler_release_channel(channel_t *chan);
/* Notify scheduler of queue size adjustments */
void scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj);
#endif /* !defined(TOR_SCHEDULER_H) */