Instead of adding servers and v1 directories to buffers en masse, directory servers add them on the fly as their outbufs are depleted. This will save ram on busy dirservers.

svn:r6641
This commit is contained in:
Nick Mathewson 2006-06-18 07:38:55 +00:00
parent 73ada60d64
commit 1d9923da7c
5 changed files with 286 additions and 86 deletions

View File

@ -31,21 +31,23 @@ Items for 0.1.2.x:
N . Improve memory usage on tight-memory machines.
- Directory-related fixes.
o Remember offset and location of each descriptor in the cache/journal
- When sending a big pile of descs to a client, don't shove them all
o When sending a big pile of descs to a client, don't shove them all
on the buffer at once. Keep a list of the descriptor digests for
the descriptors we still want to send. We might end up truncating
some replies by returning fewer descriptors than were requested (if
somebody requests a desc that we throw away before we deliver it),
but this happens only when somebody wants an obsolete desc, and
clients can already handle truncated replies.
- But what do we do about compression? That's the part that makes
. But what do we do about compression? That's the part that makes
stuff hard.
- Implement compress/decompress-on-the-fly support.
- Use it for returning lists of descriptors and lists of
network status docs.
o Implement compress/decompress-on-the-fly support.
o Use it for returning lists of descriptors.
- Use it for returning lists of network status docs. (This will
take a hybrid approach; let's get the other bits working first.)
o Make clients handle missing Content-Length tags. (Oh, they do.)
o Verify that this has happened for a long time.
- Try a similar trick for spooling out v1 directories.
o Try a similar trick for spooling out v1 directories. These we
_uncompress_ on the fly.
- Look into pulling serverdescs off buffers as they arrive.
- Mmap cache files where possible.
- Mmap cached-routers file; when building it, go oldest-to-newest.
@ -54,7 +56,7 @@ N . Improve memory usage on tight-memory machines.
- Save and mmap v1 directories; store them zipped?
- "bandwidth classes", for incoming vs initiated-here conns.
N - Asynchronous DNS
o Asynchronous DNS
- Security improvements
- Directory guards

View File

@ -20,6 +20,7 @@ static int connection_init_accepted_conn(connection_t *conn);
static int connection_handle_listener_read(connection_t *conn, int new_type);
static int connection_receiver_bucket_should_increase(connection_t *conn);
static int connection_finished_flushing(connection_t *conn);
static int connection_flushed_some(connection_t *conn);
static int connection_finished_connecting(connection_t *conn);
static int connection_reached_eof(connection_t *conn);
static int connection_read_to_buf(connection_t *conn, int *max_to_read);
@ -241,6 +242,14 @@ _connection_free(connection_t *conn)
log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest");
connection_or_remove_from_identity_map(conn);
}
if (conn->zlib_state)
tor_zlib_free(conn->zlib_state);
if (conn->fingerprint_stack) {
SMARTLIST_FOREACH(conn->fingerprint_stack, char *, cp, tor_free(cp));
smartlist_free(conn->fingerprint_stack);
}
if (conn->cached_dir)
cached_dir_decref(conn->cached_dir);
memset(conn, 0xAA, sizeof(connection_t)); /* poison memory */
tor_free(conn);
@ -1491,9 +1500,13 @@ connection_handle_write(connection_t *conn)
}
}
if (result > 0 && !is_local_IP(conn->addr)) { /* remember it */
rep_hist_note_bytes_written(result, now);
global_write_bucket -= result;
if (result > 0) {
if (!is_local_IP(conn->addr)) { /* remember it */
rep_hist_note_bytes_written(result, time(NULL));
global_write_bucket -= result;
}
if (connection_flushed_some(conn) < 0)
connection_mark_for_close(conn);
}
if (!connection_wants_to_flush(conn)) { /* it's done flushing */
@ -1531,9 +1544,13 @@ _connection_controller_force_write(connection_t *conn)
return;
}
if (result > 0 && !is_local_IP(conn->addr)) { /* remember it */
rep_hist_note_bytes_written(result, time(NULL));
global_write_bucket -= result;
if (result > 0) {
if (!is_local_IP(conn->addr)) { /* remember it */
rep_hist_note_bytes_written(result, time(NULL));
global_write_bucket -= result;
}
if (connection_flushed_some(conn) < 0)
connection_mark_for_close(conn);
}
if (!connection_wants_to_flush(conn)) { /* it's done flushing */
@ -1913,6 +1930,17 @@ connection_process_inbuf(connection_t *conn, int package_partial)
}
}
/** Called whenever we've written data on a connection. */
static int
connection_flushed_some(connection_t *conn)
{
if (conn->type == CONN_TYPE_DIR &&
conn->state == DIR_CONN_STATE_SERVER_WRITING)
return connection_dirserv_flushed_some(conn);
else
return 0;
}
/** We just finished flushing bytes from conn-\>outbuf, and there
* are no more bytes remaining.
*

View File

@ -1374,9 +1374,9 @@ directory_handle_command_get(connection_t *conn, char *headers,
if (!strcmp(url,"/tor/") || !strcmp(url,"/tor/dir.z")) { /* dir fetch */
int deflated = !strcmp(url,"/tor/dir.z");
dlen = dirserv_get_directory(&cp, deflated);
cached_dir_t *d = dirserv_get_directory();
if (dlen == 0) {
if (!d) {
log_notice(LD_DIRSERV,"Client asked for the mirrored directory, but we "
"don't have a good one yet. Sending 503 Dir not available.");
write_http_status_line(conn, 503, "Directory unavailable");
@ -1386,6 +1386,7 @@ directory_handle_command_get(connection_t *conn, char *headers,
tor_free(url);
return 0;
}
dlen = deflated ? d->dir_z_len : d->dir_len;
if (global_write_bucket_empty()) {
log_info(LD_DIRSERV,
@ -1410,7 +1411,14 @@ directory_handle_command_get(connection_t *conn, char *headers,
deflated?"application/octet-stream":"text/plain",
deflated?"deflate":"identity");
connection_write_to_buf(tmp, strlen(tmp), conn);
connection_write_to_buf(cp, dlen, conn);
conn->cached_dir = d;
conn->cached_dir_offset = 0;
if (deflated)
conn->zlib_state = tor_zlib_new(0, ZLIB_METHOD);
++d->refcnt;
/* Prime the connection with some data. */
connection_dirserv_flushed_some(conn);
return 0;
}
@ -1495,11 +1503,11 @@ directory_handle_command_get(connection_t *conn, char *headers,
int deflated = !strcmp(url+url_len-2, ".z");
int res;
const char *msg;
smartlist_t *descs = smartlist_create();
const char *request_type = NULL;
if (deflated)
url[url_len-2] = '\0';
res = dirserv_get_routerdescs(descs, url, &msg);
conn->fingerprint_stack = smartlist_create();
res = dirserv_get_routerdescs(conn->fingerprint_stack, url, &msg);
if (!strcmpstart(url, "/tor/server/fp/"))
request_type = deflated?"/tor/server/fp.z":"/tor/server/fp";
@ -1516,58 +1524,27 @@ directory_handle_command_get(connection_t *conn, char *headers,
if (res < 0)
write_http_status_line(conn, 404, msg);
else {
size_t len = 0;
format_rfc1123_time(date, time(NULL));
SMARTLIST_FOREACH(descs, signed_descriptor_t *, ri,
len += ri->signed_descriptor_len);
if (deflated) {
size_t compressed_len;
char *compressed;
char *inp = tor_malloc(len+smartlist_len(descs)+1);
char *cp = inp;
SMARTLIST_FOREACH(descs, signed_descriptor_t *, ri,
{
const char *body = signed_descriptor_get_body(ri);
memcpy(cp, body, ri->signed_descriptor_len);
cp += ri->signed_descriptor_len;
*cp++ = '\n';
});
*cp = '\0';
/* XXXX This could be way more efficiently handled; let's see if it
* shows up under oprofile. */
if (tor_gzip_compress(&compressed, &compressed_len,
inp, cp-inp, ZLIB_METHOD)<0) {
tor_free(inp);
smartlist_free(descs);
return -1;
}
tor_free(inp);
conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD);
/* // note_request(request_type, compressed_len); XXXX */
tor_snprintf(tmp, sizeof(tmp),
"HTTP/1.0 200 OK\r\nDate: %s\r\nContent-Length: %d\r\n"
"HTTP/1.0 200 OK\r\nDate: %s\r\n"
"Content-Type: application/octet-stream\r\n"
"Content-Encoding: deflate\r\n\r\n",
date,
(int)compressed_len);
note_request(request_type, compressed_len);
date);
connection_write_to_buf(tmp, strlen(tmp), conn);
connection_write_to_buf(compressed, compressed_len, conn);
tor_free(compressed);
} else {
tor_snprintf(tmp, sizeof(tmp),
"HTTP/1.0 200 OK\r\nDate: %s\r\nContent-Length: %d\r\n"
"HTTP/1.0 200 OK\r\nDate: %s\r\n"
"Content-Type: text/plain\r\n\r\n",
date,
(int)len);
note_request(request_type, len);
date);
/* note_request(request_type, len); XXXX */
connection_write_to_buf(tmp, strlen(tmp), conn);
SMARTLIST_FOREACH(descs, signed_descriptor_t *, ri,
{
const char *body = signed_descriptor_get_body(ri);
connection_write_to_buf(body, ri->signed_descriptor_len, conn);
});
}
/* Prime the connection with some data. */
connection_dirserv_flushed_some(conn);
}
smartlist_free(descs);
return 0;
}
@ -1690,7 +1667,6 @@ static int
directory_handle_command_post(connection_t *conn, char *headers,
char *body, size_t body_len)
{
const char *cp;
char *origin = NULL;
char *url = NULL;
@ -1718,7 +1694,7 @@ directory_handle_command_post(connection_t *conn, char *headers,
int r = dirserv_add_descriptor(body, &msg);
tor_assert(msg);
if (r > 0)
dirserv_get_directory(&cp, 0); /* rebuild and write to disk */
dirserv_get_directory(); /* rebuild and write to disk */
switch (r) {
case -2:
case -1:

View File

@ -51,6 +51,7 @@ dirserv_get_status_impl(const char *fp, const char *nickname,
const char **msg, int should_log);
static int dirserv_thinks_router_is_reachable(routerinfo_t *router,
time_t now);
static void clear_cached_dir(cached_dir_t *d);
/************** Fingerprint handling code ************/
@ -865,12 +866,12 @@ dirserv_dump_directory_to_string(char **dir_out,
}
/** Most recently generated encoded signed directory. (auth dirservers only.)*/
static cached_dir_t the_directory = { NULL, NULL, 0, 0, 0 };
static cached_dir_t *the_directory = NULL;
/* Used only by non-auth dirservers: The directory and runningrouters we'll
* serve when requested. */
static cached_dir_t cached_directory = { NULL, NULL, 0, 0, 0 };
static cached_dir_t cached_runningrouters = { NULL, NULL, 0, 0, 0 };
static cached_dir_t *cached_directory = NULL;
static cached_dir_t cached_runningrouters = { NULL, NULL, 0, 0, 0, -1 };
/* Used for other dirservers' v2 network statuses. Map from hexdigest to
* cached_dir_t. */
@ -907,6 +908,32 @@ set_cached_dir(cached_dir_t *d, char *directory, time_t when)
}
}
/** DOCDOC */
void
cached_dir_decref(cached_dir_t *d)
{
if (!d || --d->refcnt > 0)
return;
clear_cached_dir(d);
tor_free(d);
}
/** DOCDOC */
static cached_dir_t *
new_cached_dir(char *s, time_t published)
{
cached_dir_t *d = tor_malloc_zero(sizeof(cached_dir_t));
d->refcnt = 1;
d->dir = s;
d->dir_len = strlen(s);
d->published = published;
if (tor_gzip_compress(&(d->dir_z), &(d->dir_z_len), d->dir, d->dir_len,
ZLIB_METHOD)) {
log_warn(LD_BUG, "Error compressing directory");
}
return d;
}
/** Remove all storage held in <b>d</b>, but do not free <b>d</b> itself. */
static void
clear_cached_dir(cached_dir_t *d)
@ -932,9 +959,12 @@ void
dirserv_set_cached_directory(const char *directory, time_t published,
int is_running_routers)
{
cached_dir_t *d;
d = is_running_routers ? &cached_runningrouters : &cached_directory;
set_cached_dir(d, tor_strdup(directory), published);
if (is_running_routers) {
set_cached_dir(&cached_runningrouters, tor_strdup(directory), published);
} else {
cached_dir_decref(cached_directory);
cached_directory = new_cached_dir(tor_strdup(directory), published);
}
}
/** We've just received a v2 network-status for an authoritative directory
@ -1043,7 +1073,8 @@ dirserv_pick_cached_dir_obj(cached_dir_t *cache_src,
* this kind of object.
**/
static size_t
dirserv_get_obj(const char **out, int compress,
dirserv_get_obj(const char **out,
int compress,
cached_dir_t *cache_src,
cached_dir_t *auth_src,
time_t dirty, int (*regenerate)(void),
@ -1065,17 +1096,16 @@ dirserv_get_obj(const char **out, int compress,
}
}
/** Set *<b>directory</b> to the most recently generated encoded signed
* directory, generating a new one as necessary. If not an authoritative
* directory may return 0 if no directory is yet cached.*/
size_t
dirserv_get_directory(const char **directory, int compress)
/** Return the most recently generated encoded signed directory, generating a
* new one as necessary. If not an authoritative directory may return NULL if
* no directory is yet cached.*/
cached_dir_t *
dirserv_get_directory(void)
{
return dirserv_get_obj(directory, compress,
&cached_directory, &the_directory,
the_directory_is_dirty,
dirserv_regenerate_directory,
"server directory", 1);
return dirserv_pick_cached_dir_obj(cached_directory, the_directory,
the_directory_is_dirty,
dirserv_regenerate_directory,
"server directory", 1);
}
/**
@ -1092,23 +1122,24 @@ dirserv_regenerate_directory(void)
tor_free(new_directory);
return -1;
}
set_cached_dir(&the_directory, new_directory, time(NULL));
cached_dir_decref(the_directory);
the_directory = new_cached_dir(new_directory, time(NULL));
log_info(LD_DIRSERV,"New directory (size %d) has been built.",
(int)the_directory.dir_len);
(int)the_directory->dir_len);
log_debug(LD_DIRSERV,"New directory (size %d):\n%s",
(int)the_directory.dir_len, the_directory.dir);
(int)the_directory->dir_len, the_directory->dir);
the_directory_is_dirty = 0;
/* Save the directory to disk so we re-load it quickly on startup.
*/
dirserv_set_cached_directory(the_directory.dir, time(NULL), 0);
dirserv_set_cached_directory(the_directory->dir, time(NULL), 0);
return 0;
}
/** For authoritative directories: the current (v1) network status */
static cached_dir_t the_runningrouters = { NULL, NULL, 0, 0, 0 };
static cached_dir_t the_runningrouters = { NULL, NULL, 0, 0, 0, -1 };
/** Replace the current running-routers list with a newly generated one. */
static int
@ -1177,7 +1208,7 @@ dirserv_get_runningrouters(const char **rr, int compress)
}
/** For authoritative directories: the current (v2) network status */
static cached_dir_t the_v2_networkstatus = { NULL, NULL, 0, 0, 0 };
static cached_dir_t the_v2_networkstatus = { NULL, NULL, 0, 0, 0, -1 };
static int
should_generate_v2_networkstatus(void)
@ -1535,6 +1566,44 @@ dirserv_get_networkstatus_v2(smartlist_t *result,
}
}
/** As dirserv_get_routerdescs(), but instead of getting signed_descriptor_t
* pointers, adds copies of digests to fps_out. For a /tor/server/d/ request,
* adds descriptor digests; for other requests, adds identity digests.
*/
int
dirserv_get_routerdesc_fingerprints(smartlist_t *fps_out, const char *key,
const char **msg)
{
*msg = NULL;
if (!strcmp(key, "/tor/server/all")) {
routerlist_t *rl = router_get_routerlist();
SMARTLIST_FOREACH(rl->routers, routerinfo_t *, r,
smartlist_add(fps_out,
tor_memdup(r->cache_info.identity_digest, DIGEST_LEN)));
} else if (!strcmp(key, "/tor/server/authority")) {
routerinfo_t *ri = router_get_my_routerinfo();
if (ri)
smartlist_add(fps_out,
tor_memdup(ri->cache_info.identity_digest, DIGEST_LEN));
} else if (!strcmpstart(key, "/tor/server/d/")) {
key += strlen("/tor/server/d/");
dir_split_resource_into_fingerprints(key, fps_out, NULL, 1);
} else if (!strcmpstart(key, "/tor/server/fp/")) {
key += strlen("/tor/server/fp/");
dir_split_resource_into_fingerprints(key, fps_out, NULL, 1);
} else {
*msg = "Key not recognized";
return -1;
}
if (!smartlist_len(fps_out)) {
*msg = "Servers unavailable";
return -1;
}
return 0;
}
/** Add a signed_descriptor_t to <b>descs_out</b> for each router matching
* <b>key</b>. The key should be either
* - "/tor/server/authority" for our own routerinfo;
@ -1673,6 +1742,119 @@ dirserv_orconn_tls_done(const char *address,
}
}
/** When we're spooling data onto our outbuf, add more whenever we dip
* below this threshold. */
#define DIRSERV_BUFFER_MIN 16384
/** DOCDOC */
static int
connection_dirserv_add_servers_to_outbuf(connection_t *conn)
{
int fp;
if (!strcmpstart(conn->requested_resource, "/tor/server/d/"))
fp = 0;
else
fp = 1;
while (smartlist_len(conn->fingerprint_stack) &&
buf_datalen(conn->outbuf) < DIRSERV_BUFFER_MIN) {
char *fp = smartlist_pop_last(conn->fingerprint_stack);
signed_descriptor_t *sd = NULL;
if (fp) {
if (router_digest_is_me(fp)) {
sd = &(router_get_my_routerinfo()->cache_info);
} else {
routerinfo_t *ri = router_get_by_digest(fp);
if (ri &&
ri->cache_info.published_on > time(NULL)-ROUTER_MAX_AGE_TO_PUBLISH)
sd = &ri->cache_info;
}
} else
sd = router_get_by_descriptor_digest(fp);
tor_free(fp);
if (!sd)
continue;
if (conn->zlib_state) {
write_to_buf_zlib(conn->outbuf, conn->zlib_state,
sd->signed_descriptor_body, sd->signed_descriptor_len,
0);
} else {
write_to_buf(sd->signed_descriptor_body, sd->signed_descriptor_len,
conn->outbuf);
}
}
if (!smartlist_len(conn->fingerprint_stack)) {
/* We just wrote the last one; finish up. */
if (conn->zlib_state) {
write_to_buf_zlib(conn->outbuf, conn->zlib_state, "", 0, 1);
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
smartlist_free(conn->fingerprint_stack);
conn->fingerprint_stack = NULL;
}
return 0;
}
/** DOCDOC */
static int
connection_dirserv_add_dir_bytes_to_outbuf(connection_t *conn)
{
int bytes, remaining;
bytes = DIRSERV_BUFFER_MIN - buf_datalen(conn->outbuf);
tor_assert(bytes > 0);
if (bytes < 8192)
bytes = 8192;
remaining = conn->cached_dir->dir_z_len - conn->cached_dir_offset;
if (bytes > remaining)
bytes = remaining;
if (conn->zlib_state) {
write_to_buf_zlib(conn->outbuf, conn->zlib_state,
conn->cached_dir->dir_z + conn->cached_dir_offset,
bytes, bytes == remaining);
} else {
write_to_buf(conn->cached_dir->dir_z + conn->cached_dir_offset,
bytes, conn->outbuf);
}
conn->cached_dir_offset += bytes;
if (bytes == (int)conn->cached_dir->dir_z_len) {
/* We just wrote the last one; finish up. */
if (conn->zlib_state) {
write_to_buf_zlib(conn->outbuf, conn->zlib_state, "", 0, 1);
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
cached_dir_decref(conn->cached_dir);
conn->cached_dir = NULL;
}
return 0;
}
/** Called whenever we have flushed some directory data in state
* SERVER_WRITING. */
int
connection_dirserv_flushed_some(connection_t *conn)
{
tor_assert(conn->type == CONN_TYPE_DIR);
tor_assert(conn->state == DIR_CONN_STATE_SERVER_WRITING);
if (! (conn->fingerprint_stack || conn->cached_dir)
|| buf_datalen(conn->outbuf) > DIRSERV_BUFFER_MIN)
return 0;
if (!strcmpstart(conn->requested_resource, "/tor/server/")) {
return connection_dirserv_add_servers_to_outbuf(conn);
} else if (conn->cached_dir) {
return connection_dirserv_add_dir_bytes_to_outbuf(conn);
} else {
return 0;
}
}
/** Release all storage used by the directory server. */
void
dirserv_free_all(void)
@ -1685,10 +1867,10 @@ dirserv_free_all(void)
smartlist_free(fingerprint_list);
fingerprint_list = NULL;
}
clear_cached_dir(&the_directory);
cached_dir_decref(the_directory);
clear_cached_dir(&the_runningrouters);
clear_cached_dir(&the_v2_networkstatus);
clear_cached_dir(&cached_directory);
cached_dir_decref(cached_directory);
clear_cached_dir(&cached_runningrouters);
if (cached_v2_networkstatus) {
digestmap_free(cached_v2_networkstatus, free_cached_dir);

View File

@ -697,6 +697,11 @@ struct connection_t {
/* Used only by Dir connections */
char *requested_resource; /**< Which 'resource' did we ask the directory
* for?*/
/* Used only for server sides of some dir connections. */
smartlist_t *fingerprint_stack;
struct cached_dir_t *cached_dir;
off_t cached_dir_offset;
tor_zlib_state_t *zlib_state;
/* Used only by AP connections */
socks_request_t *socks_request; /**< SOCKS structure describing request (AP
@ -750,6 +755,7 @@ typedef struct cached_dir_t {
size_t dir_len; /**< Length of <b>dir</b> */
size_t dir_z_len; /**< Length of <b>dir_z</b> */
time_t published; /**< When was this object published */
int refcnt; /**< Reference count for this cached_dir_t. */
} cached_dir_t;
/** Information need to cache an onion router's descriptor. */
@ -1468,6 +1474,8 @@ int flush_buf(int s, buf_t *buf, size_t sz, size_t *buf_flushlen);
int flush_buf_tls(tor_tls_t *tls, buf_t *buf, size_t sz, size_t *buf_flushlen);
int write_to_buf(const char *string, size_t string_len, buf_t *buf);
int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
const char *data, size_t data_len, int done);
int fetch_from_buf(char *string, size_t string_len, buf_t *buf);
int fetch_from_buf_http(buf_t *buf,
char **headers_out, size_t max_headerlen,
@ -1899,6 +1907,7 @@ char *directory_dump_request_log(void);
/********************************* dirserv.c ***************************/
int connection_dirserv_flushed_some(connection_t *conn);
int dirserv_add_own_fingerprint(const char *nickname, crypto_pk_env_t *pk);
int dirserv_parse_fingerprint_file(const char *fname);
void dirserv_free_fingerprint_list(void);
@ -1913,7 +1922,7 @@ int dirserv_dump_directory_to_string(char **dir_out,
crypto_pk_env_t *private_key,
int complete);
void directory_set_dirty(void);
size_t dirserv_get_directory(const char **cp, int compress);
cached_dir_t *dirserv_get_directory(void);
size_t dirserv_get_runningrouters(const char **rr, int compress);
void dirserv_set_cached_directory(const char *directory, time_t when,
int is_running_routers);
@ -1921,6 +1930,8 @@ void dirserv_set_cached_networkstatus_v2(const char *directory,
const char *identity,
time_t published);
void dirserv_get_networkstatus_v2(smartlist_t *result, const char *key);
int dirserv_get_routerdesc_fingerprints(smartlist_t *fps_out, const char *key,
const char **msg);
int dirserv_get_routerdescs(smartlist_t *descs_out, const char *key,
const char **msg);
void dirserv_orconn_tls_done(const char *address,
@ -1932,6 +1943,7 @@ int authdir_wants_to_reject_router(routerinfo_t *ri, const char **msg,
int complain);
int dirserv_would_reject_router(routerstatus_t *rs);
void dirserv_free_all(void);
void cached_dir_decref(cached_dir_t *d);
/********************************* dns.c ***************************/