Merge branch 'pubsub_squashed'

This commit is contained in:
Nick Mathewson 2016-05-11 13:26:29 -04:00
commit 00ee62b8a5
6 changed files with 396 additions and 0 deletions

View File

@ -67,6 +67,7 @@ LIBOR_A_SOURCES = \
src/common/di_ops.c \
src/common/log.c \
src/common/memarea.c \
src/common/pubsub.c \
src/common/util.c \
src/common/util_bug.c \
src/common/util_format.c \
@ -134,6 +135,7 @@ COMMONHEADERS = \
src/common/memarea.h \
src/common/linux_syscalls.inc \
src/common/procmon.h \
src/common/pubsub.h \
src/common/sandbox.h \
src/common/testsupport.h \
src/common/timers.h \

129
src/common/pubsub.c Normal file
View File

@ -0,0 +1,129 @@
/* Copyright (c) 2016, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file pubsub.c
*
* \brief DOCDOC
*/
#include "orconfig.h"
#include "pubsub.h"
#include "container.h"
/** Helper: insert <b>s</b> into <b>topic's</b> list of subscribers, keeping
* them sorted in priority order. */
static void
subscriber_insert(pubsub_topic_t *topic, pubsub_subscriber_t *s)
{
int i;
smartlist_t *sl = topic->subscribers;
for (i = 0; i < smartlist_len(sl); ++i) {
pubsub_subscriber_t *other = smartlist_get(sl, i);
if (s->priority < other->priority) {
break;
}
}
smartlist_insert(sl, i, s);
}
/**
* Add a new subscriber to <b>topic</b>, where (when an event is triggered),
* we'll notify the function <b>fn</b> by passing it <b>subscriber_data</b>.
* Return a handle to the subscribe which can later be passed to
* pubsub_unsubscribe_().
*
* Functions are called in priority order, from lowest to highest.
*
* See pubsub.h for <b>subscribe_flags</b>.
*/
const pubsub_subscriber_t *
pubsub_subscribe_(pubsub_topic_t *topic,
pubsub_subscriber_fn_t fn,
void *subscriber_data,
unsigned subscribe_flags,
unsigned priority)
{
tor_assert(! topic->locked);
if (subscribe_flags & SUBSCRIBE_ATSTART) {
tor_assert(topic->n_events_fired == 0);
}
pubsub_subscriber_t *r = tor_malloc_zero(sizeof(r));
r->priority = priority;
r->subscriber_flags = subscribe_flags;
r->fn = fn;
r->subscriber_data = subscriber_data;
if (topic->subscribers == NULL) {
topic->subscribers = smartlist_new();
}
subscriber_insert(topic, r);
return r;
}
/**
* Remove the subscriber <b>s</b> from <b>topic</b>. After calling this
* function, <b>s</b> may no longer be used.
*/
int
pubsub_unsubscribe_(pubsub_topic_t *topic,
const pubsub_subscriber_t *s)
{
tor_assert(! topic->locked);
smartlist_t *sl = topic->subscribers;
if (sl == NULL)
return -1;
int i = smartlist_pos(sl, s);
if (i == -1)
return -1;
pubsub_subscriber_t *tmp = smartlist_get(sl, i);
tor_assert(tmp == s);
smartlist_del_keeporder(sl, i);
tor_free(tmp);
return 0;
}
/**
* For every subscriber s in <b>topic</b>, invoke notify_fn on s and
* event_data. Return 0 if there were no nonzero return values, and -1 if
* there were any.
*/
int
pubsub_notify_(pubsub_topic_t *topic, pubsub_notify_fn_t notify_fn,
void *event_data, unsigned notify_flags)
{
tor_assert(! topic->locked);
(void) notify_flags;
smartlist_t *sl = topic->subscribers;
int n_bad = 0;
++topic->n_events_fired;
if (sl == NULL)
return -1;
topic->locked = 1;
SMARTLIST_FOREACH_BEGIN(sl, pubsub_subscriber_t *, s) {
int r = notify_fn(s, event_data);
if (r != 0)
++n_bad;
} SMARTLIST_FOREACH_END(s);
topic->locked = 0;
return (n_bad == 0) ? 0 : -1;
}
/**
* Release all storage held by <b>topic</b>.
*/
void
pubsub_clear_(pubsub_topic_t *topic)
{
tor_assert(! topic->locked);
smartlist_t *sl = topic->subscribers;
if (sl == NULL)
return;
SMARTLIST_FOREACH_BEGIN(sl, pubsub_subscriber_t *, s) {
tor_free(s);
} SMARTLIST_FOREACH_END(s);
smartlist_free(sl);
topic->subscribers = NULL;
topic->n_events_fired = 0;
}

177
src/common/pubsub.h Normal file
View File

@ -0,0 +1,177 @@
/* Copyright (c) 2016, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file pubsub.h
* \brief Macros to implement publish/subscribe abstractions.
*
* To use these macros, call DECLARE_PUBSUB_TOPIC() with an identifier to use
* as your topic. Below, I'm going to assume you say DECLARE_PUBSUB_TOPIC(T).
*
* Doing this will declare the following types:
* typedef struct T_event_data_t T_event_data_t; // you define this struct
* typedef struct T_subscriber_data_t T_subscriber_data_t; // this one too.
* typedef struct T_subscriber_t T_subscriber_t; // opaque
* typedef int (*T_subscriber_fn_t)(T_event_data_t*, T_subscriber_data_t*);
*
* and it will declare the following functions:
* const T_subscriber_t *T_subscribe(T_subscriber_fn_t,
* T_subscriber_data_t *,
* unsigned flags,
* unsigned priority);
* int T_unsubscribe(const T_subscriber_t *)
*
* Elsewhere you can say DECLARE_NOTIFY_PUBSUB_TOPIC(static, T), which declares:
* static int T_notify(T_event_data_t *, unsigned notify_flags);
* static void T_clear(void);
*
* And in some C file, you would define these functions with:
* IMPLEMENT_PUBSUB_TOPIC(static, T).
*
* The implementations will be small typesafe wrappers over generic versions
* of the above functions.
*
* To use the typesafe functions, you add any number of subscribers with
* T_subscribe(). Each has an associated function pointer, data pointer,
* and priority. Later, you can invoke T_notify() to declare that the
* event has occurred. Each of the subscribers will be invoked once.
**/
#ifndef TOR_PUBSUB_H
#define TOR_PUBSUB_H
#include "torint.h"
/**
* Flag for T_subscribe: die with an assertion failure if the event
* have ever been published before. Used when a subscriber must absolutely
* never have missed an event.
*/
#define SUBSCRIBE_ATSTART (1u<<0)
#define DECLARE_PUBSUB_STRUCT_TYPES(name) \
/* You define this type. */ \
typedef struct name ## _event_data_t name ## _event_data_t; \
/* You define this type. */ \
typedef struct name ## _subscriber_data_t name ## _subscriber_data_t;
#define DECLARE_PUBSUB_TOPIC(name) \
/* This type is opaque. */ \
typedef struct name ## _subscriber_t name ## _subscriber_t; \
/* You declare functions matching this type. */ \
typedef int (*name ## _subscriber_fn_t)( \
name ## _event_data_t *data, \
name ## _subscriber_data_t *extra); \
/* Call this function to subscribe to a topic. */ \
const name ## _subscriber_t *name ## _subscribe( \
name##_subscriber_fn_t subscriber, \
name##_subscriber_data_t *extra_data, \
unsigned flags, \
unsigned priority); \
/* Call this function to unsubscribe from a topic. */ \
int name ## _unsubscribe(const name##_subscriber_t *s);
#define DECLARE_NOTIFY_PUBSUB_TOPIC(linkage, name) \
/* Call this function to notify all subscribers. Flags not yet used. */ \
linkage int name ## _notify(name ## _event_data_t *data, unsigned flags); \
/* Call this function to release storage held by the topic. */ \
linkage void name ## _clear(void);
/**
* Type used to hold a generic function for a subscriber.
*
* [Yes, it is safe to cast to this, so long as we cast back to the original
* type before calling. From C99: "A pointer to a function of one type may be
* converted to a pointer to a function of another type and back again; the
* result shall compare equal to the original pointer."]
*/
typedef int (*pubsub_subscriber_fn_t)(void *, void *);
/**
* Helper type to implement pubsub abstraction. Don't use this directly.
* It represents a subscriber.
*/
typedef struct pubsub_subscriber_t {
/** Function to invoke when the event triggers. */
pubsub_subscriber_fn_t fn;
/** Data associated with this subscriber. */
void *subscriber_data;
/** Priority for this subscriber. Low priorities happen first. */
unsigned priority;
/** Flags set on this subscriber. Not yet used.*/
unsigned subscriber_flags;
} pubsub_subscriber_t;
/**
* Helper type to implement pubsub abstraction. Don't use this directly.
* It represents a topic, and keeps a record of subscribers.
*/
typedef struct pubsub_topic_t {
/** List of subscribers to this topic. May be NULL. */
struct smartlist_t *subscribers;
/** Total number of times that pubsub_notify_() has ever been called on this
* topic. */
uint64_t n_events_fired;
/** True iff we're running 'notify' on this topic, and shouldn't allow
* any concurrent modifications or events. */
unsigned locked;
} pubsub_topic_t;
const pubsub_subscriber_t *pubsub_subscribe_(pubsub_topic_t *topic,
pubsub_subscriber_fn_t fn,
void *subscriber_data,
unsigned subscribe_flags,
unsigned priority);
int pubsub_unsubscribe_(pubsub_topic_t *topic, const pubsub_subscriber_t *sub);
void pubsub_clear_(pubsub_topic_t *topic);
typedef int (*pubsub_notify_fn_t)(pubsub_subscriber_t *subscriber,
void *notify_data);
int pubsub_notify_(pubsub_topic_t *topic, pubsub_notify_fn_t notify_fn,
void *notify_data, unsigned notify_flags);
#define IMPLEMENT_PUBSUB_TOPIC(notify_linkage, name) \
static pubsub_topic_t name ## _topic_ = { NULL, 0, 0 }; \
const name ## _subscriber_t * \
name ## _subscribe(name##_subscriber_fn_t subscriber, \
name##_subscriber_data_t *extra_data, \
unsigned flags, \
unsigned priority) \
{ \
const pubsub_subscriber_t *s; \
s = pubsub_subscribe_(&name##_topic_, \
(pubsub_subscriber_fn_t)subscriber, \
extra_data, \
flags, \
priority); \
return (const name##_subscriber_t *)s; \
} \
int \
name ## _unsubscribe(const name##_subscriber_t *subscriber) \
{ \
return pubsub_unsubscribe_(&name##_topic_, \
(const pubsub_subscriber_t *)subscriber); \
} \
static int \
name##_call_the_notify_fn_(pubsub_subscriber_t *subscriber, \
void *notify_data) \
{ \
name ## _subscriber_fn_t fn; \
fn = (name ## _subscriber_fn_t) subscriber->fn; \
return fn(notify_data, subscriber->subscriber_data); \
} \
notify_linkage int \
name ## _notify(name ## _event_data_t *event_data, unsigned flags) \
{ \
return pubsub_notify_(&name##_topic_, \
name##_call_the_notify_fn_, \
event_data, \
flags); \
} \
notify_linkage void \
name ## _clear(void) \
{ \
pubsub_clear_(&name##_topic_); \
}
#endif /* TOR_PUBSUB_H */

View File

@ -100,6 +100,7 @@ src_test_test_SOURCES = \
src/test/test_policy.c \
src/test/test_procmon.c \
src/test/test_pt.c \
src/test/test_pubsub.c \
src/test/test_relay.c \
src/test/test_relaycell.c \
src/test/test_rendcache.c \

View File

@ -1159,6 +1159,7 @@ extern struct testcase_t oom_tests[];
extern struct testcase_t options_tests[];
extern struct testcase_t policy_tests[];
extern struct testcase_t procmon_tests[];
extern struct testcase_t pubsub_tests[];
extern struct testcase_t pt_tests[];
extern struct testcase_t relay_tests[];
extern struct testcase_t relaycell_tests[];
@ -1231,6 +1232,7 @@ struct testgroup_t testgroups[] = {
{ "util/format/", util_format_tests },
{ "util/logging/", logging_tests },
{ "util/process/", util_process_tests },
{ "util/pubsub/", pubsub_tests },
{ "util/thread/", thread_tests },
{ "util/handle/", handle_tests },
{ "dns/", dns_tests },

85
src/test/test_pubsub.c Normal file
View File

@ -0,0 +1,85 @@
/* Copyright (c) 2016, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file test_pubsub.c
* \brief Unit tests for publish-subscribe abstraction.
**/
#include "or.h"
#include "test.h"
#include "pubsub.h"
DECLARE_PUBSUB_STRUCT_TYPES(foobar)
DECLARE_PUBSUB_TOPIC(foobar)
DECLARE_NOTIFY_PUBSUB_TOPIC(static, foobar)
IMPLEMENT_PUBSUB_TOPIC(static, foobar)
struct foobar_event_data_t {
unsigned u;
const char *s;
};
struct foobar_subscriber_data_t {
const char *name;
long l;
};
static int
foobar_sub1(foobar_event_data_t *ev, foobar_subscriber_data_t *mine)
{
ev->u += 10;
mine->l += 100;
return 0;
}
static int
foobar_sub2(foobar_event_data_t *ev, foobar_subscriber_data_t *mine)
{
ev->u += 5;
mine->l += 50;
return 0;
}
static void
test_pubsub_basic(void *arg)
{
(void)arg;
foobar_subscriber_data_t subdata1 = { "hi", 0 };
foobar_subscriber_data_t subdata2 = { "wow", 0 };
const foobar_subscriber_t *sub1;
const foobar_subscriber_t *sub2;
foobar_event_data_t ed = { 0, "x" };
foobar_event_data_t ed2 = { 0, "y" };
sub1 = foobar_subscribe(foobar_sub1, &subdata1, SUBSCRIBE_ATSTART, 100);
tt_assert(sub1);
foobar_notify(&ed, 0);
tt_int_op(subdata1.l, OP_EQ, 100);
tt_int_op(subdata2.l, OP_EQ, 0);
tt_int_op(ed.u, OP_EQ, 10);
sub2 = foobar_subscribe(foobar_sub2, &subdata2, 0, 5);
tt_assert(sub2);
foobar_notify(&ed2, 0);
tt_int_op(subdata1.l, OP_EQ, 200);
tt_int_op(subdata2.l, OP_EQ, 50);
tt_int_op(ed2.u, OP_EQ, 15);
foobar_unsubscribe(sub1);
foobar_notify(&ed, 0);
tt_int_op(subdata1.l, OP_EQ, 200);
tt_int_op(subdata2.l, OP_EQ, 100);
tt_int_op(ed.u, OP_EQ, 15);
done:
foobar_clear();
}
struct testcase_t pubsub_tests[] = {
{ "pubsub_basic", test_pubsub_basic, TT_FORK, NULL, NULL },
END_OF_TESTCASES
};