diff --git a/src/or/consdiffmgr.c b/src/or/consdiffmgr.c index 2deeab5e42..9a3c56d5c6 100644 --- a/src/or/consdiffmgr.c +++ b/src/or/consdiffmgr.c @@ -74,6 +74,25 @@ typedef enum cdm_diff_status_t { CDM_DIFF_ERROR=3, } cdm_diff_status_t; +/** Which methods do we use for precompressing diffs? */ +static const compress_method_t compress_diffs_with[] = { + NO_METHOD, + GZIP_METHOD, +#ifdef HAVE_LZMA + LZMA_METHOD, +#endif +#ifdef HAVE_ZSTD + ZSTD_METHOD, +#endif +}; + +/** How many different methods will we try to use for diff compression? */ +STATIC unsigned +n_diff_compression_methods(void) +{ + return ARRAY_LENGTH(compress_diffs_with); +} + /** Hashtable node used to remember the current status of the diff * from a given sha3 digest to the current consensus. */ typedef struct cdm_diff_t { @@ -84,12 +103,15 @@ typedef struct cdm_diff_t { /** SHA3-256 digest of the consensus that this diff is _from_. (part of the * ht key) */ uint8_t from_sha3[DIGEST256_LEN]; + /** Method by which the diff is compressed. (part of the ht key */ + compress_method_t compress_method; /** One of the CDM_DIFF_* values, depending on whether this diff * is available, in progress, or impossible to compute. */ cdm_diff_status_t cdm_diff_status; /** SHA3-256 digest of the consensus that this diff is _to. */ uint8_t target_sha3[DIGEST256_LEN]; + /** Handle to the cache entry for this diff, if any. We use a handle here * to avoid thinking too hard about cache entry lifetime issues. */ consensus_cache_entry_handle_t *entry; @@ -121,9 +143,10 @@ static void consdiffmgr_set_cache_flags(void); static unsigned cdm_diff_hash(const cdm_diff_t *diff) { - uint8_t tmp[DIGEST256_LEN + 1]; + uint8_t tmp[DIGEST256_LEN + 2]; memcpy(tmp, diff->from_sha3, DIGEST256_LEN); tmp[DIGEST256_LEN] = (uint8_t) diff->flavor; + tmp[DIGEST256_LEN+1] = (uint8_t) diff->compress_method; return (unsigned) siphash24g(tmp, sizeof(tmp)); } /** Helper: compare two cdm_diff_t objects for key equality */ @@ -131,7 +154,8 @@ static int cdm_diff_eq(const cdm_diff_t *diff1, const cdm_diff_t *diff2) { return fast_memeq(diff1->from_sha3, diff2->from_sha3, DIGEST256_LEN) && - diff1->flavor == diff2->flavor; + diff1->flavor == diff2->flavor && + diff1->compress_method == diff2->compress_method; } HT_PROTOTYPE(cdm_diff_ht, cdm_diff_t, node, cdm_diff_hash, cdm_diff_eq) @@ -153,13 +177,15 @@ cdm_diff_free(cdm_diff_t *diff) static cdm_diff_t * cdm_diff_new(consensus_flavor_t flav, const uint8_t *from_sha3, - const uint8_t *target_sha3) + const uint8_t *target_sha3, + compress_method_t method) { cdm_diff_t *ent; ent = tor_malloc_zero(sizeof(cdm_diff_t)); ent->flavor = flav; memcpy(ent->from_sha3, from_sha3, DIGEST256_LEN); memcpy(ent->target_sha3, target_sha3, DIGEST256_LEN); + ent->compress_method = method; return ent; } @@ -177,18 +203,25 @@ cdm_diff_ht_check_and_note_pending(consensus_flavor_t flav, const uint8_t *target_sha3) { struct cdm_diff_t search, *ent; - memset(&search, 0, sizeof(cdm_diff_t)); - search.flavor = flav; - memcpy(search.from_sha3, from_sha3, DIGEST256_LEN); - ent = HT_FIND(cdm_diff_ht, &cdm_diff_ht, &search); - if (ent) { - tor_assert_nonfatal(ent->cdm_diff_status != CDM_DIFF_PRESENT); - return 1; + unsigned u; + int result = 0; + for (u = 0; u < n_diff_compression_methods(); ++u) { + compress_method_t method = compress_diffs_with[u]; + memset(&search, 0, sizeof(cdm_diff_t)); + search.flavor = flav; + search.compress_method = method; + memcpy(search.from_sha3, from_sha3, DIGEST256_LEN); + ent = HT_FIND(cdm_diff_ht, &cdm_diff_ht, &search); + if (ent) { + tor_assert_nonfatal(ent->cdm_diff_status != CDM_DIFF_PRESENT); + result = 1; + continue; + } + ent = cdm_diff_new(flav, from_sha3, target_sha3, method); + ent->cdm_diff_status = CDM_DIFF_IN_PROGRESS; + HT_INSERT(cdm_diff_ht, &cdm_diff_ht, ent); } - ent = cdm_diff_new(flav, from_sha3, target_sha3); - ent->cdm_diff_status = CDM_DIFF_IN_PROGRESS; - HT_INSERT(cdm_diff_ht, &cdm_diff_ht, ent); - return 0; + return result; } /** @@ -201,16 +234,18 @@ static void cdm_diff_ht_set_status(consensus_flavor_t flav, const uint8_t *from_sha3, const uint8_t *to_sha3, + compress_method_t method, int status, consensus_cache_entry_handle_t *handle) { struct cdm_diff_t search, *ent; memset(&search, 0, sizeof(cdm_diff_t)); search.flavor = flav; + search.compress_method = method, memcpy(search.from_sha3, from_sha3, DIGEST256_LEN); ent = HT_FIND(cdm_diff_ht, &cdm_diff_ht, &search); if (!ent) { - ent = cdm_diff_new(flav, from_sha3, to_sha3); + ent = cdm_diff_new(flav, from_sha3, to_sha3, method); ent->cdm_diff_status = CDM_DIFF_IN_PROGRESS; HT_INSERT(cdm_diff_ht, &cdm_diff_ht, ent); } else if (fast_memneq(ent->target_sha3, to_sha3, DIGEST256_LEN)) { @@ -500,7 +535,8 @@ consdiffmgr_find_diff_from(consensus_cache_entry_t **entry_out, consensus_flavor_t flavor, int digest_type, const uint8_t *digest, - size_t digestlen) + size_t digestlen, + compress_method_t method) { if (BUG(digest_type != DIGEST_SHA3_256) || BUG(digestlen != DIGEST256_LEN)) { @@ -511,6 +547,7 @@ consdiffmgr_find_diff_from(consensus_cache_entry_t **entry_out, cdm_diff_t search, *ent; memset(&search, 0, sizeof(search)); search.flavor = flavor; + search.compress_method = method; memcpy(search.from_sha3, digest, DIGEST256_LEN); ent = HT_FIND(cdm_diff_ht, &cdm_diff_ht, &search); @@ -820,6 +857,16 @@ consdiffmgr_diffs_load(void) int flavor = networkstatus_parse_flavor_name(lv_flavor); if (flavor < 0) continue; + const char *lv_compression = + consensus_cache_entry_get_value(diff, LABEL_COMPRESSION_TYPE); + compress_method_t method = NO_METHOD; + if (lv_compression) { + method = compression_method_get_by_name(lv_compression); + if (method == UNKNOWN_METHOD) { + continue; + } + } + uint8_t from_sha3[DIGEST256_LEN]; uint8_t to_sha3[DIGEST256_LEN]; if (cdm_entry_get_sha3_value(from_sha3, diff, LABEL_FROM_SHA3_DIGEST)<0) @@ -828,6 +875,7 @@ consdiffmgr_diffs_load(void) continue; cdm_diff_ht_set_status(flavor, from_sha3, to_sha3, + method, CDM_DIFF_PRESENT, consensus_cache_entry_handle_new(diff)); } SMARTLIST_FOREACH_END(diff); @@ -896,6 +944,18 @@ consdiffmgr_free_all(void) Thread workers =====*/ +typedef struct compressed_result_t { + config_line_t *labels; + /** + * Output: Body of the diff, as compressed. + */ + uint8_t *body; + /** + * Output: length of body_out + */ + size_t bodylen; +} compressed_result_t; + /** * An object passed to a worker thread that will try to produce a consensus * diff. @@ -914,18 +974,8 @@ typedef struct consensus_diff_worker_job_t { */ consensus_cache_entry_t *diff_to; - /** - * Output: Labels to store in the cache associated with this diff. - */ - config_line_t *labels_out; - /** - * Output: Body of the diff - */ - uint8_t *body_out; - /** - * Output: length of body_out - */ - size_t bodylen_out; + /** Output: labels and bodies */ + compressed_result_t out[ARRAY_LENGTH(compress_diffs_with)]; } consensus_diff_worker_job_t; /** Given a consensus_cache_entry_t, check whether it has a label claiming @@ -1040,23 +1090,55 @@ consensus_diff_worker_threadfn(void *state_, void *work_) return WQ_RPL_REPLY; } - /* Send the reply */ - job->body_out = (uint8_t *) consensus_diff; - job->bodylen_out = strlen(consensus_diff); + /* Compress the results and send the reply */ + tor_assert(compress_diffs_with[0] == NO_METHOD); + size_t difflen = strlen(consensus_diff); + job->out[0].body = (uint8_t *) consensus_diff; + job->out[0].bodylen = difflen; - cdm_labels_prepend_sha3(&job->labels_out, LABEL_SHA3_DIGEST, - job->body_out, job->bodylen_out); - cdm_labels_prepend_sha3(&job->labels_out, LABEL_SHA3_DIGEST_UNCOMPRESSED, - job->body_out, job->bodylen_out); - config_line_prepend(&job->labels_out, LABEL_FROM_VALID_AFTER, + config_line_t *common_labels = NULL; + cdm_labels_prepend_sha3(&common_labels, + LABEL_SHA3_DIGEST_UNCOMPRESSED, + job->out[0].body, + job->out[0].bodylen); + config_line_prepend(&common_labels, LABEL_FROM_VALID_AFTER, lv_from_valid_after); - config_line_prepend(&job->labels_out, LABEL_VALID_AFTER, lv_to_valid_after); - config_line_prepend(&job->labels_out, LABEL_FLAVOR, lv_from_flavor); - config_line_prepend(&job->labels_out, LABEL_FROM_SHA3_DIGEST, + config_line_prepend(&common_labels, LABEL_VALID_AFTER, + lv_to_valid_after); + config_line_prepend(&common_labels, LABEL_FLAVOR, lv_from_flavor); + config_line_prepend(&common_labels, LABEL_FROM_SHA3_DIGEST, lv_from_digest); - config_line_prepend(&job->labels_out, LABEL_TARGET_SHA3_DIGEST, + config_line_prepend(&common_labels, LABEL_TARGET_SHA3_DIGEST, lv_to_digest); - config_line_prepend(&job->labels_out, LABEL_DOCTYPE, DOCTYPE_CONSENSUS_DIFF); + config_line_prepend(&common_labels, LABEL_DOCTYPE, + DOCTYPE_CONSENSUS_DIFF); + + job->out[0].labels = config_lines_dup(common_labels); + cdm_labels_prepend_sha3(&job->out[0].labels, + LABEL_SHA3_DIGEST, + job->out[0].body, + job->out[0].bodylen); + + unsigned u; + for (u = 1; u < n_diff_compression_methods(); ++u) { + compress_method_t method = compress_diffs_with[u]; + const char *methodname = compression_method_get_name(method); + char *result; + size_t sz; + if (0 == tor_compress(&result, &sz, consensus_diff, difflen, method)) { + job->out[u].body = (uint8_t*)result; + job->out[u].bodylen = sz; + job->out[u].labels = config_lines_dup(common_labels); + cdm_labels_prepend_sha3(&job->out[u].labels, LABEL_SHA3_DIGEST, + job->out[u].body, + job->out[u].bodylen); + config_line_prepend(&job->out[u].labels, + LABEL_COMPRESSION_TYPE, + methodname); + } + } + + config_free_lines(common_labels); return WQ_RPL_REPLY; } @@ -1068,8 +1150,11 @@ consensus_diff_worker_job_free(consensus_diff_worker_job_t *job) { if (!job) return; - tor_free(job->body_out); - config_free_lines(job->labels_out); + unsigned u; + for (u = 0; u < n_diff_compression_methods(); ++u) { + config_free_lines(job->out[u].labels); + tor_free(job->out[u].body); + } consensus_cache_entry_decref(job->diff_from); consensus_cache_entry_decref(job->diff_to); tor_free(job); @@ -1117,20 +1202,35 @@ consensus_diff_worker_replyfn(void *work_) cache = 0; } - int status; - consensus_cache_entry_handle_t *handle = NULL; - if (job->body_out && job->bodylen_out && job->labels_out) { - /* Success! Store the results */ - log_info(LD_DIRSERV, "Adding consensus diff from %s to %s", - lv_from_digest, lv_to_digest); - consensus_cache_entry_t *ent = - consensus_cache_add(cdm_cache_get(), job->labels_out, - job->body_out, - job->bodylen_out); - status = CDM_DIFF_PRESENT; - handle = consensus_cache_entry_handle_new(ent); - consensus_cache_entry_decref(ent); - } else { + int status = CDM_DIFF_ERROR; + consensus_cache_entry_handle_t *handles[ARRAY_LENGTH(compress_diffs_with)]; + memset(handles, 0, sizeof(handles)); + + unsigned u; + for (u = 0; u < n_diff_compression_methods(); ++u) { + compress_method_t method = compress_diffs_with[u]; + uint8_t *body_out = job->out[u].body; + size_t bodylen_out = job->out[u].bodylen; + config_line_t *labels = job->out[u].labels; + const char *methodname = compression_method_get_name(method); + if (body_out && bodylen_out && labels) { + /* Success! Store the results */ + log_info(LD_DIRSERV, "Adding consensus diff from %s to %s, " + "compressed with %s", + lv_from_digest, lv_to_digest, methodname); + + consensus_cache_entry_t *ent = + consensus_cache_add(cdm_cache_get(), + labels, + body_out, + bodylen_out); + + status = CDM_DIFF_PRESENT; + handles[u] = consensus_cache_entry_handle_new(ent); + consensus_cache_entry_decref(ent); + } + } + if (status != CDM_DIFF_PRESENT) { /* Failure! Nothing to do but complain */ log_warn(LD_DIRSERV, "Worker was unable to compute consensus diff " @@ -1139,10 +1239,15 @@ consensus_diff_worker_replyfn(void *work_) status = CDM_DIFF_ERROR; } - if (cache) - cdm_diff_ht_set_status(flav, from_sha3, to_sha3, status, handle); - else - consensus_cache_entry_handle_free(handle); + for (u = 0; u < ARRAY_LENGTH(handles); ++u) { + compress_method_t method = compress_diffs_with[u]; + if (cache) { + cdm_diff_ht_set_status(flav, from_sha3, to_sha3, method, status, + handles[u]); + } else { + consensus_cache_entry_handle_free(handles[u]); + } + } consensus_diff_worker_job_free(job); } diff --git a/src/or/consdiffmgr.h b/src/or/consdiffmgr.h index 982e0df3ab..12796739e9 100644 --- a/src/or/consdiffmgr.h +++ b/src/or/consdiffmgr.h @@ -28,7 +28,8 @@ consdiff_status_t consdiffmgr_find_diff_from( consensus_flavor_t flavor, int digest_type, const uint8_t *digest, - size_t digestlen); + size_t digestlen, + compress_method_t method); void consdiffmgr_rescan(void); int consdiffmgr_cleanup(void); void consdiffmgr_configure(const consdiff_cfg_t *cfg); @@ -36,6 +37,7 @@ void consdiffmgr_free_all(void); int consdiffmgr_validate(void); #ifdef CONSDIFFMGR_PRIVATE +STATIC unsigned n_diff_compression_methods(void); STATIC consensus_cache_t *cdm_cache_get(void); STATIC consensus_cache_entry_t *cdm_cache_lookup_consensus( consensus_flavor_t flavor, time_t valid_after); diff --git a/src/test/test_consdiffmgr.c b/src/test/test_consdiffmgr.c index b3fa3886f1..31ce6ce901 100644 --- a/src/test/test_consdiffmgr.c +++ b/src/test/test_consdiffmgr.c @@ -141,7 +141,8 @@ lookup_diff_from(consensus_cache_entry_t **out, uint8_t digest[DIGEST256_LEN]; crypto_digest256((char*)digest, str1, strlen(str1), DIGEST_SHA3_256); return consdiffmgr_find_diff_from(out, flav, - DIGEST_SHA3_256, digest, sizeof(digest)); + DIGEST_SHA3_256, digest, sizeof(digest), + NO_METHOD); } static int @@ -373,7 +374,8 @@ test_consdiffmgr_make_diffs(void *arg) tt_int_op(1, OP_EQ, smartlist_len(fake_cpuworker_queue)); diff_status = consdiffmgr_find_diff_from(&diff, FLAV_MICRODESC, DIGEST_SHA3_256, - md_ns_sha3, DIGEST256_LEN); + md_ns_sha3, DIGEST256_LEN, + NO_METHOD); tt_int_op(CONSDIFF_IN_PROGRESS, OP_EQ, diff_status); // Now run that process and get the diff. @@ -384,7 +386,8 @@ test_consdiffmgr_make_diffs(void *arg) // At this point we should be able to get that diff. diff_status = consdiffmgr_find_diff_from(&diff, FLAV_MICRODESC, DIGEST_SHA3_256, - md_ns_sha3, DIGEST256_LEN); + md_ns_sha3, DIGEST256_LEN, + NO_METHOD); tt_int_op(CONSDIFF_AVAILABLE, OP_EQ, diff_status); tt_assert(diff); @@ -757,7 +760,7 @@ test_consdiffmgr_cleanup_old_diffs(void *arg) /* Now add an even-more-recent consensus; this should make all previous * diffs deletable */ tt_int_op(0, OP_EQ, consdiffmgr_add_consensus(md_body[3], md_ns[3])); - tt_int_op(2, OP_EQ, consdiffmgr_cleanup()); + tt_int_op(2 * n_diff_compression_methods(), OP_EQ, consdiffmgr_cleanup()); tt_int_op(CONSDIFF_NOT_FOUND, OP_EQ, lookup_diff_from(&ent, FLAV_MICRODESC, md_body[0]));