Have the OOM handler also count the age the data in a stream buffer

This commit is contained in:
Nick Mathewson 2013-11-15 18:38:52 -05:00
parent 9e90707602
commit 91ec6f7269
5 changed files with 108 additions and 15 deletions

View File

@ -68,6 +68,8 @@ typedef struct chunk_t {
size_t datalen; /**< The number of bytes stored in this chunk */ size_t datalen; /**< The number of bytes stored in this chunk */
size_t memlen; /**< The number of usable bytes of storage in <b>mem</b>. */ size_t memlen; /**< The number of usable bytes of storage in <b>mem</b>. */
char *data; /**< A pointer to the first byte of data stored in <b>mem</b>. */ char *data; /**< A pointer to the first byte of data stored in <b>mem</b>. */
uint32_t inserted_time; /**< Timestamp in truncated ms since epoch
* when this chunk was inserted. */
char mem[FLEXIBLE_ARRAY_MEMBER]; /**< The actual memory used for storage in char mem[FLEXIBLE_ARRAY_MEMBER]; /**< The actual memory used for storage in
* this chunk. */ * this chunk. */
} chunk_t; } chunk_t;
@ -139,6 +141,9 @@ static chunk_freelist_t freelists[] = {
* could help with? */ * could help with? */
static uint64_t n_freelist_miss = 0; static uint64_t n_freelist_miss = 0;
/** DOCDOC */
static size_t total_bytes_allocated_in_chunks = 0;
static void assert_freelist_ok(chunk_freelist_t *fl); static void assert_freelist_ok(chunk_freelist_t *fl);
/** Return the freelist to hold chunks of size <b>alloc</b>, or NULL if /** Return the freelist to hold chunks of size <b>alloc</b>, or NULL if
@ -172,6 +177,8 @@ chunk_free_unchecked(chunk_t *chunk)
} else { } else {
if (freelist) if (freelist)
++freelist->n_free; ++freelist->n_free;
tor_assert(total_bytes_allocated_in_chunks >= alloc);
total_bytes_allocated_in_chunks -= alloc;
tor_free(chunk); tor_free(chunk);
} }
} }
@ -200,6 +207,7 @@ chunk_new_with_alloc_size(size_t alloc)
else else
++n_freelist_miss; ++n_freelist_miss;
ch = tor_malloc(alloc); ch = tor_malloc(alloc);
total_bytes_allocated_in_chunks += alloc;
} }
ch->next = NULL; ch->next = NULL;
ch->datalen = 0; ch->datalen = 0;
@ -211,6 +219,10 @@ chunk_new_with_alloc_size(size_t alloc)
static void static void
chunk_free_unchecked(chunk_t *chunk) chunk_free_unchecked(chunk_t *chunk)
{ {
if (!chunk)
return;
tor_assert(total_bytes_allocated_in_chunks >= CHUNK_ALLOC_SIZE(chunk->memlen));
total_bytes_allocated_in_chunks -= CHUNK_ALLOC_SIZE(chunk->memlen);
tor_free(chunk); tor_free(chunk);
} }
static INLINE chunk_t * static INLINE chunk_t *
@ -221,6 +233,7 @@ chunk_new_with_alloc_size(size_t alloc)
ch->next = NULL; ch->next = NULL;
ch->datalen = 0; ch->datalen = 0;
ch->memlen = CHUNK_SIZE_WITH_ALLOC(alloc); ch->memlen = CHUNK_SIZE_WITH_ALLOC(alloc);
total_bytes_allocated_in_chunks += alloc;
ch->data = &ch->mem[0]; ch->data = &ch->mem[0];
return ch; return ch;
} }
@ -237,6 +250,7 @@ chunk_grow(chunk_t *chunk, size_t sz)
chunk = tor_realloc(chunk, CHUNK_ALLOC_SIZE(sz)); chunk = tor_realloc(chunk, CHUNK_ALLOC_SIZE(sz));
chunk->memlen = sz; chunk->memlen = sz;
chunk->data = chunk->mem + offset; chunk->data = chunk->mem + offset;
total_bytes_allocated_in_chunks += (sz - chunk->memlen);
return chunk; return chunk;
} }
@ -298,6 +312,8 @@ buf_shrink_freelists(int free_all)
*chp = NULL; *chp = NULL;
while (chunk) { while (chunk) {
chunk_t *next = chunk->next; chunk_t *next = chunk->next;
tor_assert(total_bytes_allocated_in_chunks >= CHUNK_ALLOC_SIZE(chunk->memlen));
total_bytes_allocated_in_chunks -= CHUNK_ALLOC_SIZE(chunk->memlen);
tor_free(chunk); tor_free(chunk);
chunk = next; chunk = next;
--n_to_free; --n_to_free;
@ -599,6 +615,7 @@ static chunk_t *
buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped) buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
{ {
chunk_t *chunk; chunk_t *chunk;
struct timeval now;
if (CHUNK_ALLOC_SIZE(capacity) < buf->default_chunk_size) { if (CHUNK_ALLOC_SIZE(capacity) < buf->default_chunk_size) {
chunk = chunk_new_with_alloc_size(buf->default_chunk_size); chunk = chunk_new_with_alloc_size(buf->default_chunk_size);
} else if (capped && CHUNK_ALLOC_SIZE(capacity) > MAX_CHUNK_ALLOC) { } else if (capped && CHUNK_ALLOC_SIZE(capacity) > MAX_CHUNK_ALLOC) {
@ -606,6 +623,10 @@ buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
} else { } else {
chunk = chunk_new_with_alloc_size(preferred_chunk_size(capacity)); chunk = chunk_new_with_alloc_size(preferred_chunk_size(capacity));
} }
tor_gettimeofday_cached(&now);
chunk->inserted_time = (uint32_t)tv_to_msec(&now);
if (buf->tail) { if (buf->tail) {
tor_assert(buf->head); tor_assert(buf->head);
buf->tail->next = chunk; buf->tail->next = chunk;
@ -618,6 +639,26 @@ buf_add_chunk_with_capacity(buf_t *buf, size_t capacity, int capped)
return chunk; return chunk;
} }
/** Return the age of the oldest chunk in the buffer <b>buf</b>, in
* milliseconds. Requires the current time, in truncated milliseconds since
* the epoch, as its input <b>now</b>.
*/
uint32_t
buf_get_oldest_chunk_timestamp(const buf_t *buf, uint32_t now)
{
if (buf->head) {
return now - buf->head->inserted_time;
} else {
return 0;
}
}
size_t
buf_get_total_allocation(void)
{
return total_bytes_allocated_in_chunks;
}
/** Read up to <b>at_most</b> bytes from the socket <b>fd</b> into /** Read up to <b>at_most</b> bytes from the socket <b>fd</b> into
* <b>chunk</b> (which must be on <b>buf</b>). If we get an EOF, set * <b>chunk</b> (which must be on <b>buf</b>). If we get an EOF, set
* *<b>reached_eof</b> to 1. Return -1 on error, 0 on eof or blocking, * *<b>reached_eof</b> to 1. Return -1 on error, 0 on eof or blocking,

View File

@ -25,6 +25,9 @@ size_t buf_datalen(const buf_t *buf);
size_t buf_allocation(const buf_t *buf); size_t buf_allocation(const buf_t *buf);
size_t buf_slack(const buf_t *buf); size_t buf_slack(const buf_t *buf);
uint32_t buf_get_oldest_chunk_timestamp(const buf_t *buf, uint32_t now);
size_t buf_get_total_allocation(void);
int read_to_buf(tor_socket_t s, size_t at_most, buf_t *buf, int *reached_eof, int read_to_buf(tor_socket_t s, size_t at_most, buf_t *buf, int *reached_eof,
int *socket_error); int *socket_error);
int read_to_buf_tls(tor_tls_t *tls, size_t at_most, buf_t *buf); int read_to_buf_tls(tor_tls_t *tls, size_t at_most, buf_t *buf);

View File

@ -1409,20 +1409,63 @@ circuit_max_queued_cell_age(const circuit_t *c, uint32_t now)
return age; return age;
} }
/** Temporary variable for circuits_compare_by_oldest_queued_cell_ This is a /** DOCDOC*/
* kludge to work around the fact that qsort doesn't provide a way for static uint32_t
* comparison functions to take an extra argument. */ circuit_get_streams_max_data_age(const edge_connection_t *stream, uint32_t now)
static uint32_t circcomp_now_tmp; {
uint32_t age = 0, age2;
for (; stream; stream = stream->next_stream) {
const connection_t *conn = TO_CONN(stream);
if (conn->outbuf) {
age2 = buf_get_oldest_chunk_timestamp(conn->outbuf, now);
if (age2 > age)
age = age2;
}
if (conn->inbuf) {
age2 = buf_get_oldest_chunk_timestamp(conn->inbuf, now);
if (age2 > age)
age = age2;
}
}
/** Helper to sort a list of circuit_t by age of oldest cell, in descending return age;
* order. Requires that circcomp_now_tmp is set correctly. */ }
/** DOCDOC
*/
static uint32_t
circuit_max_queued_data_age(const circuit_t *c, uint32_t now)
{
if (CIRCUIT_IS_ORIGIN(c)) {
return circuit_get_streams_max_data_age(
TO_ORIGIN_CIRCUIT((circuit_t*)c)->p_streams, now);
} else {
return circuit_get_streams_max_data_age(
TO_OR_CIRCUIT((circuit_t*)c)->n_streams, now);
}
}
/** DATA */
static uint32_t
circuit_max_queued_item_age(const circuit_t *c, uint32_t now)
{
uint32_t cell_age = circuit_max_queued_cell_age(c, now);
uint32_t data_age = circuit_max_queued_data_age(c, now);
if (cell_age > data_age)
return cell_age;
else
return data_age;
}
/** Helper to sort a list of circuit_t by age of oldest item, in descending
* order. */
static int static int
circuits_compare_by_oldest_queued_cell_(const void **a_, const void **b_) circuits_compare_by_oldest_queued_item_(const void **a_, const void **b_)
{ {
const circuit_t *a = *a_; const circuit_t *a = *a_;
const circuit_t *b = *b_; const circuit_t *b = *b_;
uint32_t age_a = circuit_max_queued_cell_age(a, circcomp_now_tmp); uint32_t age_a = a->age_tmp;
uint32_t age_b = circuit_max_queued_cell_age(b, circcomp_now_tmp); uint32_t age_b = b->age_tmp;
if (age_a < age_b) if (age_a < age_b)
return 1; return 1;
@ -1446,6 +1489,7 @@ circuits_handle_oom(size_t current_allocation)
size_t n_cells_removed=0, n_cells_to_remove; size_t n_cells_removed=0, n_cells_to_remove;
int n_circuits_killed=0; int n_circuits_killed=0;
struct timeval now; struct timeval now;
uint32_t now_ms;
log_notice(LD_GENERAL, "We're low on memory. Killing circuits with " log_notice(LD_GENERAL, "We're low on memory. Killing circuits with "
"over-long queues. (This behavior is controlled by " "over-long queues. (This behavior is controlled by "
"MaxMemInCellQueues.)"); "MaxMemInCellQueues.)");
@ -1460,18 +1504,19 @@ circuits_handle_oom(size_t current_allocation)
n_cells_to_remove = CEIL_DIV(mem_to_recover, packed_cell_mem_cost()); n_cells_to_remove = CEIL_DIV(mem_to_recover, packed_cell_mem_cost());
} }
tor_gettimeofday_cached(&now);
now_ms = (uint32_t)tv_to_msec(&now);
/* This algorithm itself assumes that you've got enough memory slack /* This algorithm itself assumes that you've got enough memory slack
* to actually run it. */ * to actually run it. */
for (circ = global_circuitlist; circ; circ = circ->next) for (circ = global_circuitlist; circ; circ = circ->next) {
circ->age_tmp = circuit_max_queued_item_age(circ, now_ms);
smartlist_add(circlist, circ); smartlist_add(circlist, circ);
}
/* Set circcomp_now_tmp so that the sort can work. */
tor_gettimeofday_cached(&now);
circcomp_now_tmp = (uint32_t)tv_to_msec(&now);
/* This is O(n log n); there are faster algorithms we could use instead. /* This is O(n log n); there are faster algorithms we could use instead.
* Let's hope this doesn't happen enough to be in the critical path. */ * Let's hope this doesn't happen enough to be in the critical path. */
smartlist_sort(circlist, circuits_compare_by_oldest_queued_cell_); smartlist_sort(circlist, circuits_compare_by_oldest_queued_item_);
/* Okay, now the worst circuits are at the front of the list. Let's mark /* Okay, now the worst circuits are at the front of the list. Let's mark
* them, and reclaim their storage aggressively. */ * them, and reclaim their storage aggressively. */

View File

@ -2564,6 +2564,9 @@ typedef struct circuit_t {
* more. */ * more. */
int deliver_window; int deliver_window;
/** Temporary field used during circuits_handle_oom. */
uint32_t age_tmp;
/** For storage while n_conn is pending /** For storage while n_conn is pending
* (state CIRCUIT_STATE_OR_WAIT). When defined, it is always * (state CIRCUIT_STATE_OR_WAIT). When defined, it is always
* length ONIONSKIN_CHALLENGE_LEN. */ * length ONIONSKIN_CHALLENGE_LEN. */

View File

@ -1998,6 +1998,7 @@ static int
cell_queues_check_size(void) cell_queues_check_size(void)
{ {
size_t alloc = total_cells_allocated * packed_cell_mem_cost(); size_t alloc = total_cells_allocated * packed_cell_mem_cost();
alloc += buf_get_total_allocation();
if (alloc >= get_options()->MaxMemInCellQueues) { if (alloc >= get_options()->MaxMemInCellQueues) {
circuits_handle_oom(alloc); circuits_handle_oom(alloc);
return 1; return 1;