diff --git a/changes/decouple_control_events b/changes/decouple_control_events new file mode 100644 index 0000000000..67c9c11f87 --- /dev/null +++ b/changes/decouple_control_events @@ -0,0 +1,8 @@ + o Code simplification and refactoring: + - When generating an event to send to the controller, we no longer + put the event over the network immediately. Instead, we queue + these events, and use a Libevent callback to deliver them. + This change simplifies Tor's callgraph by reducing the number + of functions from which all other Tor functions are reachable. + Closes ticket 16695. + diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c index b78ab3d871..4b32fc93d2 100644 --- a/src/common/compat_pthreads.c +++ b/src/common/compat_pthreads.c @@ -275,6 +275,33 @@ tor_cond_signal_all(tor_cond_t *cond) pthread_cond_broadcast(&cond->cond); } +int +tor_threadlocal_init(tor_threadlocal_t *threadlocal) +{ + int err = pthread_key_create(&threadlocal->key, NULL); + return err ? -1 : 0; +} + +void +tor_threadlocal_destroy(tor_threadlocal_t *threadlocal) +{ + pthread_key_delete(threadlocal->key); + memset(threadlocal, 0, sizeof(tor_threadlocal_t)); +} + +void * +tor_threadlocal_get(tor_threadlocal_t *threadlocal) +{ + return pthread_getspecific(threadlocal->key); +} + +void +tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value) +{ + int err = pthread_setspecific(threadlocal->key, value); + tor_assert(err == 0); +} + /** Set up common structures for use by threading. */ void tor_threads_init(void) diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h index acf3083f37..71562ba3ef 100644 --- a/src/common/compat_threads.h +++ b/src/common/compat_threads.h @@ -111,5 +111,41 @@ typedef struct alert_sockets_s { int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags); void alert_sockets_close(alert_sockets_t *socks); +typedef struct tor_threadlocal_s { +#ifdef _WIN32 + DWORD index; +#else + pthread_key_t key; +#endif +} tor_threadlocal_t; + +/** Initialize a thread-local variable. + * + * After you call this function on a tor_threadlocal_t, you can call + * tor_threadlocal_set to change the current value of this variable for the + * current thread, and tor_threadlocal_get to retrieve the current value for + * the current thread. Each thread has its own value. + **/ +int tor_threadlocal_init(tor_threadlocal_t *threadlocal); +/** + * Release all resource associated with a thread-local variable. + */ +void tor_threadlocal_destroy(tor_threadlocal_t *threadlocal); +/** + * Return the current value of a thread-local variable for this thread. + * + * It's undefined behavior to use this function if the threadlocal hasn't + * been initialized, or has been destroyed. + */ +void *tor_threadlocal_get(tor_threadlocal_t *threadlocal); +/** + * Change the current value of a thread-local variable for this thread to + * value. + * + * It's undefined behavior to use this function if the threadlocal hasn't + * been initialized, or has been destroyed. + */ +void tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value); + #endif diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c index 465ef3ebed..9a87daa871 100644 --- a/src/common/compat_winthreads.c +++ b/src/common/compat_winthreads.c @@ -124,6 +124,49 @@ tor_cond_signal_all(tor_cond_t *cond) tor_cond_signal_impl(cond, 1); } +int +tor_threadlocal_init(tor_threadlocal_t *threadlocal) +{ + threadlocal->index = TlsAlloc(); + return (threadlocal->index == TLS_OUT_OF_INDEXES) ? -1 : 0; +} + +void +tor_threadlocal_destroy(tor_threadlocal_t *threadlocal) +{ + TlsFree(threadlocal->index); + memset(threadlocal, 0, sizeof(tor_threadlocal_t)); +} + +void * +tor_threadlocal_get(tor_threadlocal_t *threadlocal) +{ + void *value = TlsGetValue(threadlocal->index); + if (value == NULL) { + DWORD err = GetLastError(); + if (err != ERROR_SUCCESS) { + char *msg = format_win32_error(err); + log_err(LD_GENERAL, "Error retrieving thread-local value: %s", msg); + tor_free(msg); + tor_assert(err == ERROR_SUCCESS); + } + } + return value; +} + +void +tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value) +{ + BOOL ok = TlsSetValue(threadlocal->index, value); + if (!ok) { + DWORD err = GetLastError(); + char *msg = format_win32_error(err); + log_err(LD_GENERAL, "Error adjusting thread-local value: %s", msg); + tor_free(msg); + tor_assert(ok); + } +} + int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv) { diff --git a/src/or/config.c b/src/or/config.c index 618c941fb4..dabf9e0acb 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -1106,6 +1106,9 @@ options_act_reversible(const or_options_t *old_options, char **msg) init_libevent(options); libevent_initialized = 1; + /* This has to come up after libevent is initialized. */ + control_initialize_event_queue(); + /* * Initialize the scheduler - this has to come after * options_init_from_torrc() sets up libevent - why yes, that seems diff --git a/src/or/control.c b/src/or/control.c index a5073037ab..3638bd2e05 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -20,6 +20,7 @@ #include "circuitstats.h" #include "circuituse.h" #include "command.h" +#include "compat_libevent.h" #include "config.h" #include "confparse.h" #include "connection.h" @@ -50,6 +51,12 @@ #include #endif +#ifdef HAVE_EVENT2_EVENT_H +#include +#else +#include +#endif + #include "crypto_s2k.h" #include "procmon.h" @@ -110,17 +117,17 @@ static char last_sent_bootstrap_message[BOOTSTRAP_MSG_LEN]; static void connection_printf_to_buf(control_connection_t *conn, const char *format, ...) CHECK_PRINTF(2,3); -static void send_control_event_impl(uint16_t event, event_format_t which, +static void send_control_event_impl(uint16_t event, const char *format, va_list ap) - CHECK_PRINTF(3,0); + CHECK_PRINTF(2,0); static int control_event_status(int type, int severity, const char *format, va_list args) CHECK_PRINTF(3,0); static void send_control_done(control_connection_t *conn); -static void send_control_event(uint16_t event, event_format_t which, +static void send_control_event(uint16_t event, const char *format, ...) - CHECK_PRINTF(3,4); + CHECK_PRINTF(2,3); static int handle_control_setconf(control_connection_t *conn, uint32_t len, char *body); static int handle_control_resetconf(control_connection_t *conn, uint32_t len, @@ -181,6 +188,8 @@ static void orconn_target_get_name(char *buf, size_t len, static int get_cached_network_liveness(void); static void set_cached_network_liveness(int liveness); +static void flush_queued_events_cb(evutil_socket_t fd, short what, void *arg); + /** Given a control event code for a message event, return the corresponding * log severity. */ static INLINE int @@ -578,46 +587,217 @@ send_control_done(control_connection_t *conn) connection_write_str_to_buf("250 OK\r\n", conn); } -/** Send an event to all v1 controllers that are listening for code - * event. The event's body is given by msg. - * - * If which & SHORT_NAMES, the event contains short-format names: send - * it to controllers that haven't enabled the VERBOSE_NAMES feature. If - * which & LONG_NAMES, the event contains long-format names: send it - * to controllers that have enabled VERBOSE_NAMES. - * - * The EXTENDED_FORMAT and NONEXTENDED_FORMAT flags behave similarly with - * respect to the EXTENDED_EVENTS feature. */ -MOCK_IMPL(STATIC void, -send_control_event_string,(uint16_t event, event_format_t which, - const char *msg)) -{ - smartlist_t *conns = get_connection_array(); - (void)which; - tor_assert(event >= EVENT_MIN_ && event <= EVENT_MAX_); +/** Represents an event that's queued to be sent to one or more + * controllers. */ +typedef struct queued_event_s { + uint16_t event; + char *msg; +} queued_event_t; - SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { +/** Pointer to int. If this is greater than 0, we don't allow new events to be + * queued. */ +static tor_threadlocal_t block_event_queue; + +/** Holds a smartlist of queued_event_t objects that may need to be sent + * to one or more controllers */ +static smartlist_t *queued_control_events = NULL; + +/** True if the flush_queued_events_event is pending. */ +static int flush_queued_event_pending = 0; + +/** Lock to protect the above fields. */ +static tor_mutex_t *queued_control_events_lock = NULL; + +/** An event that should fire in order to flush the contents of + * queued_control_events. */ +static struct event *flush_queued_events_event = NULL; + +void +control_initialize_event_queue(void) +{ + if (queued_control_events == NULL) { + queued_control_events = smartlist_new(); + } + + if (flush_queued_events_event == NULL) { + struct event_base *b = tor_libevent_get_base(); + if (b) { + flush_queued_events_event = tor_event_new(b, + -1, 0, flush_queued_events_cb, + NULL); + tor_assert(flush_queued_events_event); + } + } + + if (queued_control_events_lock == NULL) { + queued_control_events_lock = tor_mutex_new(); + tor_threadlocal_init(&block_event_queue); + } +} + +static int * +get_block_event_queue(void) +{ + int *val = tor_threadlocal_get(&block_event_queue); + if (PREDICT_UNLIKELY(val == NULL)) { + val = tor_malloc_zero(sizeof(int)); + tor_threadlocal_set(&block_event_queue, val); + } + return val; +} + +/** Helper: inserts an event on the list of events queued to be sent to + * one or more controllers, and schedules the events to be flushed if needed. + * + * This function takes ownership of msg, and may free it. + * + * We queue these events rather than send them immediately in order to break + * the dependency in our callgraph from code that generates events for the + * controller, and the network layer at large. Otherwise, nearly every + * interesting part of Tor would potentially call every other interesting part + * of Tor. + */ +MOCK_IMPL(STATIC void, +queue_control_event_string,(uint16_t event, char *msg)) +{ + /* This is redundant with checks done elsewhere, but it's a last-ditch + * attempt to avoid queueing something we shouldn't have to queue. */ + if (PREDICT_UNLIKELY( ! EVENT_IS_INTERESTING(event) )) { + tor_free(msg); + return; + } + + int *block_event_queue = get_block_event_queue(); + if (*block_event_queue) { + tor_free(msg); + return; + } + + queued_event_t *ev = tor_malloc(sizeof(*ev)); + ev->event = event; + ev->msg = msg; + + /* No queueing an event while queueing an event */ + ++*block_event_queue; + + tor_mutex_acquire(queued_control_events_lock); + tor_assert(queued_control_events); + smartlist_add(queued_control_events, ev); + + int activate_event = 0; + if (! flush_queued_event_pending && in_main_thread()) { + activate_event = 1; + flush_queued_event_pending = 1; + } + + tor_mutex_release(queued_control_events_lock); + + --*block_event_queue; + + /* We just put an event on the queue; mark the queue to be + * flushed. We only do this from the main thread for now; otherwise, + * we'd need to incur locking overhead in Libevent or use a socket. + */ + if (activate_event) { + tor_assert(flush_queued_events_event); + event_active(flush_queued_events_event, EV_READ, 1); + } +} + +/** Release all storage held by ev. */ +static void +queued_event_free(queued_event_t *ev) +{ + if (ev == NULL) + return; + + tor_free(ev->msg); + tor_free(ev); +} + +/** Send every queued event to every controller that's interested in it, + * and remove the events from the queue. If force is true, + * then make all controllers send their data out immediately, since we + * may be about to shut down. */ +static void +queued_events_flush_all(int force) +{ + if (PREDICT_UNLIKELY(queued_control_events == NULL)) { + return; + } + smartlist_t *all_conns = get_connection_array(); + smartlist_t *controllers = smartlist_new(); + smartlist_t *queued_events; + + int *block_event_queue = get_block_event_queue(); + ++*block_event_queue; + + tor_mutex_acquire(queued_control_events_lock); + /* No queueing an event while flushing events. */ + flush_queued_event_pending = 0; + queued_events = queued_control_events; + queued_control_events = smartlist_new(); + tor_mutex_release(queued_control_events_lock); + + /* Gather all the controllers that will care... */ + SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) { if (conn->type == CONN_TYPE_CONTROL && !conn->marked_for_close && conn->state == CONTROL_CONN_STATE_OPEN) { control_connection_t *control_conn = TO_CONTROL_CONN(conn); - if (control_conn->event_mask & (((event_mask_t)1)<event; + const size_t msg_len = strlen(ev->msg); + SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *, + control_conn) { + if (control_conn->event_mask & bit) { + connection_write_to_buf(ev->msg, msg_len, TO_CONN(control_conn)); + } + } SMARTLIST_FOREACH_END(control_conn); + + queued_event_free(ev); + } SMARTLIST_FOREACH_END(ev); + + if (force) { + SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *, + control_conn) { + connection_flush(TO_CONN(control_conn)); + } SMARTLIST_FOREACH_END(control_conn); + } + + smartlist_free(queued_events); + smartlist_free(controllers); + + --*block_event_queue; +} + +/** Libevent callback: Flushes pending events to controllers that are + * interested in them */ +static void +flush_queued_events_cb(evutil_socket_t fd, short what, void *arg) +{ + (void) fd; + (void) what; + (void) arg; + queued_events_flush_all(0); +} + +/** Send an event to all v1 controllers that are listening for code + * event. The event's body is given by msg. + * + * The EXTENDED_FORMAT and NONEXTENDED_FORMAT flags behave similarly with + * respect to the EXTENDED_EVENTS feature. */ +MOCK_IMPL(STATIC void, +send_control_event_string,(uint16_t event, + const char *msg)) +{ + tor_assert(event >= EVENT_MIN_ && event <= EVENT_MAX_); + queue_control_event_string(event, tor_strdup(msg)); } /** Helper for send_control_event and control_event_status: @@ -625,8 +805,8 @@ send_control_event_string,(uint16_t event, event_format_t which, * event. The event's body is created by the printf-style format in * format, and other arguments as provided. */ static void -send_control_event_impl(uint16_t event, event_format_t which, - const char *format, va_list ap) +send_control_event_impl(uint16_t event, + const char *format, va_list ap) { char *buf = NULL; int len; @@ -637,21 +817,19 @@ send_control_event_impl(uint16_t event, event_format_t which, return; } - send_control_event_string(event, which|ALL_FORMATS, buf); - - tor_free(buf); + queue_control_event_string(event, buf); } /** Send an event to all v1 controllers that are listening for code * event. The event's body is created by the printf-style format in * format, and other arguments as provided. */ static void -send_control_event(uint16_t event, event_format_t which, +send_control_event(uint16_t event, const char *format, ...) { va_list ap; va_start(ap, format); - send_control_event_impl(event, which, format, ap); + send_control_event_impl(event, format, ap); va_end(ap); } @@ -4281,7 +4459,7 @@ control_event_circuit_status(origin_circuit_t *circ, circuit_status_event_t tp, { char *circdesc = circuit_describe_status_for_controller(circ); const char *sp = strlen(circdesc) ? " " : ""; - send_control_event(EVENT_CIRCUIT_STATUS, ALL_FORMATS, + send_control_event(EVENT_CIRCUIT_STATUS, "650 CIRC %lu %s%s%s%s\r\n", (unsigned long)circ->global_identifier, status, sp, @@ -4352,7 +4530,7 @@ control_event_circuit_status_minor(origin_circuit_t *circ, { char *circdesc = circuit_describe_status_for_controller(circ); const char *sp = strlen(circdesc) ? " " : ""; - send_control_event(EVENT_CIRCUIT_STATUS_MINOR, ALL_FORMATS, + send_control_event(EVENT_CIRCUIT_STATUS_MINOR, "650 CIRC_MINOR %lu %s%s%s%s\r\n", (unsigned long)circ->global_identifier, event_desc, sp, @@ -4527,7 +4705,7 @@ control_event_stream_status(entry_connection_t *conn, stream_status_event_t tp, circ = circuit_get_by_edge_conn(ENTRY_TO_EDGE_CONN(conn)); if (circ && CIRCUIT_IS_ORIGIN(circ)) origin_circ = TO_ORIGIN_CIRCUIT(circ); - send_control_event(EVENT_STREAM_STATUS, ALL_FORMATS, + send_control_event(EVENT_STREAM_STATUS, "650 STREAM "U64_FORMAT" %s %lu %s%s%s%s\r\n", U64_PRINTF_ARG(ENTRY_TO_CONN(conn)->global_identifier), status, @@ -4599,7 +4777,7 @@ control_event_or_conn_status(or_connection_t *conn, or_conn_status_event_t tp, } orconn_target_get_name(name, sizeof(name), conn); - send_control_event(EVENT_OR_CONN_STATUS, ALL_FORMATS, + send_control_event(EVENT_OR_CONN_STATUS, "650 ORCONN %s %s%s%s%s ID="U64_FORMAT"\r\n", name, status, reason ? " REASON=" : "", @@ -4622,7 +4800,7 @@ control_event_stream_bandwidth(edge_connection_t *edge_conn) if (!edge_conn->n_read && !edge_conn->n_written) return 0; - send_control_event(EVENT_STREAM_BANDWIDTH_USED, ALL_FORMATS, + send_control_event(EVENT_STREAM_BANDWIDTH_USED, "650 STREAM_BW "U64_FORMAT" %lu %lu\r\n", U64_PRINTF_ARG(edge_conn->base_.global_identifier), (unsigned long)edge_conn->n_read, @@ -4657,7 +4835,7 @@ control_event_stream_bandwidth_used(void) if (!edge_conn->n_read && !edge_conn->n_written) continue; - send_control_event(EVENT_STREAM_BANDWIDTH_USED, ALL_FORMATS, + send_control_event(EVENT_STREAM_BANDWIDTH_USED, "650 STREAM_BW "U64_FORMAT" %lu %lu\r\n", U64_PRINTF_ARG(edge_conn->base_.global_identifier), (unsigned long)edge_conn->n_read, @@ -4686,7 +4864,7 @@ control_event_circ_bandwidth_used(void) ocirc = TO_ORIGIN_CIRCUIT(circ); if (!ocirc->n_read_circ_bw && !ocirc->n_written_circ_bw) continue; - send_control_event(EVENT_CIRC_BANDWIDTH_USED, ALL_FORMATS, + send_control_event(EVENT_CIRC_BANDWIDTH_USED, "650 CIRC_BW ID=%d READ=%lu WRITTEN=%lu\r\n", ocirc->global_identifier, (unsigned long)ocirc->n_read_circ_bw, @@ -4722,7 +4900,7 @@ control_event_conn_bandwidth(connection_t *conn) default: return 0; } - send_control_event(EVENT_CONN_BW, ALL_FORMATS, + send_control_event(EVENT_CONN_BW, "650 CONN_BW ID="U64_FORMAT" TYPE=%s " "READ=%lu WRITTEN=%lu\r\n", U64_PRINTF_ARG(conn->global_identifier), @@ -4869,7 +5047,7 @@ control_event_circuit_cell_stats(void) continue; sum_up_cell_stats_by_command(circ, cell_stats); format_cell_stats(&event_string, circ, cell_stats); - send_control_event(EVENT_CELL_STATS, ALL_FORMATS, + send_control_event(EVENT_CELL_STATS, "650 CELL_STATS %s\r\n", event_string); tor_free(event_string); } @@ -4891,7 +5069,7 @@ control_event_tb_empty(const char *bucket, uint32_t read_empty_time, if (get_options()->TestingEnableTbEmptyEvent && EVENT_IS_INTERESTING(EVENT_TB_EMPTY) && (read_empty_time > 0 || write_empty_time > 0)) { - send_control_event(EVENT_TB_EMPTY, ALL_FORMATS, + send_control_event(EVENT_TB_EMPTY, "650 TB_EMPTY %s READ=%d WRITTEN=%d " "LAST=%d\r\n", bucket, read_empty_time, write_empty_time, @@ -4924,7 +5102,7 @@ control_event_bandwidth_used(uint32_t n_read, uint32_t n_written) ++n_measurements; if (EVENT_IS_INTERESTING(EVENT_BANDWIDTH_USED)) { - send_control_event(EVENT_BANDWIDTH_USED, ALL_FORMATS, + send_control_event(EVENT_BANDWIDTH_USED, "650 BW %lu %lu\r\n", (unsigned long)n_read, (unsigned long)n_written); @@ -5023,7 +5201,11 @@ control_event_logmsg(int severity, uint32_t domain, const char *msg) default: s = "UnknownLogSeverity"; break; } ++disable_log_messages; - send_control_event(event, ALL_FORMATS, "650 %s %s\r\n", s, b?b:msg); + send_control_event(event, "650 %s %s\r\n", s, b?b:msg); + if (severity == LOG_ERR) { + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); + } --disable_log_messages; tor_free(b); } @@ -5051,7 +5233,7 @@ control_event_descriptors_changed(smartlist_t *routers) }); ids = smartlist_join_strings(names, " ", 0, NULL); tor_asprintf(&msg, "650 NEWDESC %s\r\n", ids); - send_control_event_string(EVENT_NEW_DESC, ALL_FORMATS, msg); + send_control_event_string(EVENT_NEW_DESC, msg); tor_free(ids); tor_free(msg); SMARTLIST_FOREACH(names, char *, cp, tor_free(cp)); @@ -5073,7 +5255,7 @@ control_event_address_mapped(const char *from, const char *to, time_t expires, return 0; if (expires < 3 || expires == TIME_MAX) - send_control_event(EVENT_ADDRMAP, ALL_FORMATS, + send_control_event(EVENT_ADDRMAP, "650 ADDRMAP %s %s NEVER %s%s" "CACHED=\"%s\"\r\n", from, to, error?error:"", error?" ":"", @@ -5083,7 +5265,7 @@ control_event_address_mapped(const char *from, const char *to, time_t expires, char buf2[ISO_TIME_LEN+1]; format_local_iso_time(buf,expires); format_iso_time(buf2,expires); - send_control_event(EVENT_ADDRMAP, ALL_FORMATS, + send_control_event(EVENT_ADDRMAP, "650 ADDRMAP %s %s \"%s\"" " %s%sEXPIRES=\"%s\" CACHED=\"%s\"\r\n", from, to, buf, @@ -5125,9 +5307,9 @@ control_event_or_authdir_new_descriptor(const char *action, buf = tor_malloc(totallen); strlcpy(buf, firstline, totallen); strlcpy(buf+strlen(firstline), esc, totallen); - send_control_event_string(EVENT_AUTHDIR_NEWDESCS, ALL_FORMATS, + send_control_event_string(EVENT_AUTHDIR_NEWDESCS, buf); - send_control_event_string(EVENT_AUTHDIR_NEWDESCS, ALL_FORMATS, + send_control_event_string(EVENT_AUTHDIR_NEWDESCS, "650 OK\r\n"); tor_free(esc); tor_free(buf); @@ -5163,7 +5345,7 @@ control_event_network_liveness_update(int liveness) /* Update cached liveness */ set_cached_network_liveness(1); log_debug(LD_CONTROL, "Sending NETWORK_LIVENESS UP"); - send_control_event_string(EVENT_NETWORK_LIVENESS, ALL_FORMATS, + send_control_event_string(EVENT_NETWORK_LIVENESS, "650 NETWORK_LIVENESS UP\r\n"); } /* else was already live, no-op */ @@ -5172,7 +5354,7 @@ control_event_network_liveness_update(int liveness) /* Update cached liveness */ set_cached_network_liveness(0); log_debug(LD_CONTROL, "Sending NETWORK_LIVENESS DOWN"); - send_control_event_string(EVENT_NETWORK_LIVENESS, ALL_FORMATS, + send_control_event_string(EVENT_NETWORK_LIVENESS, "650 NETWORK_LIVENESS DOWN\r\n"); } /* else was already dead, no-op */ @@ -5211,8 +5393,8 @@ control_event_networkstatus_changed_helper(smartlist_t *statuses, SMARTLIST_FOREACH(strs, char *, cp, tor_free(cp)); smartlist_free(strs); tor_free(s); - send_control_event_string(event, ALL_FORMATS, esc); - send_control_event_string(event, ALL_FORMATS, + send_control_event_string(event, esc); + send_control_event_string(event, "650 OK\r\n"); tor_free(esc); @@ -5269,7 +5451,7 @@ control_event_buildtimeout_set(buildtimeout_set_event_t type, break; } - send_control_event(EVENT_BUILDTIMEOUT_SET, ALL_FORMATS, + send_control_event(EVENT_BUILDTIMEOUT_SET, "650 BUILDTIMEOUT_SET %s %s\r\n", type_string, args); @@ -5310,7 +5492,7 @@ control_event_signal(uintptr_t signal) return -1; } - send_control_event(EVENT_SIGNAL, ALL_FORMATS, "650 SIGNAL %s\r\n", + send_control_event(EVENT_SIGNAL, "650 SIGNAL %s\r\n", signal_string); return 0; } @@ -5338,7 +5520,7 @@ control_event_networkstatus_changed_single(const routerstatus_t *rs) int control_event_my_descriptor_changed(void) { - send_control_event(EVENT_DESCCHANGED, ALL_FORMATS, "650 DESCCHANGED\r\n"); + send_control_event(EVENT_DESCCHANGED, "650 DESCCHANGED\r\n"); return 0; } @@ -5388,24 +5570,40 @@ control_event_status(int type, int severity, const char *format, va_list args) } tor_vasprintf(&user_buf, format, args); - send_control_event(type, ALL_FORMATS, "%s %s\r\n", format_buf, user_buf); + send_control_event(type, "%s %s\r\n", format_buf, user_buf); tor_free(user_buf); return 0; } +#define CONTROL_EVENT_STATUS_BODY(event, sev) \ + int r; \ + do { \ + va_list ap; \ + if (!EVENT_IS_INTERESTING(event)) \ + return 0; \ + \ + va_start(ap, format); \ + r = control_event_status((event), (sev), format, ap); \ + va_end(ap); \ + } while (0) + /** Format and send an EVENT_STATUS_GENERAL event whose main text is obtained * by formatting the arguments using the printf-style format. */ int control_event_general_status(int severity, const char *format, ...) { - va_list ap; - int r; - if (!EVENT_IS_INTERESTING(EVENT_STATUS_GENERAL)) - return 0; + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, severity); + return r; +} - va_start(ap, format); - r = control_event_status(EVENT_STATUS_GENERAL, severity, format, ap); - va_end(ap); +/** Format and send an EVENT_STATUS_GENERAL LOG_ERR event, and flush it to the + * controller(s) immediately. */ +int +control_event_general_error(const char *format, ...) +{ + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, LOG_ERR); + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); return r; } @@ -5414,14 +5612,18 @@ control_event_general_status(int severity, const char *format, ...) int control_event_client_status(int severity, const char *format, ...) { - va_list ap; - int r; - if (!EVENT_IS_INTERESTING(EVENT_STATUS_CLIENT)) - return 0; + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, severity); + return r; +} - va_start(ap, format); - r = control_event_status(EVENT_STATUS_CLIENT, severity, format, ap); - va_end(ap); +/** Format and send an EVENT_STATUS_CLIENT LOG_ERR event, and flush it to the + * controller(s) immediately. */ +int +control_event_client_error(const char *format, ...) +{ + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, LOG_ERR); + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); return r; } @@ -5430,14 +5632,18 @@ control_event_client_status(int severity, const char *format, ...) int control_event_server_status(int severity, const char *format, ...) { - va_list ap; - int r; - if (!EVENT_IS_INTERESTING(EVENT_STATUS_SERVER)) - return 0; + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, severity); + return r; +} - va_start(ap, format); - r = control_event_status(EVENT_STATUS_SERVER, severity, format, ap); - va_end(ap); +/** Format and send an EVENT_STATUS_SERVER LOG_ERR event, and flush it to the + * controller(s) immediately. */ +int +control_event_server_error(const char *format, ...) +{ + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, LOG_ERR); + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); return r; } @@ -5461,7 +5667,7 @@ control_event_guard(const char *nickname, const char *digest, } else { tor_snprintf(buf, sizeof(buf), "$%s~%s", hbuf, nickname); } - send_control_event(EVENT_GUARD, ALL_FORMATS, + send_control_event(EVENT_GUARD, "650 GUARD ENTRY %s %s\r\n", buf, status); } return 0; @@ -5492,7 +5698,7 @@ control_event_conf_changed(const smartlist_t *elements) } } result = smartlist_join_strings(lines, "\r\n", 0, NULL); - send_control_event(EVENT_CONF_CHANGED, 0, + send_control_event(EVENT_CONF_CHANGED, "650-CONF_CHANGED\r\n%s\r\n650 OK\r\n", result); tor_free(result); SMARTLIST_FOREACH(lines, char *, cp, tor_free(cp)); @@ -5882,7 +6088,7 @@ MOCK_IMPL(void, void control_event_clients_seen(const char *controller_str) { - send_control_event(EVENT_CLIENTS_SEEN, 0, + send_control_event(EVENT_CLIENTS_SEEN, "650 CLIENTS_SEEN %s\r\n", controller_str); } @@ -5896,7 +6102,7 @@ void control_event_transport_launched(const char *mode, const char *transport_name, tor_addr_t *addr, uint16_t port) { - send_control_event(EVENT_TRANSPORT_LAUNCHED, ALL_FORMATS, + send_control_event(EVENT_TRANSPORT_LAUNCHED, "650 TRANSPORT_LAUNCHED %s %s %s %u\r\n", mode, transport_name, fmt_addr(addr), port); } @@ -5981,7 +6187,7 @@ control_event_hs_descriptor_requested(const rend_data_t *rend_query, return; } - send_control_event(EVENT_HS_DESC, ALL_FORMATS, + send_control_event(EVENT_HS_DESC, "650 HS_DESC REQUESTED %s %s %s %s\r\n", rend_hsaddress_str_or_unknown(rend_query->onion_address), rend_auth_type_to_string(rend_query->auth_type), @@ -6045,7 +6251,7 @@ control_event_hs_descriptor_upload(const char *service_id, return; } - send_control_event(EVENT_HS_DESC, ALL_FORMATS, + send_control_event(EVENT_HS_DESC, "650 HS_DESC UPLOAD %s UNKNOWN %s %s\r\n", service_id, node_describe_longname_by_id(id_digest), @@ -6092,7 +6298,7 @@ control_event_hs_descriptor_receive_end(const char *action, tor_asprintf(&reason_field, " REASON=%s", reason); } - send_control_event(EVENT_HS_DESC, ALL_FORMATS, + send_control_event(EVENT_HS_DESC, "650 HS_DESC %s %s %s %s%s%s\r\n", action, rend_hsaddress_str_or_unknown(onion_address), @@ -6130,7 +6336,7 @@ control_event_hs_descriptor_upload_end(const char *action, tor_asprintf(&reason_field, " REASON=%s", reason); } - send_control_event(EVENT_HS_DESC, ALL_FORMATS, + send_control_event(EVENT_HS_DESC, "650 HS_DESC %s UNKNOWN UNKNOWN %s%s\r\n", action, node_describe_longname_by_id(id_digest), @@ -6215,7 +6421,7 @@ control_event_hs_descriptor_content(const char *onion_address, } write_escaped_data(content, strlen(content), &esc_content); - send_control_event(EVENT_HS_DESC_CONTENT, ALL_FORMATS, + send_control_event(EVENT_HS_DESC_CONTENT, "650+%s %s %s %s\r\n%s650 OK\r\n", event_name, rend_hsaddress_str_or_unknown(onion_address), @@ -6252,6 +6458,16 @@ control_free_all(void) SMARTLIST_FOREACH(detached_onion_services, char *, cp, tor_free(cp)); smartlist_free(detached_onion_services); } + if (queued_control_events) { + SMARTLIST_FOREACH(queued_control_events, queued_event_t *, ev, + queued_event_free(ev)); + smartlist_free(queued_control_events); + queued_control_events = NULL; + } + if (flush_queued_events_event) { + tor_event_free(flush_queued_events_event); + flush_queued_events_event = NULL; + } } #ifdef TOR_UNIT_TESTS @@ -6262,4 +6478,3 @@ control_testing_set_global_event_mask(uint64_t mask) global_event_mask = mask; } #endif - diff --git a/src/or/control.h b/src/or/control.h index 2d02443834..574dd85002 100644 --- a/src/or/control.h +++ b/src/or/control.h @@ -12,6 +12,8 @@ #ifndef TOR_CONTROL_H #define TOR_CONTROL_H +void control_initialize_event_queue(void); + void control_update_global_event_mask(void); void control_adjust_event_log_severity(void); @@ -78,6 +80,14 @@ int control_event_client_status(int severity, const char *format, ...) CHECK_PRINTF(2,3); int control_event_server_status(int severity, const char *format, ...) CHECK_PRINTF(2,3); + +int control_event_general_error(const char *format, ...) + CHECK_PRINTF(1,2); +int control_event_client_error(const char *format, ...) + CHECK_PRINTF(1,2); +int control_event_server_error(const char *format, ...) + CHECK_PRINTF(1,2); + int control_event_guard(const char *nickname, const char *digest, const char *status); int control_event_conf_changed(const smartlist_t *elements); @@ -203,18 +213,13 @@ void control_free_all(void); /* Used only by control.c and test.c */ STATIC size_t write_escaped_data(const char *data, size_t len, char **out); STATIC size_t read_escaped_data(const char *data, size_t len, char **out); -/** Flag for event_format_t. Indicates that we should use the one standard - format. (Other formats previous existed, and are now deprecated) - */ -#define ALL_FORMATS 1 -/** Bit field of flags to select how to format a controller event. Recognized - * flag is ALL_FORMATS. */ -typedef int event_format_t; #ifdef TOR_UNIT_TESTS MOCK_DECL(STATIC void, -send_control_event_string,(uint16_t event, event_format_t which, - const char *msg)); + send_control_event_string,(uint16_t event, const char *msg)); + +MOCK_DECL(STATIC void, + queue_control_event_string,(uint16_t event, char *msg)); void control_testing_set_global_event_mask(uint64_t mask); #endif diff --git a/src/or/main.c b/src/or/main.c index 06fc541567..08fd29427b 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -1007,7 +1007,7 @@ directory_all_unreachable_cb(evutil_socket_t fd, short event, void *arg) connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_NET_UNREACHABLE); } - control_event_general_status(LOG_ERR, "DIR_ALL_UNREACHABLE"); + control_event_general_error("DIR_ALL_UNREACHABLE"); } static struct event *directory_all_unreachable_cb_event = NULL; diff --git a/src/test/test_controller_events.c b/src/test/test_controller_events.c index bd91aecd94..7b439d490d 100644 --- a/src/test/test_controller_events.c +++ b/src/test/test_controller_events.c @@ -395,12 +395,12 @@ test_cntev_event_mask(void *arg) { #name, test_cntev_ ## name, flags, 0, NULL } struct testcase_t controller_event_tests[] = { - TEST(bucket_note_empty, 0), - TEST(bucket_millis_empty, 0), - TEST(sum_up_cell_stats, 0), - TEST(append_cell_stats, 0), - TEST(format_cell_stats, 0), - TEST(event_mask, 0), + TEST(bucket_note_empty, TT_FORK), + TEST(bucket_millis_empty, TT_FORK), + TEST(sum_up_cell_stats, TT_FORK), + TEST(append_cell_stats, TT_FORK), + TEST(format_cell_stats, TT_FORK), + TEST(event_mask, TT_FORK), END_OF_TESTCASES }; diff --git a/src/test/test_hs.c b/src/test/test_hs.c index 6d01798a63..126e211858 100644 --- a/src/test/test_hs.c +++ b/src/test/test_hs.c @@ -102,13 +102,11 @@ static char *received_msg = NULL; /** Mock function for send_control_event_string */ static void -send_control_event_string_replacement(uint16_t event, event_format_t which, - const char *msg) +queue_control_event_string_replacement(uint16_t event, char *msg) { (void) event; - (void) which; tor_free(received_msg); - received_msg = tor_strdup(msg); + received_msg = msg; } /** Mock function for node_describe_longname_by_id, it returns either @@ -141,8 +139,8 @@ test_hs_desc_event(void *arg) char desc_id_base32[REND_DESC_ID_V2_LEN_BASE32 + 1]; (void) arg; - MOCK(send_control_event_string, - send_control_event_string_replacement); + MOCK(queue_control_event_string, + queue_control_event_string_replacement); MOCK(node_describe_longname_by_id, node_describe_longname_by_id_replacement); @@ -225,7 +223,7 @@ test_hs_desc_event(void *arg) smartlist_free(rend_query.hsdirs_fp); done: - UNMOCK(send_control_event_string); + UNMOCK(queue_control_event_string); UNMOCK(node_describe_longname_by_id); tor_free(received_msg); } diff --git a/src/test/test_pt.c b/src/test/test_pt.c index 996ef8666b..6c9aefc487 100644 --- a/src/test/test_pt.c +++ b/src/test/test_pt.c @@ -333,15 +333,13 @@ static uint16_t controlevent_event = 0; static smartlist_t *controlevent_msgs = NULL; static void -send_control_event_string_replacement(uint16_t event, event_format_t which, - const char *msg) +queue_control_event_string_replacement(uint16_t event, char *msg) { - (void) which; ++controlevent_n; controlevent_event = event; if (!controlevent_msgs) controlevent_msgs = smartlist_new(); - smartlist_add(controlevent_msgs, tor_strdup(msg)); + smartlist_add(controlevent_msgs, msg); } /* Test the configure_proxy() function. */ @@ -360,8 +358,8 @@ test_pt_configure_proxy(void *arg) tor_process_handle_destroy_replacement); MOCK(get_or_state, get_or_state_replacement); - MOCK(send_control_event_string, - send_control_event_string_replacement); + MOCK(queue_control_event_string, + queue_control_event_string_replacement); control_testing_set_global_event_mask(EVENT_TRANSPORT_LAUNCHED); @@ -435,7 +433,7 @@ test_pt_configure_proxy(void *arg) UNMOCK(tor_get_lines_from_handle); UNMOCK(tor_process_handle_destroy); UNMOCK(get_or_state); - UNMOCK(send_control_event_string); + UNMOCK(queue_control_event_string); if (controlevent_msgs) { SMARTLIST_FOREACH(controlevent_msgs, char *, cp, tor_free(cp)); smartlist_free(controlevent_msgs); diff --git a/src/test/test_threads.c b/src/test/test_threads.c index 2ac08d4d28..35f5dc8ea3 100644 --- a/src/test/test_threads.c +++ b/src/test/test_threads.c @@ -28,7 +28,7 @@ static unsigned long thread_fn_tid1, thread_fn_tid2; static void thread_test_func_(void* _s) ATTR_NORETURN; /** How many iterations have the threads in the unit test run? */ -static int t1_count = 0, t2_count = 0; +static tor_threadlocal_t count; /** Helper function for threading unit tests: This function runs in a * subthread. It grabs its own mutex (start1 or start2) to make sure that it @@ -38,19 +38,19 @@ static void thread_test_func_(void* _s) { char *s = _s; - int i, *count; + int i; tor_mutex_t *m; char buf[64]; char **cp; + int *mycount = tor_malloc_zero(sizeof(int)); + tor_threadlocal_set(&count, mycount); if (!strcmp(s, "thread 1")) { m = thread_test_start1_; cp = &thread1_name_; - count = &t1_count; thread_fn_tid1 = tor_get_thread_id(); } else { m = thread_test_start2_; cp = &thread2_name_; - count = &t2_count; thread_fn_tid2 = tor_get_thread_id(); } @@ -62,8 +62,10 @@ thread_test_func_(void* _s) for (i=0; i<10000; ++i) { tor_mutex_acquire(thread_test_mutex_); strmap_set(thread_test_strmap_, "last to run", *cp); - ++*count; tor_mutex_release(thread_test_mutex_); + int *tls_count = tor_threadlocal_get(&count); + tor_assert(tls_count == mycount); + ++*tls_count; } tor_mutex_acquire(thread_test_mutex_); strmap_set(thread_test_strmap_, s, *cp); @@ -89,6 +91,7 @@ test_threads_basic(void *arg) tv.tv_usec=100*1000; #endif (void) arg; + tt_int_op(tor_threadlocal_init(&count), OP_EQ, 0); set_main_thread(); @@ -128,7 +131,6 @@ test_threads_basic(void *arg) tor_mutex_free(thread_test_mutex_); if (timedout) { - printf("\nTimed out: %d %d", t1_count, t2_count); tt_assert(strmap_get(thread_test_strmap_, "thread 1")); tt_assert(strmap_get(thread_test_strmap_, "thread 2")); tt_assert(!timedout); diff --git a/src/test/testing_common.c b/src/test/testing_common.c index 7f387c0b3d..441024bd7d 100644 --- a/src/test/testing_common.c +++ b/src/test/testing_common.c @@ -14,6 +14,7 @@ const char tor_git_revision[] = ""; #include "orconfig.h" #include "or.h" +#include "control.h" #include "config.h" #include "rephist.h" #include "backtrace.h" @@ -237,6 +238,7 @@ main(int c, const char **v) update_approx_time(time(NULL)); options = options_new(); tor_threads_init(); + control_initialize_event_queue(); init_logging(1); configure_backtrace_handler(get_version());