Provide generic mechanism for scheduler to query writeable cells on a channel

This commit is contained in:
Andrea Shepard 2013-10-29 02:13:53 -07:00
parent 472b62bfe4
commit 2efbab2aaf
6 changed files with 95 additions and 12 deletions

View File

@ -3826,6 +3826,40 @@ channel_mark_outgoing(channel_t *chan)
chan->is_incoming = 0;
}
/************************
* Flow control queries *
***********************/
/*
* Estimate the number of writeable cells
*
* Ask the lower layer for an estimate of how many cells it can accept, and
* then subtract the length of our outgoing_queue, if any, to produce an
* estimate of the number of cells this channel can accept for writes.
*/
int
channel_num_cells_writeable(channel_t *chan)
{
int result;
tor_assert(chan);
tor_assert(chan->num_cells_writeable);
if (chan->state == CHANNEL_STATE_OPEN) {
/* Query lower layer */
result = chan->num_cells_writeable(chan);
/* Subtract cell queue length, if any */
result -= chan_cell_queue_len(&chan->outgoing_queue);
if (result < 0) result = 0;
} else {
/* No cells are writeable in any other state */
result = 0;
}
return result;
}
/*********************
* Timestamp updates *
********************/

View File

@ -110,7 +110,9 @@ struct channel_s {
int (*matches_extend_info)(channel_t *, extend_info_t *);
/** Check if this channel matches a target address when extending */
int (*matches_target)(channel_t *, const tor_addr_t *);
/** Write a cell to an open channel */
/* Ask the lower layer how many cells can be written */
int (*num_cells_writeable)(channel_t *);
/* Write a cell to an open channel */
int (*write_cell)(channel_t *, cell_t *);
/** Write a packed cell to an open channel */
int (*write_packed_cell)(channel_t *, packed_cell_t *);
@ -465,6 +467,9 @@ void channel_listener_dump_statistics(channel_listener_t *chan_l,
void channel_listener_dump_transport_statistics(channel_listener_t *chan_l,
int severity);
/* Flow control queries */
int channel_num_cells_writeable(channel_t *chan);
/* Timestamp queries */
time_t channel_when_created(channel_t *chan);
time_t channel_when_last_active(channel_t *chan);

View File

@ -68,6 +68,7 @@ channel_tls_matches_extend_info_method(channel_t *chan,
extend_info_t *extend_info);
static int channel_tls_matches_target_method(channel_t *chan,
const tor_addr_t *target);
static int channel_tls_num_cells_writeable_method(channel_t *chan);
static int channel_tls_write_cell_method(channel_t *chan,
cell_t *cell);
static int channel_tls_write_packed_cell_method(channel_t *chan,
@ -124,6 +125,7 @@ channel_tls_common_init(channel_tls_t *tlschan)
chan->is_canonical = channel_tls_is_canonical_method;
chan->matches_extend_info = channel_tls_matches_extend_info_method;
chan->matches_target = channel_tls_matches_target_method;
chan->num_cells_writeable = channel_tls_num_cells_writeable_method;
chan->write_cell = channel_tls_write_cell_method;
chan->write_packed_cell = channel_tls_write_packed_cell_method;
chan->write_var_cell = channel_tls_write_var_cell_method;
@ -673,6 +675,34 @@ channel_tls_matches_target_method(channel_t *chan,
return tor_addr_eq(&(tlschan->conn->real_addr), target);
}
/**
* Tell the upper layer how many cells we can accept to write
*
* This implements the num_cells_writeable method for channel_tls_t; it
* returns an estimate of the number of cells we can accept with
* channel_tls_write_*_cell().
*/
static int
channel_tls_num_cells_writeable_method(channel_t *chan)
{
size_t outbuf_len;
int n;
channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
size_t cell_network_size;
tor_assert(tlschan);
tor_assert(tlschan->conn);
cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids);
outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn));
/* Get the number of cells */
n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size);
if (n < 0) n = 0;
return n;
}
/**
* Write a cell to a channel_tls_t
*

View File

@ -576,14 +576,6 @@ connection_or_process_inbuf(or_connection_t *conn)
return ret;
}
/** When adding cells to an OR connection's outbuf, keep adding until the
* outbuf is at least this long, or we run out of cells. */
#define OR_CONN_HIGHWATER (32*1024)
/** Add cells to an OR connection's outbuf whenever the outbuf's data length
* drops below this size. */
#define OR_CONN_LOWWATER (16*1024)
/** Called whenever we have flushed some data on an or_conn: add more data
* from active circuits. */
int

View File

@ -1426,6 +1426,18 @@ typedef struct or_handshake_state_t {
/** Length of Extended ORPort connection identifier. */
#define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */
/*
* OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so
* channeltls.c can see them too.
*/
/** When adding cells to an OR connection's outbuf, keep adding until the
* outbuf is at least this long, or we run out of cells. */
#define OR_CONN_HIGHWATER (32*1024)
/** Add cells to an OR connection's outbuf whenever the outbuf's data length
* drops below this size. */
#define OR_CONN_LOWWATER (16*1024)
/** Subtype of connection_t for an "OR connection" -- that is, one that speaks
* cells over TLS. */

View File

@ -324,6 +324,7 @@ void
scheduler_run(void)
{
smartlist_t *tmp = NULL;
int n_cells;
log_debug(LD_SCHED, "We have a chance to run the scheduler");
@ -337,9 +338,18 @@ scheduler_run(void)
channels_pending = smartlist_new();
SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
log_debug(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p",
U64_PRINTF_ARG(chan->global_identifier), chan);
n_cells = channel_num_cells_writeable(chan);
if (n_cells > 0) {
log_debug(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"%d cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
} else {
log_info(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p with "
"no cells writeable",
U64_PRINTF_ARG(chan->global_identifier), chan);
}
} SMARTLIST_FOREACH_END(chan);
smartlist_free(tmp);