Pubsub: an OO layer on top of lib/dispatch

This "publish/subscribe" layer sits on top of lib/dispatch, and
tries to provide more type-safety and cross-checking for the
lower-level layer.

Even with this commit, we're still not done: more checking will come
in the next commit, and a set of usability/typesafety macros will
come after.
This commit is contained in:
Nick Mathewson 2019-01-15 09:01:20 -05:00
parent 24b945f713
commit 9e60482b80
15 changed files with 1161 additions and 0 deletions

View File

@ -0,0 +1,10 @@
orconfig.h
lib/cc/*.h
lib/container/*.h
lib/dispatch/*.h
lib/intmath/*.h
lib/log/*.h
lib/malloc/*.h
lib/pubsub/*.h
lib/string/*.h

24
src/lib/pubsub/include.am Normal file
View File

@ -0,0 +1,24 @@
noinst_LIBRARIES += src/lib/libtor-pubsub.a
if UNITTESTS_ENABLED
noinst_LIBRARIES += src/lib/libtor-pubsub-testing.a
endif
src_lib_libtor_pubsub_a_SOURCES = \
src/lib/pubsub/pubsub_build.c \
src/lib/pubsub/pubsub_publish.c
src_lib_libtor_pubsub_testing_a_SOURCES = \
$(src_lib_libtor_pubsub_a_SOURCES)
src_lib_libtor_pubsub_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS)
src_lib_libtor_pubsub_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
noinst_HEADERS += \
src/lib/pubsub/pub_binding_st.h \
src/lib/pubsub/pubsub.h \
src/lib/pubsub/pubsub_build.h \
src/lib/pubsub/pubsub_builder_st.h \
src/lib/pubsub/pubsub_connect.h \
src/lib/pubsub/pubsub_flags.h \
src/lib/pubsub/pubsub_publish.h

View File

@ -0,0 +1,36 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub_build.h
* @brief Declaration of pub_binding_t.
*/
#ifndef TOR_PUB_BINDING_ST_H
#define TOR_PUB_BINDING_ST_H
#include "lib/dispatch/msgtypes.h"
struct dispatch_t;
/**
* A pub_binding_t is an opaque object that subsystems use to publish
* messages. The DISPATCH_ADD_PUB*() macros set it up.
**/
typedef struct pub_binding_t {
/**
* A pointer to a configured dispatch_t object. This is filled in
* when the dispatch_t is finally constructed.
**/
struct dispatch_t *dispatch_ptr;
/**
* A template for the msg_t fields that are filled in for this message.
* This is copied into outgoing messages, ensuring that their fields are set
* corretly.
**/
msg_t msg_template;
} pub_binding_t;
#endif

85
src/lib/pubsub/pubsub.h Normal file
View File

@ -0,0 +1,85 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub.h
* @brief Header for OO publish-subscribe functionality.
*
* This module provides a wrapper around the "dispatch" module,
* ensuring type-safety and allowing us to do static analysis on
* publication and subscriptions.
*
* With this module, we enforce:
* <ul>
* <li>that every message has (potential) publishers and subscribers;
* <li>that every message is published and subscribed from the correct
* channels, with the correct type ID, every time it is published.
* <li>that type IDs correspond to a single C type, and that the C types are
* used correctly.
* <li>that when a message is published or subscribed, it is done with
* a correct subsystem identifier
* </ul>
*
* We do this by making "publication requests" and "subscription requests"
* into objects, and doing some computation on them before we create
* a dispatch_t with them.
*
* Rather than using the dispatch module directly, a publishing module
* receives a "binding" object that it uses to send messages with the right
* settings.
*/
/*
*
* Overview: Messages are sent over channels. Before sending a message on a
* channel, or receiving a message on a channel, a subsystem needs to register
* that it publishes, or subscribes, to that message, on that channel.
*
* Messages, channels, and subsystems are represented internally as short
* integers, though they are associated with human-readable strings for
* initialization and debugging.
*
* When registering for a message, a subsystem must say whether it is an
* exclusive publisher/subscriber to that message type, or whether other
* subsystems may also publish/subscribe to it.
*
* All messages and their publishers/subscribers must be registered early in
* the initialization process.
*
* By default, it is an error for a message type to have publishers and no
* subscribers on a channel, or subscribers and no publishers on a channel.
*
* A subsystem may register for a message with a note that delivery or
* production is disabled -- for example, because the subsystem is
* disabled at compile-time. It is not an error for a message type to
* have all of its publishers or subscribers disabled.
*
* After a message is sent, it is delivered to every recipient. This
* delivery happens from the top level of the event loop; it may be
* interleaved with network events, timers, etc.
*
* Messages may have associated data. This data is typed, and is owned
* by the message. Strings, byte-arrays, and integers have built-in
* support. Other types may be added. If objects are to be sent,
* they should be identified by handle. If an object requires cleanup,
* it should be declared with an associated free function.
*
* Semantically, if two subsystems communicate only by this kind of
* message passing, neither is considered to depend on the other, though
* both are considered to have a dependency on the message and on any
* types it contains.
*
* (Or generational index?)
*/
#ifndef TOR_PUBSUB_PUBSUB_H
#define TOR_PUBSUB_PUBSUB_H
#include "lib/pubsub/pub_binding_st.h"
#include "lib/pubsub/pubsub_connect.h"
#include "lib/pubsub/pubsub_flags.h"
#include "lib/pubsub/pubsub_publish.h"
#endif

View File

@ -0,0 +1,286 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub_build.c
* @brief Construct a dispatch_t in safer, more OO way.
**/
#define PUBSUB_PRIVATE
#include "lib/dispatch/dispatch.h"
#include "lib/dispatch/dispatch_cfg.h"
#include "lib/dispatch/dispatch_naming.h"
#include "lib/dispatch/msgtypes.h"
#include "lib/pubsub/pubsub_flags.h"
#include "lib/pubsub/pub_binding_st.h"
#include "lib/pubsub/pubsub_build.h"
#include "lib/pubsub/pubsub_builder_st.h"
#include "lib/pubsub/pubsub_connect.h"
#include "lib/container/smartlist.h"
#include "lib/log/util_bug.h"
#include "lib/malloc/malloc.h"
#include <string.h>
/** Construct and return a new empty pubsub_items_t. */
static pubsub_items_t *
pubsub_items_new(void)
{
pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
cfg->items = smartlist_new();
cfg->type_items = smartlist_new();
return cfg;
}
/** Release all storage held in a pubsub_items_t. */
void
pubsub_items_free_(pubsub_items_t *cfg)
{
if (! cfg)
return;
SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
SMARTLIST_FOREACH(cfg->type_items,
pubsub_type_cfg_t *, item, tor_free(item));
smartlist_free(cfg->items);
smartlist_free(cfg->type_items);
tor_free(cfg);
}
/** Construct and return a new pubsub_builder_t. */
pubsub_builder_t *
pubsub_builder_new(void)
{
dispatch_naming_init();
pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
pb->cfg = dcfg_new();
pb->items = pubsub_items_new();
return pb;
}
/**
* Release all storage held by a pubsub_builder_t.
*
* You'll (mostly) only want to call this function on an error case: if you're
* constructing a dispatch_t instead, you should call
* pubsub_builder_finalize() to consume the pubsub_builder_t.
*/
void
pubsub_builder_free_(pubsub_builder_t *pb)
{
if (pb == NULL)
return;
pubsub_items_free(pb->items);
dcfg_free(pb->cfg);
tor_free(pb);
}
/**
* Create and return a pubsub_connector_t for the subsystem with ID
* <b>subsys</b> to use in adding publications, subscriptions, and types to
* <b>builder</b>.
**/
pubsub_connector_t *
pubsub_connector_for_subsystem(pubsub_builder_t *builder,
subsys_id_t subsys)
{
tor_assert(builder);
++builder->n_connectors;
pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
con->builder = builder;
con->subsys_id = subsys;
return con;
}
/**
* Release all storage held by a pubsub_connector_t.
**/
void
pubsub_connector_free_(pubsub_connector_t *con)
{
if (!con)
return;
if (con->builder) {
--con->builder->n_connectors;
tor_assert(con->builder->n_connectors >= 0);
}
tor_free(con);
}
/**
* Use <b>con</b> to add a request for being able to publish messages of type
* <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
**/
int
pubsub_add_pub_(pubsub_connector_t *con,
pub_binding_t *out,
channel_id_t channel,
message_id_t msg,
msg_type_id_t type,
unsigned flags,
const char *file,
unsigned line)
{
pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
memset(out, 0, sizeof(*out));
cfg->is_publish = true;
out->msg_template.sender = cfg->subsys = con->subsys_id;
out->msg_template.channel = cfg->channel = channel;
out->msg_template.msg = cfg->msg = msg;
out->msg_template.type = cfg->type = type;
cfg->flags = flags;
cfg->added_by_file = file;
cfg->added_by_line = line;
/* We're grabbing a pointer to the pub_binding_t so we can tell it about
* the dispatcher later on.
*/
cfg->pub_binding = out;
smartlist_add(con->builder->items->items, cfg);
if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
goto err;
if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
goto err;
return 0;
err:
++con->builder->n_errors;
return -1;
}
/**
* Use <b>con</b> to add a request for being able to publish messages of type
* <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
* passing them to the callback in <b>recv_fn</b>.
**/
int
pubsub_add_sub_(pubsub_connector_t *con,
recv_fn_t recv_fn,
channel_id_t channel,
message_id_t msg,
msg_type_id_t type,
unsigned flags,
const char *file,
unsigned line)
{
pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
cfg->is_publish = false;
cfg->subsys = con->subsys_id;
cfg->channel = channel;
cfg->msg = msg;
cfg->type = type;
cfg->flags = flags;
cfg->added_by_file = file;
cfg->added_by_line = line;
cfg->recv_fn = recv_fn;
smartlist_add(con->builder->items->items, cfg);
if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
goto err;
if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
goto err;
if (! (flags & DISP_FLAG_STUB)) {
if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
goto err;
}
return 0;
err:
++con->builder->n_errors;
return -1;
}
/**
* Use <b>con</b> to define a the functions to use for manipulating the type
* <b>type</b>. Any function pointers left as NULL will be implemented as
* no-ops.
**/
int
pubsub_connector_define_type_(pubsub_connector_t *con,
msg_type_id_t type,
dispatch_typefns_t *fns,
const char *file,
unsigned line)
{
pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
cfg->type = type;
memcpy(&cfg->fns, fns, sizeof(*fns));
cfg->subsys = con->subsys_id;
cfg->added_by_file = file;
cfg->added_by_line = line;
smartlist_add(con->builder->items->type_items, cfg);
if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
goto err;
return 0;
err:
++con->builder->n_errors;
return -1;
}
/**
* Initialize the dispatch_ptr field in every relevant publish binding
* for <b>d</b>.
*/
static void
dispatch_fill_pub_binding_backptrs(pubsub_builder_t *builder,
dispatch_t *d)
{
SMARTLIST_FOREACH_BEGIN(builder->items->items, pubsub_cfg_t *, cfg) {
if (cfg->pub_binding) {
// XXXX we could skip this for STUB publishers, and for any publishers
// XXXX where all subscribers are STUB.
cfg->pub_binding->dispatch_ptr = d;
}
} SMARTLIST_FOREACH_END(cfg);
}
/**
* Create a new dispatcher as configured in a pubsub_builder_t.
*
* Consumes and frees its input.
**/
dispatch_t *
pubsub_builder_finalize(pubsub_builder_t *builder)
{
dispatch_t *dispatcher = NULL;
tor_assert_nonfatal(builder->n_connectors == 0);
if (builder->n_errors)
goto err;
/* Coming in the next commit.
if (pubsub_builder_check(builder) < 0)
goto err;
*/
dispatcher = dispatch_new(builder->cfg);
if (!dispatcher)
goto err;
dispatch_fill_pub_binding_backptrs(builder, dispatcher);
err:
pubsub_builder_free(builder);
return dispatcher;
}

View File

@ -0,0 +1,87 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub_build.h
* @brief Header used for constructing the OO publish-subscribe facility.
*
* (See pubsub.h for more general information on this API.)
**/
#ifndef TOR_PUBSUB_BUILD_H
#define TOR_PUBSUB_BUILD_H
struct dispatch_t;
/**
* A "dispatch builder" is an incomplete dispatcher, used when
* registering messages. It does not have the same integrity guarantees
* as a dispatcher. It cannot actually handle messages itself: once all
* subsystems have registered, it is converted into a dispatch_t.
**/
typedef struct pubsub_builder_t pubsub_builder_t;
/**
* A "dispatch connector" is a view of the dispatcher that a subsystem
* uses while initializing itself. It is specific to the subsystem, and
* ensures that each subsystem doesn't need to identify itself
* repeatedly while registering its messages.
**/
typedef struct pubsub_connector_t pubsub_connector_t;
/**
* Create a new pubsub_builder. This should only happen in the
* main-init code.
*/
pubsub_builder_t *pubsub_builder_new(void);
/** DOCDOC */
int pubsub_builder_check(pubsub_builder_t *);
/**
* Free a pubsub builder. This should only happen on error paths, where
* we have decided not to construct a dispatcher for some reason.
*/
#define pubsub_builder_free(db) \
FREE_AND_NULL(pubsub_builder_t, pubsub_builder_free_, (db))
/** Internal implementation of pubsub_builder_free(). */
void pubsub_builder_free_(pubsub_builder_t *);
/**
* Create a pubsub connector that a single subsystem will use to
* register its messages. The main-init code does this during susbsystem
* initialization.
*/
pubsub_connector_t *pubsub_connector_for_subsystem(pubsub_builder_t *,
subsys_id_t);
/**
* The main-init code does this after subsystem initialization.
*/
#define pubsub_connector_free(c) \
FREE_AND_NULL(pubsub_connector_t, pubsub_connector_free_, (c))
void pubsub_connector_free_(pubsub_connector_t *);
/**
* Constructs a dispatcher from a dispatch_builder, after checking that the
* invariances on the messages, channels, and connections have been
* respected.
*
* This should happen after every subsystem has initialized, and before
* entering the mainloop.
*/
struct dispatch_t *pubsub_builder_finalize(pubsub_builder_t *);
#ifdef PUBSUB_PRIVATE
struct pubsub_items_t;
#define pubsub_items_free(cfg) \
FREE_AND_NULL(pubsub_items_t, pubsub_items_free_, (cfg))
void pubsub_items_free_(struct pubsub_items_t *cfg);
#endif
#endif

View File

@ -0,0 +1,161 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub_builder_st.h
*
* @brief private structures used for configuring dispatchers and messages.
*/
#ifndef TOR_PUBSUB_BUILDER_ST_H
#define TOR_PUBSUB_BUILDER_ST_H
#ifdef PUBSUB_PRIVATE
#include <stdbool.h>
#include <stddef.h>
struct dispatch_cfg_t;
struct smartlist_t;
struct pub_binding_t;
/**
* Configuration for a single publication or subscription request.
*
* These can be stored while the dispatcher is in use, but are only used for
* setup, teardown, and debugging.
*
* There are various fields in this request describing the message; all of
* them must match other descriptions of the message, or a bug has occurred.
**/
typedef struct pubsub_cfg_t {
/** True if this is a publishing request; false for a subscribing request. */
bool is_publish;
/** The system making this request. */
subsys_id_t subsys;
/** The channel on which the message is to be sent. */
channel_id_t channel;
/** The message ID to be sent or received. */
message_id_t msg;
/** The C type associated with the message. */
msg_type_id_t type;
/** One or more DISP_FLAGS_* items, combined with bitwise OR. */
unsigned flags;
/**
* Publishing only: a pub_binding object that will receive the binding for
* this request. We will finish filling this in when the dispatcher is
* constructed, so that the subsystem can publish then and not before.
*/
struct pub_binding_t *pub_binding;
/**
* Subscribing only: a function to receive message objects for this request.
*/
recv_fn_t recv_fn;
/** The file from which this message was configured */
const char *added_by_file;
/** The line at which this message was configured */
unsigned added_by_line;
} pubsub_cfg_t;
/**
* Configuration request for a single C type.
*
* These are stored while the dispatcher is in use, but are only used for
* setup, teardown, and debugging.
**/
typedef struct pubsub_type_cfg_t {
/**
* The identifier for this type.
*/
msg_type_id_t type;
/**
* Functions to use when manipulating the type.
*/
dispatch_typefns_t fns;
/** The subsystem that configured this type. */
subsys_id_t subsys;
/** The file from which this type was configured */
const char *added_by_file;
/** The line at which this type was configured */
unsigned added_by_line;
} pubsub_type_cfg_t;
/**
* The set of configuration requests for a dispatcher, as made by various
* subsystems.
**/
typedef struct pubsub_items_t {
/** List of pubsub_cfg_t. */
struct smartlist_t *items;
/** List of pubsub_type_cfg_t. */
struct smartlist_t *type_items;
} pubsub_items_t;
/**
* Type used to construct a dispatcher. We use this type to build up the
* configuration for a dispatcher, and then pass ownership of that
* configuration to the newly constructed dispatcher.
**/
struct pubsub_builder_t {
/** Number of outstanding pubsub_connector_t objects pointing to this
* pubsub_builder_t. */
int n_connectors;
/** Number of errors encountered while constructing this object so far. */
int n_errors;
/** In-progress configuration that we're constructing, as a list of the
* requests that have been made. */
pubsub_items_t *items;
/** In-progress configuration that we're constructing, in a form that can
* be converted to a dispatch_t. */
struct dispatch_cfg_t *cfg;
};
/**
* Type given to a subsystem when adding connections to a pubsub_builder_t.
* We use this type to force each subsystem to get blamed for the
* publications, subscriptions, and types that it adds.
**/
struct pubsub_connector_t {
/** The pubsub_builder that this connector refers to. */
struct pubsub_builder_t *builder;
/** The subsystem that has been given this connector. */
subsys_id_t subsys_id;
};
/**
* Helper structure used when constructing a dispatcher that sorts the
* pubsub_cfg_t objects in various ways.
**/
typedef struct pubsub_adjmap_t {
/* XXXX The next three fields are currently constructed but not yet
* XXXX used. I believe we'll want them in the future, though. -nickm
*/
/** Number of subsystems; length of the *_by_subsys arrays. */
size_t n_subsystems;
/** Array of lists of publisher pubsub_cfg_t objects, indexed by
* subsystem. */
struct smartlist_t **pub_by_subsys;
/** Array of lists of subscriber pubsub_cfg_t objects, indexed by
* subsystem. */
struct smartlist_t **sub_by_subsys;
/** Number of message IDs; length of the *_by_msg arrays. */
size_t n_msgs;
/** Array of lists of publisher pubsub_cfg_t objects, indexed by
* message ID. */
struct smartlist_t **pub_by_msg;
/** Array of lists of subscriber pubsub_cfg_t objects, indexed by
* message ID. */
struct smartlist_t **sub_by_msg;
} pubsub_adjmap_t;
#endif
#endif

View File

@ -0,0 +1,47 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub_connect.h
* @brief Header for functions that add relationships to a pubsub builder.
*
* These functions are used by modules that need to add publication and
* subscription requests.
**/
#ifndef TOR_PUBSUB_CONNECT_H
#define TOR_PUBSUB_CONNECT_H
#include "lib/dispatch/msgtypes.h"
struct pub_binding_t;
struct pubsub_connector_t;
int pubsub_add_pub_(struct pubsub_connector_t *con,
struct pub_binding_t *out,
channel_id_t channel,
message_id_t msg,
msg_type_id_t type,
unsigned flags,
const char *file,
unsigned line);
int pubsub_add_sub_(struct pubsub_connector_t *con,
recv_fn_t recv_fn,
channel_id_t channel,
message_id_t msg,
msg_type_id_t type,
unsigned flags,
const char *file,
unsigned line);
int pubsub_connector_define_type_(struct pubsub_connector_t *,
msg_type_id_t,
dispatch_typefns_t *,
const char *file,
unsigned line);
#endif

View File

@ -0,0 +1,32 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub_flags.h
* @brief Flags that can be set on publish/subscribe messages.
**/
#ifndef TOR_PUBSUB_FLAGS_H
#define TOR_PUBSUB_FLAGS_H
/**
* Flag for registering a message: declare that no other module is allowed to
* publish this message if we are publishing it, or subscribe to it if we are
* subscribing to it.
*/
#define DISP_FLAG_EXCL (1u<<0)
/**
* Flag for registering a message: declare that this message is a stub, and we
* will not actually publish/subscribe it, but that the dispatcher should
* treat us as if we did when typechecking.
*
* We use this so that messages aren't treated as "dangling" if they are
* potentially used by some other build of Tor.
*/
#define DISP_FLAG_STUB (1u<<1)
#endif

View File

@ -0,0 +1,70 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* @file pubsub_publish.h
* @brief Header for functions to publish using a pub_binding_t.
**/
#define PUBSUB_PRIVATE
#define DISPATCH_PRIVATE
#include "orconfig.h"
#include "lib/dispatch/dispatch.h"
#include "lib/dispatch/dispatch_st.h"
#include "lib/pubsub/pub_binding_st.h"
#include "lib/pubsub/pubsub_publish.h"
#include "lib/malloc/malloc.h"
#include "lib/log/util_bug.h"
#include <string.h>
/**
* Publish a message from the publication binding <b>pub</b> using the
* auxiliary data <b>auxdata</b>.
*
* Return 0 on success, -1 on failure.
**/
int
pubsub_pub_(const pub_binding_t *pub, msg_aux_data_t auxdata)
{
dispatch_t *d = pub->dispatch_ptr;
if (BUG(! d)) {
/* Tried to publish a message before the dispatcher was configured. */
/* (Without a dispatcher, we don't know how to free auxdata.) */
return -1;
}
if (BUG(pub->msg_template.type >= d->n_types)) {
/* The type associated with this message is not known to the dispatcher. */
/* (Without a correct type, we don't know how to free auxdata.) */
return -1;
}
if (BUG(pub->msg_template.msg >= d->n_msgs) ||
BUG(pub->msg_template.channel >= d->n_queues)) {
/* The message ID or channel ID was out of bounds. */
d->typefns[pub->msg_template.type].free_fn(auxdata);
return -1;
}
if (! d->table[pub->msg_template.msg]) {
/* Fast path: nobody wants this data. */
// XXXX Faster path: we could store this in the pub_binding_t.
d->typefns[pub->msg_template.type].free_fn(auxdata);
return 0;
}
/* Construct the message object */
msg_t *m = tor_malloc(sizeof(msg_t));
memcpy(m, &pub->msg_template, sizeof(msg_t));
m->aux_data__ = auxdata;
return dispatch_send_msg_unchecked(d, m);
}

View File

@ -0,0 +1,15 @@
/* Copyright (c) 2001, Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#ifndef TOR_PUBSUB_PUBLISH_H
#define TOR_PUBSUB_PUBLISH_H
#include "lib/dispatch/msgtypes.h"
struct pub_binding_t;
int pubsub_pub_(const struct pub_binding_t *pub, msg_aux_data_t auxdata);
#endif

View File

@ -165,6 +165,7 @@ src_test_test_SOURCES += \
src/test/test_proto_misc.c \
src/test/test_protover.c \
src/test/test_pt.c \
src/test/test_pubsub_msg.c \
src/test/test_relay.c \
src/test/test_relaycell.c \
src/test/test_relaycrypt.c \

View File

@ -910,6 +910,7 @@ struct testgroup_t testgroups[] = {
{ "proto/misc/", proto_misc_tests },
{ "protover/", protover_tests },
{ "pt/", pt_tests },
{ "pubsub/msg/", pubsub_msg_tests },
{ "relay/" , relay_tests },
{ "relaycell/", relaycell_tests },
{ "relaycrypt/", relaycrypt_tests },

View File

@ -253,6 +253,7 @@ extern struct testcase_t proto_http_tests[];
extern struct testcase_t proto_misc_tests[];
extern struct testcase_t protover_tests[];
extern struct testcase_t pt_tests[];
extern struct testcase_t pubsub_msg_tests[];
extern struct testcase_t relay_tests[];
extern struct testcase_t relaycell_tests[];
extern struct testcase_t relaycrypt_tests[];

305
src/test/test_pubsub_msg.c Normal file
View File

@ -0,0 +1,305 @@
/* Copyright (c) 2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#define DISPATCH_PRIVATE
#include "test/test.h"
#include "lib/dispatch/dispatch.h"
#include "lib/dispatch/dispatch_naming.h"
#include "lib/dispatch/dispatch_st.h"
#include "lib/dispatch/msgtypes.h"
#include "lib/pubsub/pubsub_flags.h"
#include "lib/pubsub/pub_binding_st.h"
#include "lib/pubsub/pubsub_build.h"
#include "lib/pubsub/pubsub_builder_st.h"
#include "lib/pubsub/pubsub_connect.h"
#include "lib/pubsub/pubsub_publish.h"
#include "lib/log/escape.h"
#include "lib/malloc/malloc.h"
#include "lib/string/printf.h"
#include <stdio.h>
#include <string.h>
static char *
ex_str_fmt(msg_aux_data_t aux)
{
return esc_for_log(aux.ptr);
}
static void
ex_str_free(msg_aux_data_t aux)
{
tor_free_(aux.ptr);
}
static dispatch_typefns_t stringfns = {
.free_fn = ex_str_free,
.fmt_fn = ex_str_fmt
};
// We're using the lowest-level publish/subscribe logic here, to avoid the
// pubsub_macros.h macros and just test the dispatch core. We'll use a string
// type for everything.
#define DECLARE_MESSAGE(suffix) \
static pub_binding_t pub_binding_##suffix; \
static int msg_received_##suffix = 0; \
static void recv_msg_##suffix(const msg_t *m) { \
(void)m; \
++msg_received_##suffix; \
} \
EAT_SEMICOLON
#define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \
STMT_BEGIN { \
con = pubsub_connector_for_subsystem(builder, \
get_subsys_id(#subsys)); \
pubsub_add_pub_(con, &pub_binding_##binding_suffix, \
get_channel_id(#channel), \
get_message_id(#msg), get_msg_type_id("string"), \
(flags), __FILE__, __LINE__); \
pubsub_connector_free(con); \
} STMT_END
#define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \
STMT_BEGIN { \
con = pubsub_connector_for_subsystem(builder, \
get_subsys_id(#subsys)); \
pubsub_add_sub_(con, recv_msg_##hook_suffix, \
get_channel_id(#channel), \
get_message_id(#msg), get_msg_type_id("string"), \
(flags), __FILE__, __LINE__); \
pubsub_connector_free(con); \
} STMT_END
#define SEND(binding_suffix, val) \
STMT_BEGIN { \
msg_aux_data_t data_; \
data_.ptr = tor_strdup(val); \
pubsub_pub_(&pub_binding_##binding_suffix, data_); \
} STMT_END
DECLARE_MESSAGE(msg1);
DECLARE_MESSAGE(msg2);
DECLARE_MESSAGE(msg3);
DECLARE_MESSAGE(msg4);
DECLARE_MESSAGE(msg5);
static smartlist_t *strings_received = NULL;
static void
recv_msg_copy_string(const msg_t *m)
{
const char *s = m->aux_data__.ptr;
smartlist_add(strings_received, tor_strdup(s));
}
static void *
setup_dispatcher(const struct testcase_t *testcase)
{
(void)testcase;
pubsub_builder_t *builder = pubsub_builder_new();
pubsub_connector_t *con;
{
con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
pubsub_connector_define_type_(con,
get_msg_type_id("string"),
&stringfns,
"nowhere.c", 99);
pubsub_connector_free(con);
}
// message1 has one publisher and one subscriber.
ADD_PUBLISH(msg1, sys1, main, message1, 0);
ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
// message2 has a publisher and a stub subscriber.
ADD_PUBLISH(msg2, sys1, main, message2, 0);
ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
// message3 has a publisher and three subscribers.
ADD_PUBLISH(msg3, sys1, main, message3, 0);
ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
// message4 has one publisher and two subscribers, but it's on another
// channel.
ADD_PUBLISH(msg4, sys2, other, message4, 0);
ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
// message5 has a huge number of recipients.
ADD_PUBLISH(msg5, sys3, main, message5, 0);
ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
for (int i = 0; i < 1000-5; ++i) {
char *sys;
tor_asprintf(&sys, "xsys-%d", i);
con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
pubsub_add_sub_(con, recv_msg_copy_string,
get_channel_id("main"),
get_message_id("message5"),
get_msg_type_id("string"), 0, "here", 100);
pubsub_connector_free(con);
tor_free(sys);
}
return pubsub_builder_finalize(builder);
}
static int
cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
{
(void)testcase;
dispatch_t *dispatcher = dispatcher_;
dispatch_free(dispatcher);
return 1;
}
static const struct testcase_setup_t dispatcher_setup = {
setup_dispatcher, cleanup_dispatcher
};
static void
test_pubsub_msg_minimal(void *arg)
{
dispatch_t *d = arg;
tt_int_op(0, OP_EQ, msg_received_msg1);
SEND(msg1, "hello world");
tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
done:
;
}
static void
test_pubsub_msg_send_to_stub(void *arg)
{
dispatch_t *d = arg;
tt_int_op(0, OP_EQ, msg_received_msg2);
SEND(msg2, "hello silence");
tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
done:
;
}
static void
test_pubsub_msg_cancel_msgs(void *arg)
{
dispatch_t *d = arg;
tt_int_op(0, OP_EQ, msg_received_msg1);
for (int i = 0; i < 100; ++i) {
SEND(msg1, "hello world");
}
tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
// At this point, the dispatcher will be freed with queued, undelivered
// messages.
done:
;
}
struct alertfn_target {
dispatch_t *d;
channel_id_t ch;
int count;
};
static void
alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
{
struct alertfn_target *t = arg;
tt_ptr_op(d, OP_EQ, t->d);
tt_int_op(ch, OP_EQ, t->ch);
++t->count;
done:
;
}
static void
test_pubsub_msg_alertfns(void *arg)
{
dispatch_t *d = arg;
struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
tt_int_op(0, OP_EQ,
dispatch_set_alert_fn(d, get_channel_id("main"),
alertfn_generic, &ch1_a));
tt_int_op(0, OP_EQ,
dispatch_set_alert_fn(d, get_channel_id("other"),
alertfn_generic, &ch2_a));
SEND(msg3, "hello");
tt_int_op(ch1_a.count, OP_EQ, 1);
SEND(msg3, "world");
tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
SEND(msg4, "worse things happen in C");
tt_int_op(ch2_a.count, OP_EQ, 1);
// flush the first (main) channel...
tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
// now that the main channel is flushed, sending another message on it
// starts another alert.
tt_int_op(ch1_a.count, OP_EQ, 1);
SEND(msg1, "plover");
tt_int_op(ch1_a.count, OP_EQ, 2);
tt_int_op(ch2_a.count, OP_EQ, 1);
done:
;
}
/* try more than N_FAST_FNS hooks on msg5 */
static void
test_pubsub_msg_many_hooks(void *arg)
{
dispatch_t *d = arg;
strings_received = smartlist_new();
tt_int_op(0, OP_EQ, msg_received_msg5);
SEND(msg5, "hello world");
tt_int_op(0, OP_EQ, msg_received_msg5);
tt_int_op(0, OP_EQ, smartlist_len(strings_received));
tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
tt_int_op(5, OP_EQ, msg_received_msg5);
tt_int_op(995, OP_EQ, smartlist_len(strings_received));
done:
SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
smartlist_free(strings_received);
}
#define T(name) \
{ #name, test_pubsub_msg_ ## name , TT_FORK, \
&dispatcher_setup, NULL }
struct testcase_t pubsub_msg_tests[] = {
T(minimal),
T(send_to_stub),
T(cancel_msgs),
T(alertfns),
T(many_hooks),
END_OF_TESTCASES
};