Merge branch 'better_workqueue_v3_squashed'

This commit is contained in:
Nick Mathewson 2015-01-21 14:47:16 -05:00
commit 23fc1691b6
29 changed files with 2603 additions and 1157 deletions

2
.gitignore vendored
View File

@ -163,10 +163,12 @@ cscope.*
/src/test/test-bt-cl
/src/test/test-child
/src/test/test-ntor-cl
/src/test/test_workqueue
/src/test/test.exe
/src/test/test-bt-cl.exe
/src/test/test-child.exe
/src/test/test-ntor-cl.exe
/src/test/test_workqueue.exe
# /src/tools/
/src/tools/tor-checkkey

10
changes/better_workqueues Normal file
View File

@ -0,0 +1,10 @@
o Major features:
- Refactor the CPU worker implementation for better performance by
avoiding the kernel and lengthening pipelines. The original
implementation used sockets to transfer data from the main thread
to the worker threads, and didn't allow any thread to be assigned
more than a single piece of work at once. The new implementation
avoids communications overhead by making requests in shared
memory, avoiding kernel IO where possible, and keeping more
request in flight at once. Resolves issue #9682.

View File

@ -400,6 +400,10 @@ fi
AC_SEARCH_LIBS(pthread_create, [pthread])
AC_SEARCH_LIBS(pthread_detach, [pthread])
AM_CONDITIONAL(THREADS_WIN32, test "$enable_threads" = "yes" && test "$bwin32" = "true")
AM_CONDITIONAL(THREADS_PTHREADS, test "$enable_threads" = "yes" && test "$bwin32" = "false")
AM_CONDITIONAL(THREADS_NONE, test "$enable_threads" != "yes")
dnl -------------------------------------------------------------------
dnl Check for functions before libevent, since libevent-1.2 apparently
dnl exports strlcpy without defining it in a header.
@ -410,6 +414,7 @@ AC_CHECK_FUNCS(
backtrace \
backtrace_symbols_fd \
clock_gettime \
eventfd \
flock \
ftime \
getaddrinfo \
@ -424,6 +429,8 @@ AC_CHECK_FUNCS(
localtime_r \
lround \
memmem \
pipe \
pipe2 \
prctl \
rint \
sigaction \
@ -437,7 +444,7 @@ AC_CHECK_FUNCS(
sysconf \
sysctl \
uname \
usleep \
usleep \
vasprintf \
_vscprintf
)
@ -965,6 +972,7 @@ AC_CHECK_HEADERS(
netinet/in6.h \
pwd.h \
stdint.h \
sys/eventfd.h \
sys/file.h \
sys/ioctl.h \
sys/limits.h \

View File

@ -27,7 +27,6 @@
#include "compat.h"
#ifdef _WIN32
#include <process.h>
#include <windows.h>
#include <sys/locking.h>
#endif
@ -2544,109 +2543,6 @@ get_uname(void)
* Process control
*/
#if defined(USE_PTHREADS)
/** Wraps a void (*)(void*) function and its argument so we can
* invoke them in a way pthreads would expect.
*/
typedef struct tor_pthread_data_t {
void (*func)(void *);
void *data;
} tor_pthread_data_t;
/** Given a tor_pthread_data_t <b>_data</b>, call _data-&gt;func(d-&gt;data)
* and free _data. Used to make sure we can call functions the way pthread
* expects. */
static void *
tor_pthread_helper_fn(void *_data)
{
tor_pthread_data_t *data = _data;
void (*func)(void*);
void *arg;
/* mask signals to worker threads to avoid SIGPIPE, etc */
sigset_t sigs;
/* We're in a subthread; don't handle any signals here. */
sigfillset(&sigs);
pthread_sigmask(SIG_SETMASK, &sigs, NULL);
func = data->func;
arg = data->data;
tor_free(_data);
func(arg);
return NULL;
}
/**
* A pthread attribute to make threads start detached.
*/
static pthread_attr_t attr_detached;
/** True iff we've called tor_threads_init() */
static int threads_initialized = 0;
#endif
/** Minimalist interface to run a void function in the background. On
* Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
* func should not return, but rather should call spawn_exit.
*
* NOTE: if <b>data</b> is used, it should not be allocated on the stack,
* since in a multithreaded environment, there is no way to be sure that
* the caller's stack will still be around when the called function is
* running.
*/
int
spawn_func(void (*func)(void *), void *data)
{
#if defined(USE_WIN32_THREADS)
int rv;
rv = (int)_beginthread(func, 0, data);
if (rv == (int)-1)
return -1;
return 0;
#elif defined(USE_PTHREADS)
pthread_t thread;
tor_pthread_data_t *d;
if (PREDICT_UNLIKELY(!threads_initialized))
tor_threads_init();
d = tor_malloc(sizeof(tor_pthread_data_t));
d->data = data;
d->func = func;
if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d))
return -1;
return 0;
#else
pid_t pid;
pid = fork();
if (pid<0)
return -1;
if (pid==0) {
/* Child */
func(data);
tor_assert(0); /* Should never reach here. */
return 0; /* suppress "control-reaches-end-of-non-void" warning. */
} else {
/* Parent */
return 0;
}
#endif
}
/** End the current thread/process.
*/
void
spawn_exit(void)
{
#if defined(USE_WIN32_THREADS)
_endthread();
//we should never get here. my compiler thinks that _endthread returns, this
//is an attempt to fool it.
tor_assert(0);
_exit(0);
#elif defined(USE_PTHREADS)
pthread_exit(NULL);
#else
/* http://www.erlenstar.demon.co.uk/unix/faq_2.html says we should
* call _exit, not exit, from child processes. */
_exit(0);
#endif
}
/** Implementation logic for compute_num_cpus(). */
static int
compute_num_cpus_impl(void)
@ -2935,280 +2831,6 @@ tor_gmtime_r(const time_t *timep, struct tm *result)
}
#endif
#if defined(USE_WIN32_THREADS)
void
tor_mutex_init(tor_mutex_t *m)
{
InitializeCriticalSection(&m->mutex);
}
void
tor_mutex_uninit(tor_mutex_t *m)
{
DeleteCriticalSection(&m->mutex);
}
void
tor_mutex_acquire(tor_mutex_t *m)
{
tor_assert(m);
EnterCriticalSection(&m->mutex);
}
void
tor_mutex_release(tor_mutex_t *m)
{
LeaveCriticalSection(&m->mutex);
}
unsigned long
tor_get_thread_id(void)
{
return (unsigned long)GetCurrentThreadId();
}
#elif defined(USE_PTHREADS)
/** A mutex attribute that we're going to use to tell pthreads that we want
* "reentrant" mutexes (i.e., once we can re-lock if we're already holding
* them.) */
static pthread_mutexattr_t attr_reentrant;
/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set
* up with tor_mutex_init() or tor_mutex_new(); not both. */
void
tor_mutex_init(tor_mutex_t *mutex)
{
int err;
if (PREDICT_UNLIKELY(!threads_initialized))
tor_threads_init();
err = pthread_mutex_init(&mutex->mutex, &attr_reentrant);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d creating a mutex.", err);
tor_fragile_assert();
}
}
/** Wait until <b>m</b> is free, then acquire it. */
void
tor_mutex_acquire(tor_mutex_t *m)
{
int err;
tor_assert(m);
err = pthread_mutex_lock(&m->mutex);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d locking a mutex.", err);
tor_fragile_assert();
}
}
/** Release the lock <b>m</b> so another thread can have it. */
void
tor_mutex_release(tor_mutex_t *m)
{
int err;
tor_assert(m);
err = pthread_mutex_unlock(&m->mutex);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d unlocking a mutex.", err);
tor_fragile_assert();
}
}
/** Clean up the mutex <b>m</b> so that it no longer uses any system
* resources. Does not free <b>m</b>. This function must only be called on
* mutexes from tor_mutex_init(). */
void
tor_mutex_uninit(tor_mutex_t *m)
{
int err;
tor_assert(m);
err = pthread_mutex_destroy(&m->mutex);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d destroying a mutex.", err);
tor_fragile_assert();
}
}
/** Return an integer representing this thread. */
unsigned long
tor_get_thread_id(void)
{
union {
pthread_t thr;
unsigned long id;
} r;
r.thr = pthread_self();
return r.id;
}
#endif
/** Return a newly allocated, ready-for-use mutex. */
tor_mutex_t *
tor_mutex_new(void)
{
tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
tor_mutex_init(m);
return m;
}
/** Release all storage and system resources held by <b>m</b>. */
void
tor_mutex_free(tor_mutex_t *m)
{
if (!m)
return;
tor_mutex_uninit(m);
tor_free(m);
}
/* Conditions. */
#ifdef USE_PTHREADS
#if 0
/** Cross-platform condition implementation. */
struct tor_cond_t {
pthread_cond_t cond;
};
/** Return a newly allocated condition, with nobody waiting on it. */
tor_cond_t *
tor_cond_new(void)
{
tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
if (pthread_cond_init(&cond->cond, NULL)) {
tor_free(cond);
return NULL;
}
return cond;
}
/** Release all resources held by <b>cond</b>. */
void
tor_cond_free(tor_cond_t *cond)
{
if (!cond)
return;
if (pthread_cond_destroy(&cond->cond)) {
log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno));
return;
}
tor_free(cond);
}
/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
* All waiters on the condition must wait holding the same <b>mutex</b>.
* Returns 0 on success, negative on failure. */
int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
{
return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
}
/** Wake up one of the waiters on <b>cond</b>. */
void
tor_cond_signal_one(tor_cond_t *cond)
{
pthread_cond_signal(&cond->cond);
}
/** Wake up all of the waiters on <b>cond</b>. */
void
tor_cond_signal_all(tor_cond_t *cond)
{
pthread_cond_broadcast(&cond->cond);
}
#endif
/** Set up common structures for use by threading. */
void
tor_threads_init(void)
{
if (!threads_initialized) {
pthread_mutexattr_init(&attr_reentrant);
pthread_mutexattr_settype(&attr_reentrant, PTHREAD_MUTEX_RECURSIVE);
tor_assert(0==pthread_attr_init(&attr_detached));
tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1));
threads_initialized = 1;
set_main_thread();
}
}
#elif defined(USE_WIN32_THREADS)
#if 0
static DWORD cond_event_tls_index;
struct tor_cond_t {
CRITICAL_SECTION mutex;
smartlist_t *events;
};
tor_cond_t *
tor_cond_new(void)
{
tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
InitializeCriticalSection(&cond->mutex);
cond->events = smartlist_new();
return cond;
}
void
tor_cond_free(tor_cond_t *cond)
{
if (!cond)
return;
DeleteCriticalSection(&cond->mutex);
/* XXXX notify? */
smartlist_free(cond->events);
tor_free(cond);
}
int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
{
HANDLE event;
int r;
tor_assert(cond);
tor_assert(mutex);
event = TlsGetValue(cond_event_tls_index);
if (!event) {
event = CreateEvent(0, FALSE, FALSE, NULL);
TlsSetValue(cond_event_tls_index, event);
}
EnterCriticalSection(&cond->mutex);
tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT);
tor_assert(!smartlist_contains(cond->events, event));
smartlist_add(cond->events, event);
LeaveCriticalSection(&cond->mutex);
tor_mutex_release(mutex);
r = WaitForSingleObject(event, INFINITE);
tor_mutex_acquire(mutex);
switch (r) {
case WAIT_OBJECT_0: /* we got the mutex normally. */
break;
case WAIT_ABANDONED: /* holding thread exited. */
case WAIT_TIMEOUT: /* Should never happen. */
tor_assert(0);
break;
case WAIT_FAILED:
log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError());
}
return 0;
}
void
tor_cond_signal_one(tor_cond_t *cond)
{
HANDLE event;
tor_assert(cond);
EnterCriticalSection(&cond->mutex);
if ((event = smartlist_pop_last(cond->events)))
SetEvent(event);
LeaveCriticalSection(&cond->mutex);
}
void
tor_cond_signal_all(tor_cond_t *cond)
{
tor_assert(cond);
EnterCriticalSection(&cond->mutex);
SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event));
smartlist_clear(cond->events);
LeaveCriticalSection(&cond->mutex);
}
#endif
void
tor_threads_init(void)
{
#if 0
cond_event_tls_index = TlsAlloc();
#endif
set_main_thread();
}
#endif
#if defined(HAVE_MLOCKALL) && HAVE_DECL_MLOCKALL && defined(RLIMIT_MEMLOCK)
/** Attempt to raise the current and max rlimit to infinity for our process.
* This only needs to be done once and can probably only be done when we have
@ -3292,23 +2914,6 @@ tor_mlockall(void)
#endif
}
/** Identity of the "main" thread */
static unsigned long main_thread_id = -1;
/** Start considering the current thread to be the 'main thread'. This has
* no effect on anything besides in_main_thread(). */
void
set_main_thread(void)
{
main_thread_id = tor_get_thread_id();
}
/** Return true iff called from the main thread. */
int
in_main_thread(void)
{
return main_thread_id == tor_get_thread_id();
}
/**
* On Windows, WSAEWOULDBLOCK is not always correct: when you see it,
* you need to ask the socket for its actual errno. Also, you need to

View File

@ -36,9 +36,6 @@
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#if defined(HAVE_PTHREAD_H) && !defined(_WIN32)
#include <pthread.h>
#endif
#include <stdarg.h>
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
@ -642,61 +639,10 @@ char **get_environment(void);
int get_total_system_memory(size_t *mem_out);
int spawn_func(void (*func)(void *), void *data);
void spawn_exit(void) ATTR_NORETURN;
#if defined(_WIN32)
#define USE_WIN32_THREADS
#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE)
#define USE_PTHREADS
#else
#error "No threading system was found"
#endif
int compute_num_cpus(void);
/* Because we use threads instead of processes on most platforms (Windows,
* Linux, etc), we need locking for them. On platforms with poor thread
* support or broken gethostbyname_r, these functions are no-ops. */
/** A generic lock structure for multithreaded builds. */
typedef struct tor_mutex_t {
#if defined(USE_WIN32_THREADS)
/** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */
CRITICAL_SECTION mutex;
#elif defined(USE_PTHREADS)
/** Pthreads-only: with pthreads, we implement locks with
* pthread_mutex_t. */
pthread_mutex_t mutex;
#else
/** No-threads only: Dummy variable so that tor_mutex_t takes up space. */
int _unused;
#endif
} tor_mutex_t;
int tor_mlockall(void);
tor_mutex_t *tor_mutex_new(void);
void tor_mutex_init(tor_mutex_t *m);
void tor_mutex_acquire(tor_mutex_t *m);
void tor_mutex_release(tor_mutex_t *m);
void tor_mutex_free(tor_mutex_t *m);
void tor_mutex_uninit(tor_mutex_t *m);
unsigned long tor_get_thread_id(void);
void tor_threads_init(void);
void set_main_thread(void);
int in_main_thread(void);
#if 0
typedef struct tor_cond_t tor_cond_t;
tor_cond_t *tor_cond_new(void);
void tor_cond_free(tor_cond_t *cond);
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex);
void tor_cond_signal_one(tor_cond_t *cond);
void tor_cond_signal_all(tor_cond_t *cond);
#endif
/** Macros for MIN/MAX. Never use these when the arguments could have
* side-effects.
* {With GCC extensions we could probably define a safer MIN/MAX. But
@ -742,5 +688,8 @@ STATIC int tor_ersatz_socketpair(int family, int type, int protocol,
#endif
#endif
/* This needs some of the declarations above so we include it here. */
#include "compat_threads.h"
#endif

View File

@ -0,0 +1,285 @@
/* Copyright (c) 2003-2004, Roger Dingledine
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#include "orconfig.h"
#include <pthread.h>
#include <signal.h>
#include <time.h>
#include "compat.h"
#include "torlog.h"
#include "util.h"
/** Wraps a void (*)(void*) function and its argument so we can
* invoke them in a way pthreads would expect.
*/
typedef struct tor_pthread_data_t {
void (*func)(void *);
void *data;
} tor_pthread_data_t;
/** Given a tor_pthread_data_t <b>_data</b>, call _data-&gt;func(d-&gt;data)
* and free _data. Used to make sure we can call functions the way pthread
* expects. */
static void *
tor_pthread_helper_fn(void *_data)
{
tor_pthread_data_t *data = _data;
void (*func)(void*);
void *arg;
/* mask signals to worker threads to avoid SIGPIPE, etc */
sigset_t sigs;
/* We're in a subthread; don't handle any signals here. */
sigfillset(&sigs);
pthread_sigmask(SIG_SETMASK, &sigs, NULL);
func = data->func;
arg = data->data;
tor_free(_data);
func(arg);
return NULL;
}
/**
* A pthread attribute to make threads start detached.
*/
static pthread_attr_t attr_detached;
/** True iff we've called tor_threads_init() */
static int threads_initialized = 0;
/** Minimalist interface to run a void function in the background. On
* Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
* func should not return, but rather should call spawn_exit.
*
* NOTE: if <b>data</b> is used, it should not be allocated on the stack,
* since in a multithreaded environment, there is no way to be sure that
* the caller's stack will still be around when the called function is
* running.
*/
int
spawn_func(void (*func)(void *), void *data)
{
pthread_t thread;
tor_pthread_data_t *d;
if (PREDICT_UNLIKELY(!threads_initialized))
tor_threads_init();
d = tor_malloc(sizeof(tor_pthread_data_t));
d->data = data;
d->func = func;
if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d))
return -1;
return 0;
}
/** End the current thread/process.
*/
void
spawn_exit(void)
{
pthread_exit(NULL);
}
/** A mutex attribute that we're going to use to tell pthreads that we want
* "recursive" mutexes (i.e., once we can re-lock if we're already holding
* them.) */
static pthread_mutexattr_t attr_recursive;
/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set
* up with tor_mutex_init() or tor_mutex_new(); not both. */
void
tor_mutex_init(tor_mutex_t *mutex)
{
int err;
if (PREDICT_UNLIKELY(!threads_initialized))
tor_threads_init();
err = pthread_mutex_init(&mutex->mutex, &attr_recursive);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d creating a mutex.", err);
tor_fragile_assert();
}
}
/** As tor_mutex_init, but initialize a mutex suitable that may be
* non-recursive, if the OS supports that. */
void
tor_mutex_init_nonrecursive(tor_mutex_t *mutex)
{
int err;
if (PREDICT_UNLIKELY(!threads_initialized))
tor_threads_init();
err = pthread_mutex_init(&mutex->mutex, NULL);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d creating a mutex.", err);
tor_fragile_assert();
}
}
/** Wait until <b>m</b> is free, then acquire it. */
void
tor_mutex_acquire(tor_mutex_t *m)
{
int err;
tor_assert(m);
err = pthread_mutex_lock(&m->mutex);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d locking a mutex.", err);
tor_fragile_assert();
}
}
/** Release the lock <b>m</b> so another thread can have it. */
void
tor_mutex_release(tor_mutex_t *m)
{
int err;
tor_assert(m);
err = pthread_mutex_unlock(&m->mutex);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d unlocking a mutex.", err);
tor_fragile_assert();
}
}
/** Clean up the mutex <b>m</b> so that it no longer uses any system
* resources. Does not free <b>m</b>. This function must only be called on
* mutexes from tor_mutex_init(). */
void
tor_mutex_uninit(tor_mutex_t *m)
{
int err;
tor_assert(m);
err = pthread_mutex_destroy(&m->mutex);
if (PREDICT_UNLIKELY(err)) {
log_err(LD_GENERAL, "Error %d destroying a mutex.", err);
tor_fragile_assert();
}
}
/** Return an integer representing this thread. */
unsigned long
tor_get_thread_id(void)
{
union {
pthread_t thr;
unsigned long id;
} r;
r.thr = pthread_self();
return r.id;
}
/* Conditions. */
/** Initialize an already-allocated condition variable. */
int
tor_cond_init(tor_cond_t *cond)
{
pthread_condattr_t condattr;
memset(cond, 0, sizeof(tor_cond_t));
/* Default condition attribute. Might be used if clock monotonic is
* available else this won't affect anything. */
if (pthread_condattr_init(&condattr)) {
return -1;
}
#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
/* Use monotonic time so when we timedwait() on it, any clock adjustment
* won't affect the timeout value. */
if (pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC)) {
return -1;
}
#endif
if (pthread_cond_init(&cond->cond, &condattr)) {
return -1;
}
return 0;
}
/** Release all resources held by <b>cond</b>, but do not free <b>cond</b>
* itself. */
void
tor_cond_uninit(tor_cond_t *cond)
{
if (pthread_cond_destroy(&cond->cond)) {
log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno));
return;
}
}
/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
* (If <b>tv</b> is set, and that amount of time passes with no signal to
* <b>cond</b>, return anyway. All waiters on the condition must wait holding
* the same <b>mutex</b>. All signallers should hold that mutex. The mutex
* needs to have been allocated with tor_mutex_init_for_cond().
*
* Returns 0 on success, -1 on failure, 1 on timeout. */
int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
{
int r;
if (tv == NULL) {
while (1) {
r = pthread_cond_wait(&cond->cond, &mutex->mutex);
if (r == EINTR) {
/* EINTR should be impossible according to POSIX, but POSIX, like the
* Pirate's Code, is apparently treated "more like what you'd call
* guidelines than actual rules." */
continue;
}
return r ? -1 : 0;
}
} else {
struct timeval tvnow, tvsum;
struct timespec ts;
while (1) {
#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) {
return -1;
}
tvnow.tv_sec = ts.tv_sec;
tvnow.tv_usec = ts.tv_nsec / 1000;
timeradd(tv, &tvnow, &tvsum);
#else
if (gettimeofday(&tvnow, NULL) < 0)
return -1;
timeradd(tv, &tvnow, &tvsum);
#endif /* HAVE_CLOCK_GETTIME, CLOCK_MONOTONIC */
ts.tv_sec = tvsum.tv_sec;
ts.tv_nsec = tvsum.tv_usec * 1000;
r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts);
if (r == 0)
return 0;
else if (r == ETIMEDOUT)
return 1;
else if (r == EINTR)
continue;
else
return -1;
}
}
}
/** Wake up one of the waiters on <b>cond</b>. */
void
tor_cond_signal_one(tor_cond_t *cond)
{
pthread_cond_signal(&cond->cond);
}
/** Wake up all of the waiters on <b>cond</b>. */
void
tor_cond_signal_all(tor_cond_t *cond)
{
pthread_cond_broadcast(&cond->cond);
}
/** Set up common structures for use by threading. */
void
tor_threads_init(void)
{
if (!threads_initialized) {
pthread_mutexattr_init(&attr_recursive);
pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE);
tor_assert(0==pthread_attr_init(&attr_detached));
tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1));
threads_initialized = 1;
set_main_thread();
}
}

303
src/common/compat_threads.c Normal file
View File

@ -0,0 +1,303 @@
/* Copyright (c) 2003-2004, Roger Dingledine
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#define _GNU_SOURCE
#include "orconfig.h"
#define _GNU_SOURCE
#include <stdlib.h>
#include "compat.h"
#include "compat_threads.h"
#include "util.h"
#include "torlog.h"
#ifdef HAVE_SYS_EVENTFD_H
#include <sys/eventfd.h>
#endif
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
/** Return a newly allocated, ready-for-use mutex. */
tor_mutex_t *
tor_mutex_new(void)
{
tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
tor_mutex_init(m);
return m;
}
/** Return a newly allocated, ready-for-use mutex. This one might be
* non-recursive, if that's faster. */
tor_mutex_t *
tor_mutex_new_nonrecursive(void)
{
tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
tor_mutex_init_nonrecursive(m);
return m;
}
/** Release all storage and system resources held by <b>m</b>. */
void
tor_mutex_free(tor_mutex_t *m)
{
if (!m)
return;
tor_mutex_uninit(m);
tor_free(m);
}
/** Allocate and return a new condition variable. */
tor_cond_t *
tor_cond_new(void)
{
tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
if (tor_cond_init(cond)<0)
tor_free(cond);
return cond;
}
/** Free all storage held in <b>c</b>. */
void
tor_cond_free(tor_cond_t *c)
{
if (!c)
return;
tor_cond_uninit(c);
tor_free(c);
}
/** Identity of the "main" thread */
static unsigned long main_thread_id = -1;
/** Start considering the current thread to be the 'main thread'. This has
* no effect on anything besides in_main_thread(). */
void
set_main_thread(void)
{
main_thread_id = tor_get_thread_id();
}
/** Return true iff called from the main thread. */
int
in_main_thread(void)
{
return main_thread_id == tor_get_thread_id();
}
#if defined(HAVE_EVENTFD) || defined(HAVE_PIPE)
/* non-interruptable versions */
static int
write_ni(int fd, const void *buf, size_t n)
{
int r;
again:
r = write(fd, buf, n);
if (r < 0 && errno == EINTR)
goto again;
return r;
}
static int
read_ni(int fd, void *buf, size_t n)
{
int r;
again:
r = read(fd, buf, n);
if (r < 0 && errno == EINTR)
goto again;
return r;
}
#endif
/* non-interruptable versions */
static int
send_ni(int fd, const void *buf, size_t n, int flags)
{
int r;
again:
r = send(fd, buf, n, flags);
if (r < 0 && errno == EINTR)
goto again;
return r;
}
static int
recv_ni(int fd, void *buf, size_t n, int flags)
{
int r;
again:
r = recv(fd, buf, n, flags);
if (r < 0 && errno == EINTR)
goto again;
return r;
}
#ifdef HAVE_EVENTFD
static int
eventfd_alert(int fd)
{
uint64_t u = 1;
int r = write_ni(fd, (void*)&u, sizeof(u));
if (r < 0 && errno != EAGAIN)
return -1;
return 0;
}
static int
eventfd_drain(int fd)
{
uint64_t u = 0;
int r = read_ni(fd, (void*)&u, sizeof(u));
if (r < 0 && errno != EAGAIN)
return -1;
return 0;
}
#endif
#ifdef HAVE_PIPE
static int
pipe_alert(int fd)
{
ssize_t r = write(fd, "x", 1);
if (r < 0 && errno != EAGAIN)
return -1;
return 0;
}
static int
pipe_drain(int fd)
{
char buf[32];
ssize_t r;
while ((r = read(fd, buf, sizeof(buf))) >= 0)
;
if (r == 0 || errno != EAGAIN)
return -1;
return 0;
}
#endif
static int
sock_alert(tor_socket_t fd)
{
ssize_t r = send_ni(fd, "x", 1, 0);
if (r < 0 && !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
return -1;
return 0;
}
static int
sock_drain(tor_socket_t fd)
{
char buf[32];
ssize_t r;
while ((r = recv_ni(fd, buf, sizeof(buf), 0)) >= 0)
;
if (r == 0 || !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
return -1;
return 0;
}
/** Allocate a new set of alert sockets, and set the appropriate function
* pointers, in <b>socks_out</b>. */
int
alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
{
tor_socket_t socks[2] = { TOR_INVALID_SOCKET, TOR_INVALID_SOCKET };
#ifdef HAVE_EVENTFD
/* First, we try the Linux eventfd() syscall. This gives a 64-bit counter
* associated with a single file descriptor. */
#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)
if (!(flags & ASOCKS_NOEVENTFD2))
socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
#endif
if (socks[0] < 0 && !(flags & ASOCKS_NOEVENTFD)) {
socks[0] = eventfd(0,0);
if (socks[0] >= 0) {
if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
set_socket_nonblocking(socks[0]) < 0) {
close(socks[0]);
return -1;
}
}
}
if (socks[0] >= 0) {
socks_out->read_fd = socks_out->write_fd = socks[0];
socks_out->alert_fn = eventfd_alert;
socks_out->drain_fn = eventfd_drain;
return 0;
}
#endif
#ifdef HAVE_PIPE2
/* Now we're going to try pipes. First type the pipe2() syscall, if we
* have it, so we can save some calls... */
if (!(flags & ASOCKS_NOPIPE2) &&
pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) {
socks_out->read_fd = socks[0];
socks_out->write_fd = socks[1];
socks_out->alert_fn = pipe_alert;
socks_out->drain_fn = pipe_drain;
return 0;
}
#endif
#ifdef HAVE_PIPE
/* Now try the regular pipe() syscall. Pipes have a bit lower overhead than
* socketpairs, fwict. */
if (!(flags & ASOCKS_NOPIPE) &&
pipe(socks) == 0) {
if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 ||
set_socket_nonblocking(socks[0]) < 0 ||
set_socket_nonblocking(socks[1]) < 0) {
close(socks[0]);
close(socks[1]);
return -1;
}
socks_out->read_fd = socks[0];
socks_out->write_fd = socks[1];
socks_out->alert_fn = pipe_alert;
socks_out->drain_fn = pipe_drain;
return 0;
}
#endif
/* If nothing else worked, fall back on socketpair(). */
if (!(flags & ASOCKS_NOSOCKETPAIR) &&
tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) {
if (set_socket_nonblocking(socks[0]) < 0 ||
set_socket_nonblocking(socks[1])) {
tor_close_socket(socks[0]);
tor_close_socket(socks[1]);
return -1;
}
socks_out->read_fd = socks[0];
socks_out->write_fd = socks[1];
socks_out->alert_fn = sock_alert;
socks_out->drain_fn = sock_drain;
return 0;
}
return -1;
}
/** Close the sockets in <b>socks</b>. */
void
alert_sockets_close(alert_sockets_t *socks)
{
if (socks->alert_fn == sock_alert) {
/* they are sockets. */
tor_close_socket(socks->read_fd);
tor_close_socket(socks->write_fd);
} else {
close(socks->read_fd);
if (socks->write_fd != socks->read_fd)
close(socks->write_fd);
}
socks->read_fd = socks->write_fd = -1;
}

115
src/common/compat_threads.h Normal file
View File

@ -0,0 +1,115 @@
/* Copyright (c) 2003-2004, Roger Dingledine
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#ifndef TOR_COMPAT_THREADS_H
#define TOR_COMPAT_THREADS_H
#include "orconfig.h"
#include "torint.h"
#include "testsupport.h"
#if defined(HAVE_PTHREAD_H) && !defined(_WIN32)
#include <pthread.h>
#endif
#if defined(_WIN32)
#define USE_WIN32_THREADS
#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE)
#define USE_PTHREADS
#else
#error "No threading system was found"
#endif
int spawn_func(void (*func)(void *), void *data);
void spawn_exit(void) ATTR_NORETURN;
/* Because we use threads instead of processes on most platforms (Windows,
* Linux, etc), we need locking for them. On platforms with poor thread
* support or broken gethostbyname_r, these functions are no-ops. */
/** A generic lock structure for multithreaded builds. */
typedef struct tor_mutex_t {
#if defined(USE_WIN32_THREADS)
/** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */
CRITICAL_SECTION mutex;
#elif defined(USE_PTHREADS)
/** Pthreads-only: with pthreads, we implement locks with
* pthread_mutex_t. */
pthread_mutex_t mutex;
#else
/** No-threads only: Dummy variable so that tor_mutex_t takes up space. */
int _unused;
#endif
} tor_mutex_t;
tor_mutex_t *tor_mutex_new(void);
tor_mutex_t *tor_mutex_new_nonrecursive(void);
void tor_mutex_init(tor_mutex_t *m);
void tor_mutex_init_nonrecursive(tor_mutex_t *m);
void tor_mutex_acquire(tor_mutex_t *m);
void tor_mutex_release(tor_mutex_t *m);
void tor_mutex_free(tor_mutex_t *m);
void tor_mutex_uninit(tor_mutex_t *m);
unsigned long tor_get_thread_id(void);
void tor_threads_init(void);
/** Conditions need nonrecursive mutexes with pthreads. */
#define tor_mutex_init_for_cond(m) tor_mutex_init_nonrecursive(m)
void set_main_thread(void);
int in_main_thread(void);
typedef struct tor_cond_t {
#ifdef USE_PTHREADS
pthread_cond_t cond;
#elif defined(USE_WIN32_THREADS)
HANDLE event;
CRITICAL_SECTION lock;
int n_waiting;
int n_to_wake;
int generation;
#else
#error no known condition implementation.
#endif
} tor_cond_t;
tor_cond_t *tor_cond_new(void);
void tor_cond_free(tor_cond_t *cond);
int tor_cond_init(tor_cond_t *cond);
void tor_cond_uninit(tor_cond_t *cond);
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex,
const struct timeval *tv);
void tor_cond_signal_one(tor_cond_t *cond);
void tor_cond_signal_all(tor_cond_t *cond);
/** Helper type used to manage waking up the main thread while it's in
* the libevent main loop. Used by the work queue code. */
typedef struct alert_sockets_s {
/* XXXX This structure needs a better name. */
/** Socket that the main thread should listen for EV_READ events on.
* Note that this socket may be a regular fd on a non-Windows platform.
*/
tor_socket_t read_fd;
/** Socket to use when alerting the main thread. */
tor_socket_t write_fd;
/** Function to alert the main thread */
int (*alert_fn)(tor_socket_t write_fd);
/** Function to make the main thread no longer alerted. */
int (*drain_fn)(tor_socket_t read_fd);
} alert_sockets_t;
/* Flags to disable one or more alert_sockets backends. */
#define ASOCKS_NOEVENTFD2 (1u<<0)
#define ASOCKS_NOEVENTFD (1u<<1)
#define ASOCKS_NOPIPE2 (1u<<2)
#define ASOCKS_NOPIPE (1u<<3)
#define ASOCKS_NOSOCKETPAIR (1u<<4)
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags);
void alert_sockets_close(alert_sockets_t *socks);
#endif

View File

@ -0,0 +1,196 @@
/* Copyright (c) 2003-2004, Roger Dingledine
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#include "compat.h"
#include <windows.h>
#include <process.h>
#include "util.h"
#include "container.h"
#include "torlog.h"
#include <process.h>
/* This value is more or less total cargo-cult */
#define SPIN_COUNT 2000
/** Minimalist interface to run a void function in the background. On
* Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
* func should not return, but rather should call spawn_exit.
*
* NOTE: if <b>data</b> is used, it should not be allocated on the stack,
* since in a multithreaded environment, there is no way to be sure that
* the caller's stack will still be around when the called function is
* running.
*/
int
spawn_func(void (*func)(void *), void *data)
{
int rv;
rv = (int)_beginthread(func, 0, data);
if (rv == (int)-1)
return -1;
return 0;
}
/** End the current thread/process.
*/
void
spawn_exit(void)
{
_endthread();
//we should never get here. my compiler thinks that _endthread returns, this
//is an attempt to fool it.
tor_assert(0);
_exit(0);
}
void
tor_mutex_init(tor_mutex_t *m)
{
InitializeCriticalSection(&m->mutex);
}
void
tor_mutex_init_nonrecursive(tor_mutex_t *m)
{
InitializeCriticalSection(&m->mutex);
}
void
tor_mutex_uninit(tor_mutex_t *m)
{
DeleteCriticalSection(&m->mutex);
}
void
tor_mutex_acquire(tor_mutex_t *m)
{
tor_assert(m);
EnterCriticalSection(&m->mutex);
}
void
tor_mutex_release(tor_mutex_t *m)
{
LeaveCriticalSection(&m->mutex);
}
unsigned long
tor_get_thread_id(void)
{
return (unsigned long)GetCurrentThreadId();
}
int
tor_cond_init(tor_cond_t *cond)
{
memset(cond, 0, sizeof(tor_cond_t));
if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) {
return -1;
}
if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) {
DeleteCriticalSection(&cond->lock);
return -1;
}
cond->n_waiting = cond->n_to_wake = cond->generation = 0;
return 0;
}
void
tor_cond_uninit(tor_cond_t *cond)
{
DeleteCriticalSection(&cond->lock);
CloseHandle(cond->event);
}
static void
tor_cond_signal_impl(tor_cond_t *cond, int broadcast)
{
EnterCriticalSection(&cond->lock);
if (broadcast)
cond->n_to_wake = cond->n_waiting;
else
++cond->n_to_wake;
cond->generation++;
SetEvent(cond->event);
LeaveCriticalSection(&cond->lock);
}
void
tor_cond_signal_one(tor_cond_t *cond)
{
tor_cond_signal_impl(cond, 0);
}
void
tor_cond_signal_all(tor_cond_t *cond)
{
tor_cond_signal_impl(cond, 1);
}
int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv)
{
CRITICAL_SECTION *lock = &lock_->mutex;
int generation_at_start;
int waiting = 1;
int result = -1;
DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime;
if (tv)
ms_orig = ms = tv->tv_sec*1000 + (tv->tv_usec+999)/1000;
EnterCriticalSection(&cond->lock);
++cond->n_waiting;
generation_at_start = cond->generation;
LeaveCriticalSection(&cond->lock);
LeaveCriticalSection(lock);
startTime = GetTickCount();
do {
DWORD res;
res = WaitForSingleObject(cond->event, ms);
EnterCriticalSection(&cond->lock);
if (cond->n_to_wake &&
cond->generation != generation_at_start) {
--cond->n_to_wake;
--cond->n_waiting;
result = 0;
waiting = 0;
goto out;
} else if (res != WAIT_OBJECT_0) {
result = (res==WAIT_TIMEOUT) ? 1 : -1;
--cond->n_waiting;
waiting = 0;
goto out;
} else if (ms != INFINITE) {
endTime = GetTickCount();
if (startTime + ms_orig <= endTime) {
result = 1; /* Timeout */
--cond->n_waiting;
waiting = 0;
goto out;
} else {
ms = startTime + ms_orig - endTime;
}
}
/* If we make it here, we are still waiting. */
if (cond->n_to_wake == 0) {
/* There is nobody else who should wake up; reset
* the event. */
ResetEvent(cond->event);
}
out:
LeaveCriticalSection(&cond->lock);
} while (waiting);
EnterCriticalSection(lock);
EnterCriticalSection(&cond->lock);
if (!cond->n_waiting)
ResetEvent(cond->event);
LeaveCriticalSection(&cond->lock);
return result;
}
void
tor_threads_init(void)
{
set_main_thread();
}

View File

@ -54,10 +54,18 @@ endif
LIBDONNA += $(LIBED25519_REF10)
if THREADS_PTHREADS
threads_impl_source=src/common/compat_pthreads.c
endif
if THREADS_WIN32
threads_impl_source=src/common/compat_winthreads.c
endif
LIBOR_A_SOURCES = \
src/common/address.c \
src/common/backtrace.c \
src/common/compat.c \
src/common/compat_threads.c \
src/common/container.c \
src/common/di_ops.c \
src/common/log.c \
@ -66,10 +74,12 @@ LIBOR_A_SOURCES = \
src/common/util_codedigest.c \
src/common/util_process.c \
src/common/sandbox.c \
src/common/workqueue.c \
src/ext/csiphash.c \
src/ext/trunnel/trunnel.c \
$(libor_extra_source) \
$(libor_mempool_source)
$(libor_mempool_source) \
$(threads_impl_source)
LIBOR_CRYPTO_A_SOURCES = \
src/common/aes.c \
@ -102,7 +112,6 @@ src_common_libor_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
src_common_libor_crypto_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
src_common_libor_event_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
COMMONHEADERS = \
src/common/address.h \
src/common/backtrace.h \
@ -110,6 +119,7 @@ COMMONHEADERS = \
src/common/ciphers.inc \
src/common/compat.h \
src/common/compat_libevent.h \
src/common/compat_threads.h \
src/common/container.h \
src/common/crypto.h \
src/common/crypto_curve25519.h \
@ -128,6 +138,7 @@ COMMONHEADERS = \
src/common/tortls.h \
src/common/util.h \
src/common/util_process.h \
src/common/workqueue.h \
$(libor_mempool_header)
noinst_HEADERS+= $(COMMONHEADERS)

490
src/common/workqueue.c Normal file
View File

@ -0,0 +1,490 @@
/* copyright (c) 2013-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#include "orconfig.h"
#include "compat.h"
#include "compat_threads.h"
#include "util.h"
#include "workqueue.h"
#include "tor_queue.h"
#include "torlog.h"
struct threadpool_s {
/** An array of pointers to workerthread_t: one for each running worker
* thread. */
struct workerthread_s **threads;
/** Condition variable that we wait on when we have no work, and which
* gets signaled when our queue becomes nonempty. */
tor_cond_t condition;
/** Queue of pending work that we have to do. */
TOR_TAILQ_HEAD(, workqueue_entry_s) work;
/** The current 'update generation' of the threadpool. Any thread that is
* at an earlier generation needs to run the update function. */
unsigned generation;
/** Function that should be run for updates on each thread. */
int (*update_fn)(void *, void *);
/** Function to free update arguments if they can't be run. */
void (*free_update_arg_fn)(void *);
/** Array of n_threads update arguments. */
void **update_args;
/** Number of elements in threads. */
int n_threads;
/** Mutex to protect all the above fields. */
tor_mutex_t lock;
/** A reply queue to use when constructing new threads. */
replyqueue_t *reply_queue;
/** Functions used to allocate and free thread state. */
void *(*new_thread_state_fn)(void*);
void (*free_thread_state_fn)(void*);
void *new_thread_state_arg;
};
struct workqueue_entry_s {
/** The next workqueue_entry_t that's pending on the same thread or
* reply queue. */
TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
/** The threadpool to which this workqueue_entry_t was assigned. This field
* is set when the workqueue_entry_t is created, and won't be cleared until
* after it's handled in the main thread. */
struct threadpool_s *on_pool;
/** True iff this entry is waiting for a worker to start processing it. */
uint8_t pending;
/** Function to run in the worker thread. */
int (*fn)(void *state, void *arg);
/** Function to run while processing the reply queue. */
void (*reply_fn)(void *arg);
/** Argument for the above functions. */
void *arg;
};
struct replyqueue_s {
/** Mutex to protect the answers field */
tor_mutex_t lock;
/** Doubly-linked list of answers that the reply queue needs to handle. */
TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
/** Mechanism to wake up the main thread when it is receiving answers. */
alert_sockets_t alert;
};
/** A worker thread represents a single thread in a thread pool. To avoid
* contention, each gets its own queue. This breaks the guarantee that that
* queued work will get executed strictly in order. */
typedef struct workerthread_s {
/** Which thread it this? In range 0..in_pool->n_threads-1 */
int index;
/** The pool this thread is a part of. */
struct threadpool_s *in_pool;
/** User-supplied state field that we pass to the worker functions of each
* work item. */
void *state;
/** Reply queue to which we pass our results. */
replyqueue_t *reply_queue;
/** The current update generation of this thread */
unsigned generation;
} workerthread_t;
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
/** Allocate and return a new workqueue_entry_t, set up to run the function
* <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
* thread. See threadpool_queue_work() for full documentation. */
static workqueue_entry_t *
workqueue_entry_new(int (*fn)(void*, void*),
void (*reply_fn)(void*),
void *arg)
{
workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
ent->fn = fn;
ent->reply_fn = reply_fn;
ent->arg = arg;
return ent;
}
/**
* Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
* any queue.
*/
static void
workqueue_entry_free(workqueue_entry_t *ent)
{
if (!ent)
return;
memset(ent, 0xf0, sizeof(*ent));
tor_free(ent);
}
/**
* Cancel a workqueue_entry_t that has been returned from
* threadpool_queue_work.
*
* You must not call this function on any work whose reply function has been
* executed in the main thread; that will cause undefined behavior (probably,
* a crash).
*
* If the work is cancelled, this function return the argument passed to the
* work function. It is the caller's responsibility to free this storage.
*
* This function will have no effect if the worker thread has already executed
* or begun to execute the work item. In that case, it will return NULL.
*/
void *
workqueue_entry_cancel(workqueue_entry_t *ent)
{
int cancelled = 0;
void *result = NULL;
tor_mutex_acquire(&ent->on_pool->lock);
if (ent->pending) {
TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work);
cancelled = 1;
result = ent->arg;
}
tor_mutex_release(&ent->on_pool->lock);
if (cancelled) {
tor_free(ent);
}
return result;
}
/**DOCDOC
must hold lock */
static int
worker_thread_has_work(workerthread_t *thread)
{
return !TOR_TAILQ_EMPTY(&thread->in_pool->work) ||
thread->generation != thread->in_pool->generation;
}
/**
* Main function for the worker thread.
*/
static void
worker_thread_main(void *thread_)
{
workerthread_t *thread = thread_;
threadpool_t *pool = thread->in_pool;
workqueue_entry_t *work;
int result;
tor_mutex_acquire(&pool->lock);
while (1) {
/* lock must be held at this point. */
while (worker_thread_has_work(thread)) {
/* lock must be held at this point. */
if (thread->in_pool->generation != thread->generation) {
void *arg = thread->in_pool->update_args[thread->index];
thread->in_pool->update_args[thread->index] = NULL;
int (*update_fn)(void*,void*) = thread->in_pool->update_fn;
thread->generation = thread->in_pool->generation;
tor_mutex_release(&pool->lock);
int r = update_fn(thread->state, arg);
if (r < 0) {
return;
}
tor_mutex_acquire(&pool->lock);
continue;
}
work = TOR_TAILQ_FIRST(&pool->work);
TOR_TAILQ_REMOVE(&pool->work, work, next_work);
work->pending = 0;
tor_mutex_release(&pool->lock);
/* We run the work function without holding the thread lock. This
* is the main thread's first opportunity to give us more work. */
result = work->fn(thread->state, work->arg);
/* Queue the reply for the main thread. */
queue_reply(thread->reply_queue, work);
/* We may need to exit the thread. */
if (result >= WQ_RPL_ERROR) {
return;
}
tor_mutex_acquire(&pool->lock);
}
/* At this point the lock is held, and there is no work in this thread's
* queue. */
/* TODO: support an idle-function */
/* Okay. Now, wait till somebody has work for us. */
if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
log_warn(LD_GENERAL, "Fail tor_cond_wait.");
}
}
}
/** Put a reply on the reply queue. The reply must not currently be on
* any thread's work queue. */
static void
queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
{
int was_empty;
tor_mutex_acquire(&queue->lock);
was_empty = TOR_TAILQ_EMPTY(&queue->answers);
TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
if (was_empty) {
if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
/* XXXX complain! */
}
}
}
/** Allocate and start a new worker thread to use state object <b>state</b>,
* and send responses to <b>replyqueue</b>. */
static workerthread_t *
workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
{
workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
thr->state = state;
thr->reply_queue = replyqueue;
thr->in_pool = pool;
if (spawn_func(worker_thread_main, thr) < 0) {
log_err(LD_GENERAL, "Can't launch worker thread.");
return NULL;
}
return thr;
}
/**
* Queue an item of work for a thread in a thread pool. The function
* <b>fn</b> will be run in a worker thread, and will receive as arguments the
* thread's state object, and the provided object <b>arg</b>. It must return
* one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
*
* Regardless of its return value, the function <b>reply_fn</b> will later be
* run in the main thread when it invokes replyqueue_process(), and will
* receive as its argument the same <b>arg</b> object. It's the reply
* function's responsibility to free the work object.
*
* On success, return a workqueue_entry_t object that can be passed to
* workqueue_entry_cancel(). On failure, return NULL.
*
* Note that because each thread has its own work queue, work items may not
* be executed strictly in order.
*/
workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
ent->on_pool = pool;
ent->pending = 1;
tor_mutex_acquire(&pool->lock);
TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work);
tor_mutex_release(&pool->lock);
tor_cond_signal_one(&pool->condition);
return ent;
}
/**
* Queue a copy of a work item for every thread in a pool. This can be used,
* for example, to tell the threads to update some parameter in their states.
*
* Arguments are as for <b>threadpool_queue_work</b>, except that the
* <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
* make a copy of it.
*
* UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
* will be run. If a new update is scheduled before the old update finishes
* running, then the new will replace the old in any threads that haven't run
* it yet.
*
* Return 0 on success, -1 on failure.
*/
int
threadpool_queue_update(threadpool_t *pool,
void *(*dup_fn)(void *),
int (*fn)(void *, void *),
void (*free_fn)(void *),
void *arg)
{
int i, n_threads;
void (*old_args_free_fn)(void *arg);
void **old_args;
void **new_args;
tor_mutex_acquire(&pool->lock);
n_threads = pool->n_threads;
old_args = pool->update_args;
old_args_free_fn = pool->free_update_arg_fn;
new_args = tor_calloc(n_threads, sizeof(void*));
for (i = 0; i < n_threads; ++i) {
if (dup_fn)
new_args[i] = dup_fn(arg);
else
new_args[i] = arg;
}
pool->update_args = new_args;
pool->free_update_arg_fn = free_fn;
pool->update_fn = fn;
++pool->generation;
tor_mutex_release(&pool->lock);
tor_cond_signal_all(&pool->condition);
if (old_args) {
for (i = 0; i < n_threads; ++i) {
if (old_args[i] && old_args_free_fn)
old_args_free_fn(old_args[i]);
}
tor_free(old_args);
}
return 0;
}
/** Launch threads until we have <b>n</b>. */
static int
threadpool_start_threads(threadpool_t *pool, int n)
{
tor_mutex_acquire(&pool->lock);
if (pool->n_threads < n)
pool->threads = tor_realloc(pool->threads, sizeof(workerthread_t*)*n);
while (pool->n_threads < n) {
void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue);
thr->index = pool->n_threads;
if (!thr) {
tor_mutex_release(&pool->lock);
return -1;
}
pool->threads[pool->n_threads++] = thr;
}
tor_mutex_release(&pool->lock);
return 0;
}
/**
* Construct a new thread pool with <b>n</b> worker threads, configured to
* send their output to <b>replyqueue</b>. The threads' states will be
* constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
* as its argument. When the threads close, they will call
* <b>free_thread_state_fn</b> on their states.
*/
threadpool_t *
threadpool_new(int n_threads,
replyqueue_t *replyqueue,
void *(*new_thread_state_fn)(void*),
void (*free_thread_state_fn)(void*),
void *arg)
{
threadpool_t *pool;
pool = tor_malloc_zero(sizeof(threadpool_t));
tor_mutex_init_nonrecursive(&pool->lock);
tor_cond_init(&pool->condition);
TOR_TAILQ_INIT(&pool->work);
pool->new_thread_state_fn = new_thread_state_fn;
pool->new_thread_state_arg = arg;
pool->free_thread_state_fn = free_thread_state_fn;
pool->reply_queue = replyqueue;
if (threadpool_start_threads(pool, n_threads) < 0) {
tor_mutex_uninit(&pool->lock);
tor_free(pool);
return NULL;
}
return pool;
}
/** Return the reply queue associated with a given thread pool. */
replyqueue_t *
threadpool_get_replyqueue(threadpool_t *tp)
{
return tp->reply_queue;
}
/** Allocate a new reply queue. Reply queues are used to pass results from
* worker threads to the main thread. Since the main thread is running an
* IO-centric event loop, it needs to get woken up with means other than a
* condition variable. */
replyqueue_t *
replyqueue_new(uint32_t alertsocks_flags)
{
replyqueue_t *rq;
rq = tor_malloc_zero(sizeof(replyqueue_t));
if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
tor_free(rq);
return NULL;
}
tor_mutex_init(&rq->lock);
TOR_TAILQ_INIT(&rq->answers);
return rq;
}
/**
* Return the "read socket" for a given reply queue. The main thread should
* listen for read events on this socket, and call replyqueue_process() every
* time it triggers.
*/
tor_socket_t
replyqueue_get_socket(replyqueue_t *rq)
{
return rq->alert.read_fd;
}
/**
* Process all pending replies on a reply queue. The main thread should call
* this function every time the socket returned by replyqueue_get_socket() is
* readable.
*/
void
replyqueue_process(replyqueue_t *queue)
{
if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
static ratelim_t warn_limit = RATELIM_INIT(7200);
log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
"Failure from drain_fd");
}
tor_mutex_acquire(&queue->lock);
while (!TOR_TAILQ_EMPTY(&queue->answers)) {
/* lock must be held at this point.*/
workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
work->on_pool = NULL;
work->reply_fn(work->arg);
workqueue_entry_free(work);
tor_mutex_acquire(&queue->lock);
}
tor_mutex_release(&queue->lock);
}

48
src/common/workqueue.h Normal file
View File

@ -0,0 +1,48 @@
/* Copyright (c) 2013, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#ifndef TOR_WORKQUEUE_H
#define TOR_WORKQUEUE_H
#include "compat.h"
/** A replyqueue is used to tell the main thread about the outcome of
* work that we queued for the the workers. */
typedef struct replyqueue_s replyqueue_t;
/** A thread-pool manages starting threads and passing work to them. */
typedef struct threadpool_s threadpool_t;
/** A workqueue entry represents a request that has been passed to a thread
* pool. */
typedef struct workqueue_entry_s workqueue_entry_t;
/** Possible return value from a work function: indicates success. */
#define WQ_RPL_REPLY 0
/** Possible return value from a work function: indicates fatal error */
#define WQ_RPL_ERROR 1
/** Possible return value from a work function: indicates thread is shutting
* down. */
#define WQ_RPL_SHUTDOWN 2
workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg);
int threadpool_queue_update(threadpool_t *pool,
void *(*dup_fn)(void *),
int (*fn)(void *, void *),
void (*free_fn)(void *),
void *arg);
void *workqueue_entry_cancel(workqueue_entry_t *pending_work);
threadpool_t *threadpool_new(int n_threads,
replyqueue_t *replyqueue,
void *(*new_thread_state_fn)(void*),
void (*free_thread_state_fn)(void*),
void *arg);
replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);
tor_socket_t replyqueue_get_socket(replyqueue_t *rq);
void replyqueue_process(replyqueue_t *queue);
#endif

View File

@ -745,6 +745,7 @@ circuit_free(circuit_t *circ)
{
void *mem;
size_t memlen;
int should_free = 1;
if (!circ)
return;
@ -784,6 +785,8 @@ circuit_free(circuit_t *circ)
memlen = sizeof(or_circuit_t);
tor_assert(circ->magic == OR_CIRCUIT_MAGIC);
should_free = (ocirc->workqueue_entry == NULL);
crypto_cipher_free(ocirc->p_crypto);
crypto_digest_free(ocirc->p_digest);
crypto_cipher_free(ocirc->n_crypto);
@ -826,8 +829,18 @@ circuit_free(circuit_t *circ)
* "active" checks will be violated. */
cell_queue_clear(&circ->n_chan_cells);
memwipe(mem, 0xAA, memlen); /* poison memory */
tor_free(mem);
if (should_free) {
memwipe(mem, 0xAA, memlen); /* poison memory */
tor_free(mem);
} else {
/* If we made it here, this is an or_circuit_t that still has a pending
* cpuworker request which we weren't able to cancel. Instead, set up
* the magic value so that when the reply comes back, we'll know to discard
* the reply and free this structure.
*/
memwipe(mem, 0xAA, memlen);
circ->magic = DEAD_CIRCUIT_MAGIC;
}
}
/** Deallocate the linked list circ-><b>cpath</b>, and remove the cpath from

View File

@ -310,7 +310,7 @@ command_process_create_cell(cell_t *cell, channel_t *chan)
/* hand it off to the cpuworkers, and then return. */
if (connection_or_digest_is_known_relay(chan->identity_digest))
rep_hist_note_circuit_handshake_requested(create_cell->handshake_type);
if (assign_onionskin_to_cpuworker(NULL, circ, create_cell) < 0) {
if (assign_onionskin_to_cpuworker(circ, create_cell) < 0) {
log_debug(LD_GENERAL,"Failed to hand off onionskin. Closing.");
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_RESOURCELIMIT);
return;

View File

@ -1730,7 +1730,7 @@ options_act(const or_options_t *old_options)
if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL)))
inform_testing_reachability();
}
cpuworkers_rotate();
cpuworkers_rotate_keyinfo();
if (dns_reset())
return -1;
} else {

View File

@ -29,7 +29,6 @@
#include "connection_edge.h"
#include "connection_or.h"
#include "control.h"
#include "cpuworker.h"
#include "directory.h"
#include "dirserv.h"
#include "dns.h"
@ -130,7 +129,6 @@ conn_type_to_string(int type)
case CONN_TYPE_AP: return "Socks";
case CONN_TYPE_DIR_LISTENER: return "Directory listener";
case CONN_TYPE_DIR: return "Directory";
case CONN_TYPE_CPUWORKER: return "CPU worker";
case CONN_TYPE_CONTROL_LISTENER: return "Control listener";
case CONN_TYPE_CONTROL: return "Control";
case CONN_TYPE_EXT_OR: return "Extended OR";
@ -213,12 +211,6 @@ conn_state_to_string(int type, int state)
case DIR_CONN_STATE_SERVER_WRITING: return "writing";
}
break;
case CONN_TYPE_CPUWORKER:
switch (state) {
case CPUWORKER_STATE_IDLE: return "idle";
case CPUWORKER_STATE_BUSY_ONION: return "busy with onion";
}
break;
case CONN_TYPE_CONTROL:
switch (state) {
case CONTROL_CONN_STATE_OPEN: return "open (protocol v1)";
@ -248,7 +240,6 @@ connection_type_uses_bufferevent(connection_t *conn)
case CONN_TYPE_CONTROL:
case CONN_TYPE_OR:
case CONN_TYPE_EXT_OR:
case CONN_TYPE_CPUWORKER:
return 1;
default:
return 0;
@ -2417,7 +2408,6 @@ connection_mark_all_noncontrol_connections(void)
if (conn->marked_for_close)
continue;
switch (conn->type) {
case CONN_TYPE_CPUWORKER:
case CONN_TYPE_CONTROL_LISTENER:
case CONN_TYPE_CONTROL:
break;
@ -4511,8 +4501,6 @@ connection_process_inbuf(connection_t *conn, int package_partial)
package_partial);
case CONN_TYPE_DIR:
return connection_dir_process_inbuf(TO_DIR_CONN(conn));
case CONN_TYPE_CPUWORKER:
return connection_cpu_process_inbuf(conn);
case CONN_TYPE_CONTROL:
return connection_control_process_inbuf(TO_CONTROL_CONN(conn));
default:
@ -4572,8 +4560,6 @@ connection_finished_flushing(connection_t *conn)
return connection_edge_finished_flushing(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_finished_flushing(TO_DIR_CONN(conn));
case CONN_TYPE_CPUWORKER:
return connection_cpu_finished_flushing(conn);
case CONN_TYPE_CONTROL:
return connection_control_finished_flushing(TO_CONTROL_CONN(conn));
default:
@ -4629,8 +4615,6 @@ connection_reached_eof(connection_t *conn)
return connection_edge_reached_eof(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_reached_eof(TO_DIR_CONN(conn));
case CONN_TYPE_CPUWORKER:
return connection_cpu_reached_eof(conn);
case CONN_TYPE_CONTROL:
return connection_control_reached_eof(TO_CONTROL_CONN(conn));
default:
@ -4836,10 +4820,6 @@ assert_connection_ok(connection_t *conn, time_t now)
tor_assert(conn->purpose >= DIR_PURPOSE_MIN_);
tor_assert(conn->purpose <= DIR_PURPOSE_MAX_);
break;
case CONN_TYPE_CPUWORKER:
tor_assert(conn->state >= CPUWORKER_STATE_MIN_);
tor_assert(conn->state <= CPUWORKER_STATE_MAX_);
break;
case CONN_TYPE_CONTROL:
tor_assert(conn->state >= CONTROL_CONN_STATE_MIN_);
tor_assert(conn->state <= CONTROL_CONN_STATE_MAX_);
@ -4940,9 +4920,7 @@ proxy_type_to_string(int proxy_type)
}
/** Call connection_free_() on every connection in our array, and release all
* storage held by connection.c. This is used by cpuworkers and dnsworkers
* when they fork, so they don't keep resources held open (especially
* sockets).
* storage held by connection.c.
*
* Don't do the checks in connection_free(), because they will
* fail.

View File

@ -5,84 +5,98 @@
/**
* \file cpuworker.c
* \brief Implements a farm of 'CPU worker' processes to perform
* CPU-intensive tasks in another thread or process, to not
* interrupt the main thread.
* \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
* out to subprocesses.
*
* Right now, we only use this for processing onionskins.
**/
#include "or.h"
#include "buffers.h"
#include "channel.h"
#include "channeltls.h"
#include "circuitbuild.h"
#include "circuitlist.h"
#include "config.h"
#include "connection.h"
#include "connection_or.h"
#include "config.h"
#include "cpuworker.h"
#include "main.h"
#include "onion.h"
#include "rephist.h"
#include "router.h"
#include "workqueue.h"
/** The maximum number of cpuworker processes we will keep around. */
#define MAX_CPUWORKERS 16
/** The minimum number of cpuworker processes we will keep around. */
#define MIN_CPUWORKERS 1
#ifdef HAVE_EVENT2_EVENT_H
#include <event2/event.h>
#else
#include <event.h>
#endif
/** The tag specifies which circuit this onionskin was from. */
#define TAG_LEN 12
static void queue_pending_tasks(void);
/** How many cpuworkers we have running right now. */
static int num_cpuworkers=0;
/** How many of the running cpuworkers have an assigned task right now. */
static int num_cpuworkers_busy=0;
/** We need to spawn new cpuworkers whenever we rotate the onion keys
* on platforms where execution contexts==processes. This variable stores
* the last time we got a key rotation event. */
static time_t last_rotation_time=0;
typedef struct worker_state_s {
int generation;
server_onion_keys_t *onion_keys;
} worker_state_t;
static void cpuworker_main(void *data) ATTR_NORETURN;
static int spawn_cpuworker(void);
static void spawn_enough_cpuworkers(void);
static void process_pending_task(connection_t *cpuworker);
static void *
worker_state_new(void *arg)
{
worker_state_t *ws;
(void)arg;
ws = tor_malloc_zero(sizeof(worker_state_t));
ws->onion_keys = server_onion_keys_new();
return ws;
}
static void
worker_state_free(void *arg)
{
worker_state_t *ws = arg;
server_onion_keys_free(ws->onion_keys);
tor_free(ws);
}
static replyqueue_t *replyqueue = NULL;
static threadpool_t *threadpool = NULL;
static struct event *reply_event = NULL;
static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
static int total_pending_tasks = 0;
static int max_pending_tasks = 128;
static void
replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
{
replyqueue_t *rq = arg;
(void) sock;
(void) events;
replyqueue_process(rq);
}
/** Initialize the cpuworker subsystem.
*/
void
cpu_init(void)
{
cpuworkers_rotate();
}
/** Called when we're done sending a request to a cpuworker. */
int
connection_cpu_finished_flushing(connection_t *conn)
{
tor_assert(conn);
tor_assert(conn->type == CONN_TYPE_CPUWORKER);
return 0;
}
/** Pack global_id and circ_id; set *tag to the result. (See note on
* cpuworker_main for wire format.) */
static void
tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id)
{
/*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/
/*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/
set_uint64(tag, chan_id);
set_uint32(tag+8, circ_id);
}
/** Unpack <b>tag</b> into addr, port, and circ_id.
*/
static void
tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
{
*chan_id = get_uint64(tag);
*circ_id = get_uint32(tag+8);
if (!replyqueue) {
replyqueue = replyqueue_new(0);
}
if (!reply_event) {
reply_event = tor_event_new(tor_libevent_get_base(),
replyqueue_get_socket(replyqueue),
EV_READ|EV_PERSIST,
replyqueue_process_cb,
replyqueue);
event_add(reply_event, NULL);
}
if (!threadpool) {
threadpool = threadpool_new(get_num_cpus(get_options()),
replyqueue,
worker_state_new,
worker_state_free,
NULL);
}
/* Total voodoo. Can we make this more sensible? */
max_pending_tasks = get_num_cpus(get_options()) * 64;
crypto_seed_weak_rng(&request_sample_rng);
}
/** Magic numbers to make sure our cpuworker_requests don't grow any
@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
typedef struct cpuworker_request_t {
/** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
uint32_t magic;
/** Opaque tag to identify the job */
uint8_t tag[TAG_LEN];
/** Task code. Must be one of CPUWORKER_TASK_* */
uint8_t task;
/** Flag: Are we timing this request? */
unsigned timed : 1;
@ -114,8 +124,7 @@ typedef struct cpuworker_request_t {
typedef struct cpuworker_reply_t {
/** Magic number; must be CPUWORKER_REPLY_MAGIC. */
uint32_t magic;
/** Opaque tag to identify the job; matches the request's tag.*/
uint8_t tag[TAG_LEN];
/** True iff we got a successful request. */
uint8_t success;
@ -142,42 +151,39 @@ typedef struct cpuworker_reply_t {
uint8_t rend_auth_material[DIGEST_LEN];
} cpuworker_reply_t;
/** Called when the onion key has changed and we need to spawn new
* cpuworkers. Close all currently idle cpuworkers, and mark the last
* rotation time as now.
*/
void
cpuworkers_rotate(void)
typedef struct cpuworker_job_u {
or_circuit_t *circ;
union {
cpuworker_request_t request;
cpuworker_reply_t reply;
} u;
} cpuworker_job_t;
static int
update_state_threadfn(void *state_, void *work_)
{
connection_t *cpuworker;
while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
CPUWORKER_STATE_IDLE))) {
connection_mark_for_close(cpuworker);
--num_cpuworkers;
}
last_rotation_time = time(NULL);
if (server_mode(get_options()))
spawn_enough_cpuworkers();
worker_state_t *state = state_;
worker_state_t *update = work_;
server_onion_keys_free(state->onion_keys);
state->onion_keys = update->onion_keys;
update->onion_keys = NULL;
++state->generation;
return WQ_RPL_REPLY;
}
/** If the cpuworker closes the connection,
* mark it as closed and spawn a new one as needed. */
int
connection_cpu_reached_eof(connection_t *conn)
/** Called when the onion key has changed so update all CPU worker(s) with
* new function pointers with which a new state will be generated.
*/
void
cpuworkers_rotate_keyinfo(void)
{
log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly.");
if (conn->state != CPUWORKER_STATE_IDLE) {
/* the circ associated with this cpuworker will have to wait until
* it gets culled in run_connection_housekeeping(), since we have
* no way to find out which circ it was. */
log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ.");
num_cpuworkers_busy--;
if (threadpool_queue_update(threadpool,
worker_state_new,
update_state_threadfn,
worker_state_free,
NULL)) {
log_warn(LD_OR, "Failed to queue key update for worker threads.");
}
num_cpuworkers--;
spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up
spinning. */
connection_mark_for_close(conn);
return 0;
}
/** Indexed by handshake type: how many onionskins have we processed and
@ -197,8 +203,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
* time. (microseconds) */
#define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
/** Return true iff we'd like to measure a handshake of type
* <b>onionskin_type</b>. Call only from the main thread. */
static int
@ -286,428 +290,266 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
onionskin_type_name, (unsigned)overhead, relative_overhead*100);
}
/** Called when we get data from a cpuworker. If the answer is not complete,
* wait for a complete answer. If the answer is complete,
* process it as appropriate.
*/
int
connection_cpu_process_inbuf(connection_t *conn)
/** Handle a reply from the worker threads. */
static void
cpuworker_onion_handshake_replyfn(void *work_)
{
uint64_t chan_id;
circid_t circ_id;
channel_t *p_chan = NULL;
circuit_t *circ;
cpuworker_job_t *job = work_;
cpuworker_reply_t rpl;
or_circuit_t *circ = NULL;
tor_assert(conn);
tor_assert(conn->type == CONN_TYPE_CPUWORKER);
--total_pending_tasks;
if (!connection_get_inbuf_len(conn))
return 0;
/* Could avoid this, but doesn't matter. */
memcpy(&rpl, &job->u.reply, sizeof(rpl));
if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
cpuworker_reply_t rpl;
if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t))
return 0; /* not yet */
tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t));
tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn);
tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
if (rpl.timed && rpl.success &&
rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
/* Time how long this request took. The handshake_type check should be
needless, but let's leave it in to be safe. */
struct timeval tv_end, tv_diff;
int64_t usec_roundtrip;
tor_gettimeofday(&tv_end);
timersub(&tv_end, &rpl.started_at, &tv_diff);
usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
if (usec_roundtrip >= 0 &&
usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
++onionskins_n_processed[rpl.handshake_type];
onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
/* Scale down every 500000 handshakes. On a busy server, that's
* less impressive than it sounds. */
onionskins_n_processed[rpl.handshake_type] /= 2;
onionskins_usec_internal[rpl.handshake_type] /= 2;
onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
}
if (rpl.timed && rpl.success &&
rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
/* Time how long this request took. The handshake_type check should be
needless, but let's leave it in to be safe. */
struct timeval tv_end, tv_diff;
int64_t usec_roundtrip;
tor_gettimeofday(&tv_end);
timersub(&tv_end, &rpl.started_at, &tv_diff);
usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
if (usec_roundtrip >= 0 &&
usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
++onionskins_n_processed[rpl.handshake_type];
onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
/* Scale down every 500000 handshakes. On a busy server, that's
* less impressive than it sounds. */
onionskins_n_processed[rpl.handshake_type] /= 2;
onionskins_usec_internal[rpl.handshake_type] /= 2;
onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
}
}
/* parse out the circ it was talking about */
tag_unpack(rpl.tag, &chan_id, &circ_id);
circ = NULL;
log_debug(LD_OR,
"Unpacking cpuworker reply, chan_id is " U64_FORMAT
", circ_id is %u",
U64_PRINTF_ARG(chan_id), (unsigned)circ_id);
p_chan = channel_find_by_global_id(chan_id);
if (p_chan)
circ = circuit_get_by_circid_channel(circ_id, p_chan);
if (rpl.success == 0) {
log_debug(LD_OR,
"decoding onionskin failed. "
"(Old key or bad software.) Closing.");
if (circ)
circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL);
goto done_processing;
}
if (!circ) {
/* This happens because somebody sends us a destroy cell and the
* circuit goes away, while the cpuworker is working. This is also
* why our tag doesn't include a pointer to the circ, because we'd
* never know if it's still valid.
*/
log_debug(LD_OR,"processed onion for a circ that's gone. Dropping.");
goto done_processing;
}
tor_assert(! CIRCUIT_IS_ORIGIN(circ));
if (onionskin_answer(TO_OR_CIRCUIT(circ),
&rpl.created_cell,
(const char*)rpl.keys,
rpl.rend_auth_material) < 0) {
log_warn(LD_OR,"onionskin_answer failed. Closing.");
circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
goto done_processing;
}
log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
} else {
tor_assert(0); /* don't ask me to do handshakes yet */
}
circ = job->circ;
log_debug(LD_OR,
"Unpacking cpuworker reply %p, circ=%p, success=%d",
job, circ, rpl.success);
if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
/* The circuit was supposed to get freed while the reply was
* pending. Instead, it got left for us to free so that we wouldn't freak
* out when the job->circ field wound up pointing to nothing. */
log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
circ->base_.magic = 0;
tor_free(circ);
goto done_processing;
}
circ->workqueue_entry = NULL;
if (rpl.success == 0) {
log_debug(LD_OR,
"decoding onionskin failed. "
"(Old key or bad software.) Closing.");
if (circ)
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
goto done_processing;
}
if (onionskin_answer(circ,
&rpl.created_cell,
(const char*)rpl.keys,
rpl.rend_auth_material) < 0) {
log_warn(LD_OR,"onionskin_answer failed. Closing.");
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
goto done_processing;
}
log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
done_processing:
conn->state = CPUWORKER_STATE_IDLE;
num_cpuworkers_busy--;
if (conn->timestamp_created < last_rotation_time) {
connection_mark_for_close(conn);
num_cpuworkers--;
spawn_enough_cpuworkers();
} else {
process_pending_task(conn);
}
return 0;
memwipe(&rpl, 0, sizeof(rpl));
memwipe(job, 0, sizeof(*job));
tor_free(job);
queue_pending_tasks();
}
/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair.
* Read and writes from fdarray[1]. Reads requests, writes answers.
*
* Request format:
* cpuworker_request_t.
* Response format:
* cpuworker_reply_t
*/
static void
cpuworker_main(void *data)
/** Implementation function for onion handshake requests. */
static int
cpuworker_onion_handshake_threadfn(void *state_, void *work_)
{
/* For talking to the parent thread/process */
tor_socket_t *fdarray = data;
tor_socket_t fd;
worker_state_t *state = state_;
cpuworker_job_t *job = work_;
/* variables for onion processing */
server_onion_keys_t onion_keys;
server_onion_keys_t *onion_keys = state->onion_keys;
cpuworker_request_t req;
cpuworker_reply_t rpl;
fd = fdarray[1]; /* this side is ours */
tor_free(data);
memcpy(&req, &job->u.request, sizeof(req));
setup_server_onion_keys(&onion_keys);
for (;;) {
if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) {
log_info(LD_OR, "read request failed. Exiting.");
goto end;
}
tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
memset(&rpl, 0, sizeof(rpl));
const create_cell_t *cc = &req.create_cell;
created_cell_t *cell_out = &rpl.created_cell;
struct timeval tv_start = {0,0}, tv_end;
int n;
rpl.timed = req.timed;
rpl.started_at = req.started_at;
rpl.handshake_type = cc->handshake_type;
if (req.timed)
tor_gettimeofday(&tv_start);
n = onion_skin_server_handshake(cc->handshake_type,
cc->onionskin, cc->handshake_len,
onion_keys,
cell_out->reply,
rpl.keys, CPATH_KEY_MATERIAL_LEN,
rpl.rend_auth_material);
if (n < 0) {
/* failure */
log_debug(LD_OR,"onion_skin_server_handshake failed.");
memset(&rpl, 0, sizeof(rpl));
if (req.task == CPUWORKER_TASK_ONION) {
const create_cell_t *cc = &req.create_cell;
created_cell_t *cell_out = &rpl.created_cell;
struct timeval tv_start = {0,0}, tv_end;
int n;
rpl.timed = req.timed;
rpl.started_at = req.started_at;
rpl.handshake_type = cc->handshake_type;
if (req.timed)
tor_gettimeofday(&tv_start);
n = onion_skin_server_handshake(cc->handshake_type,
cc->onionskin, cc->handshake_len,
&onion_keys,
cell_out->reply,
rpl.keys, CPATH_KEY_MATERIAL_LEN,
rpl.rend_auth_material);
if (n < 0) {
/* failure */
log_debug(LD_OR,"onion_skin_server_handshake failed.");
memset(&rpl, 0, sizeof(rpl));
memcpy(rpl.tag, req.tag, TAG_LEN);
rpl.success = 0;
} else {
/* success */
log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
memcpy(rpl.tag, req.tag, TAG_LEN);
cell_out->handshake_len = n;
switch (cc->cell_type) {
case CELL_CREATE:
cell_out->cell_type = CELL_CREATED; break;
case CELL_CREATE2:
cell_out->cell_type = CELL_CREATED2; break;
case CELL_CREATE_FAST:
cell_out->cell_type = CELL_CREATED_FAST; break;
default:
tor_assert(0);
goto end;
}
rpl.success = 1;
}
rpl.magic = CPUWORKER_REPLY_MAGIC;
if (req.timed) {
struct timeval tv_diff;
int64_t usec;
tor_gettimeofday(&tv_end);
timersub(&tv_end, &tv_start, &tv_diff);
usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
else
rpl.n_usec = (uint32_t) usec;
}
if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) {
log_err(LD_BUG,"writing response buf failed. Exiting.");
goto end;
}
log_debug(LD_OR,"finished writing response.");
} else if (req.task == CPUWORKER_TASK_SHUTDOWN) {
log_info(LD_OR,"Clean shutdown: exiting");
goto end;
rpl.success = 0;
} else {
/* success */
log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
cell_out->handshake_len = n;
switch (cc->cell_type) {
case CELL_CREATE:
cell_out->cell_type = CELL_CREATED; break;
case CELL_CREATE2:
cell_out->cell_type = CELL_CREATED2; break;
case CELL_CREATE_FAST:
cell_out->cell_type = CELL_CREATED_FAST; break;
default:
tor_assert(0);
return WQ_RPL_SHUTDOWN;
}
memwipe(&req, 0, sizeof(req));
memwipe(&rpl, 0, sizeof(req));
rpl.success = 1;
}
end:
rpl.magic = CPUWORKER_REPLY_MAGIC;
if (req.timed) {
struct timeval tv_diff;
int64_t usec;
tor_gettimeofday(&tv_end);
timersub(&tv_end, &tv_start, &tv_diff);
usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
else
rpl.n_usec = (uint32_t) usec;
}
memcpy(&job->u.reply, &rpl, sizeof(rpl));
memwipe(&req, 0, sizeof(req));
memwipe(&rpl, 0, sizeof(req));
release_server_onion_keys(&onion_keys);
tor_close_socket(fd);
crypto_thread_cleanup();
spawn_exit();
return WQ_RPL_REPLY;
}
/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed.
*/
static int
spawn_cpuworker(void)
{
tor_socket_t *fdarray;
tor_socket_t fd;
connection_t *conn;
int err;
fdarray = tor_calloc(2, sizeof(tor_socket_t));
if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) {
log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s",
tor_socket_strerror(-err));
tor_free(fdarray);
return -1;
}
tor_assert(SOCKET_OK(fdarray[0]));
tor_assert(SOCKET_OK(fdarray[1]));
fd = fdarray[0];
if (spawn_func(cpuworker_main, (void*)fdarray) < 0) {
tor_close_socket(fdarray[0]);
tor_close_socket(fdarray[1]);
tor_free(fdarray);
return -1;
}
log_debug(LD_OR,"just spawned a cpu worker.");
conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX);
/* set up conn so it's got all the data we need to remember */
conn->s = fd;
conn->address = tor_strdup("localhost");
tor_addr_make_unspec(&conn->addr);
if (set_socket_nonblocking(fd) == -1) {
connection_free(conn); /* this closes fd */
return -1;
}
if (connection_add(conn) < 0) { /* no space, forget it */
log_warn(LD_NET,"connection_add for cpuworker failed. Giving up.");
connection_free(conn); /* this closes fd */
return -1;
}
conn->state = CPUWORKER_STATE_IDLE;
connection_start_reading(conn);
return 0; /* success */
}
/** If we have too few or too many active cpuworkers, try to spawn new ones
* or kill idle ones.
*/
/** Take pending tasks from the queue and assign them to cpuworkers. */
static void
spawn_enough_cpuworkers(void)
{
int num_cpuworkers_needed = get_num_cpus(get_options());
int reseed = 0;
if (num_cpuworkers_needed < MIN_CPUWORKERS)
num_cpuworkers_needed = MIN_CPUWORKERS;
if (num_cpuworkers_needed > MAX_CPUWORKERS)
num_cpuworkers_needed = MAX_CPUWORKERS;
while (num_cpuworkers < num_cpuworkers_needed) {
if (spawn_cpuworker() < 0) {
log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later.");
return;
}
num_cpuworkers++;
reseed++;
}
if (reseed)
crypto_seed_weak_rng(&request_sample_rng);
}
/** Take a pending task from the queue and assign it to 'cpuworker'. */
static void
process_pending_task(connection_t *cpuworker)
queue_pending_tasks(void)
{
or_circuit_t *circ;
create_cell_t *onionskin = NULL;
tor_assert(cpuworker);
while (total_pending_tasks < max_pending_tasks) {
circ = onion_next_task(&onionskin);
/* for now only process onion tasks */
if (!circ)
return;
circ = onion_next_task(&onionskin);
if (!circ)
return;
if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin))
log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
}
/** How long should we let a cpuworker stay busy before we give
* up on it and decide that we have a bug or infinite loop?
* This value is high because some servers with low memory/cpu
* sometimes spend an hour or more swapping, and Tor starves. */
#define CPUWORKER_BUSY_TIMEOUT (60*60*12)
/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get
* stuck in the 'busy' state, even though the cpuworker process thinks of
* itself as idle. I don't know why. But here's a workaround to kill any
* cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT.
*/
static void
cull_wedged_cpuworkers(void)
{
time_t now = time(NULL);
smartlist_t *conns = get_connection_array();
SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
if (!conn->marked_for_close &&
conn->type == CONN_TYPE_CPUWORKER &&
conn->state == CPUWORKER_STATE_BUSY_ONION &&
conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) {
log_notice(LD_BUG,
"closing wedged cpuworker. Can somebody find the bug?");
num_cpuworkers_busy--;
num_cpuworkers--;
connection_mark_for_close(conn);
}
} SMARTLIST_FOREACH_END(conn);
if (assign_onionskin_to_cpuworker(circ, onionskin))
log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
}
}
/** Try to tell a cpuworker to perform the public key operations necessary to
* respond to <b>onionskin</b> for the circuit <b>circ</b>.
*
* If <b>cpuworker</b> is defined, assert that he's idle, and use him. Else,
* look for an idle cpuworker and use him. If none idle, queue task onto the
* pending onion list and return. Return 0 if we successfully assign the
* task, or -1 on failure.
* Return 0 if we successfully assign the task, or -1 on failure.
*/
int
assign_onionskin_to_cpuworker(connection_t *cpuworker,
or_circuit_t *circ,
assign_onionskin_to_cpuworker(or_circuit_t *circ,
create_cell_t *onionskin)
{
workqueue_entry_t *queue_entry;
cpuworker_job_t *job;
cpuworker_request_t req;
time_t now = approx_time();
static time_t last_culled_cpuworkers = 0;
int should_time;
/* Checking for wedged cpuworkers requires a linear search over all
* connections, so let's do it only once a minute.
*/
#define CULL_CPUWORKERS_INTERVAL 60
if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) {
cull_wedged_cpuworkers();
spawn_enough_cpuworkers();
last_culled_cpuworkers = now;
if (!circ->p_chan) {
log_info(LD_OR,"circ->p_chan gone. Failing circ.");
tor_free(onionskin);
return -1;
}
if (1) {
if (num_cpuworkers_busy == num_cpuworkers) {
log_debug(LD_OR,"No idle cpuworkers. Queuing.");
if (onion_pending_add(circ, onionskin) < 0) {
tor_free(onionskin);
return -1;
}
return 0;
}
if (!cpuworker)
cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
CPUWORKER_STATE_IDLE);
tor_assert(cpuworker);
if (!circ->p_chan) {
log_info(LD_OR,"circ->p_chan gone. Failing circ.");
if (total_pending_tasks >= max_pending_tasks) {
log_debug(LD_OR,"No idle cpuworkers. Queuing.");
if (onion_pending_add(circ, onionskin) < 0) {
tor_free(onionskin);
return -1;
}
if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
should_time = should_time_request(onionskin->handshake_type);
memset(&req, 0, sizeof(req));
req.magic = CPUWORKER_REQUEST_MAGIC;
tag_pack(req.tag, circ->p_chan->global_identifier,
circ->p_circ_id);
req.timed = should_time;
cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
/* touch the lastwritten timestamp, since that's how we check to
* see how long it's been since we asked the question, and sometimes
* we check before the first call to connection_handle_write(). */
cpuworker->timestamp_lastwritten = now;
num_cpuworkers_busy++;
req.task = CPUWORKER_TASK_ONION;
memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
tor_free(onionskin);
if (should_time)
tor_gettimeofday(&req.started_at);
connection_write_to_buf((void*)&req, sizeof(req), cpuworker);
memwipe(&req, 0, sizeof(req));
return 0;
}
if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
should_time = should_time_request(onionskin->handshake_type);
memset(&req, 0, sizeof(req));
req.magic = CPUWORKER_REQUEST_MAGIC;
req.timed = should_time;
memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
tor_free(onionskin);
if (should_time)
tor_gettimeofday(&req.started_at);
job = tor_malloc_zero(sizeof(cpuworker_job_t));
job->circ = circ;
memcpy(&job->u.request, &req, sizeof(req));
memwipe(&req, 0, sizeof(req));
++total_pending_tasks;
queue_entry = threadpool_queue_work(threadpool,
cpuworker_onion_handshake_threadfn,
cpuworker_onion_handshake_replyfn,
job);
if (!queue_entry) {
log_warn(LD_BUG, "Couldn't queue work on threadpool");
tor_free(job);
return -1;
}
log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
job, queue_entry, job->circ);
circ->workqueue_entry = queue_entry;
return 0;
}
/** If <b>circ</b> has a pending handshake that hasn't been processed yet,
* remove it from the worker queue. */
void
cpuworker_cancel_circ_handshake(or_circuit_t *circ)
{
cpuworker_job_t *job;
if (circ->workqueue_entry == NULL)
return;
job = workqueue_entry_cancel(circ->workqueue_entry);
if (job) {
/* It successfully cancelled. */
memwipe(job, 0xe0, sizeof(*job));
tor_free(job);
}
circ->workqueue_entry = NULL;
}

View File

@ -13,19 +13,17 @@
#define TOR_CPUWORKER_H
void cpu_init(void);
void cpuworkers_rotate(void);
int connection_cpu_finished_flushing(connection_t *conn);
int connection_cpu_reached_eof(connection_t *conn);
int connection_cpu_process_inbuf(connection_t *conn);
void cpuworkers_rotate_keyinfo(void);
struct create_cell_t;
int assign_onionskin_to_cpuworker(connection_t *cpuworker,
or_circuit_t *circ,
int assign_onionskin_to_cpuworker(or_circuit_t *circ,
struct create_cell_t *onionskin);
uint64_t estimated_usec_for_onionskins(uint32_t n_requests,
uint16_t onionskin_type);
void cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
const char *onionskin_type_name);
void cpuworker_cancel_circ_handshake(or_circuit_t *circ);
#endif

View File

@ -1271,7 +1271,7 @@ run_scheduled_events(time_t now)
get_onion_key_set_at()+MIN_ONION_KEY_LIFETIME < now) {
log_info(LD_GENERAL,"Rotating onion key.");
rotate_onion_key();
cpuworkers_rotate();
cpuworkers_rotate_keyinfo();
if (router_rebuild_descriptor(1)<0) {
log_info(LD_CONFIG, "Couldn't rebuild router descriptor");
}
@ -1960,9 +1960,9 @@ do_hup(void)
* force a retry there. */
if (server_mode(options)) {
/* Restart cpuworker and dnsworker processes, so they get up-to-date
/* Update cpuworker and dnsworker processes, so they get up-to-date
* configuration options. */
cpuworkers_rotate();
cpuworkers_rotate_keyinfo();
dns_reset();
}
return 0;

View File

@ -295,6 +295,8 @@ onion_pending_remove(or_circuit_t *circ)
victim = circ->onionqueue_entry;
if (victim)
onion_queue_entry_remove(victim);
cpuworker_cancel_circ_handshake(circ);
}
/** Remove a queue entry <b>victim</b> from the queue, unlinking it from
@ -339,25 +341,25 @@ clear_pending_onions(void)
/* ============================================================ */
/** Fill in a server_onion_keys_t object at <b>keys</b> with all of the keys
/** Return a new server_onion_keys_t object with all of the keys
* and other info we might need to do onion handshakes. (We make a copy of
* our keys for each cpuworker to avoid race conditions with the main thread,
* and to avoid locking) */
void
setup_server_onion_keys(server_onion_keys_t *keys)
server_onion_keys_t *
server_onion_keys_new(void)
{
memset(keys, 0, sizeof(server_onion_keys_t));
server_onion_keys_t *keys = tor_malloc_zero(sizeof(server_onion_keys_t));
memcpy(keys->my_identity, router_get_my_id_digest(), DIGEST_LEN);
dup_onion_keys(&keys->onion_key, &keys->last_onion_key);
keys->curve25519_key_map = construct_ntor_key_map();
keys->junk_keypair = tor_malloc_zero(sizeof(curve25519_keypair_t));
curve25519_keypair_generate(keys->junk_keypair, 0);
return keys;
}
/** Release all storage held in <b>keys</b>, but do not free <b>keys</b>
* itself (as it's likely to be stack-allocated.) */
/** Release all storage held in <b>keys</b>. */
void
release_server_onion_keys(server_onion_keys_t *keys)
server_onion_keys_free(server_onion_keys_t *keys)
{
if (! keys)
return;
@ -366,7 +368,8 @@ release_server_onion_keys(server_onion_keys_t *keys)
crypto_pk_free(keys->last_onion_key);
ntor_key_map_free(keys->curve25519_key_map);
tor_free(keys->junk_keypair);
memset(keys, 0, sizeof(server_onion_keys_t));
memwipe(keys, 0, sizeof(server_onion_keys_t));
tor_free(keys);
}
/** Release whatever storage is held in <b>state</b>, depending on its

View File

@ -30,8 +30,8 @@ typedef struct server_onion_keys_t {
#define MAX_ONIONSKIN_CHALLENGE_LEN 255
#define MAX_ONIONSKIN_REPLY_LEN 255
void setup_server_onion_keys(server_onion_keys_t *keys);
void release_server_onion_keys(server_onion_keys_t *keys);
server_onion_keys_t *server_onion_keys_new(void);
void server_onion_keys_free(server_onion_keys_t *keys);
void onion_handshake_state_release(onion_handshake_state_t *state);

View File

@ -213,8 +213,7 @@ typedef enum {
#define CONN_TYPE_DIR_LISTENER 8
/** Type for HTTP connections to the directory server. */
#define CONN_TYPE_DIR 9
/** Connection from the main process to a CPU worker process. */
#define CONN_TYPE_CPUWORKER 10
/* Type 10 is unused. */
/** Type for listening for connections from user interface process. */
#define CONN_TYPE_CONTROL_LISTENER 11
/** Type for connections from user interface process. */
@ -276,17 +275,6 @@ typedef enum {
/** State for any listener connection. */
#define LISTENER_STATE_READY 0
#define CPUWORKER_STATE_MIN_ 1
/** State for a connection to a cpuworker process that's idle. */
#define CPUWORKER_STATE_IDLE 1
/** State for a connection to a cpuworker process that's processing a
* handshake. */
#define CPUWORKER_STATE_BUSY_ONION 2
#define CPUWORKER_STATE_MAX_ 2
#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION
#define CPUWORKER_TASK_SHUTDOWN 255
#define OR_CONN_STATE_MIN_ 1
/** State for a connection to an OR: waiting for connect() to finish. */
#define OR_CONN_STATE_CONNECTING 1
@ -2707,8 +2695,14 @@ typedef struct {
time_t expiry_time;
} cpath_build_state_t;
/** "magic" value for an origin_circuit_t */
#define ORIGIN_CIRCUIT_MAGIC 0x35315243u
/** "magic" value for an or_circuit_t */
#define OR_CIRCUIT_MAGIC 0x98ABC04Fu
/** "magic" value for a circuit that would have been freed by circuit_free,
* but which we're keeping around until a cpuworker reply arrives. See
* circuit_free() for more documentation. */
#define DEAD_CIRCUIT_MAGIC 0xdeadc14c
struct create_cell_t;
@ -3128,6 +3122,9 @@ typedef struct or_circuit_t {
/** Pointer to an entry on the onion queue, if this circuit is waiting for a
* chance to give an onionskin to a cpuworker. Used only in onion.c */
struct onion_queue_t *onionqueue_entry;
/** Pointer to a workqueue entry, if this circuit has given an onionskin to
* a cpuworker and is waiting for a response. Used only in cpuworker.c */
struct workqueue_entry_s *workqueue_entry;
/** The circuit_id used in the previous (backward) hop of this circuit. */
circid_t p_circ_id;

View File

@ -2,7 +2,7 @@ TESTS += src/test/test
noinst_PROGRAMS+= src/test/bench
if UNITTESTS_ENABLED
noinst_PROGRAMS+= src/test/test src/test/test-child
noinst_PROGRAMS+= src/test/test src/test/test-child src/test/test_workqueue
endif
src_test_AM_CPPFLAGS = -DSHARE_DATADIR="\"$(datadir)\"" \
@ -47,6 +47,7 @@ src_test_test_SOURCES = \
src/test/test_routerkeys.c \
src/test/test_scheduler.c \
src/test/test_socks.c \
src/test/test_threads.c \
src/test/test_util.c \
src/test/test_config.c \
src/test/test_hs.c \
@ -63,6 +64,11 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS)
src_test_bench_SOURCES = \
src/test/bench.c
src_test_test_workqueue_SOURCES = \
src/test/test_workqueue.c
src_test_test_workqueue_CPPFLAGS= $(src_test_AM_CPPFLAGS)
src_test_test_workqueue_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \
@TOR_LDFLAGS_libevent@
src_test_test_LDADD = src/or/libtor-testing.a src/common/libor-testing.a \
@ -81,6 +87,15 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \
@TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \
@TOR_SYSTEMD_LIBS@
src_test_test_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \
@TOR_LDFLAGS_libevent@
src_test_test_workqueue_LDADD = src/or/libtor-testing.a \
src/common/libor-testing.a \
src/common/libor-crypto-testing.a $(LIBDONNA) \
src/common/libor-event-testing.a \
@TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \
@TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@
noinst_HEADERS+= \
src/test/fakechans.h \
src/test/test.h \

View File

@ -1258,6 +1258,23 @@ test_stats(void *arg)
tor_free(s);
}
static void *
passthrough_test_setup(const struct testcase_t *testcase)
{
return testcase->setup_data;
}
static int
passthrough_test_cleanup(const struct testcase_t *testcase, void *ptr)
{
(void)testcase;
(void)ptr;
return 1;
}
const struct testcase_setup_t passthrough_setup = {
passthrough_test_setup, passthrough_test_cleanup
};
#define ENT(name) \
{ #name, test_ ## name , 0, NULL, NULL }
#define FORK(name) \
@ -1297,6 +1314,7 @@ extern struct testcase_t cell_queue_tests[];
extern struct testcase_t options_tests[];
extern struct testcase_t socks_tests[];
extern struct testcase_t entrynodes_tests[];
extern struct testcase_t thread_tests[];
extern struct testcase_t extorport_tests[];
extern struct testcase_t controller_event_tests[];
extern struct testcase_t logging_tests[];
@ -1324,6 +1342,7 @@ static struct testgroup_t testgroups[] = {
{ "container/", container_tests },
{ "util/", util_tests },
{ "util/logging/", logging_tests },
{ "util/thread/", thread_tests },
{ "cellfmt/", cell_format_tests },
{ "cellqueue/", cell_queue_tests },
{ "dir/", dir_tests },

View File

@ -158,5 +158,7 @@ crypto_pk_t *pk_generate(int idx);
#define NS_MOCK(name) MOCK(name, NS(name))
#define NS_UNMOCK(name) UNMOCK(name)
extern const struct testcase_setup_t passthrough_setup;
#endif

View File

@ -1975,30 +1975,14 @@ test_crypto_siphash(void *arg)
;
}
static void *
pass_data_setup_fn(const struct testcase_t *testcase)
{
return testcase->setup_data;
}
static int
pass_data_cleanup_fn(const struct testcase_t *testcase, void *ptr)
{
(void)ptr;
(void)testcase;
return 1;
}
static const struct testcase_setup_t pass_data = {
pass_data_setup_fn, pass_data_cleanup_fn
};
#define CRYPTO_LEGACY(name) \
{ #name, test_crypto_ ## name , 0, NULL, NULL }
struct testcase_t crypto_tests[] = {
CRYPTO_LEGACY(formats),
CRYPTO_LEGACY(rng),
{ "aes_AES", test_crypto_aes, TT_FORK, &pass_data, (void*)"aes" },
{ "aes_EVP", test_crypto_aes, TT_FORK, &pass_data, (void*)"evp" },
{ "aes_AES", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"aes" },
{ "aes_EVP", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"evp" },
CRYPTO_LEGACY(sha),
CRYPTO_LEGACY(pk),
{ "pk_fingerprints", test_crypto_pk_fingerprints, TT_FORK, NULL, NULL },
@ -2006,23 +1990,25 @@ struct testcase_t crypto_tests[] = {
CRYPTO_LEGACY(dh),
CRYPTO_LEGACY(s2k_rfc2440),
#ifdef HAVE_LIBSCRYPT_H
{ "s2k_scrypt", test_crypto_s2k_general, 0, &pass_data,
{ "s2k_scrypt", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"scrypt" },
{ "s2k_scrypt_low", test_crypto_s2k_general, 0, &pass_data,
{ "s2k_scrypt_low", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"scrypt-low" },
#endif
{ "s2k_pbkdf2", test_crypto_s2k_general, 0, &pass_data,
{ "s2k_pbkdf2", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"pbkdf2" },
{ "s2k_rfc2440_general", test_crypto_s2k_general, 0, &pass_data,
{ "s2k_rfc2440_general", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"rfc2440" },
{ "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &pass_data,
{ "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"rfc2440-legacy" },
{ "s2k_errors", test_crypto_s2k_errors, 0, NULL, NULL },
{ "scrypt_vectors", test_crypto_scrypt_vectors, 0, NULL, NULL },
{ "pbkdf2_vectors", test_crypto_pbkdf2_vectors, 0, NULL, NULL },
{ "pwbox", test_crypto_pwbox, 0, NULL, NULL },
{ "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"aes" },
{ "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"evp" },
{ "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &passthrough_setup,
(void*)"aes" },
{ "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &passthrough_setup,
(void*)"evp" },
CRYPTO_LEGACY(base32_decode),
{ "kdf_TAP", test_crypto_kdf_TAP, 0, NULL, NULL },
{ "hkdf_sha256", test_crypto_hkdf_sha256, 0, NULL, NULL },

316
src/test/test_threads.c Normal file
View File

@ -0,0 +1,316 @@
/* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#include "orconfig.h"
#include "or.h"
#include "compat_threads.h"
#include "test.h"
/** mutex for thread test to stop the threads hitting data at the same time. */
static tor_mutex_t *thread_test_mutex_ = NULL;
/** mutexes for the thread test to make sure that the threads have to
* interleave somewhat. */
static tor_mutex_t *thread_test_start1_ = NULL,
*thread_test_start2_ = NULL;
/** Shared strmap for the thread test. */
static strmap_t *thread_test_strmap_ = NULL;
/** The name of thread1 for the thread test */
static char *thread1_name_ = NULL;
/** The name of thread2 for the thread test */
static char *thread2_name_ = NULL;
static int thread_fns_failed = 0;
static unsigned long thread_fn_tid1, thread_fn_tid2;
static void thread_test_func_(void* _s) ATTR_NORETURN;
/** How many iterations have the threads in the unit test run? */
static int t1_count = 0, t2_count = 0;
/** Helper function for threading unit tests: This function runs in a
* subthread. It grabs its own mutex (start1 or start2) to make sure that it
* should start, then it repeatedly alters _test_thread_strmap protected by
* thread_test_mutex_. */
static void
thread_test_func_(void* _s)
{
char *s = _s;
int i, *count;
tor_mutex_t *m;
char buf[64];
char **cp;
if (!strcmp(s, "thread 1")) {
m = thread_test_start1_;
cp = &thread1_name_;
count = &t1_count;
thread_fn_tid1 = tor_get_thread_id();
} else {
m = thread_test_start2_;
cp = &thread2_name_;
count = &t2_count;
thread_fn_tid2 = tor_get_thread_id();
}
tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id());
*cp = tor_strdup(buf);
tor_mutex_acquire(m);
for (i=0; i<10000; ++i) {
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, "last to run", *cp);
++*count;
tor_mutex_release(thread_test_mutex_);
}
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, s, *cp);
if (in_main_thread())
++thread_fns_failed;
tor_mutex_release(thread_test_mutex_);
tor_mutex_release(m);
spawn_exit();
}
/** Run unit tests for threading logic. */
static void
test_threads_basic(void *arg)
{
char *s1 = NULL, *s2 = NULL;
int done = 0, timedout = 0;
time_t started;
#ifndef _WIN32
struct timeval tv;
tv.tv_sec=0;
tv.tv_usec=100*1000;
#endif
(void) arg;
set_main_thread();
thread_test_mutex_ = tor_mutex_new();
thread_test_start1_ = tor_mutex_new();
thread_test_start2_ = tor_mutex_new();
thread_test_strmap_ = strmap_new();
s1 = tor_strdup("thread 1");
s2 = tor_strdup("thread 2");
tor_mutex_acquire(thread_test_start1_);
tor_mutex_acquire(thread_test_start2_);
spawn_func(thread_test_func_, s1);
spawn_func(thread_test_func_, s2);
tor_mutex_release(thread_test_start2_);
tor_mutex_release(thread_test_start1_);
started = time(NULL);
while (!done) {
tor_mutex_acquire(thread_test_mutex_);
strmap_assert_ok(thread_test_strmap_);
if (strmap_get(thread_test_strmap_, "thread 1") &&
strmap_get(thread_test_strmap_, "thread 2")) {
done = 1;
} else if (time(NULL) > started + 150) {
timedout = done = 1;
}
tor_mutex_release(thread_test_mutex_);
#ifndef _WIN32
/* Prevent the main thread from starving the worker threads. */
select(0, NULL, NULL, NULL, &tv);
#endif
}
tor_mutex_acquire(thread_test_start1_);
tor_mutex_release(thread_test_start1_);
tor_mutex_acquire(thread_test_start2_);
tor_mutex_release(thread_test_start2_);
tor_mutex_free(thread_test_mutex_);
if (timedout) {
printf("\nTimed out: %d %d", t1_count, t2_count);
tt_assert(strmap_get(thread_test_strmap_, "thread 1"));
tt_assert(strmap_get(thread_test_strmap_, "thread 2"));
tt_assert(!timedout);
}
/* different thread IDs. */
tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"),
strmap_get(thread_test_strmap_, "thread 2")));
tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"),
strmap_get(thread_test_strmap_, "last to run")) ||
!strcmp(strmap_get(thread_test_strmap_, "thread 2"),
strmap_get(thread_test_strmap_, "last to run")));
tt_int_op(thread_fns_failed, ==, 0);
tt_int_op(thread_fn_tid1, !=, thread_fn_tid2);
done:
tor_free(s1);
tor_free(s2);
tor_free(thread1_name_);
tor_free(thread2_name_);
if (thread_test_strmap_)
strmap_free(thread_test_strmap_, NULL);
if (thread_test_start1_)
tor_mutex_free(thread_test_start1_);
if (thread_test_start2_)
tor_mutex_free(thread_test_start2_);
}
typedef struct cv_testinfo_s {
tor_cond_t *cond;
tor_mutex_t *mutex;
int value;
int addend;
int shutdown;
int n_shutdown;
int n_wakeups;
int n_timeouts;
int n_threads;
const struct timeval *tv;
} cv_testinfo_t;
static cv_testinfo_t *
cv_testinfo_new(void)
{
cv_testinfo_t *i = tor_malloc_zero(sizeof(*i));
i->cond = tor_cond_new();
i->mutex = tor_mutex_new_nonrecursive();
return i;
}
static void
cv_testinfo_free(cv_testinfo_t *i)
{
if (!i)
return;
tor_cond_free(i->cond);
tor_mutex_free(i->mutex);
tor_free(i);
}
static void cv_test_thr_fn_(void *arg) ATTR_NORETURN;
static void
cv_test_thr_fn_(void *arg)
{
cv_testinfo_t *i = arg;
int tid, r;
tor_mutex_acquire(i->mutex);
tid = i->n_threads++;
tor_mutex_release(i->mutex);
(void) tid;
tor_mutex_acquire(i->mutex);
while (1) {
if (i->addend) {
i->value += i->addend;
i->addend = 0;
}
if (i->shutdown) {
++i->n_shutdown;
i->shutdown = 0;
tor_mutex_release(i->mutex);
spawn_exit();
}
r = tor_cond_wait(i->cond, i->mutex, i->tv);
++i->n_wakeups;
if (r == 1) {
++i->n_timeouts;
tor_mutex_release(i->mutex);
spawn_exit();
}
}
}
static void
test_threads_conditionvar(void *arg)
{
cv_testinfo_t *ti=NULL;
const struct timeval msec100 = { 0, 100*1000 };
const int timeout = !strcmp(arg, "tv");
ti = cv_testinfo_new();
if (timeout) {
ti->tv = &msec100;
}
spawn_func(cv_test_thr_fn_, ti);
spawn_func(cv_test_thr_fn_, ti);
spawn_func(cv_test_thr_fn_, ti);
spawn_func(cv_test_thr_fn_, ti);
tor_mutex_acquire(ti->mutex);
ti->addend = 7;
ti->shutdown = 1;
tor_cond_signal_one(ti->cond);
tor_mutex_release(ti->mutex);
#define SPIN() \
while (1) { \
tor_mutex_acquire(ti->mutex); \
if (ti->addend == 0) { \
break; \
} \
tor_mutex_release(ti->mutex); \
}
SPIN();
ti->addend = 30;
ti->shutdown = 1;
tor_cond_signal_all(ti->cond);
tor_mutex_release(ti->mutex);
SPIN();
ti->addend = 1000;
if (! timeout) ti->shutdown = 1;
tor_cond_signal_one(ti->cond);
tor_mutex_release(ti->mutex);
SPIN();
ti->addend = 300;
if (! timeout) ti->shutdown = 1;
tor_cond_signal_all(ti->cond);
tor_mutex_release(ti->mutex);
SPIN();
tor_mutex_release(ti->mutex);
tt_int_op(ti->value, ==, 1337);
if (!timeout) {
tt_int_op(ti->n_shutdown, ==, 4);
} else {
#ifdef _WIN32
Sleep(500); /* msec */
#elif defined(HAVE_USLEEP)
usleep(500*1000); /* usec */
#else
{
struct tv = { 0, 500*1000 };
select(0, NULL, NULL, NULL, &tv);
}
#endif
tor_mutex_acquire(ti->mutex);
tt_int_op(ti->n_shutdown, ==, 2);
tt_int_op(ti->n_timeouts, ==, 2);
tor_mutex_release(ti->mutex);
}
done:
cv_testinfo_free(ti);
}
#define THREAD_TEST(name) \
{ #name, test_threads_##name, TT_FORK, NULL, NULL }
struct testcase_t thread_tests[] = {
THREAD_TEST(basic),
{ "conditionvar", test_threads_conditionvar, TT_FORK,
&passthrough_setup, (void*)"no-tv" },
{ "conditionvar_timeout", test_threads_conditionvar, TT_FORK,
&passthrough_setup, (void*)"tv" },
END_OF_TESTCASES
};

View File

@ -1607,142 +1607,6 @@ test_util_pow2(void *arg)
;
}
/** mutex for thread test to stop the threads hitting data at the same time. */
static tor_mutex_t *thread_test_mutex_ = NULL;
/** mutexes for the thread test to make sure that the threads have to
* interleave somewhat. */
static tor_mutex_t *thread_test_start1_ = NULL,
*thread_test_start2_ = NULL;
/** Shared strmap for the thread test. */
static strmap_t *thread_test_strmap_ = NULL;
/** The name of thread1 for the thread test */
static char *thread1_name_ = NULL;
/** The name of thread2 for the thread test */
static char *thread2_name_ = NULL;
static void thread_test_func_(void* _s) ATTR_NORETURN;
/** How many iterations have the threads in the unit test run? */
static int t1_count = 0, t2_count = 0;
/** Helper function for threading unit tests: This function runs in a
* subthread. It grabs its own mutex (start1 or start2) to make sure that it
* should start, then it repeatedly alters _test_thread_strmap protected by
* thread_test_mutex_. */
static void
thread_test_func_(void* _s)
{
char *s = _s;
int i, *count;
tor_mutex_t *m;
char buf[64];
char **cp;
if (!strcmp(s, "thread 1")) {
m = thread_test_start1_;
cp = &thread1_name_;
count = &t1_count;
} else {
m = thread_test_start2_;
cp = &thread2_name_;
count = &t2_count;
}
tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id());
*cp = tor_strdup(buf);
tor_mutex_acquire(m);
for (i=0; i<10000; ++i) {
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, "last to run", *cp);
++*count;
tor_mutex_release(thread_test_mutex_);
}
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, s, *cp);
tor_mutex_release(thread_test_mutex_);
tor_mutex_release(m);
spawn_exit();
}
/** Run unit tests for threading logic. */
static void
test_util_threads(void *arg)
{
char *s1 = NULL, *s2 = NULL;
int done = 0, timedout = 0;
time_t started;
#ifndef _WIN32
struct timeval tv;
tv.tv_sec=0;
tv.tv_usec=100*1000;
#endif
(void)arg;
thread_test_mutex_ = tor_mutex_new();
thread_test_start1_ = tor_mutex_new();
thread_test_start2_ = tor_mutex_new();
thread_test_strmap_ = strmap_new();
s1 = tor_strdup("thread 1");
s2 = tor_strdup("thread 2");
tor_mutex_acquire(thread_test_start1_);
tor_mutex_acquire(thread_test_start2_);
spawn_func(thread_test_func_, s1);
spawn_func(thread_test_func_, s2);
tor_mutex_release(thread_test_start2_);
tor_mutex_release(thread_test_start1_);
started = time(NULL);
while (!done) {
tor_mutex_acquire(thread_test_mutex_);
strmap_assert_ok(thread_test_strmap_);
if (strmap_get(thread_test_strmap_, "thread 1") &&
strmap_get(thread_test_strmap_, "thread 2")) {
done = 1;
} else if (time(NULL) > started + 150) {
timedout = done = 1;
}
tor_mutex_release(thread_test_mutex_);
#ifndef _WIN32
/* Prevent the main thread from starving the worker threads. */
select(0, NULL, NULL, NULL, &tv);
#endif
}
tor_mutex_acquire(thread_test_start1_);
tor_mutex_release(thread_test_start1_);
tor_mutex_acquire(thread_test_start2_);
tor_mutex_release(thread_test_start2_);
tor_mutex_free(thread_test_mutex_);
if (timedout) {
printf("\nTimed out: %d %d", t1_count, t2_count);
tt_assert(strmap_get(thread_test_strmap_, "thread 1"));
tt_assert(strmap_get(thread_test_strmap_, "thread 2"));
tt_assert(!timedout);
}
/* different thread IDs. */
tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"),
strmap_get(thread_test_strmap_, "thread 2")));
tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"),
strmap_get(thread_test_strmap_, "last to run")) ||
!strcmp(strmap_get(thread_test_strmap_, "thread 2"),
strmap_get(thread_test_strmap_, "last to run")));
done:
tor_free(s1);
tor_free(s2);
tor_free(thread1_name_);
tor_free(thread2_name_);
if (thread_test_strmap_)
strmap_free(thread_test_strmap_, NULL);
if (thread_test_start1_)
tor_mutex_free(thread_test_start1_);
if (thread_test_start2_)
tor_mutex_free(thread_test_start2_);
}
/** Run unit tests for compression functions */
static void
test_util_gzip(void *arg)
@ -4783,23 +4647,6 @@ test_util_socket(void *arg)
tor_close_socket(fd4);
}
static void *
socketpair_test_setup(const struct testcase_t *testcase)
{
return testcase->setup_data;
}
static int
socketpair_test_cleanup(const struct testcase_t *testcase, void *ptr)
{
(void)testcase;
(void)ptr;
return 1;
}
static const struct testcase_setup_t socketpair_setup = {
socketpair_test_setup, socketpair_test_cleanup
};
/* Test for socketpair and ersatz_socketpair(). We test them both, since
* the latter is a tolerably good way to exersize tor_accept_socket(). */
static void
@ -4928,7 +4775,6 @@ struct testcase_t util_tests[] = {
UTIL_LEGACY(memarea),
UTIL_LEGACY(control_formats),
UTIL_LEGACY(mmap),
UTIL_LEGACY(threads),
UTIL_LEGACY(sscanf),
UTIL_LEGACY(format_time_interval),
UTIL_LEGACY(path_is_relative),
@ -4975,10 +4821,10 @@ struct testcase_t util_tests[] = {
UTIL_TEST(mathlog, 0),
UTIL_TEST(weak_random, 0),
UTIL_TEST(socket, TT_FORK),
{ "socketpair", test_util_socketpair, TT_FORK, &socketpair_setup,
{ "socketpair", test_util_socketpair, TT_FORK, &passthrough_setup,
(void*)"0" },
{ "socketpair_ersatz", test_util_socketpair, TT_FORK,
&socketpair_setup, (void*)"1" },
&passthrough_setup, (void*)"1" },
UTIL_TEST(max_mem, 0),
UTIL_TEST(hostname_validation, 0),
UTIL_TEST(ipv4_validation, 0),

409
src/test/test_workqueue.c Normal file
View File

@ -0,0 +1,409 @@
/* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#include "or.h"
#include "compat_threads.h"
#include "onion.h"
#include "workqueue.h"
#include "crypto.h"
#include "crypto_curve25519.h"
#include "compat_libevent.h"
#include <stdio.h>
#ifdef HAVE_EVENT2_EVENT_H
#include <event2/event.h>
#else
#include <event.h>
#endif
static int opt_verbose = 0;
static int opt_n_threads = 8;
static int opt_n_items = 10000;
static int opt_n_inflight = 1000;
static int opt_n_lowwater = 250;
static int opt_n_cancel = 0;
static int opt_ratio_rsa = 5;
#ifdef TRACK_RESPONSES
tor_mutex_t bitmap_mutex;
int handled_len;
bitarray_t *handled;
#endif
typedef struct state_s {
int magic;
int n_handled;
crypto_pk_t *rsa;
curve25519_secret_key_t ecdh;
int is_shutdown;
} state_t;
typedef struct rsa_work_s {
int serial;
uint8_t msg[128];
uint8_t msglen;
} rsa_work_t;
typedef struct ecdh_work_s {
int serial;
union {
curve25519_public_key_t pk;
uint8_t msg[32];
} u;
} ecdh_work_t;
static void
mark_handled(int serial)
{
#ifdef TRACK_RESPONSES
tor_mutex_acquire(&bitmap_mutex);
tor_assert(serial < handled_len);
tor_assert(! bitarray_is_set(handled, serial));
bitarray_set(handled, serial);
tor_mutex_release(&bitmap_mutex);
#else
(void)serial;
#endif
}
static int
workqueue_do_rsa(void *state, void *work)
{
rsa_work_t *rw = work;
state_t *st = state;
crypto_pk_t *rsa = st->rsa;
uint8_t sig[256];
int len;
tor_assert(st->magic == 13371337);
len = crypto_pk_private_sign(rsa, (char*)sig, 256,
(char*)rw->msg, rw->msglen);
if (len < 0) {
rw->msglen = 0;
return WQ_RPL_ERROR;
}
memset(rw->msg, 0, sizeof(rw->msg));
rw->msglen = len;
memcpy(rw->msg, sig, len);
++st->n_handled;
mark_handled(rw->serial);
return WQ_RPL_REPLY;
}
static int
workqueue_do_shutdown(void *state, void *work)
{
(void)state;
(void)work;
crypto_pk_free(((state_t*)state)->rsa);
tor_free(state);
return WQ_RPL_SHUTDOWN;
}
static int
workqueue_do_ecdh(void *state, void *work)
{
ecdh_work_t *ew = work;
uint8_t output[CURVE25519_OUTPUT_LEN];
state_t *st = state;
tor_assert(st->magic == 13371337);
curve25519_handshake(output, &st->ecdh, &ew->u.pk);
memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
++st->n_handled;
mark_handled(ew->serial);
return WQ_RPL_REPLY;
}
static void *
new_state(void *arg)
{
state_t *st;
(void)arg;
st = tor_malloc(sizeof(*st));
/* Every thread gets its own keys. not a problem for benchmarking */
st->rsa = crypto_pk_new();
if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
crypto_pk_free(st->rsa);
tor_free(st);
return NULL;
}
curve25519_secret_key_generate(&st->ecdh, 0);
st->magic = 13371337;
return st;
}
static void
free_state(void *arg)
{
state_t *st = arg;
crypto_pk_free(st->rsa);
tor_free(st);
}
static tor_weak_rng_t weak_rng;
static int n_sent = 0;
static int rsa_sent = 0;
static int ecdh_sent = 0;
static int n_received = 0;
#ifdef TRACK_RESPONSES
bitarray_t *received;
#endif
static void
handle_reply(void *arg)
{
#ifdef TRACK_RESPONSES
rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
tor_assert(! bitarray_is_set(received, rw->serial));
bitarray_set(received,rw->serial);
#endif
tor_free(arg);
++n_received;
}
static workqueue_entry_t *
add_work(threadpool_t *tp)
{
int add_rsa =
opt_ratio_rsa == 0 ||
tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
if (add_rsa) {
rsa_work_t *w = tor_malloc_zero(sizeof(*w));
w->serial = n_sent++;
crypto_rand((char*)w->msg, 20);
w->msglen = 20;
++rsa_sent;
return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w);
} else {
ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
w->serial = n_sent++;
/* Not strictly right, but this is just for benchmarks. */
crypto_rand((char*)w->u.pk.public_key, 32);
++ecdh_sent;
return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
}
}
static int n_failed_cancel = 0;
static int n_successful_cancel = 0;
static int
add_n_work_items(threadpool_t *tp, int n)
{
int n_queued = 0;
int n_try_cancel = 0, i;
workqueue_entry_t **to_cancel;
workqueue_entry_t *ent;
to_cancel = tor_malloc(sizeof(workqueue_entry_t*) * opt_n_cancel);
while (n_queued++ < n) {
ent = add_work(tp);
if (! ent) {
tor_event_base_loopexit(tor_libevent_get_base(), NULL);
return -1;
}
if (n_try_cancel < opt_n_cancel &&
tor_weak_random_range(&weak_rng, n) < opt_n_cancel) {
to_cancel[n_try_cancel++] = ent;
}
}
for (i = 0; i < n_try_cancel; ++i) {
void *work = workqueue_entry_cancel(to_cancel[i]);
if (! work) {
n_failed_cancel++;
} else {
n_successful_cancel++;
tor_free(work);
}
}
tor_free(to_cancel);
return 0;
}
static int shutting_down = 0;
static void
replysock_readable_cb(tor_socket_t sock, short what, void *arg)
{
threadpool_t *tp = arg;
replyqueue_t *rq = threadpool_get_replyqueue(tp);
int old_r = n_received;
(void) sock;
(void) what;
replyqueue_process(rq);
if (old_r == n_received)
return;
if (opt_verbose) {
printf("%d / %d", n_received, n_sent);
if (opt_n_cancel)
printf(" (%d cancelled, %d uncancellable)",
n_successful_cancel, n_failed_cancel);
puts("");
}
#ifdef TRACK_RESPONSES
tor_mutex_acquire(&bitmap_mutex);
for (i = 0; i < opt_n_items; ++i) {
if (bitarray_is_set(received, i))
putc('o', stdout);
else if (bitarray_is_set(handled, i))
putc('!', stdout);
else
putc('.', stdout);
}
puts("");
tor_mutex_release(&bitmap_mutex);
#endif
if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
int n_to_send = n_received + opt_n_inflight - n_sent;
if (n_to_send > opt_n_items - n_sent)
n_to_send = opt_n_items - n_sent;
add_n_work_items(tp, n_to_send);
}
if (shutting_down == 0 &&
n_received+n_successful_cancel == n_sent &&
n_sent >= opt_n_items) {
shutting_down = 1;
threadpool_queue_update(tp, NULL,
workqueue_do_shutdown, NULL, NULL);
}
}
static void
help(void)
{
puts(
"Options:\n"
" -N <items> Run this many items of work\n"
" -T <threads> Use this many threads\n"
" -I <inflight> Have no more than this many requests queued at once\n"
" -L <lowwater> Add items whenever fewer than this many are pending\n"
" -C <cancel> Try to cancel N items of every batch that we add\n"
" -R <ratio> Make one out of this many items be a slow (RSA) one\n"
" --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
" Disable one of the alert_socket backends.");
}
int
main(int argc, char **argv)
{
replyqueue_t *rq;
threadpool_t *tp;
int i;
tor_libevent_cfg evcfg;
struct event *ev;
uint32_t as_flags = 0;
for (i = 1; i < argc; ++i) {
if (!strcmp(argv[i], "-v")) {
opt_verbose = 1;
} else if (!strcmp(argv[i], "-T") && i+1<argc) {
opt_n_threads = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-N") && i+1<argc) {
opt_n_items = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-I") && i+1<argc) {
opt_n_inflight = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-L") && i+1<argc) {
opt_n_lowwater = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-R") && i+1<argc) {
opt_ratio_rsa = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-C") && i+1<argc) {
opt_n_cancel = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--no-eventfd2")) {
as_flags |= ASOCKS_NOEVENTFD2;
} else if (!strcmp(argv[i], "--no-eventfd")) {
as_flags |= ASOCKS_NOEVENTFD;
} else if (!strcmp(argv[i], "--no-pipe2")) {
as_flags |= ASOCKS_NOPIPE2;
} else if (!strcmp(argv[i], "--no-pipe")) {
as_flags |= ASOCKS_NOPIPE;
} else if (!strcmp(argv[i], "--no-socketpair")) {
as_flags |= ASOCKS_NOSOCKETPAIR;
} else if (!strcmp(argv[i], "-h")) {
help();
return 0;
} else {
help();
return 1;
}
}
if (opt_n_threads < 1 ||
opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
opt_n_cancel > opt_n_inflight ||
opt_ratio_rsa < 0) {
help();
return 1;
}
init_logging(1);
crypto_global_init(1, NULL, NULL);
crypto_seed_rng(1);
rq = replyqueue_new(as_flags);
tor_assert(rq);
tp = threadpool_new(opt_n_threads,
rq, new_state, free_state, NULL);
tor_assert(tp);
crypto_seed_weak_rng(&weak_rng);
memset(&evcfg, 0, sizeof(evcfg));
tor_libevent_initialize(&evcfg);
ev = tor_event_new(tor_libevent_get_base(),
replyqueue_get_socket(rq), EV_READ|EV_PERSIST,
replysock_readable_cb, tp);
event_add(ev, NULL);
#ifdef TRACK_RESPONSES
handled = bitarray_init_zero(opt_n_items);
received = bitarray_init_zero(opt_n_items);
tor_mutex_init(&bitmap_mutex);
handled_len = opt_n_items;
#endif
for (i = 0; i < opt_n_inflight; ++i) {
if (! add_work(tp)) {
puts("Couldn't add work.");
return 1;
}
}
{
struct timeval limit = { 30, 0 };
tor_event_base_loopexit(tor_libevent_get_base(), &limit);
}
event_base_loop(tor_libevent_get_base(), 0);
if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
printf("%d vs %d\n", n_sent, opt_n_items);
printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent);
puts("FAIL");
return 1;
} else {
puts("OK");
return 0;
}
}