channel: Remove everything related to queue size

The channel subsystem was doing a whole lot to track and try to predict the
channel queue size but they are gone due to previous commit.

Signed-off-by: David Goulet <dgoulet@torproject.org>
This commit is contained in:
David Goulet 2017-11-20 16:49:51 -05:00
parent 46a0709261
commit e1c29a769c
3 changed files with 0 additions and 464 deletions

View File

@ -112,59 +112,6 @@ HANDLE_IMPL(channel, channel_s,)
/* Counter for ID numbers */
static uint64_t n_channels_allocated = 0;
/*
* Channel global byte/cell counters, for statistics and for scheduler high
* /low-water marks.
*/
/*
* Total number of cells ever given to any channel with the
* channel_write_*_cell() functions.
*/
static uint64_t n_channel_cells_queued = 0;
/*
* Total number of cells ever passed to a channel lower layer with the
* write_*_cell() methods.
*/
static uint64_t n_channel_cells_passed_to_lower_layer = 0;
/*
* Current number of cells in all channel queues; should be
* n_channel_cells_queued - n_channel_cells_passed_to_lower_layer.
*/
static uint64_t n_channel_cells_in_queues = 0;
/*
* Total number of bytes for all cells ever queued to a channel and
* counted in n_channel_cells_queued.
*/
static uint64_t n_channel_bytes_queued = 0;
/*
* Total number of bytes for all cells ever passed to a channel lower layer
* and counted in n_channel_cells_passed_to_lower_layer.
*/
static uint64_t n_channel_bytes_passed_to_lower_layer = 0;
/*
* Current number of bytes in all channel queues; should be
* n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer.
*/
static uint64_t n_channel_bytes_in_queues = 0;
/*
* Current total estimated queue size *including lower layer queues and
* transmit overhead*
*/
STATIC uint64_t estimated_total_queue_size = 0;
/* Digest->channel map
*
@ -204,8 +151,6 @@ HT_GENERATE2(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash,
static int is_destroy_cell(channel_t *chan,
const cell_queue_entry_t *q, circid_t *circid_out);
static void channel_assert_counter_consistency(void);
/* Functions to maintain the digest map */
static void channel_add_to_digest_map(channel_t *chan);
static void channel_remove_from_digest_map(channel_t *chan);
@ -1774,12 +1719,6 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
/* Update the counter */
++(chan->n_cells_xmitted);
chan->n_bytes_xmitted += cell_bytes;
/* Update global counters */
++n_channel_cells_queued;
++n_channel_cells_passed_to_lower_layer;
n_channel_bytes_queued += cell_bytes;
n_channel_bytes_passed_to_lower_layer += cell_bytes;
channel_assert_counter_consistency();
}
}
@ -1816,8 +1755,6 @@ channel_write_cell_generic_(channel_t *chan, const char *cell_type,
cell, chan, U64_PRINTF_ARG(chan->global_identifier));
channel_write_cell_queue_entry(chan, q);
/* Update the queue size estimate */
channel_update_xmit_queue_size(chan);
}
/**
@ -1977,29 +1914,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state)
} else if (to_state == CHANNEL_STATE_MAINT) {
scheduler_channel_doesnt_want_writes(chan);
}
/*
* If we're closing, this channel no longer counts toward the global
* estimated queue size; if we're open, it now does.
*/
if ((to_state == CHANNEL_STATE_CLOSING ||
to_state == CHANNEL_STATE_CLOSED ||
to_state == CHANNEL_STATE_ERROR) &&
(from_state == CHANNEL_STATE_OPEN ||
from_state == CHANNEL_STATE_MAINT)) {
estimated_total_queue_size -= chan->bytes_in_queue;
}
/*
* If we're opening, this channel now does count toward the global
* estimated queue size.
*/
if ((to_state == CHANNEL_STATE_OPEN ||
to_state == CHANNEL_STATE_MAINT) &&
!(from_state == CHANNEL_STATE_OPEN ||
from_state == CHANNEL_STATE_MAINT)) {
estimated_total_queue_size += chan->bytes_in_queue;
}
}
/**
@ -2438,19 +2352,6 @@ packed_cell_is_destroy(channel_t *chan,
return 0;
}
/**
* Assert that the global channel stats counters are internally consistent
*/
static void
channel_assert_counter_consistency(void)
{
tor_assert(n_channel_cells_queued ==
(n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer));
tor_assert(n_channel_bytes_queued ==
(n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer));
}
/* DOCDOC */
static int
is_destroy_cell(channel_t *chan,
@ -2529,19 +2430,6 @@ void
channel_dumpstats(int severity)
{
if (all_channels && smartlist_len(all_channels) > 0) {
tor_log(severity, LD_GENERAL,
"Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, "
"and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower"
" layer.",
U64_PRINTF_ARG(n_channel_bytes_queued),
U64_PRINTF_ARG(n_channel_cells_queued),
U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer),
U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer));
tor_log(severity, LD_GENERAL,
"There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells "
"in channel queues.",
U64_PRINTF_ARG(n_channel_bytes_in_queues),
U64_PRINTF_ARG(n_channel_cells_in_queues));
tor_log(severity, LD_GENERAL,
"Dumping statistics about %d channels:",
smartlist_len(all_channels));
@ -3640,16 +3528,6 @@ channel_mark_outgoing(channel_t *chan)
* Flow control queries *
***********************/
/*
* Get the latest estimate for the total queue size of all open channels
*/
uint64_t
channel_get_global_queue_estimate(void)
{
return estimated_total_queue_size;
}
/*
* Estimate the number of writeable cells
*
@ -4163,83 +4041,3 @@ channel_update_bad_for_new_circs(const char *digest, int force)
}
}
/**
* Update the estimated number of bytes queued to transmit for this channel,
* and notify the scheduler. The estimate includes both the channel queue and
* the queue size reported by the lower layer, and an overhead estimate
* optionally provided by the lower layer.
*/
void
channel_update_xmit_queue_size(channel_t *chan)
{
uint64_t queued, adj;
double overhead;
tor_assert(chan);
tor_assert(chan->num_bytes_queued);
/*
* First, get the number of bytes we have queued without factoring in
* lower-layer overhead.
*/
queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue;
/* Next, adjust by the overhead factor, if any is available */
if (chan->get_overhead_estimate) {
overhead = chan->get_overhead_estimate(chan);
if (overhead >= 1.0) {
queued = (uint64_t)(queued * overhead);
} else {
/* Ignore silly overhead factors */
log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead);
}
}
/* Now, compare to the previous estimate */
if (queued > chan->bytes_queued_for_xmit) {
adj = queued - chan->bytes_queued_for_xmit;
log_debug(LD_CHANNEL,
"Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT
" from " U64_FORMAT " to " U64_FORMAT,
U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(adj),
U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
U64_PRINTF_ARG(queued));
/* Update the channel's estimate */
chan->bytes_queued_for_xmit = queued;
/* Update the global queue size estimate if appropriate */
if (chan->state == CHANNEL_STATE_OPEN ||
chan->state == CHANNEL_STATE_MAINT) {
estimated_total_queue_size += adj;
log_debug(LD_CHANNEL,
"Increasing global queue size by " U64_FORMAT " for channel "
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
}
} else if (queued < chan->bytes_queued_for_xmit) {
adj = chan->bytes_queued_for_xmit - queued;
log_debug(LD_CHANNEL,
"Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT
" from " U64_FORMAT " to " U64_FORMAT,
U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(adj),
U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
U64_PRINTF_ARG(queued));
/* Update the channel's estimate */
chan->bytes_queued_for_xmit = queued;
/* Update the global queue size estimate if appropriate */
if (chan->state == CHANNEL_STATE_OPEN ||
chan->state == CHANNEL_STATE_MAINT) {
estimated_total_queue_size -= adj;
log_debug(LD_CHANNEL,
"Decreasing global queue size by " U64_FORMAT " for channel "
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
}
}
}

View File

@ -592,9 +592,6 @@ connection_or_flushed_some(or_connection_t *conn)
{
size_t datalen;
/* The channel will want to update its estimated queue size */
channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan));
/* If we're under the low water mark, add cells until we're just over the
* high water mark. */
datalen = connection_get_outbuf_len(TO_CONN(conn));

View File

@ -61,8 +61,6 @@ static void scheduler_channel_doesnt_want_writes_mock(channel_t *ch);
static void test_channel_dumpstats(void *arg);
static void test_channel_incoming(void *arg);
static void test_channel_lifecycle(void *arg);
static void test_channel_multi(void *arg);
static void test_channel_queue_size(void *arg);
static void test_channel_write(void *arg);
static void
@ -917,261 +915,6 @@ test_channel_lifecycle_2(void *arg)
return;
}
static void
test_channel_multi(void *arg)
{
channel_t *ch1 = NULL, *ch2 = NULL;
uint64_t global_queue_estimate;
cell_t *cell = NULL;
(void)arg;
/* Accept cells to lower layer */
test_chan_accept_cells = 1;
/* Use default overhead factor */
test_overhead_estimate = 1.0;
ch1 = new_fake_channel();
tt_assert(ch1);
ch2 = new_fake_channel();
tt_assert(ch2);
/* Initial queue size update */
channel_update_xmit_queue_size(ch1);
tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 0);
channel_update_xmit_queue_size(ch2);
tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 0);
/* Queue some cells, check queue estimates */
cell = tor_malloc_zero(sizeof(cell_t));
make_fake_cell(cell);
channel_write_cell(ch1, cell);
cell = tor_malloc_zero(sizeof(cell_t));
make_fake_cell(cell);
channel_write_cell(ch2, cell);
channel_update_xmit_queue_size(ch1);
channel_update_xmit_queue_size(ch2);
tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 0);
tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 0);
/* Stop accepting cells at lower layer */
test_chan_accept_cells = 0;
/* Queue some cells and check queue estimates */
cell = tor_malloc_zero(sizeof(cell_t));
make_fake_cell(cell);
channel_write_cell(ch1, cell);
channel_update_xmit_queue_size(ch1);
tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 512);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 512);
cell = tor_malloc_zero(sizeof(cell_t));
make_fake_cell(cell);
channel_write_cell(ch2, cell);
channel_update_xmit_queue_size(ch2);
tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 512);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 1024);
/* Allow cells through again */
test_chan_accept_cells = 1;
/* Update and check queue sizes */
channel_update_xmit_queue_size(ch1);
channel_update_xmit_queue_size(ch2);
tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 512);
tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 512);
/* Update and check queue sizes */
channel_update_xmit_queue_size(ch1);
channel_update_xmit_queue_size(ch2);
tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 0);
tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 0);
/* Now block again */
test_chan_accept_cells = 0;
/* Queue some cells */
cell = tor_malloc_zero(sizeof(cell_t));
make_fake_cell(cell);
channel_write_cell(ch1, cell);
cell = tor_malloc_zero(sizeof(cell_t));
make_fake_cell(cell);
channel_write_cell(ch2, cell);
cell = NULL;
/* Check the estimates */
channel_update_xmit_queue_size(ch1);
channel_update_xmit_queue_size(ch2);
tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 512);
tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 512);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 1024);
/* Now close channel 2; it should be subtracted from the global queue */
MOCK(scheduler_release_channel, scheduler_release_channel_mock);
channel_mark_for_close(ch2);
UNMOCK(scheduler_release_channel);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 512);
/*
* Since the fake channels aren't registered, channel_free_all() can't
* see them properly.
*/
MOCK(scheduler_release_channel, scheduler_release_channel_mock);
channel_mark_for_close(ch1);
UNMOCK(scheduler_release_channel);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 0);
/* Now free everything */
MOCK(scheduler_release_channel, scheduler_release_channel_mock);
channel_free_all();
UNMOCK(scheduler_release_channel);
done:
free_fake_channel(ch1);
free_fake_channel(ch2);
return;
}
static void
test_channel_queue_size(void *arg)
{
channel_t *ch = NULL;
cell_t *cell = NULL;
int n, old_count;
uint64_t global_queue_estimate;
(void)arg;
ch = new_fake_channel();
tt_assert(ch);
/* Initial queue size update */
channel_update_xmit_queue_size(ch);
tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 0);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 0);
/* Test the call-through to our fake lower layer */
n = channel_num_cells_writeable(ch);
/* chan_test_num_cells_writeable() always returns 32 */
tt_int_op(n, OP_EQ, 32);
/*
* Now we queue some cells and check that channel_num_cells_writeable()
* adjusts properly
*/
/* tell it not to accept cells */
test_chan_accept_cells = 0;
/* ...and keep it from trying to flush the queue */
ch->state = CHANNEL_STATE_MAINT;
/* Get a fresh cell */
cell = tor_malloc_zero(sizeof(cell_t));
make_fake_cell(cell);
old_count = test_cells_written;
channel_write_cell(ch, cell);
/* Assert that it got queued, not written through, correctly */
tt_int_op(test_cells_written, OP_EQ, old_count);
/* Now check chan_test_num_cells_writeable() again */
n = channel_num_cells_writeable(ch);
/* Should return 0 since we're in CHANNEL_STATE_MAINT */
tt_int_op(n, OP_EQ, 0);
/* Update queue size estimates */
channel_update_xmit_queue_size(ch);
/* One cell, times an overhead factor of 1.0 */
tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512);
/* Try a different overhead factor */
test_overhead_estimate = 0.5;
/* This one should be ignored since it's below 1.0 */
channel_update_xmit_queue_size(ch);
tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512);
/* Now try a larger one */
test_overhead_estimate = 2.0;
channel_update_xmit_queue_size(ch);
tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 1024);
/* Go back to 1.0 */
test_overhead_estimate = 1.0;
channel_update_xmit_queue_size(ch);
tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512);
/* Check the global estimate too */
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 512);
/* Go to open */
old_count = test_cells_written;
channel_change_state_open(ch);
/*
* It should try to write, but we aren't accepting cells right now, so
* it'll requeue
*/
tt_int_op(test_cells_written, OP_EQ, old_count);
/* Check the queue size again */
channel_update_xmit_queue_size(ch);
tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 512);
/*
* Now the cell is in the queue, and we're open, so we should get 31
* writeable cells.
*/
n = channel_num_cells_writeable(ch);
tt_int_op(n, OP_EQ, 31);
/* Accept cells again */
test_chan_accept_cells = 1;
/* ...and re-process the queue */
old_count = test_cells_written;
tt_int_op(test_cells_written, OP_EQ, old_count + 1);
/* Should have 32 writeable now */
n = channel_num_cells_writeable(ch);
tt_int_op(n, OP_EQ, 32);
/* Should have queue size estimate of zero */
channel_update_xmit_queue_size(ch);
tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 0);
global_queue_estimate = channel_get_global_queue_estimate();
tt_u64_op(global_queue_estimate, OP_EQ, 0);
/* Okay, now we're done with this one */
MOCK(scheduler_release_channel, scheduler_release_channel_mock);
channel_mark_for_close(ch);
UNMOCK(scheduler_release_channel);
done:
free_fake_channel(ch);
return;
}
static void
test_channel_write(void *arg)
{
@ -1399,8 +1142,6 @@ struct testcase_t channel_tests[] = {
{ "incoming", test_channel_incoming, TT_FORK, NULL, NULL },
{ "lifecycle", test_channel_lifecycle, TT_FORK, NULL, NULL },
{ "lifecycle_2", test_channel_lifecycle_2, TT_FORK, NULL, NULL },
{ "multi", test_channel_multi, TT_FORK, NULL, NULL },
{ "queue_size", test_channel_queue_size, TT_FORK, NULL, NULL },
{ "write", test_channel_write, TT_FORK, NULL, NULL },
{ "id_map", test_channel_id_map, TT_FORK, NULL, NULL },
END_OF_TESTCASES