diff --git a/.gitignore b/.gitignore index f4f6dacbb0..47b23c6470 100644 --- a/.gitignore +++ b/.gitignore @@ -202,6 +202,8 @@ uptime-*.json /src/lib/libtor-osinfo-testing.a /src/lib/libtor-process.a /src/lib/libtor-process-testing.a +/src/lib/libtor-pubsub.a +/src/lib/libtor-pubsub-testing.a /src/lib/libtor-sandbox.a /src/lib/libtor-sandbox-testing.a /src/lib/libtor-string.a diff --git a/Makefile.am b/Makefile.am index 36d9725f38..44cfe17877 100644 --- a/Makefile.am +++ b/Makefile.am @@ -41,6 +41,7 @@ TOR_UTIL_LIBS = \ src/lib/libtor-geoip.a \ src/lib/libtor-process.a \ src/lib/libtor-buf.a \ + src/lib/libtor-pubsub.a \ src/lib/libtor-dispatch.a \ src/lib/libtor-time.a \ src/lib/libtor-fs.a \ @@ -73,6 +74,7 @@ TOR_UTIL_TESTING_LIBS = \ src/lib/libtor-geoip-testing.a \ src/lib/libtor-process-testing.a \ src/lib/libtor-buf-testing.a \ + src/lib/libtor-pubsub-testing.a \ src/lib/libtor-dispatch-testing.a \ src/lib/libtor-time-testing.a \ src/lib/libtor-fs-testing.a \ diff --git a/src/include.am b/src/include.am index c6c351c806..77c126ba45 100644 --- a/src/include.am +++ b/src/include.am @@ -25,6 +25,7 @@ include src/lib/malloc/include.am include src/lib/net/include.am include src/lib/osinfo/include.am include src/lib/process/include.am +include src/lib/pubsub/include.am include src/lib/sandbox/include.am include src/lib/string/include.am include src/lib/subsys/include.am diff --git a/src/lib/pubsub/include.am b/src/lib/pubsub/include.am index 9856c94a5d..0ab2fd7b33 100644 --- a/src/lib/pubsub/include.am +++ b/src/lib/pubsub/include.am @@ -7,6 +7,7 @@ endif src_lib_libtor_pubsub_a_SOURCES = \ src/lib/pubsub/pubsub_build.c \ + src/lib/pubsub/pubsub_check.c \ src/lib/pubsub/pubsub_publish.c src_lib_libtor_pubsub_testing_a_SOURCES = \ diff --git a/src/lib/pubsub/pubsub_build.c b/src/lib/pubsub/pubsub_build.c index 72f2eacea8..7e4ab5ba8f 100644 --- a/src/lib/pubsub/pubsub_build.c +++ b/src/lib/pubsub/pubsub_build.c @@ -268,10 +268,8 @@ pubsub_builder_finalize(pubsub_builder_t *builder) if (builder->n_errors) goto err; - /* Coming in the next commit. if (pubsub_builder_check(builder) < 0) goto err; - */ dispatcher = dispatch_new(builder->cfg); diff --git a/src/lib/pubsub/pubsub_check.c b/src/lib/pubsub/pubsub_check.c new file mode 100644 index 0000000000..1b3853d8bb --- /dev/null +++ b/src/lib/pubsub/pubsub_check.c @@ -0,0 +1,368 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_check.c + * @brief Enforce various requirements on a pubsub_builder. + **/ + +#define PUBSUB_PRIVATE + +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/pubsub/pubsub_flags.h" +#include "lib/pubsub/pubsub_builder_st.h" +#include "lib/pubsub/pubsub_build.h" + +#include "lib/container/bitarray.h" +#include "lib/container/smartlist.h" +#include "lib/log/util_bug.h" +#include "lib/malloc/malloc.h" +#include "lib/string/compat_string.h" + +#include + +static void pubsub_adjmap_add(pubsub_adjmap_t *map, + const pubsub_cfg_t *item); + +/** + * Helper: contruct and return a new pubsub_adjacency_map from cfg. + * Return NULL on error. + **/ +static pubsub_adjmap_t * +pubsub_build_adjacency_map(const pubsub_items_t *cfg) +{ + pubsub_adjmap_t *map = tor_malloc_zero(sizeof(*map)); + const size_t n_subsystems = get_num_subsys_ids(); + const size_t n_msgs = get_num_message_ids(); + + map->n_subsystems = n_subsystems; + map->n_msgs = n_msgs; + + map->pub_by_subsys = tor_calloc(n_subsystems, sizeof(smartlist_t*)); + map->sub_by_subsys = tor_calloc(n_subsystems, sizeof(smartlist_t*)); + map->pub_by_msg = tor_calloc(n_msgs, sizeof(smartlist_t*)); + map->sub_by_msg = tor_calloc(n_msgs, sizeof(smartlist_t*)); + + SMARTLIST_FOREACH_BEGIN(cfg->items, const pubsub_cfg_t *, item) { + pubsub_adjmap_add(map, item); + } SMARTLIST_FOREACH_END(item); + + return map; +} + +/** + * Helper: add a single pubsub_cfg_t to an adjacency map. + **/ +static void +pubsub_adjmap_add(pubsub_adjmap_t *map, + const pubsub_cfg_t *item) +{ + smartlist_t **by_subsys; + smartlist_t **by_msg; + + tor_assert(item->subsys < map->n_subsystems); + tor_assert(item->msg < map->n_msgs); + + if (item->is_publish) { + by_subsys = &map->pub_by_subsys[item->subsys]; + by_msg = &map->pub_by_msg[item->msg]; + } else { + by_subsys = &map->sub_by_subsys[item->subsys]; + by_msg = &map->sub_by_msg[item->msg]; + } + + if (! *by_subsys) + *by_subsys = smartlist_new(); + if (! *by_msg) + *by_msg = smartlist_new(); + smartlist_add(*by_subsys, (void*) item); + smartlist_add(*by_msg, (void *) item); +} + +/** + * Release all storage held by m and set m to NULL. + **/ +#define pubsub_adjmap_free(m) \ + FREE_AND_NULL(pubsub_adjmap_t, pubsub_adjmap_free_, m) + +/** + * Free every element of an n-element array of smartlists, then + * free the array itself. + **/ +static void +pubsub_adjmap_free_helper(smartlist_t **lsts, size_t n) +{ + if (!lsts) + return; + + for (unsigned i = 0; i < n; ++i) { + smartlist_free(lsts[i]); + } + tor_free(lsts); +} + +/** + * Release all storage held by map. + **/ +static void +pubsub_adjmap_free_(pubsub_adjmap_t *map) +{ + if (!map) + return; + pubsub_adjmap_free_helper(map->pub_by_subsys, map->n_subsystems); + pubsub_adjmap_free_helper(map->sub_by_subsys, map->n_subsystems); + pubsub_adjmap_free_helper(map->pub_by_msg, map->n_msgs); + pubsub_adjmap_free_helper(map->sub_by_msg, map->n_msgs); + tor_free(map); +} + +/** + * Helper: return the length of sl, or 0 if sl is NULL. + **/ +static int +smartlist_len_opt(const smartlist_t *sl) +{ + if (sl) + return smartlist_len(sl); + else + return 0; +} + +/** Return a pointer to a statically allocated string encoding the + * dispatcher flags in flags. */ +static const char * +format_flags(unsigned flags) +{ + static char buf[32]; + buf[0] = 0; + if (flags & DISP_FLAG_EXCL) { + strlcat(buf, " EXCL", sizeof(buf)); + } + if (flags & DISP_FLAG_STUB) { + strlcat(buf, " STUB", sizeof(buf)); + } + return buf[0] ? buf+1 : buf; +} + +/** + * Log a message containing a description of cfg at severity, prefixed + * by the string prefix. + */ +static void +pubsub_cfg_dump(const pubsub_cfg_t *cfg, int severity, const char *prefix) +{ + if (!prefix) + prefix = 0; + + tor_log(severity, LD_MESG, + "%s%s %s: %s{%s} on %s (%s) <%u %u %u %u %x> [%s:%d]", + prefix, + get_subsys_id_name(cfg->subsys), + cfg->is_publish ? "PUB" : "SUB", + get_message_id_name(cfg->msg), + get_msg_type_id_name(cfg->type), + get_channel_id_name(cfg->channel), + format_flags(cfg->flags), + cfg->subsys, cfg->msg, cfg->type, cfg->channel, cfg->flags, + cfg->added_by_file, cfg->added_by_line); +} + +/** + * Check whether there are any errors or inconsistencies for the message + * described by msg in map. If there are problems, log about + * them, and return -1. Otherwise return 0. + **/ +static int +lint_message(const pubsub_adjmap_t *map, message_id_t msg) +{ + /* NOTE: Some of the checks in this function are maybe over-zealous, and we + * might not want to have them forever. I've marked them with [?] below. + */ + if (BUG(msg >= map->n_msgs)) + return 0; // LCOV_EXCL_LINE + + const smartlist_t *pub = map->pub_by_msg[msg]; + const smartlist_t *sub = map->sub_by_msg[msg]; + + const size_t n_pub = smartlist_len_opt(pub); + const size_t n_sub = smartlist_len_opt(sub); + + if (n_pub == 0 && n_sub == 0) { + log_info(LD_MESG, "Nobody is publishing or subscribing to message %u " + "(%s).", + msg, get_message_id_name(msg)); + return 0; // No publishers or subscribers: nothing to do. + } + + /* We'll set this to false if there are any problems. */ + bool ok = true; + + /* First make sure that if there are publishers, there are subscribers. */ + if (n_pub == 0) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) has subscribers, but no publishers.", + msg, get_message_id_name(msg)); + ok = false; + } else if (n_sub == 0) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) has publishers, but no subscribers.", + msg, get_message_id_name(msg)); + ok = false; + } + + /* The 'all' list has the publishers and the subscribers. */ + smartlist_t *all = smartlist_new(); + if (pub) + smartlist_add_all(all, pub); + if (sub) + smartlist_add_all(all, sub); + const pubsub_cfg_t *item0 = smartlist_get(all, 0); + + /* Indicates which subsystems we've found publishing/subscribing here. */ + bitarray_t *published_by = bitarray_init_zero((unsigned)map->n_subsystems); + bitarray_t *subscribed_by = bitarray_init_zero((unsigned)map->n_subsystems); + bool pub_excl = false, sub_excl = false, chan_same = true, type_same = true; + + /* Make sure that the messages all have the same channel and type; + * check whether the DISP_FLAG_EXCL flag is used; + * and if any subsystem is publishing or subscribing to it twice [??]. + */ + SMARTLIST_FOREACH_BEGIN(all, const pubsub_cfg_t *, cfg) { + if (cfg->channel != item0->channel) { + chan_same = false; + } + if (cfg->type != item0->type) { + type_same = false; + } + if (cfg->flags & DISP_FLAG_EXCL) { + if (cfg->is_publish) + pub_excl = true; + else + sub_excl = true; + } + if (cfg->is_publish) { + if (bitarray_is_set(published_by, cfg->subsys)) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) is configured to be published by subsystem " + "%u (%s) more than once.", + msg, get_message_id_name(msg), + cfg->subsys, get_subsys_id_name(cfg->subsys)); + ok = false; + } + bitarray_set(published_by, cfg->subsys); + } else { + if (bitarray_is_set(subscribed_by, cfg->subsys)) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) is configured to be subscribed by subsystem " + "%u (%s) more than once.", + msg, get_message_id_name(msg), + cfg->subsys, get_subsys_id_name(cfg->subsys)); + ok = false; + } + bitarray_set(subscribed_by, cfg->subsys); + } + } SMARTLIST_FOREACH_END(cfg); + + /* Check whether any subsystem is publishing and subscribing the same + * message. [??] + */ + for (unsigned i = 0; i < map->n_subsystems; ++i) { + if (bitarray_is_set(published_by, i) && + bitarray_is_set(subscribed_by, i)) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) is published and subscribed by the same " + "subsystem %u (%s)", + msg, get_message_id_name(msg), + i, get_subsys_id_name(i)); + ok = false; + } + } + + if (! chan_same) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) is associated with multiple inconsistent " + "channels.", + msg, get_message_id_name(msg)); + ok = false; + } + if (! type_same) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) is associated with multiple inconsistent " + "message types.", + msg, get_message_id_name(msg)); + ok = false; + } + + /* Enforce exclusive-ness for publishers and subscribers that have asked for + * it. + */ + if (pub_excl && smartlist_len(pub) > 1) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) has multiple publishers, but at least one is " + "marked as exclusive.", + msg, get_message_id_name(msg)); + ok = false; + } + if (sub_excl && smartlist_len(sub) > 1) { + log_warn(LD_MESG|LD_BUG, + "Message %u (%s) has multiple subscribers, but at least one is " + "marked as exclusive.", + msg, get_message_id_name(msg)); + ok = false; + } + + if (!ok) { + /* There was a problem -- let's log all the publishers and subscribers on + * this message */ + SMARTLIST_FOREACH(all, pubsub_cfg_t *, cfg, + pubsub_cfg_dump(cfg, LOG_WARN, " ")); + } + + smartlist_free(all); + bitarray_free(published_by); + bitarray_free(subscribed_by); + + return ok ? 0 : -1; +} + +/** + * Check all the messages in map for consistency. Return 0 on success, + * -1 on problems. + **/ +static int +pubsub_adjmap_check(const pubsub_adjmap_t *map) +{ + bool all_ok = true; + for (unsigned i = 0; i < map->n_msgs; ++i) { + if (lint_message(map, i) < 0) { + all_ok = false; + } + } + return all_ok ? 0 : -1; +} + +/** + * Check builder for consistency and various constraints. Return 0 on success, + * -1 on failure. + **/ +int +pubsub_builder_check(pubsub_builder_t *builder) +{ + pubsub_adjmap_t *map = pubsub_build_adjacency_map(builder->items); + int rv = -1; + + if (!map) + goto err; // should be impossible + + if (pubsub_adjmap_check(map) < 0) + goto err; + + rv = 0; + err: + pubsub_adjmap_free(map); + return rv; +}