mirror of
https://gitlab.torproject.org/tpo/core/tor.git
synced 2024-11-27 22:03:31 +01:00
Merge branch 'tor-github/pr/1040'
This commit is contained in:
commit
f7e8b3b68c
3
changes/ticket29976
Normal file
3
changes/ticket29976
Normal file
@ -0,0 +1,3 @@
|
||||
o Code simplification and refactoring:
|
||||
- Rework bootstrap tracking to use the new publish-subscribe
|
||||
subsystem. Closes ticket 29976.
|
@ -1256,6 +1256,8 @@ pubsub_connect(void)
|
||||
/* XXXX For each pubsub channel, its delivery strategy should be set at
|
||||
* this XXXX point, using tor_mainloop_set_delivery_strategy().
|
||||
*/
|
||||
tor_mainloop_set_delivery_strategy("orconn", DELIV_IMMEDIATE);
|
||||
tor_mainloop_set_delivery_strategy("ocirc", DELIV_IMMEDIATE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -522,14 +522,13 @@ origin_circuit_get_guard_state(origin_circuit_t *circ)
|
||||
static void
|
||||
circuit_chan_publish(const origin_circuit_t *circ, const channel_t *chan)
|
||||
{
|
||||
ocirc_event_msg_t msg;
|
||||
ocirc_chan_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
msg.type = OCIRC_MSGTYPE_CHAN;
|
||||
msg.u.chan.gid = circ->global_identifier;
|
||||
msg.u.chan.chan = chan->global_identifier;
|
||||
msg.u.chan.onehop = circ->build_state->onehop_tunnel;
|
||||
msg->gid = circ->global_identifier;
|
||||
msg->chan = chan->global_identifier;
|
||||
msg->onehop = circ->build_state->onehop_tunnel;
|
||||
|
||||
ocirc_event_publish(&msg);
|
||||
ocirc_chan_publish(msg);
|
||||
}
|
||||
|
||||
/** Start establishing the first hop of our circuit. Figure out what
|
||||
|
@ -496,17 +496,16 @@ int
|
||||
circuit_event_status(origin_circuit_t *circ, circuit_status_event_t tp,
|
||||
int reason_code)
|
||||
{
|
||||
ocirc_event_msg_t msg;
|
||||
ocirc_cevent_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
tor_assert(circ);
|
||||
|
||||
msg.type = OCIRC_MSGTYPE_CEVENT;
|
||||
msg.u.cevent.gid = circ->global_identifier;
|
||||
msg.u.cevent.evtype = tp;
|
||||
msg.u.cevent.reason = reason_code;
|
||||
msg.u.cevent.onehop = circ->build_state->onehop_tunnel;
|
||||
msg->gid = circ->global_identifier;
|
||||
msg->evtype = tp;
|
||||
msg->reason = reason_code;
|
||||
msg->onehop = circ->build_state->onehop_tunnel;
|
||||
|
||||
ocirc_event_publish(&msg);
|
||||
ocirc_cevent_publish(msg);
|
||||
return control_event_circuit_status(circ, tp, reason_code);
|
||||
}
|
||||
|
||||
@ -514,26 +513,25 @@ circuit_event_status(origin_circuit_t *circ, circuit_status_event_t tp,
|
||||
* Helper function to publish a state change message
|
||||
*
|
||||
* circuit_set_state() calls this to notify subscribers about a change
|
||||
* of the state of an origin circuit.
|
||||
* of the state of an origin circuit. @a circ must be an origin
|
||||
* circuit.
|
||||
**/
|
||||
static void
|
||||
circuit_state_publish(const circuit_t *circ)
|
||||
{
|
||||
ocirc_event_msg_t msg;
|
||||
ocirc_state_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
const origin_circuit_t *ocirc;
|
||||
|
||||
if (!CIRCUIT_IS_ORIGIN(circ))
|
||||
return;
|
||||
tor_assert(CIRCUIT_IS_ORIGIN(circ));
|
||||
ocirc = CONST_TO_ORIGIN_CIRCUIT(circ);
|
||||
/* Only inbound OR circuits can be in this state, not origin circuits. */
|
||||
tor_assert(circ->state != CIRCUIT_STATE_ONIONSKIN_PENDING);
|
||||
|
||||
msg.type = OCIRC_MSGTYPE_STATE;
|
||||
msg.u.state.gid = ocirc->global_identifier;
|
||||
msg.u.state.state = circ->state;
|
||||
msg.u.state.onehop = ocirc->build_state->onehop_tunnel;
|
||||
msg->gid = ocirc->global_identifier;
|
||||
msg->state = circ->state;
|
||||
msg->onehop = ocirc->build_state->onehop_tunnel;
|
||||
|
||||
ocirc_event_publish(&msg);
|
||||
ocirc_state_publish(msg);
|
||||
}
|
||||
|
||||
/** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
|
||||
@ -565,7 +563,8 @@ circuit_set_state(circuit_t *circ, uint8_t state)
|
||||
if (state == CIRCUIT_STATE_GUARD_WAIT || state == CIRCUIT_STATE_OPEN)
|
||||
tor_assert(!circ->n_chan_create_cell);
|
||||
circ->state = state;
|
||||
circuit_state_publish(circ);
|
||||
if (CIRCUIT_IS_ORIGIN(circ))
|
||||
circuit_state_publish(circ);
|
||||
}
|
||||
|
||||
/** Append to <b>out</b> all circuits in state CHAN_WAIT waiting for
|
||||
|
@ -414,13 +414,12 @@ void
|
||||
connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
|
||||
int reason)
|
||||
{
|
||||
orconn_event_msg_t msg;
|
||||
orconn_status_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
msg.type = ORCONN_MSGTYPE_STATUS;
|
||||
msg.u.status.gid = conn->base_.global_identifier;
|
||||
msg.u.status.status = tp;
|
||||
msg.u.status.reason = reason;
|
||||
orconn_event_publish(&msg);
|
||||
msg->gid = conn->base_.global_identifier;
|
||||
msg->status = tp;
|
||||
msg->reason = reason;
|
||||
orconn_status_publish(msg);
|
||||
control_event_or_conn_status(conn, tp, reason);
|
||||
}
|
||||
|
||||
@ -433,26 +432,25 @@ connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
|
||||
static void
|
||||
connection_or_state_publish(const or_connection_t *conn, uint8_t state)
|
||||
{
|
||||
orconn_event_msg_t msg;
|
||||
orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
msg.type = ORCONN_MSGTYPE_STATE;
|
||||
msg.u.state.gid = conn->base_.global_identifier;
|
||||
msg->gid = conn->base_.global_identifier;
|
||||
if (conn->is_pt) {
|
||||
/* Do extra decoding because conn->proxy_type indicates the proxy
|
||||
* protocol that tor uses to talk with the transport plugin,
|
||||
* instead of PROXY_PLUGGABLE. */
|
||||
tor_assert_nonfatal(conn->proxy_type != PROXY_NONE);
|
||||
msg.u.state.proxy_type = PROXY_PLUGGABLE;
|
||||
msg->proxy_type = PROXY_PLUGGABLE;
|
||||
} else {
|
||||
msg.u.state.proxy_type = conn->proxy_type;
|
||||
msg->proxy_type = conn->proxy_type;
|
||||
}
|
||||
msg.u.state.state = state;
|
||||
msg->state = state;
|
||||
if (conn->chan) {
|
||||
msg.u.state.chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
|
||||
msg->chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
|
||||
} else {
|
||||
msg.u.state.chan = 0;
|
||||
msg->chan = 0;
|
||||
}
|
||||
orconn_event_publish(&msg);
|
||||
orconn_state_publish(msg);
|
||||
}
|
||||
|
||||
/** Call this to change or_connection_t states, so the owning channel_tls_t can
|
||||
|
@ -26,59 +26,103 @@
|
||||
#include "core/or/origin_circuit_st.h"
|
||||
#include "lib/subsys/subsys.h"
|
||||
|
||||
/** List of subscribers */
|
||||
static smartlist_t *ocirc_event_rcvrs;
|
||||
DECLARE_PUBLISH(ocirc_state);
|
||||
DECLARE_PUBLISH(ocirc_chan);
|
||||
DECLARE_PUBLISH(ocirc_cevent);
|
||||
|
||||
/** Initialize subscriber list */
|
||||
static int
|
||||
ocirc_event_init(void)
|
||||
static void
|
||||
ocirc_event_free(msg_aux_data_t u)
|
||||
{
|
||||
ocirc_event_rcvrs = smartlist_new();
|
||||
tor_free_(u.ptr);
|
||||
}
|
||||
|
||||
static char *
|
||||
ocirc_state_fmt(msg_aux_data_t u)
|
||||
{
|
||||
ocirc_state_msg_t *msg = (ocirc_state_msg_t *)u.ptr;
|
||||
char *s = NULL;
|
||||
|
||||
tor_asprintf(&s, "<gid=%"PRIu32" state=%d onehop=%d>",
|
||||
msg->gid, msg->state, msg->onehop);
|
||||
return s;
|
||||
}
|
||||
|
||||
static char *
|
||||
ocirc_chan_fmt(msg_aux_data_t u)
|
||||
{
|
||||
ocirc_chan_msg_t *msg = (ocirc_chan_msg_t *)u.ptr;
|
||||
char *s = NULL;
|
||||
|
||||
tor_asprintf(&s, "<gid=%"PRIu32" chan=%"PRIu64" onehop=%d>",
|
||||
msg->gid, msg->chan, msg->onehop);
|
||||
return s;
|
||||
}
|
||||
|
||||
static char *
|
||||
ocirc_cevent_fmt(msg_aux_data_t u)
|
||||
{
|
||||
ocirc_cevent_msg_t *msg = (ocirc_cevent_msg_t *)u.ptr;
|
||||
char *s = NULL;
|
||||
|
||||
tor_asprintf(&s, "<gid=%"PRIu32" evtype=%d reason=%d onehop=%d>",
|
||||
msg->gid, msg->evtype, msg->reason, msg->onehop);
|
||||
return s;
|
||||
}
|
||||
|
||||
static dispatch_typefns_t ocirc_state_fns = {
|
||||
.free_fn = ocirc_event_free,
|
||||
.fmt_fn = ocirc_state_fmt,
|
||||
};
|
||||
|
||||
static dispatch_typefns_t ocirc_chan_fns = {
|
||||
.free_fn = ocirc_event_free,
|
||||
.fmt_fn = ocirc_chan_fmt,
|
||||
};
|
||||
|
||||
static dispatch_typefns_t ocirc_cevent_fns = {
|
||||
.free_fn = ocirc_event_free,
|
||||
.fmt_fn = ocirc_cevent_fmt,
|
||||
};
|
||||
|
||||
static int
|
||||
ocirc_add_pubsub(struct pubsub_connector_t *connector)
|
||||
{
|
||||
if (DISPATCH_REGISTER_TYPE(connector, ocirc_state, ô_state_fns))
|
||||
return -1;
|
||||
if (DISPATCH_REGISTER_TYPE(connector, ocirc_chan, ô_chan_fns))
|
||||
return -1;
|
||||
if (DISPATCH_REGISTER_TYPE(connector, ocirc_cevent, ô_cevent_fns))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_state))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_chan))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_cevent))
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Free subscriber list */
|
||||
static void
|
||||
ocirc_event_fini(void)
|
||||
void
|
||||
ocirc_state_publish(ocirc_state_msg_t *msg)
|
||||
{
|
||||
smartlist_free(ocirc_event_rcvrs);
|
||||
PUBLISH(ocirc_state, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to messages about origin circuit events
|
||||
*
|
||||
* Register a callback function to receive messages about origin
|
||||
* circuits. The publisher calls this function synchronously.
|
||||
**/
|
||||
void
|
||||
ocirc_event_subscribe(ocirc_event_rcvr_t fn)
|
||||
ocirc_chan_publish(ocirc_chan_msg_t *msg)
|
||||
{
|
||||
tor_assert(fn);
|
||||
/* Don't duplicate subscriptions. */
|
||||
if (smartlist_contains(ocirc_event_rcvrs, fn))
|
||||
return;
|
||||
|
||||
smartlist_add(ocirc_event_rcvrs, fn);
|
||||
PUBLISH(ocirc_chan, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish a message about OR connection events
|
||||
*
|
||||
* This calls the subscriber receiver function synchronously.
|
||||
**/
|
||||
void
|
||||
ocirc_event_publish(const ocirc_event_msg_t *msg)
|
||||
ocirc_cevent_publish(ocirc_cevent_msg_t *msg)
|
||||
{
|
||||
SMARTLIST_FOREACH_BEGIN(ocirc_event_rcvrs, ocirc_event_rcvr_t, fn) {
|
||||
tor_assert(fn);
|
||||
(*fn)(msg);
|
||||
} SMARTLIST_FOREACH_END(fn);
|
||||
PUBLISH(ocirc_cevent, msg);
|
||||
}
|
||||
|
||||
const subsys_fns_t sys_ocirc_event = {
|
||||
.name = "ocirc_event",
|
||||
.supported = true,
|
||||
.level = -32,
|
||||
.initialize = ocirc_event_init,
|
||||
.shutdown = ocirc_event_fini,
|
||||
.add_pubsub = ocirc_add_pubsub,
|
||||
};
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <stdbool.h>
|
||||
|
||||
#include "lib/cc/torint.h"
|
||||
#include "lib/pubsub/pubsub.h"
|
||||
|
||||
/** Used to indicate the type of a circuit event passed to the controller.
|
||||
* The various types are defined in control-spec.txt */
|
||||
@ -30,6 +31,8 @@ typedef struct ocirc_state_msg_t {
|
||||
bool onehop; /**< one-hop circuit? */
|
||||
} ocirc_state_msg_t;
|
||||
|
||||
DECLARE_MESSAGE(ocirc_state, ocirc_state, ocirc_state_msg_t *);
|
||||
|
||||
/**
|
||||
* Message when a channel gets associated to a circuit.
|
||||
*
|
||||
@ -44,6 +47,8 @@ typedef struct ocirc_chan_msg_t {
|
||||
bool onehop; /**< one-hop circuit? */
|
||||
} ocirc_chan_msg_t;
|
||||
|
||||
DECLARE_MESSAGE(ocirc_chan, ocirc_chan, ocirc_chan_msg_t *);
|
||||
|
||||
/**
|
||||
* Message for origin circuit status event
|
||||
*
|
||||
@ -56,34 +61,12 @@ typedef struct ocirc_cevent_msg_t {
|
||||
bool onehop; /**< one-hop circuit? */
|
||||
} ocirc_cevent_msg_t;
|
||||
|
||||
/** Discriminant values for origin circuit event message */
|
||||
typedef enum ocirc_msgtype_t {
|
||||
OCIRC_MSGTYPE_STATE,
|
||||
OCIRC_MSGTYPE_CHAN,
|
||||
OCIRC_MSGTYPE_CEVENT,
|
||||
} ocirc_msgtype_t;
|
||||
|
||||
/** Discriminated union for the actual message */
|
||||
typedef struct ocirc_event_msg_t {
|
||||
int type;
|
||||
union {
|
||||
ocirc_state_msg_t state;
|
||||
ocirc_chan_msg_t chan;
|
||||
ocirc_cevent_msg_t cevent;
|
||||
} u;
|
||||
} ocirc_event_msg_t;
|
||||
|
||||
/**
|
||||
* Receiver function pointer for origin circuit subscribers
|
||||
*
|
||||
* This function gets called synchronously by the publisher.
|
||||
**/
|
||||
typedef void (*ocirc_event_rcvr_t)(const ocirc_event_msg_t *);
|
||||
|
||||
void ocirc_event_subscribe(ocirc_event_rcvr_t fn);
|
||||
DECLARE_MESSAGE(ocirc_cevent, ocirc_cevent, ocirc_cevent_msg_t *);
|
||||
|
||||
#ifdef OCIRC_EVENT_PRIVATE
|
||||
void ocirc_event_publish(const ocirc_event_msg_t *msg);
|
||||
void ocirc_state_publish(ocirc_state_msg_t *msg);
|
||||
void ocirc_chan_publish(ocirc_chan_msg_t *msg);
|
||||
void ocirc_cevent_publish(ocirc_cevent_msg_t *msg);
|
||||
#endif
|
||||
|
||||
#endif /* !defined(TOR_OCIRC_EVENT_H) */
|
||||
|
@ -17,65 +17,83 @@
|
||||
**/
|
||||
|
||||
#include "core/or/or.h"
|
||||
#include "lib/pubsub/pubsub.h"
|
||||
#include "lib/subsys/subsys.h"
|
||||
|
||||
#define ORCONN_EVENT_PRIVATE
|
||||
#include "core/or/orconn_event.h"
|
||||
#include "core/or/orconn_event_sys.h"
|
||||
|
||||
/** List of subscribers */
|
||||
static smartlist_t *orconn_event_rcvrs;
|
||||
DECLARE_PUBLISH(orconn_state);
|
||||
DECLARE_PUBLISH(orconn_status);
|
||||
|
||||
/** Initialize subscriber list */
|
||||
static int
|
||||
orconn_event_init(void)
|
||||
static void
|
||||
orconn_event_free(msg_aux_data_t u)
|
||||
{
|
||||
orconn_event_rcvrs = smartlist_new();
|
||||
tor_free_(u.ptr);
|
||||
}
|
||||
|
||||
static char *
|
||||
orconn_state_fmt(msg_aux_data_t u)
|
||||
{
|
||||
orconn_state_msg_t *msg = (orconn_state_msg_t *)u.ptr;
|
||||
char *s = NULL;
|
||||
|
||||
tor_asprintf(&s, "<gid=%"PRIu64" chan=%"PRIu64" proxy_type=%d state=%d>",
|
||||
msg->gid, msg->chan, msg->proxy_type, msg->state);
|
||||
return s;
|
||||
}
|
||||
|
||||
static char *
|
||||
orconn_status_fmt(msg_aux_data_t u)
|
||||
{
|
||||
orconn_status_msg_t *msg = (orconn_status_msg_t *)u.ptr;
|
||||
char *s = NULL;
|
||||
|
||||
tor_asprintf(&s, "<gid=%"PRIu64" status=%d reason=%d>",
|
||||
msg->gid, msg->status, msg->reason);
|
||||
return s;
|
||||
}
|
||||
|
||||
static dispatch_typefns_t orconn_state_fns = {
|
||||
.free_fn = orconn_event_free,
|
||||
.fmt_fn = orconn_state_fmt,
|
||||
};
|
||||
|
||||
static dispatch_typefns_t orconn_status_fns = {
|
||||
.free_fn = orconn_event_free,
|
||||
.fmt_fn = orconn_status_fmt,
|
||||
};
|
||||
|
||||
static int
|
||||
orconn_add_pubsub(struct pubsub_connector_t *connector)
|
||||
{
|
||||
if (DISPATCH_REGISTER_TYPE(connector, orconn_state, &orconn_state_fns))
|
||||
return -1;
|
||||
if (DISPATCH_REGISTER_TYPE(connector, orconn_status, &orconn_status_fns))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_PUB(connector, orconn, orconn_state) != 0)
|
||||
return -1;
|
||||
if (DISPATCH_ADD_PUB(connector, orconn, orconn_status) != 0)
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Free subscriber list */
|
||||
static void
|
||||
orconn_event_fini(void)
|
||||
void
|
||||
orconn_state_publish(orconn_state_msg_t *msg)
|
||||
{
|
||||
smartlist_free(orconn_event_rcvrs);
|
||||
PUBLISH(orconn_state, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to messages about OR connection events
|
||||
*
|
||||
* Register a callback function to receive messages about ORCONNs.
|
||||
* The publisher calls this function synchronously.
|
||||
**/
|
||||
void
|
||||
orconn_event_subscribe(orconn_event_rcvr_t fn)
|
||||
orconn_status_publish(orconn_status_msg_t *msg)
|
||||
{
|
||||
tor_assert(fn);
|
||||
/* Don't duplicate subscriptions. */
|
||||
if (smartlist_contains(orconn_event_rcvrs, fn))
|
||||
return;
|
||||
|
||||
smartlist_add(orconn_event_rcvrs, fn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish a message about OR connection events
|
||||
*
|
||||
* This calls the subscriber receiver function synchronously.
|
||||
**/
|
||||
void
|
||||
orconn_event_publish(const orconn_event_msg_t *msg)
|
||||
{
|
||||
SMARTLIST_FOREACH_BEGIN(orconn_event_rcvrs, orconn_event_rcvr_t, fn) {
|
||||
tor_assert(fn);
|
||||
(*fn)(msg);
|
||||
} SMARTLIST_FOREACH_END(fn);
|
||||
PUBLISH(orconn_status, msg);
|
||||
}
|
||||
|
||||
const subsys_fns_t sys_orconn_event = {
|
||||
.name = "orconn_event",
|
||||
.supported = true,
|
||||
.level = -33,
|
||||
.initialize = orconn_event_init,
|
||||
.shutdown = orconn_event_fini,
|
||||
.add_pubsub = orconn_add_pubsub,
|
||||
};
|
||||
|
@ -16,6 +16,8 @@
|
||||
#ifndef TOR_ORCONN_EVENT_H
|
||||
#define TOR_ORCONN_EVENT_H
|
||||
|
||||
#include "lib/pubsub/pubsub.h"
|
||||
|
||||
/**
|
||||
* @name States of OR connections
|
||||
*
|
||||
@ -62,12 +64,6 @@ typedef enum or_conn_status_event_t {
|
||||
OR_CONN_EVENT_NEW = 4,
|
||||
} or_conn_status_event_t;
|
||||
|
||||
/** Discriminant values for orconn event message */
|
||||
typedef enum orconn_msgtype_t {
|
||||
ORCONN_MSGTYPE_STATE,
|
||||
ORCONN_MSGTYPE_STATUS,
|
||||
} orconn_msgtype_t;
|
||||
|
||||
/**
|
||||
* Message for orconn state update
|
||||
*
|
||||
@ -83,6 +79,8 @@ typedef struct orconn_state_msg_t {
|
||||
uint8_t state; /**< new connection state */
|
||||
} orconn_state_msg_t;
|
||||
|
||||
DECLARE_MESSAGE(orconn_state, orconn_state, orconn_state_msg_t *);
|
||||
|
||||
/**
|
||||
* Message for orconn status event
|
||||
*
|
||||
@ -95,26 +93,11 @@ typedef struct orconn_status_msg_t {
|
||||
int reason; /**< reason */
|
||||
} orconn_status_msg_t;
|
||||
|
||||
/** Discriminated union for the actual message */
|
||||
typedef struct orconn_event_msg_t {
|
||||
int type;
|
||||
union {
|
||||
orconn_state_msg_t state;
|
||||
orconn_status_msg_t status;
|
||||
} u;
|
||||
} orconn_event_msg_t;
|
||||
|
||||
/**
|
||||
* Receiver function pointer for OR subscribers
|
||||
*
|
||||
* This function gets called synchronously by the publisher.
|
||||
**/
|
||||
typedef void (*orconn_event_rcvr_t)(const orconn_event_msg_t *);
|
||||
|
||||
void orconn_event_subscribe(orconn_event_rcvr_t);
|
||||
DECLARE_MESSAGE(orconn_status, orconn_status, orconn_status_msg_t *);
|
||||
|
||||
#ifdef ORCONN_EVENT_PRIVATE
|
||||
void orconn_event_publish(const orconn_event_msg_t *);
|
||||
void orconn_state_publish(orconn_state_msg_t *);
|
||||
void orconn_status_publish(orconn_status_msg_t *);
|
||||
#endif
|
||||
|
||||
#endif /* !defined(TOR_ORCONN_EVENT_H) */
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "feature/control/btrack_circuit.h"
|
||||
#include "feature/control/btrack_orconn.h"
|
||||
#include "feature/control/btrack_sys.h"
|
||||
#include "lib/pubsub/pubsub.h"
|
||||
#include "lib/subsys/subsys.h"
|
||||
|
||||
static int
|
||||
@ -31,8 +32,6 @@ btrack_init(void)
|
||||
{
|
||||
if (btrack_orconn_init())
|
||||
return -1;
|
||||
if (btrack_circ_init())
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -44,10 +43,22 @@ btrack_fini(void)
|
||||
btrack_circ_fini();
|
||||
}
|
||||
|
||||
static int
|
||||
btrack_add_pubsub(pubsub_connector_t *connector)
|
||||
{
|
||||
if (btrack_orconn_add_pubsub(connector))
|
||||
return -1;
|
||||
if (btrack_circ_add_pubsub(connector))
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
const subsys_fns_t sys_btrack = {
|
||||
.name = "btrack",
|
||||
.supported = true,
|
||||
.level = -30,
|
||||
.initialize = btrack_init,
|
||||
.shutdown = btrack_fini,
|
||||
.add_pubsub = btrack_add_pubsub,
|
||||
};
|
||||
|
@ -109,51 +109,53 @@ btc_update_evtype(const ocirc_cevent_msg_t *msg, btc_best_t *best,
|
||||
return false;
|
||||
}
|
||||
|
||||
DECLARE_SUBSCRIBE(ocirc_state, btc_state_rcvr);
|
||||
DECLARE_SUBSCRIBE(ocirc_cevent, btc_cevent_rcvr);
|
||||
DECLARE_SUBSCRIBE(ocirc_chan, btc_chan_rcvr);
|
||||
|
||||
static void
|
||||
btc_state_rcvr(const ocirc_state_msg_t *msg)
|
||||
btc_state_rcvr(const msg_t *msg, const ocirc_state_msg_t *arg)
|
||||
{
|
||||
(void)msg;
|
||||
log_debug(LD_BTRACK, "CIRC gid=%"PRIu32" state=%d onehop=%d",
|
||||
msg->gid, msg->state, msg->onehop);
|
||||
arg->gid, arg->state, arg->onehop);
|
||||
|
||||
btc_update_state(msg, &best_any_state, "ANY");
|
||||
if (msg->onehop)
|
||||
btc_update_state(arg, &best_any_state, "ANY");
|
||||
if (arg->onehop)
|
||||
return;
|
||||
btc_update_state(msg, &best_ap_state, "AP");
|
||||
btc_update_state(arg, &best_ap_state, "AP");
|
||||
}
|
||||
|
||||
static void
|
||||
btc_cevent_rcvr(const ocirc_cevent_msg_t *msg)
|
||||
btc_cevent_rcvr(const msg_t *msg, const ocirc_cevent_msg_t *arg)
|
||||
{
|
||||
(void)msg;
|
||||
log_debug(LD_BTRACK, "CIRC gid=%"PRIu32" evtype=%d reason=%d onehop=%d",
|
||||
msg->gid, msg->evtype, msg->reason, msg->onehop);
|
||||
arg->gid, arg->evtype, arg->reason, arg->onehop);
|
||||
|
||||
btc_update_evtype(msg, &best_any_evtype, "ANY");
|
||||
if (msg->onehop)
|
||||
btc_update_evtype(arg, &best_any_evtype, "ANY");
|
||||
if (arg->onehop)
|
||||
return;
|
||||
btc_update_evtype(msg, &best_ap_evtype, "AP");
|
||||
btc_update_evtype(arg, &best_ap_evtype, "AP");
|
||||
}
|
||||
|
||||
static void
|
||||
btc_event_rcvr(const ocirc_event_msg_t *msg)
|
||||
btc_chan_rcvr(const msg_t *msg, const ocirc_chan_msg_t *arg)
|
||||
{
|
||||
switch (msg->type) {
|
||||
case OCIRC_MSGTYPE_STATE:
|
||||
return btc_state_rcvr(&msg->u.state);
|
||||
case OCIRC_MSGTYPE_CHAN:
|
||||
log_debug(LD_BTRACK, "CIRC gid=%"PRIu32" chan=%"PRIu64" onehop=%d",
|
||||
msg->u.chan.gid, msg->u.chan.chan, msg->u.chan.onehop);
|
||||
break;
|
||||
case OCIRC_MSGTYPE_CEVENT:
|
||||
return btc_cevent_rcvr(&msg->u.cevent);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
(void)msg;
|
||||
log_debug(LD_BTRACK, "CIRC gid=%"PRIu32" chan=%"PRIu64" onehop=%d",
|
||||
arg->gid, arg->chan, arg->onehop);
|
||||
}
|
||||
|
||||
int
|
||||
btrack_circ_init(void)
|
||||
btrack_circ_add_pubsub(pubsub_connector_t *connector)
|
||||
{
|
||||
ocirc_event_subscribe(btc_event_rcvr);
|
||||
if (DISPATCH_ADD_SUB(connector, ocirc, ocirc_chan))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_SUB(connector, ocirc, ocirc_cevent))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_SUB(connector, ocirc, ocirc_state))
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,10 @@
|
||||
#ifndef TOR_BTRACK_CIRCUIT_H
|
||||
#define TOR_BTRACK_CIRCUIT_H
|
||||
|
||||
#include "lib/pubsub/pubsub.h"
|
||||
|
||||
int btrack_circ_init(void);
|
||||
void btrack_circ_fini(void);
|
||||
int btrack_circ_add_pubsub(pubsub_connector_t *);
|
||||
|
||||
#endif /* !defined(TOR_BTRACK_CIRCUIT_H) */
|
||||
|
@ -45,6 +45,11 @@
|
||||
#include "feature/control/btrack_orconn_cevent.h"
|
||||
#include "feature/control/btrack_orconn_maps.h"
|
||||
#include "lib/log/log.h"
|
||||
#include "lib/pubsub/pubsub.h"
|
||||
|
||||
DECLARE_SUBSCRIBE(orconn_state, bto_state_rcvr);
|
||||
DECLARE_SUBSCRIBE(orconn_status, bto_status_rcvr);
|
||||
DECLARE_SUBSCRIBE(ocirc_chan, bto_chan_rcvr);
|
||||
|
||||
/** Pair of a best ORCONN GID and with its state */
|
||||
typedef struct bto_best_t {
|
||||
@ -110,16 +115,17 @@ bto_reset_bests(void)
|
||||
* message comes from code in connection_or.c.
|
||||
**/
|
||||
static void
|
||||
bto_state_rcvr(const orconn_state_msg_t *msg)
|
||||
bto_state_rcvr(const msg_t *msg, const orconn_state_msg_t *arg)
|
||||
{
|
||||
bt_orconn_t *bto;
|
||||
|
||||
bto = bto_find_or_new(msg->gid, msg->chan);
|
||||
(void)msg;
|
||||
bto = bto_find_or_new(arg->gid, arg->chan);
|
||||
log_debug(LD_BTRACK, "ORCONN gid=%"PRIu64" chan=%"PRIu64
|
||||
" proxy_type=%d state=%d",
|
||||
msg->gid, msg->chan, msg->proxy_type, msg->state);
|
||||
bto->proxy_type = msg->proxy_type;
|
||||
bto->state = msg->state;
|
||||
arg->gid, arg->chan, arg->proxy_type, arg->state);
|
||||
bto->proxy_type = arg->proxy_type;
|
||||
bto->state = arg->state;
|
||||
if (bto->is_orig)
|
||||
bto_update_bests(bto);
|
||||
}
|
||||
@ -130,54 +136,38 @@ bto_state_rcvr(const orconn_state_msg_t *msg)
|
||||
* control.c.
|
||||
**/
|
||||
static void
|
||||
bto_status_rcvr(const orconn_status_msg_t *msg)
|
||||
bto_status_rcvr(const msg_t *msg, const orconn_status_msg_t *arg)
|
||||
{
|
||||
switch (msg->status) {
|
||||
(void)msg;
|
||||
switch (arg->status) {
|
||||
case OR_CONN_EVENT_FAILED:
|
||||
case OR_CONN_EVENT_CLOSED:
|
||||
log_info(LD_BTRACK, "ORCONN DELETE gid=%"PRIu64" status=%d reason=%d",
|
||||
msg->gid, msg->status, msg->reason);
|
||||
return bto_delete(msg->gid);
|
||||
arg->gid, arg->status, arg->reason);
|
||||
return bto_delete(arg->gid);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/** Dispatch to individual ORCONN message handlers */
|
||||
static void
|
||||
bto_event_rcvr(const orconn_event_msg_t *msg)
|
||||
{
|
||||
switch (msg->type) {
|
||||
case ORCONN_MSGTYPE_STATE:
|
||||
return bto_state_rcvr(&msg->u.state);
|
||||
case ORCONN_MSGTYPE_STATUS:
|
||||
return bto_status_rcvr(&msg->u.status);
|
||||
default:
|
||||
tor_assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create or update a cached ORCONN state for a newly launched
|
||||
* connection, including whether it's launched by an origin circuit
|
||||
* and whether it's a one-hop circuit.
|
||||
**/
|
||||
static void
|
||||
bto_chan_rcvr(const ocirc_event_msg_t *msg)
|
||||
bto_chan_rcvr(const msg_t *msg, const ocirc_chan_msg_t *arg)
|
||||
{
|
||||
bt_orconn_t *bto;
|
||||
|
||||
/* Ignore other kinds of origin circuit events; we don't need them */
|
||||
if (msg->type != OCIRC_MSGTYPE_CHAN)
|
||||
return;
|
||||
|
||||
bto = bto_find_or_new(0, msg->u.chan.chan);
|
||||
if (!bto->is_orig || (bto->is_onehop && !msg->u.chan.onehop)) {
|
||||
(void)msg;
|
||||
bto = bto_find_or_new(0, arg->chan);
|
||||
if (!bto->is_orig || (bto->is_onehop && !arg->onehop)) {
|
||||
log_debug(LD_BTRACK, "ORCONN LAUNCH chan=%"PRIu64" onehop=%d",
|
||||
msg->u.chan.chan, msg->u.chan.onehop);
|
||||
arg->chan, arg->onehop);
|
||||
}
|
||||
bto->is_orig = true;
|
||||
if (!msg->u.chan.onehop)
|
||||
if (!arg->onehop)
|
||||
bto->is_onehop = false;
|
||||
bto_update_bests(bto);
|
||||
}
|
||||
@ -190,12 +180,22 @@ int
|
||||
btrack_orconn_init(void)
|
||||
{
|
||||
bto_init_maps();
|
||||
orconn_event_subscribe(bto_event_rcvr);
|
||||
ocirc_event_subscribe(bto_chan_rcvr);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
btrack_orconn_add_pubsub(pubsub_connector_t *connector)
|
||||
{
|
||||
if (DISPATCH_ADD_SUB(connector, orconn, orconn_state))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_SUB(connector, orconn, orconn_status))
|
||||
return -1;
|
||||
if (DISPATCH_ADD_SUB(connector, ocirc, ocirc_chan))
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Clear the hash maps and reset the "best" states */
|
||||
void
|
||||
btrack_orconn_fini(void)
|
||||
|
@ -9,6 +9,8 @@
|
||||
#ifndef TOR_BTRACK_ORCONN_H
|
||||
#define TOR_BTRACK_ORCONN_H
|
||||
|
||||
#include "lib/pubsub/pubsub.h"
|
||||
|
||||
#ifdef BTRACK_ORCONN_PRIVATE
|
||||
|
||||
#include "ht.h"
|
||||
@ -33,6 +35,7 @@ typedef struct bt_orconn_t {
|
||||
#endif /* defined(BTRACK_ORCONN_PRIVATE) */
|
||||
|
||||
int btrack_orconn_init(void);
|
||||
int btrack_orconn_add_pubsub(pubsub_connector_t *);
|
||||
void btrack_orconn_fini(void);
|
||||
|
||||
#endif /* !defined(TOR_BTRACK_ORCONN_H) */
|
||||
|
@ -172,34 +172,20 @@ pubsub_cfg_dump(const pubsub_cfg_t *cfg, int severity, const char *prefix)
|
||||
|
||||
/**
|
||||
* Helper: fill a bitarray <b>out</b> with entries corresponding to the
|
||||
* subsystems listed in <b>items</b>. If any subsystem is listed more than
|
||||
* once, log a warning. Return 0 on success, -1 on failure.
|
||||
* subsystems listed in <b>items</b>.
|
||||
**/
|
||||
static int
|
||||
static void
|
||||
get_message_bitarray(const pubsub_adjmap_t *map,
|
||||
message_id_t msg,
|
||||
const smartlist_t *items,
|
||||
const char *operation,
|
||||
bitarray_t **out)
|
||||
{
|
||||
bool ok = true;
|
||||
*out = bitarray_init_zero((unsigned)map->n_subsystems);
|
||||
if (! items)
|
||||
return 0;
|
||||
return;
|
||||
|
||||
SMARTLIST_FOREACH_BEGIN(items, const pubsub_cfg_t *, cfg) {
|
||||
if (bitarray_is_set(*out, cfg->subsys)) {
|
||||
log_warn(LD_MESG|LD_BUG,
|
||||
"Message \"%s\" is configured to be %s by subsystem "
|
||||
"\"%s\" more than once.",
|
||||
get_message_id_name(msg), operation,
|
||||
get_subsys_id_name(cfg->subsys));
|
||||
ok = false;
|
||||
}
|
||||
bitarray_set(*out, cfg->subsys);
|
||||
} SMARTLIST_FOREACH_END(cfg);
|
||||
|
||||
return ok ? 0 : -1;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -222,10 +208,8 @@ lint_message_graph(const pubsub_adjmap_t *map,
|
||||
bitarray_t *subscribed_by = NULL;
|
||||
bool ok = true;
|
||||
|
||||
if (get_message_bitarray(map, msg, pub, "published", &published_by) < 0)
|
||||
ok = false;
|
||||
if (get_message_bitarray(map, msg, sub, "subscribed", &subscribed_by) < 0)
|
||||
ok = false;
|
||||
get_message_bitarray(map, pub, &published_by);
|
||||
get_message_bitarray(map, sub, &subscribed_by);
|
||||
|
||||
/* Check whether any subsystem is publishing and subscribing the same
|
||||
* message. [??]
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "core/or/or.h"
|
||||
|
||||
#include "test/test.h"
|
||||
#include "test_helpers.h"
|
||||
#include "test/log_test_helpers.h"
|
||||
|
||||
#define OCIRC_EVENT_PRIVATE
|
||||
@ -11,49 +12,74 @@
|
||||
#include "core/or/ocirc_event.h"
|
||||
#include "core/or/orconn_event.h"
|
||||
|
||||
static void
|
||||
send_state(const orconn_state_msg_t *msg_in)
|
||||
{
|
||||
orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
*msg = *msg_in;
|
||||
orconn_state_publish(msg);
|
||||
}
|
||||
|
||||
static void
|
||||
send_status(const orconn_status_msg_t *msg_in)
|
||||
{
|
||||
orconn_status_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
*msg = *msg_in;
|
||||
orconn_status_publish(msg);
|
||||
}
|
||||
|
||||
static void
|
||||
send_chan(const ocirc_chan_msg_t *msg_in)
|
||||
{
|
||||
ocirc_chan_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
*msg = *msg_in;
|
||||
ocirc_chan_publish(msg);
|
||||
}
|
||||
|
||||
static void
|
||||
test_btrack_launch(void *arg)
|
||||
{
|
||||
orconn_event_msg_t conn;
|
||||
ocirc_event_msg_t circ;
|
||||
orconn_state_msg_t conn;
|
||||
ocirc_chan_msg_t circ;
|
||||
|
||||
(void)arg;
|
||||
conn.type = ORCONN_MSGTYPE_STATE;
|
||||
conn.u.state.gid = 1;
|
||||
conn.u.state.chan = 1;
|
||||
conn.u.state.proxy_type = PROXY_NONE;
|
||||
conn.u.state.state = OR_CONN_STATE_CONNECTING;
|
||||
conn.gid = 1;
|
||||
conn.chan = 1;
|
||||
conn.proxy_type = PROXY_NONE;
|
||||
conn.state = OR_CONN_STATE_CONNECTING;
|
||||
|
||||
setup_full_capture_of_logs(LOG_DEBUG);
|
||||
orconn_event_publish(&conn);
|
||||
send_state(&conn);
|
||||
expect_log_msg_containing("ORCONN gid=1 chan=1 proxy_type=0 state=1");
|
||||
expect_no_log_msg_containing("ORCONN BEST_");
|
||||
teardown_capture_of_logs();
|
||||
|
||||
circ.type = OCIRC_MSGTYPE_CHAN;
|
||||
circ.u.chan.chan = 1;
|
||||
circ.u.chan.onehop = true;
|
||||
circ.chan = 1;
|
||||
circ.onehop = true;
|
||||
|
||||
setup_full_capture_of_logs(LOG_DEBUG);
|
||||
ocirc_event_publish(&circ);
|
||||
send_chan(&circ);
|
||||
expect_log_msg_containing("ORCONN LAUNCH chan=1 onehop=1");
|
||||
expect_log_msg_containing("ORCONN BEST_ANY state -1->1 gid=1");
|
||||
teardown_capture_of_logs();
|
||||
|
||||
conn.u.state.gid = 2;
|
||||
conn.u.state.chan = 2;
|
||||
conn.gid = 2;
|
||||
conn.chan = 2;
|
||||
|
||||
setup_full_capture_of_logs(LOG_DEBUG);
|
||||
orconn_event_publish(&conn);
|
||||
send_state(&conn);
|
||||
expect_log_msg_containing("ORCONN gid=2 chan=2 proxy_type=0 state=1");
|
||||
expect_no_log_msg_containing("ORCONN BEST_");
|
||||
teardown_capture_of_logs();
|
||||
|
||||
circ.u.chan.chan = 2;
|
||||
circ.u.chan.onehop = false;
|
||||
circ.chan = 2;
|
||||
circ.onehop = false;
|
||||
|
||||
setup_full_capture_of_logs(LOG_DEBUG);
|
||||
ocirc_event_publish(&circ);
|
||||
send_chan(&circ);
|
||||
expect_log_msg_containing("ORCONN LAUNCH chan=2 onehop=0");
|
||||
expect_log_msg_containing("ORCONN BEST_AP state -1->1 gid=2");
|
||||
teardown_capture_of_logs();
|
||||
@ -65,27 +91,26 @@ test_btrack_launch(void *arg)
|
||||
static void
|
||||
test_btrack_delete(void *arg)
|
||||
{
|
||||
orconn_event_msg_t conn;
|
||||
orconn_state_msg_t state;
|
||||
orconn_status_msg_t status;
|
||||
|
||||
(void)arg;
|
||||
conn.type = ORCONN_MSGTYPE_STATE;
|
||||
conn.u.state.gid = 1;
|
||||
conn.u.state.chan = 1;
|
||||
conn.u.state.proxy_type = PROXY_NONE;
|
||||
conn.u.state.state = OR_CONN_STATE_CONNECTING;
|
||||
state.gid = 1;
|
||||
state.chan = 1;
|
||||
state.proxy_type = PROXY_NONE;
|
||||
state.state = OR_CONN_STATE_CONNECTING;
|
||||
|
||||
setup_full_capture_of_logs(LOG_DEBUG);
|
||||
orconn_event_publish(&conn);
|
||||
send_state(&state);
|
||||
expect_log_msg_containing("ORCONN gid=1 chan=1 proxy_type=0");
|
||||
teardown_capture_of_logs();
|
||||
|
||||
conn.type = ORCONN_MSGTYPE_STATUS;
|
||||
conn.u.status.gid = 1;
|
||||
conn.u.status.status = OR_CONN_EVENT_CLOSED;
|
||||
conn.u.status.reason = 0;
|
||||
status.gid = 1;
|
||||
status.status = OR_CONN_EVENT_CLOSED;
|
||||
status.reason = 0;
|
||||
|
||||
setup_full_capture_of_logs(LOG_DEBUG);
|
||||
orconn_event_publish(&conn);
|
||||
send_status(&status);
|
||||
expect_log_msg_containing("ORCONN DELETE gid=1 status=3 reason=0");
|
||||
teardown_capture_of_logs();
|
||||
|
||||
@ -94,7 +119,7 @@ test_btrack_delete(void *arg)
|
||||
}
|
||||
|
||||
struct testcase_t btrack_tests[] = {
|
||||
{ "launch", test_btrack_launch, TT_FORK, 0, NULL },
|
||||
{ "delete", test_btrack_delete, TT_FORK, 0, NULL },
|
||||
{ "launch", test_btrack_launch, TT_FORK, &helper_pubsub_setup, NULL },
|
||||
{ "delete", test_btrack_delete, TT_FORK, &helper_pubsub_setup, NULL },
|
||||
END_OF_TESTCASES
|
||||
};
|
||||
|
@ -197,7 +197,7 @@ test_circuitstats_hoplen(void *arg)
|
||||
}
|
||||
|
||||
#define TEST_CIRCUITSTATS(name, flags) \
|
||||
{ #name, test_##name, (flags), NULL, NULL }
|
||||
{ #name, test_##name, (flags), &helper_pubsub_setup, NULL }
|
||||
|
||||
struct testcase_t circuitstats_tests[] = {
|
||||
TEST_CIRCUITSTATS(circuitstats_hoplen, TT_FORK),
|
||||
|
@ -7,6 +7,7 @@
|
||||
#define CONTROL_EVENTS_PRIVATE
|
||||
#define OCIRC_EVENT_PRIVATE
|
||||
#define ORCONN_EVENT_PRIVATE
|
||||
#include "app/main/subsysmgr.h"
|
||||
#include "core/or/or.h"
|
||||
#include "core/or/channel.h"
|
||||
#include "core/or/channeltls.h"
|
||||
@ -16,6 +17,7 @@
|
||||
#include "core/mainloop/connection.h"
|
||||
#include "feature/control/control_events.h"
|
||||
#include "test/test.h"
|
||||
#include "test/test_helpers.h"
|
||||
|
||||
#include "core/or/or_circuit_st.h"
|
||||
#include "core/or/origin_circuit_st.h"
|
||||
@ -394,38 +396,39 @@ test_cntev_dirboot_defer_orconn(void *arg)
|
||||
}
|
||||
|
||||
static void
|
||||
setup_orconn_state(orconn_event_msg_t *msg, uint64_t gid, uint64_t chan,
|
||||
setup_orconn_state(orconn_state_msg_t *msg, uint64_t gid, uint64_t chan,
|
||||
int proxy_type)
|
||||
{
|
||||
msg->type = ORCONN_MSGTYPE_STATE;
|
||||
msg->u.state.gid = gid;
|
||||
msg->u.state.chan = chan;
|
||||
msg->u.state.proxy_type = proxy_type;
|
||||
msg->gid = gid;
|
||||
msg->chan = chan;
|
||||
msg->proxy_type = proxy_type;
|
||||
}
|
||||
|
||||
static void
|
||||
send_orconn_state(orconn_event_msg_t *msg, uint8_t state)
|
||||
send_orconn_state(const orconn_state_msg_t *msg_in, uint8_t state)
|
||||
{
|
||||
msg->u.state.state = state;
|
||||
orconn_event_publish(msg);
|
||||
orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
*msg = *msg_in;
|
||||
msg->state = state;
|
||||
orconn_state_publish(msg);
|
||||
}
|
||||
|
||||
static void
|
||||
send_ocirc_chan(uint32_t gid, uint64_t chan, bool onehop)
|
||||
{
|
||||
ocirc_event_msg_t msg;
|
||||
ocirc_chan_msg_t *msg = tor_malloc(sizeof(*msg));
|
||||
|
||||
msg.type = OCIRC_MSGTYPE_CHAN;
|
||||
msg.u.chan.gid = gid;
|
||||
msg.u.chan.chan = chan;
|
||||
msg.u.chan.onehop = onehop;
|
||||
ocirc_event_publish(&msg);
|
||||
msg->gid = gid;
|
||||
msg->chan = chan;
|
||||
msg->onehop = onehop;
|
||||
ocirc_chan_publish(msg);
|
||||
}
|
||||
|
||||
static void
|
||||
test_cntev_orconn_state(void *arg)
|
||||
{
|
||||
orconn_event_msg_t conn;
|
||||
orconn_state_msg_t conn;
|
||||
|
||||
(void)arg;
|
||||
MOCK(queue_control_event_string, mock_queue_control_event_string);
|
||||
@ -442,8 +445,8 @@ test_cntev_orconn_state(void *arg)
|
||||
send_orconn_state(&conn, OR_CONN_STATE_OPEN);
|
||||
assert_bootmsg("15 TAG=handshake_done");
|
||||
|
||||
conn.u.state.gid = 2;
|
||||
conn.u.state.chan = 2;
|
||||
conn.gid = 2;
|
||||
conn.chan = 2;
|
||||
send_orconn_state(&conn, OR_CONN_STATE_CONNECTING);
|
||||
/* It doesn't know it's an origin circuit yet */
|
||||
assert_bootmsg("15 TAG=handshake_done");
|
||||
@ -464,7 +467,7 @@ test_cntev_orconn_state(void *arg)
|
||||
static void
|
||||
test_cntev_orconn_state_pt(void *arg)
|
||||
{
|
||||
orconn_event_msg_t conn;
|
||||
orconn_state_msg_t conn;
|
||||
|
||||
(void)arg;
|
||||
MOCK(queue_control_event_string, mock_queue_control_event_string);
|
||||
@ -484,8 +487,8 @@ test_cntev_orconn_state_pt(void *arg)
|
||||
assert_bootmsg("15 TAG=handshake_done");
|
||||
|
||||
send_ocirc_chan(2, 2, false);
|
||||
conn.u.state.gid = 2;
|
||||
conn.u.state.chan = 2;
|
||||
conn.gid = 2;
|
||||
conn.chan = 2;
|
||||
send_orconn_state(&conn, OR_CONN_STATE_CONNECTING);
|
||||
assert_bootmsg("76 TAG=ap_conn_pt");
|
||||
send_orconn_state(&conn, OR_CONN_STATE_PROXY_HANDSHAKING);
|
||||
@ -499,7 +502,7 @@ test_cntev_orconn_state_pt(void *arg)
|
||||
static void
|
||||
test_cntev_orconn_state_proxy(void *arg)
|
||||
{
|
||||
orconn_event_msg_t conn;
|
||||
orconn_state_msg_t conn;
|
||||
|
||||
(void)arg;
|
||||
MOCK(queue_control_event_string, mock_queue_control_event_string);
|
||||
@ -519,8 +522,8 @@ test_cntev_orconn_state_proxy(void *arg)
|
||||
assert_bootmsg("15 TAG=handshake_done");
|
||||
|
||||
send_ocirc_chan(2, 2, false);
|
||||
conn.u.state.gid = 2;
|
||||
conn.u.state.chan = 2;
|
||||
conn.gid = 2;
|
||||
conn.chan = 2;
|
||||
send_orconn_state(&conn, OR_CONN_STATE_CONNECTING);
|
||||
assert_bootmsg("78 TAG=ap_conn_proxy");
|
||||
send_orconn_state(&conn, OR_CONN_STATE_PROXY_HANDSHAKING);
|
||||
@ -534,15 +537,18 @@ test_cntev_orconn_state_proxy(void *arg)
|
||||
#define TEST(name, flags) \
|
||||
{ #name, test_cntev_ ## name, flags, 0, NULL }
|
||||
|
||||
#define T_PUBSUB(name, setup) \
|
||||
{ #name, test_cntev_ ## name, TT_FORK, &helper_pubsub_setup, NULL }
|
||||
|
||||
struct testcase_t controller_event_tests[] = {
|
||||
TEST(sum_up_cell_stats, TT_FORK),
|
||||
TEST(append_cell_stats, TT_FORK),
|
||||
TEST(format_cell_stats, TT_FORK),
|
||||
TEST(event_mask, TT_FORK),
|
||||
TEST(dirboot_defer_desc, TT_FORK),
|
||||
TEST(dirboot_defer_orconn, TT_FORK),
|
||||
TEST(orconn_state, TT_FORK),
|
||||
TEST(orconn_state_pt, TT_FORK),
|
||||
TEST(orconn_state_proxy, TT_FORK),
|
||||
T_PUBSUB(dirboot_defer_desc, TT_FORK),
|
||||
T_PUBSUB(dirboot_defer_orconn, TT_FORK),
|
||||
T_PUBSUB(orconn_state, TT_FORK),
|
||||
T_PUBSUB(orconn_state_pt, TT_FORK),
|
||||
T_PUBSUB(orconn_state_proxy, TT_FORK),
|
||||
END_OF_TESTCASES
|
||||
};
|
||||
|
@ -587,6 +587,6 @@ struct testcase_t extorport_tests[] = {
|
||||
{ "cookie_auth", test_ext_or_cookie_auth, TT_FORK, NULL, NULL },
|
||||
{ "cookie_auth_testvec", test_ext_or_cookie_auth_testvec, TT_FORK,
|
||||
NULL, NULL },
|
||||
{ "handshake", test_ext_or_handshake, TT_FORK, NULL, NULL },
|
||||
{ "handshake", test_ext_or_handshake, TT_FORK, &helper_pubsub_setup, NULL },
|
||||
END_OF_TESTCASES
|
||||
};
|
||||
|
@ -17,12 +17,17 @@
|
||||
#include "lib/buf/buffers.h"
|
||||
#include "app/config/config.h"
|
||||
#include "app/config/confparse.h"
|
||||
#include "app/main/subsysmgr.h"
|
||||
#include "core/mainloop/connection.h"
|
||||
#include "lib/crypt_ops/crypto_rand.h"
|
||||
#include "core/mainloop/mainloop.h"
|
||||
#include "feature/nodelist/nodelist.h"
|
||||
#include "core/or/relay.h"
|
||||
#include "feature/nodelist/routerlist.h"
|
||||
#include "lib/dispatch/dispatch.h"
|
||||
#include "lib/dispatch/dispatch_naming.h"
|
||||
#include "lib/pubsub/pubsub_build.h"
|
||||
#include "lib/pubsub/pubsub_connect.h"
|
||||
#include "lib/encoding/confline.h"
|
||||
#include "lib/net/resolve.h"
|
||||
|
||||
@ -303,3 +308,54 @@ helper_parse_options(const char *conf)
|
||||
}
|
||||
return opt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch alertfn callback: flush all messages right now. Implements
|
||||
* DELIV_IMMEDIATE.
|
||||
**/
|
||||
static void
|
||||
alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
|
||||
{
|
||||
(void) arg;
|
||||
dispatch_flush(d, chan, INT_MAX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup helper for tests that need pubsub active
|
||||
*
|
||||
* Does not hook up mainloop events. Does set immediate delivery for
|
||||
* all channels.
|
||||
*/
|
||||
void *
|
||||
helper_setup_pubsub(const struct testcase_t *testcase)
|
||||
{
|
||||
dispatch_t *dispatcher = NULL;
|
||||
pubsub_builder_t *builder = pubsub_builder_new();
|
||||
channel_id_t chan = get_channel_id("orconn");
|
||||
|
||||
(void)testcase;
|
||||
(void)subsystems_add_pubsub(builder);
|
||||
dispatcher = pubsub_builder_finalize(builder, NULL);
|
||||
tor_assert(dispatcher);
|
||||
dispatch_set_alert_fn(dispatcher, chan, alertfn_immediate, NULL);
|
||||
chan = get_channel_id("ocirc");
|
||||
dispatch_set_alert_fn(dispatcher, chan, alertfn_immediate, NULL);
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup helper for tests that need pubsub active
|
||||
*/
|
||||
int
|
||||
helper_cleanup_pubsub(const struct testcase_t *testcase, void *dispatcher_)
|
||||
{
|
||||
dispatch_t *dispatcher = dispatcher_;
|
||||
|
||||
(void)testcase;
|
||||
dispatch_free(dispatcher);
|
||||
return 1;
|
||||
}
|
||||
|
||||
const struct testcase_setup_t helper_pubsub_setup = {
|
||||
helper_setup_pubsub, helper_cleanup_pubsub
|
||||
};
|
||||
|
@ -7,6 +7,7 @@
|
||||
#define BUFFERS_PRIVATE
|
||||
|
||||
#include "core/or/or.h"
|
||||
#include "tinytest.h"
|
||||
|
||||
const char *get_yesterday_date_str(void);
|
||||
|
||||
@ -31,5 +32,10 @@ or_options_t *helper_parse_options(const char *conf);
|
||||
|
||||
extern const char TEST_DESCRIPTORS[];
|
||||
|
||||
void *helper_setup_pubsub(const struct testcase_t *);
|
||||
int helper_cleanup_pubsub(const struct testcase_t *, void *);
|
||||
|
||||
extern const struct testcase_setup_t helper_pubsub_setup;
|
||||
|
||||
#endif /* !defined(TOR_TEST_HELPERS_H) */
|
||||
|
||||
|
@ -493,48 +493,6 @@ test_pubsub_build_sub_many(void *arg)
|
||||
tor_free(sysname);
|
||||
}
|
||||
|
||||
/* The same subsystem can only declare one publish or subscribe. */
|
||||
static void
|
||||
test_pubsub_build_pubsub_redundant(void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
pubsub_builder_t *b = NULL;
|
||||
dispatch_t *dispatcher = NULL;
|
||||
pubsub_connector_t *c = NULL;
|
||||
|
||||
b = pubsub_builder_new();
|
||||
seed_pubsub_builder_basic(b);
|
||||
pub_binding_t btmp;
|
||||
|
||||
{
|
||||
c = pubsub_connector_for_subsystem(b, get_subsys_id("sys2"));
|
||||
DISPATCH_ADD_SUB(c, main, bunch_of_coconuts);
|
||||
pubsub_add_pub_(c, &btmp, get_channel_id("main"),
|
||||
get_message_id("yes_we_have_no"),
|
||||
get_msg_type_id("string"),
|
||||
0 /* flags */,
|
||||
"somewhere.c", 22);
|
||||
pubsub_connector_free(c);
|
||||
};
|
||||
|
||||
setup_full_capture_of_logs(LOG_WARN);
|
||||
dispatcher = pubsub_builder_finalize(b, NULL);
|
||||
b = NULL;
|
||||
tt_assert(dispatcher == NULL);
|
||||
|
||||
expect_log_msg_containing(
|
||||
"Message \"yes_we_have_no\" is configured to be published by "
|
||||
"subsystem \"sys2\" more than once.");
|
||||
expect_log_msg_containing(
|
||||
"Message \"bunch_of_coconuts\" is configured to be subscribed by "
|
||||
"subsystem \"sys2\" more than once.");
|
||||
|
||||
done:
|
||||
pubsub_builder_free(b);
|
||||
dispatch_free(dispatcher);
|
||||
teardown_capture_of_logs();
|
||||
}
|
||||
|
||||
/* It's fine to declare the excl flag. */
|
||||
static void
|
||||
test_pubsub_build_excl_ok(void *arg)
|
||||
@ -614,7 +572,6 @@ struct testcase_t pubsub_build_tests[] = {
|
||||
T(pubsub_same, TT_FORK),
|
||||
T(pubsub_multi, TT_FORK),
|
||||
T(sub_many, TT_FORK),
|
||||
T(pubsub_redundant, TT_FORK),
|
||||
T(excl_ok, TT_FORK),
|
||||
T(excl_bad, TT_FORK),
|
||||
END_OF_TESTCASES
|
||||
|
Loading…
Reference in New Issue
Block a user