consdiffmgr: compress incoming consensuses in the background

Also, compress them in several ways.

This breaks the unit tests; subsequent commits will make them pass
again.
This commit is contained in:
Nick Mathewson 2017-05-05 11:17:59 -04:00 committed by Alexander Færøy
parent 6da31ec484
commit 151cd121a2
No known key found for this signature in database
GPG Key ID: E15081D5D3C3DB53
2 changed files with 173 additions and 61 deletions

View File

@ -96,6 +96,24 @@ n_diff_compression_methods(void)
return ARRAY_LENGTH(compress_diffs_with);
}
/** Which methods do we use for precompressing consensuses? */
static const compress_method_t compress_consensus_with[] = {
ZLIB_METHOD,
#ifdef HAVE_LZMA
LZMA_METHOD,
#endif
#ifdef HAVE_ZSTD
ZSTD_METHOD,
#endif
};
/** How many different methods will we try to use for diff compression? */
static unsigned
n_consensus_compression_methods(void)
{
return ARRAY_LENGTH(compress_consensus_with);
}
/** Hashtable node used to remember the current status of the diff
* from a given sha3 digest to the current consensus. */
typedef struct cdm_diff_t {
@ -135,13 +153,13 @@ static consdiff_cfg_t consdiff_cfg = {
};
static int consdiffmgr_ensure_space_for_files(int n);
static int consensus_queue_compression_work(const char *consensus,
consensus_flavor_t flavor,
time_t valid_after);
static int consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
consensus_cache_entry_t *diff_to);
static void consdiffmgr_set_cache_flags(void);
/* Just gzip consensuses for now. */
#define COMPRESS_CONSENSUS_WITH GZIP_METHOD
/* =====
* Hashtable setup
* ===== */
@ -410,11 +428,6 @@ cdm_cache_lookup_consensus(consensus_flavor_t flavor, time_t valid_after)
consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
consensus_cache_entry_t *result = NULL;
if (smartlist_len(matches) > 1) {
log_warn(LD_BUG, "How odd; there appear to be two matching consensuses "
"with flavor %s published at %s.",
flavname, formatted_time);
}
if (smartlist_len(matches)) {
result = smartlist_get(matches, 0);
}
@ -458,59 +471,7 @@ consdiffmgr_add_consensus(const char *consensus,
}
/* We don't have it. Add it to the cache. */
consdiffmgr_ensure_space_for_files(1);
{
size_t bodylen = strlen(consensus);
config_line_t *labels = NULL;
char formatted_time[ISO_TIME_LEN+1];
format_iso_time_nospace(formatted_time, valid_after);
const char *flavname = networkstatus_get_flavor_name(flavor);
cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
(const uint8_t *)consensus, bodylen);
{
const char *start, *end;
if (router_get_networkstatus_v3_signed_boundaries(consensus,
&start, &end) < 0) {
start = consensus;
end = consensus+bodylen;
}
cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
(const uint8_t *)start,
end - start);
}
char *body_compressed = NULL;
size_t size_compressed = 0;
if (tor_compress(&body_compressed, &size_compressed,
consensus, bodylen, COMPRESS_CONSENSUS_WITH) < 0) {
config_free_lines(labels);
return -1;
}
cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST,
(const uint8_t *)body_compressed, size_compressed);
config_line_prepend(&labels, LABEL_COMPRESSION_TYPE,
compression_method_get_name(COMPRESS_CONSENSUS_WITH));
config_line_prepend(&labels, LABEL_FLAVOR, flavname);
config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
entry = consensus_cache_add(cdm_cache_get(),
labels,
(const uint8_t *)body_compressed,
size_compressed);
tor_free(body_compressed);
config_free_lines(labels);
}
if (entry) {
consensus_cache_entry_mark_for_aggressive_release(entry);
consensus_cache_entry_decref(entry);
}
cdm_cache_dirty = 1;
return entry ? 0 : -1;
return consensus_queue_compression_work(consensus, flavor, valid_after);
}
/**
@ -835,6 +796,10 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
if (strmap_get(have_diff_from, va) != NULL)
continue; /* we already have this one. */
smartlist_add(compute_diffs_from, ent);
/* Since we are not going to serve this as the most recent consensus
* any more, we should stop keeping it mmap'd when it's not in use.
*/
consensus_cache_entry_mark_for_aggressive_release(ent);
} SMARTLIST_FOREACH_END(ent);
log_info(LD_DIRSERV,
@ -1147,6 +1112,8 @@ store_multiple(consensus_cache_entry_handle_t **handles_out,
labels,
body_out,
bodylen_out);
if (BUG(ent == NULL))
continue;
status = CDM_DIFF_PRESENT;
handles_out[i] = consensus_cache_entry_handle_new(ent);
@ -1464,3 +1431,147 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
return -1;
}
/**
* Holds requests and replies for consensus_compress_workers.
*/
typedef struct consensus_compress_worker_job_t {
char *consensus;
size_t consensus_len;
consensus_flavor_t flavor;
time_t valid_after;
compressed_result_t out[ARRAY_LENGTH(compress_consensus_with)];
} consensus_compress_worker_job_t;
/**
* Free all resources held in <b>job</b>
*/
static void
consensus_compress_worker_job_free(consensus_compress_worker_job_t *job)
{
if (!job)
return;
tor_free(job->consensus);
unsigned u;
for (u = 0; u < n_consensus_compression_methods(); ++u) {
config_free_lines(job->out[u].labels);
tor_free(job->out[u].body);
}
tor_free(job);
}
/**
* Worker function. This function runs inside a worker thread and receives
* a consensus_compress_worker_job_t as its input.
*/
static workqueue_reply_t
consensus_compress_worker_threadfn(void *state_, void *work_)
{
(void)state_;
consensus_compress_worker_job_t *job = work_;
consensus_flavor_t flavor = job->flavor;
const char *consensus = job->consensus;
size_t bodylen = job->consensus_len;
time_t valid_after = job->valid_after;
config_line_t *labels = NULL;
char formatted_time[ISO_TIME_LEN+1];
format_iso_time_nospace(formatted_time, valid_after);
const char *flavname = networkstatus_get_flavor_name(flavor);
cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
(const uint8_t *)consensus, bodylen);
{
const char *start, *end;
if (router_get_networkstatus_v3_signed_boundaries(consensus,
&start, &end) < 0) {
start = consensus;
end = consensus+bodylen;
}
cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
(const uint8_t *)start,
end - start);
}
config_line_prepend(&labels, LABEL_FLAVOR, flavname);
config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
compress_multiple(job->out,
n_consensus_compression_methods(),
compress_consensus_with,
(const uint8_t*)consensus, bodylen, labels);
config_free_lines(labels);
return WQ_RPL_REPLY;
}
/**
* Worker function: This function runs in the main thread, and receives
* a consensus_diff_compress_job_t that the worker thread has already
* processed.
*/
static void
consensus_compress_worker_replyfn(void *work_)
{
consensus_compress_worker_job_t *job = work_;
consensus_cache_entry_handle_t *handles[
ARRAY_LENGTH(compress_consensus_with)];
memset(handles, 0, sizeof(handles));
store_multiple(handles,
n_consensus_compression_methods(),
compress_consensus_with,
job->out,
"consensus");
cdm_cache_dirty = 1;
consensus_compress_worker_job_free(job);
}
/**
* If true, we compress in worker threads.
*/
static int background_compression = 0;
/**
* Queue a job to compress <b>consensus</b> and store its compressed
* text in the cache.
*/
static int
consensus_queue_compression_work(const char *consensus,
consensus_flavor_t flavor,
time_t valid_after)
{
consensus_compress_worker_job_t *job = tor_malloc_zero(sizeof(*job));
job->consensus = tor_strdup(consensus);
job->consensus_len = strlen(consensus);
job->flavor = flavor;
job->valid_after = valid_after;
if (background_compression) {
workqueue_entry_t *work;
work = cpuworker_queue_work(consensus_compress_worker_threadfn,
consensus_compress_worker_replyfn,
job);
if (!work) {
consensus_compress_worker_job_free(job);
return -1;
}
return 0;
} else {
consensus_compress_worker_threadfn(NULL, job);
consensus_compress_worker_replyfn(job);
return 0;
}
}
/**
* Tell the consdiffmgr backend to compress consensuses in worker threads.
*/
void
consdiffmgr_enable_background_compression(void)
{
// This isn't the default behavior because it would break unit tests.
background_compression = 1;
}

View File

@ -32,6 +32,7 @@ consdiff_status_t consdiffmgr_find_diff_from(
compress_method_t method);
void consdiffmgr_rescan(void);
int consdiffmgr_cleanup(void);
void consdiffmgr_enable_background_compression(void);
void consdiffmgr_configure(const consdiff_cfg_t *cfg);
struct sandbox_cfg_elem;
int consdiffmgr_register_with_sandbox(struct sandbox_cfg_elem **cfg);