Track total queue size per channel, with overhead estimates, and global queue total

This commit is contained in:
Andrea Shepard 2013-11-05 00:30:02 -08:00
parent 7674308f62
commit 8852a1794c
6 changed files with 207 additions and 0 deletions

View File

@ -124,6 +124,13 @@ static uint64_t n_channel_bytes_passed_to_lower_layer = 0;
static uint64_t n_channel_bytes_in_queues = 0; static uint64_t n_channel_bytes_in_queues = 0;
/*
* Current total estimated queue size *including lower layer queues and
* transmit overhead*
*/
static uint64_t estimated_total_queue_size = 0;
/* Digest->channel map /* Digest->channel map
* *
* Similar to the one used in connection_or.c, this maps from the identity * Similar to the one used in connection_or.c, this maps from the identity
@ -1840,6 +1847,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
n_channel_bytes_queued += cell_bytes; n_channel_bytes_queued += cell_bytes;
n_channel_bytes_in_queues += cell_bytes; n_channel_bytes_in_queues += cell_bytes;
channel_assert_counter_consistency(); channel_assert_counter_consistency();
/* Update channel queue size */
chan->bytes_in_queue += cell_bytes;
/* Try to process the queue? */ /* Try to process the queue? */
if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan); if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan);
} }
@ -1878,6 +1887,9 @@ channel_write_cell(channel_t *chan, cell_t *cell)
q.type = CELL_QUEUE_FIXED; q.type = CELL_QUEUE_FIXED;
q.u.fixed.cell = cell; q.u.fixed.cell = cell;
channel_write_cell_queue_entry(chan, &q); channel_write_cell_queue_entry(chan, &q);
/* Update the queue size estimate */
channel_update_xmit_queue_size(chan);
} }
/** /**
@ -1913,6 +1925,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell)
q.type = CELL_QUEUE_PACKED; q.type = CELL_QUEUE_PACKED;
q.u.packed.packed_cell = packed_cell; q.u.packed.packed_cell = packed_cell;
channel_write_cell_queue_entry(chan, &q); channel_write_cell_queue_entry(chan, &q);
/* Update the queue size estimate */
channel_update_xmit_queue_size(chan);
} }
/** /**
@ -1949,6 +1964,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell)
q.type = CELL_QUEUE_VAR; q.type = CELL_QUEUE_VAR;
q.u.var.var_cell = var_cell; q.u.var.var_cell = var_cell;
channel_write_cell_queue_entry(chan, &q); channel_write_cell_queue_entry(chan, &q);
/* Update the queue size estimate */
channel_update_xmit_queue_size(chan);
} }
/** /**
@ -2056,6 +2074,29 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
scheduler_channel_doesnt_want_writes(chan); scheduler_channel_doesnt_want_writes(chan);
} }
/*
* If we're closing, this channel no longer counts toward the global
* estimated queue size; if we're open, it now does.
*/
if ((to_state == CHANNEL_STATE_CLOSING ||
to_state == CHANNEL_STATE_CLOSED ||
to_state == CHANNEL_STATE_ERROR) &&
(from_state == CHANNEL_STATE_OPEN ||
from_state == CHANNEL_STATE_MAINT)) {
estimated_total_queue_size -= chan->bytes_in_queue;
}
/*
* If we're opening, this channel now does count toward the global
* estimated queue size.
*/
if ((to_state == CHANNEL_STATE_OPEN ||
to_state == CHANNEL_STATE_MAINT) &&
!(from_state == CHANNEL_STATE_OPEN ||
from_state == CHANNEL_STATE_MAINT)) {
estimated_total_queue_size += chan->bytes_in_queue;
}
/* Tell circuits if we opened and stuff */ /* Tell circuits if we opened and stuff */
if (to_state == CHANNEL_STATE_OPEN) { if (to_state == CHANNEL_STATE_OPEN) {
channel_do_open_actions(chan); channel_do_open_actions(chan);
@ -2350,6 +2391,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
n_channel_bytes_passed_to_lower_layer += cell_size; n_channel_bytes_passed_to_lower_layer += cell_size;
n_channel_bytes_in_queues -= cell_size; n_channel_bytes_in_queues -= cell_size;
channel_assert_counter_consistency(); channel_assert_counter_consistency();
/* Update the channel's queue size too */
chan->bytes_in_queue -= cell_size;
} }
/* No cell removed from list, so we can't go on any further */ /* No cell removed from list, so we can't go on any further */
else break; else break;
@ -2362,6 +2405,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
channel_timestamp_drained(chan); channel_timestamp_drained(chan);
} }
/* Update the estimate queue size */
channel_update_xmit_queue_size(chan);
return flushed; return flushed;
} }
@ -4421,3 +4467,83 @@ channel_set_circid_type(channel_t *chan,
} }
} }
/**
* Update the estimated number of bytes queued to transmit for this channel,
* and notify the scheduler. The estimate includes both the channel queue and
* the queue size reported by the lower layer, and an overhead estimate
* optionally provided by the lower layer.
*/
void
channel_update_xmit_queue_size(channel_t *chan)
{
uint64_t queued, adj;
double overhead;
tor_assert(chan);
tor_assert(chan->num_bytes_queued);
/*
* First, get the number of bytes we have queued without factoring in
* lower-layer overhead.
*/
queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue;
/* Next, adjust by the overhead factor, if any is available */
if (chan->get_overhead_estimate) {
overhead = chan->get_overhead_estimate(chan);
if (overhead >= 1.0f) {
queued *= overhead;
} else {
/* Ignore silly overhead factors */
log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead);
}
}
/* Now, compare to the previous estimate */
if (queued > chan->bytes_queued_for_xmit) {
adj = queued - chan->bytes_queued_for_xmit;
log_debug(LD_CHANNEL,
"Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT
" from " U64_FORMAT " to " U64_FORMAT,
U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(adj),
U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
U64_PRINTF_ARG(queued));
/* Update the channel's estimate */
chan->bytes_queued_for_xmit = queued;
/* Update the global queue size estimate if appropriate */
if (chan->state == CHANNEL_STATE_OPEN ||
chan->state == CHANNEL_STATE_MAINT) {
estimated_total_queue_size += adj;
log_debug(LD_CHANNEL,
"Increasing global queue size by " U64_FORMAT " for channel "
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
}
} else if (queued < chan->bytes_queued_for_xmit) {
adj = chan->bytes_queued_for_xmit - queued;
log_debug(LD_CHANNEL,
"Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT
" from " U64_FORMAT " to " U64_FORMAT,
U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(adj),
U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
U64_PRINTF_ARG(queued));
/* Update the channel's estimate */
chan->bytes_queued_for_xmit = queued;
/* Update the global queue size estimate if appropriate */
if (chan->state == CHANNEL_STATE_OPEN ||
chan->state == CHANNEL_STATE_MAINT) {
estimated_total_queue_size -= adj;
log_debug(LD_CHANNEL,
"Decreasing global queue size by " U64_FORMAT " for channel "
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
}
}
}

View File

@ -79,6 +79,11 @@ struct channel_s {
/* Methods implemented by the lower layer */ /* Methods implemented by the lower layer */
/** /**
* Ask the lower layer for an estimate of the average overhead for
* transmissions on this channel.
*/
double (*get_overhead_estimate)(channel_t *);
/*
* Ask the underlying transport what the remote endpoint address is, in * Ask the underlying transport what the remote endpoint address is, in
* a tor_addr_t. This is optional and subclasses may leave this NULL. * a tor_addr_t. This is optional and subclasses may leave this NULL.
* If they implement it, they should write the address out to the * If they implement it, they should write the address out to the
@ -110,6 +115,8 @@ struct channel_s {
int (*matches_extend_info)(channel_t *, extend_info_t *); int (*matches_extend_info)(channel_t *, extend_info_t *);
/** Check if this channel matches a target address when extending */ /** Check if this channel matches a target address when extending */
int (*matches_target)(channel_t *, const tor_addr_t *); int (*matches_target)(channel_t *, const tor_addr_t *);
/* Ask the lower layer how many bytes it has queued but not yet sent */
size_t (*num_bytes_queued)(channel_t *);
/* Ask the lower layer how many cells can be written */ /* Ask the lower layer how many cells can be written */
int (*num_cells_writeable)(channel_t *); int (*num_cells_writeable)(channel_t *);
/* Write a cell to an open channel */ /* Write a cell to an open channel */
@ -202,6 +209,14 @@ struct channel_s {
/** Channel counters for cell channels */ /** Channel counters for cell channels */
uint64_t n_cells_recved, n_bytes_recved; uint64_t n_cells_recved, n_bytes_recved;
uint64_t n_cells_xmitted, n_bytes_xmitted; uint64_t n_cells_xmitted, n_bytes_xmitted;
/** Our current contribution to the scheduler's total xmit queue */
uint64_t bytes_queued_for_xmit;
/** Number of bytes in this channel's cell queue; does not include
* lower-layer queueing.
*/
uint64_t bytes_in_queue;
}; };
struct channel_listener_s { struct channel_listener_s {
@ -460,6 +475,7 @@ unsigned int channel_num_circuits(channel_t *chan);
void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd, void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd,
int consider_identity); int consider_identity);
void channel_timestamp_client(channel_t *chan); void channel_timestamp_client(channel_t *chan);
void channel_update_xmit_queue_size(channel_t *chan);
const char * channel_listener_describe_transport(channel_listener_t *chan_l); const char * channel_listener_describe_transport(channel_listener_t *chan_l);
void channel_listener_dump_statistics(channel_listener_t *chan_l, void channel_listener_dump_statistics(channel_listener_t *chan_l,

View File

@ -55,6 +55,7 @@ static void channel_tls_common_init(channel_tls_t *tlschan);
static void channel_tls_close_method(channel_t *chan); static void channel_tls_close_method(channel_t *chan);
static const char * channel_tls_describe_transport_method(channel_t *chan); static const char * channel_tls_describe_transport_method(channel_t *chan);
static void channel_tls_free_method(channel_t *chan); static void channel_tls_free_method(channel_t *chan);
static double channel_tls_get_overhead_estimate_method(channel_t *chan);
static int static int
channel_tls_get_remote_addr_method(channel_t *chan, tor_addr_t *addr_out); channel_tls_get_remote_addr_method(channel_t *chan, tor_addr_t *addr_out);
static int static int
@ -69,6 +70,7 @@ channel_tls_matches_extend_info_method(channel_t *chan,
static int channel_tls_matches_target_method(channel_t *chan, static int channel_tls_matches_target_method(channel_t *chan,
const tor_addr_t *target); const tor_addr_t *target);
static int channel_tls_num_cells_writeable_method(channel_t *chan); static int channel_tls_num_cells_writeable_method(channel_t *chan);
static size_t channel_tls_num_bytes_queued_method(channel_t *chan);
static int channel_tls_write_cell_method(channel_t *chan, static int channel_tls_write_cell_method(channel_t *chan,
cell_t *cell); cell_t *cell);
static int channel_tls_write_packed_cell_method(channel_t *chan, static int channel_tls_write_packed_cell_method(channel_t *chan,
@ -118,6 +120,7 @@ channel_tls_common_init(channel_tls_t *tlschan)
chan->close = channel_tls_close_method; chan->close = channel_tls_close_method;
chan->describe_transport = channel_tls_describe_transport_method; chan->describe_transport = channel_tls_describe_transport_method;
chan->free = channel_tls_free_method; chan->free = channel_tls_free_method;
chan->get_overhead_estimate = channel_tls_get_overhead_estimate_method;
chan->get_remote_addr = channel_tls_get_remote_addr_method; chan->get_remote_addr = channel_tls_get_remote_addr_method;
chan->get_remote_descr = channel_tls_get_remote_descr_method; chan->get_remote_descr = channel_tls_get_remote_descr_method;
chan->get_transport_name = channel_tls_get_transport_name_method; chan->get_transport_name = channel_tls_get_transport_name_method;
@ -125,6 +128,7 @@ channel_tls_common_init(channel_tls_t *tlschan)
chan->is_canonical = channel_tls_is_canonical_method; chan->is_canonical = channel_tls_is_canonical_method;
chan->matches_extend_info = channel_tls_matches_extend_info_method; chan->matches_extend_info = channel_tls_matches_extend_info_method;
chan->matches_target = channel_tls_matches_target_method; chan->matches_target = channel_tls_matches_target_method;
chan->num_bytes_queued = channel_tls_num_bytes_queued_method;
chan->num_cells_writeable = channel_tls_num_cells_writeable_method; chan->num_cells_writeable = channel_tls_num_cells_writeable_method;
chan->write_cell = channel_tls_write_cell_method; chan->write_cell = channel_tls_write_cell_method;
chan->write_packed_cell = channel_tls_write_packed_cell_method; chan->write_packed_cell = channel_tls_write_packed_cell_method;
@ -437,6 +441,40 @@ channel_tls_free_method(channel_t *chan)
} }
} }
/**
* Get an estimate of the average TLS overhead for the upper layer
*/
static double
channel_tls_get_overhead_estimate_method(channel_t *chan)
{
double overhead = 1.0f;
channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
tor_assert(tlschan);
tor_assert(tlschan->conn);
/* Just return 1.0f if we don't have sensible data */
if (tlschan->conn->bytes_xmitted > 0 &&
tlschan->conn->bytes_xmitted_by_tls >=
tlschan->conn->bytes_xmitted) {
overhead = ((double)(tlschan->conn->bytes_xmitted_by_tls)) /
((double)(tlschan->conn->bytes_xmitted));
/*
* Never estimate more than 2.0; otherwise we get silly large estimates
* at the very start of a new TLS connection.
*/
if (overhead > 2.0f) overhead = 2.0f;
}
log_debug(LD_CHANNEL,
"Estimated overhead ratio for TLS chan " U64_FORMAT " is %f",
U64_PRINTF_ARG(chan->global_identifier), overhead);
return overhead;
}
/** /**
* Get the remote address of a channel_tls_t * Get the remote address of a channel_tls_t
* *
@ -675,6 +713,22 @@ channel_tls_matches_target_method(channel_t *chan,
return tor_addr_eq(&(tlschan->conn->real_addr), target); return tor_addr_eq(&(tlschan->conn->real_addr), target);
} }
/**
* Tell the upper layer how many bytes we have queued and not yet
* sent.
*/
static size_t
channel_tls_num_bytes_queued_method(channel_t *chan)
{
channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
tor_assert(tlschan);
tor_assert(tlschan->conn);
return connection_get_outbuf_len(TO_CONN(tlschan->conn));
}
/** /**
* Tell the upper layer how many cells we can accept to write * Tell the upper layer how many cells we can accept to write
* *

View File

@ -3839,6 +3839,8 @@ connection_handle_write_impl(connection_t *conn, int force)
tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written); tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written);
log_debug(LD_GENERAL, "After TLS write of %d: %ld read, %ld written", log_debug(LD_GENERAL, "After TLS write of %d: %ld read, %ld written",
result, (long)n_read, (long)n_written); result, (long)n_read, (long)n_written);
or_conn->bytes_xmitted += result;
or_conn->bytes_xmitted_by_tls += n_written;
/* So we notice bytes were written even on error */ /* So we notice bytes were written even on error */
/* XXXX024 This cast is safe since we can never write INT_MAX bytes in a /* XXXX024 This cast is safe since we can never write INT_MAX bytes in a
* single set of TLS operations. But it looks kinda ugly. If we refactor * single set of TLS operations. But it looks kinda ugly. If we refactor

View File

@ -583,6 +583,9 @@ connection_or_flushed_some(or_connection_t *conn)
{ {
size_t datalen; size_t datalen;
/* The channel will want to update its estimated queue size */
channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan));
/* If we're under the low water mark, add cells until we're just over the /* If we're under the low water mark, add cells until we're just over the
* high water mark. */ * high water mark. */
datalen = connection_get_outbuf_len(TO_CONN(conn)); datalen = connection_get_outbuf_len(TO_CONN(conn));

View File

@ -1529,6 +1529,12 @@ typedef struct or_connection_t {
/** Last emptied write token bucket in msec since midnight; only used if /** Last emptied write token bucket in msec since midnight; only used if
* TB_EMPTY events are enabled. */ * TB_EMPTY events are enabled. */
uint32_t write_emptied_time; uint32_t write_emptied_time;
/*
* Count the number of bytes flushed out on this orconn, and the number of
* bytes TLS actually sent - used for overhead estimation for scheduling.
*/
uint64_t bytes_xmitted, bytes_xmitted_by_tls;
} or_connection_t; } or_connection_t;
/** Subtype of connection_t for an "edge connection" -- that is, an entry (ap) /** Subtype of connection_t for an "edge connection" -- that is, an entry (ap)