diff --git a/src/or/channel.c b/src/or/channel.c index 0caebfb55c..f729a17be9 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -124,6 +124,13 @@ static uint64_t n_channel_bytes_passed_to_lower_layer = 0; static uint64_t n_channel_bytes_in_queues = 0; +/* + * Current total estimated queue size *including lower layer queues and + * transmit overhead* + */ + +static uint64_t estimated_total_queue_size = 0; + /* Digest->channel map * * Similar to the one used in connection_or.c, this maps from the identity @@ -1840,6 +1847,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) n_channel_bytes_queued += cell_bytes; n_channel_bytes_in_queues += cell_bytes; channel_assert_counter_consistency(); + /* Update channel queue size */ + chan->bytes_in_queue += cell_bytes; /* Try to process the queue? */ if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan); } @@ -1878,6 +1887,9 @@ channel_write_cell(channel_t *chan, cell_t *cell) q.type = CELL_QUEUE_FIXED; q.u.fixed.cell = cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1913,6 +1925,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell) q.type = CELL_QUEUE_PACKED; q.u.packed.packed_cell = packed_cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1949,6 +1964,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell) q.type = CELL_QUEUE_VAR; q.u.var.var_cell = var_cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -2056,6 +2074,29 @@ channel_change_state(channel_t *chan, channel_state_t to_state) scheduler_channel_doesnt_want_writes(chan); } + /* + * If we're closing, this channel no longer counts toward the global + * estimated queue size; if we're open, it now does. + */ + if ((to_state == CHANNEL_STATE_CLOSING || + to_state == CHANNEL_STATE_CLOSED || + to_state == CHANNEL_STATE_ERROR) && + (from_state == CHANNEL_STATE_OPEN || + from_state == CHANNEL_STATE_MAINT)) { + estimated_total_queue_size -= chan->bytes_in_queue; + } + + /* + * If we're opening, this channel now does count toward the global + * estimated queue size. + */ + if ((to_state == CHANNEL_STATE_OPEN || + to_state == CHANNEL_STATE_MAINT) && + !(from_state == CHANNEL_STATE_OPEN || + from_state == CHANNEL_STATE_MAINT)) { + estimated_total_queue_size += chan->bytes_in_queue; + } + /* Tell circuits if we opened and stuff */ if (to_state == CHANNEL_STATE_OPEN) { channel_do_open_actions(chan); @@ -2350,6 +2391,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, n_channel_bytes_passed_to_lower_layer += cell_size; n_channel_bytes_in_queues -= cell_size; channel_assert_counter_consistency(); + /* Update the channel's queue size too */ + chan->bytes_in_queue -= cell_size; } /* No cell removed from list, so we can't go on any further */ else break; @@ -2362,6 +2405,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, channel_timestamp_drained(chan); } + /* Update the estimate queue size */ + channel_update_xmit_queue_size(chan); + return flushed; } @@ -4421,3 +4467,83 @@ channel_set_circid_type(channel_t *chan, } } +/** + * Update the estimated number of bytes queued to transmit for this channel, + * and notify the scheduler. The estimate includes both the channel queue and + * the queue size reported by the lower layer, and an overhead estimate + * optionally provided by the lower layer. + */ + +void +channel_update_xmit_queue_size(channel_t *chan) +{ + uint64_t queued, adj; + double overhead; + + tor_assert(chan); + tor_assert(chan->num_bytes_queued); + + /* + * First, get the number of bytes we have queued without factoring in + * lower-layer overhead. + */ + queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue; + /* Next, adjust by the overhead factor, if any is available */ + if (chan->get_overhead_estimate) { + overhead = chan->get_overhead_estimate(chan); + if (overhead >= 1.0f) { + queued *= overhead; + } else { + /* Ignore silly overhead factors */ + log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead); + } + } + + /* Now, compare to the previous estimate */ + if (queued > chan->bytes_queued_for_xmit) { + adj = queued - chan->bytes_queued_for_xmit; + log_debug(LD_CHANNEL, + "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT + " from " U64_FORMAT " to " U64_FORMAT, + U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->bytes_queued_for_xmit), + U64_PRINTF_ARG(queued)); + /* Update the channel's estimate */ + chan->bytes_queued_for_xmit = queued; + + /* Update the global queue size estimate if appropriate */ + if (chan->state == CHANNEL_STATE_OPEN || + chan->state == CHANNEL_STATE_MAINT) { + estimated_total_queue_size += adj; + log_debug(LD_CHANNEL, + "Increasing global queue size by " U64_FORMAT " for channel " + U64_FORMAT ", new size is " U64_FORMAT, + U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(estimated_total_queue_size)); + } + } else if (queued < chan->bytes_queued_for_xmit) { + adj = chan->bytes_queued_for_xmit - queued; + log_debug(LD_CHANNEL, + "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT + " from " U64_FORMAT " to " U64_FORMAT, + U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->bytes_queued_for_xmit), + U64_PRINTF_ARG(queued)); + /* Update the channel's estimate */ + chan->bytes_queued_for_xmit = queued; + + /* Update the global queue size estimate if appropriate */ + if (chan->state == CHANNEL_STATE_OPEN || + chan->state == CHANNEL_STATE_MAINT) { + estimated_total_queue_size -= adj; + log_debug(LD_CHANNEL, + "Decreasing global queue size by " U64_FORMAT " for channel " + U64_FORMAT ", new size is " U64_FORMAT, + U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(estimated_total_queue_size)); + } + } +} + diff --git a/src/or/channel.h b/src/or/channel.h index f35cb27aa6..388c729953 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -79,6 +79,11 @@ struct channel_s { /* Methods implemented by the lower layer */ /** + * Ask the lower layer for an estimate of the average overhead for + * transmissions on this channel. + */ + double (*get_overhead_estimate)(channel_t *); + /* * Ask the underlying transport what the remote endpoint address is, in * a tor_addr_t. This is optional and subclasses may leave this NULL. * If they implement it, they should write the address out to the @@ -110,6 +115,8 @@ struct channel_s { int (*matches_extend_info)(channel_t *, extend_info_t *); /** Check if this channel matches a target address when extending */ int (*matches_target)(channel_t *, const tor_addr_t *); + /* Ask the lower layer how many bytes it has queued but not yet sent */ + size_t (*num_bytes_queued)(channel_t *); /* Ask the lower layer how many cells can be written */ int (*num_cells_writeable)(channel_t *); /* Write a cell to an open channel */ @@ -202,6 +209,14 @@ struct channel_s { /** Channel counters for cell channels */ uint64_t n_cells_recved, n_bytes_recved; uint64_t n_cells_xmitted, n_bytes_xmitted; + + /** Our current contribution to the scheduler's total xmit queue */ + uint64_t bytes_queued_for_xmit; + + /** Number of bytes in this channel's cell queue; does not include + * lower-layer queueing. + */ + uint64_t bytes_in_queue; }; struct channel_listener_s { @@ -460,6 +475,7 @@ unsigned int channel_num_circuits(channel_t *chan); void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd, int consider_identity); void channel_timestamp_client(channel_t *chan); +void channel_update_xmit_queue_size(channel_t *chan); const char * channel_listener_describe_transport(channel_listener_t *chan_l); void channel_listener_dump_statistics(channel_listener_t *chan_l, diff --git a/src/or/channeltls.c b/src/or/channeltls.c index 7df2d35d93..a8222dea35 100644 --- a/src/or/channeltls.c +++ b/src/or/channeltls.c @@ -55,6 +55,7 @@ static void channel_tls_common_init(channel_tls_t *tlschan); static void channel_tls_close_method(channel_t *chan); static const char * channel_tls_describe_transport_method(channel_t *chan); static void channel_tls_free_method(channel_t *chan); +static double channel_tls_get_overhead_estimate_method(channel_t *chan); static int channel_tls_get_remote_addr_method(channel_t *chan, tor_addr_t *addr_out); static int @@ -69,6 +70,7 @@ channel_tls_matches_extend_info_method(channel_t *chan, static int channel_tls_matches_target_method(channel_t *chan, const tor_addr_t *target); static int channel_tls_num_cells_writeable_method(channel_t *chan); +static size_t channel_tls_num_bytes_queued_method(channel_t *chan); static int channel_tls_write_cell_method(channel_t *chan, cell_t *cell); static int channel_tls_write_packed_cell_method(channel_t *chan, @@ -118,6 +120,7 @@ channel_tls_common_init(channel_tls_t *tlschan) chan->close = channel_tls_close_method; chan->describe_transport = channel_tls_describe_transport_method; chan->free = channel_tls_free_method; + chan->get_overhead_estimate = channel_tls_get_overhead_estimate_method; chan->get_remote_addr = channel_tls_get_remote_addr_method; chan->get_remote_descr = channel_tls_get_remote_descr_method; chan->get_transport_name = channel_tls_get_transport_name_method; @@ -125,6 +128,7 @@ channel_tls_common_init(channel_tls_t *tlschan) chan->is_canonical = channel_tls_is_canonical_method; chan->matches_extend_info = channel_tls_matches_extend_info_method; chan->matches_target = channel_tls_matches_target_method; + chan->num_bytes_queued = channel_tls_num_bytes_queued_method; chan->num_cells_writeable = channel_tls_num_cells_writeable_method; chan->write_cell = channel_tls_write_cell_method; chan->write_packed_cell = channel_tls_write_packed_cell_method; @@ -437,6 +441,40 @@ channel_tls_free_method(channel_t *chan) } } +/** + * Get an estimate of the average TLS overhead for the upper layer + */ + +static double +channel_tls_get_overhead_estimate_method(channel_t *chan) +{ + double overhead = 1.0f; + channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); + + tor_assert(tlschan); + tor_assert(tlschan->conn); + + /* Just return 1.0f if we don't have sensible data */ + if (tlschan->conn->bytes_xmitted > 0 && + tlschan->conn->bytes_xmitted_by_tls >= + tlschan->conn->bytes_xmitted) { + overhead = ((double)(tlschan->conn->bytes_xmitted_by_tls)) / + ((double)(tlschan->conn->bytes_xmitted)); + + /* + * Never estimate more than 2.0; otherwise we get silly large estimates + * at the very start of a new TLS connection. + */ + if (overhead > 2.0f) overhead = 2.0f; + } + + log_debug(LD_CHANNEL, + "Estimated overhead ratio for TLS chan " U64_FORMAT " is %f", + U64_PRINTF_ARG(chan->global_identifier), overhead); + + return overhead; +} + /** * Get the remote address of a channel_tls_t * @@ -675,6 +713,22 @@ channel_tls_matches_target_method(channel_t *chan, return tor_addr_eq(&(tlschan->conn->real_addr), target); } +/** + * Tell the upper layer how many bytes we have queued and not yet + * sent. + */ + +static size_t +channel_tls_num_bytes_queued_method(channel_t *chan) +{ + channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); + + tor_assert(tlschan); + tor_assert(tlschan->conn); + + return connection_get_outbuf_len(TO_CONN(tlschan->conn)); +} + /** * Tell the upper layer how many cells we can accept to write * diff --git a/src/or/connection.c b/src/or/connection.c index 4a3bd2cf03..525f4b5e85 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -3839,6 +3839,8 @@ connection_handle_write_impl(connection_t *conn, int force) tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written); log_debug(LD_GENERAL, "After TLS write of %d: %ld read, %ld written", result, (long)n_read, (long)n_written); + or_conn->bytes_xmitted += result; + or_conn->bytes_xmitted_by_tls += n_written; /* So we notice bytes were written even on error */ /* XXXX024 This cast is safe since we can never write INT_MAX bytes in a * single set of TLS operations. But it looks kinda ugly. If we refactor diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 6b276cc3a1..445335b43a 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -583,6 +583,9 @@ connection_or_flushed_some(or_connection_t *conn) { size_t datalen; + /* The channel will want to update its estimated queue size */ + channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan)); + /* If we're under the low water mark, add cells until we're just over the * high water mark. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); diff --git a/src/or/or.h b/src/or/or.h index bdcb29ea11..320931d471 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1529,6 +1529,12 @@ typedef struct or_connection_t { /** Last emptied write token bucket in msec since midnight; only used if * TB_EMPTY events are enabled. */ uint32_t write_emptied_time; + + /* + * Count the number of bytes flushed out on this orconn, and the number of + * bytes TLS actually sent - used for overhead estimation for scheduling. + */ + uint64_t bytes_xmitted, bytes_xmitted_by_tls; } or_connection_t; /** Subtype of connection_t for an "edge connection" -- that is, an entry (ap)