diff --git a/.gitignore b/.gitignore index 9ddd0c5385..e63576cfd4 100644 --- a/.gitignore +++ b/.gitignore @@ -163,10 +163,12 @@ cscope.* /src/test/test-bt-cl /src/test/test-child /src/test/test-ntor-cl +/src/test/test_workqueue /src/test/test.exe /src/test/test-bt-cl.exe /src/test/test-child.exe /src/test/test-ntor-cl.exe +/src/test/test_workqueue.exe # /src/tools/ /src/tools/tor-checkkey diff --git a/changes/better_workqueues b/changes/better_workqueues new file mode 100644 index 0000000000..32c984cb71 --- /dev/null +++ b/changes/better_workqueues @@ -0,0 +1,10 @@ + o Major features: + - Refactor the CPU worker implementation for better performance by + avoiding the kernel and lengthening pipelines. The original + implementation used sockets to transfer data from the main thread + to the worker threads, and didn't allow any thread to be assigned + more than a single piece of work at once. The new implementation + avoids communications overhead by making requests in shared + memory, avoiding kernel IO where possible, and keeping more + request in flight at once. Resolves issue #9682. + diff --git a/configure.ac b/configure.ac index efeea0658e..9aac1e8c55 100644 --- a/configure.ac +++ b/configure.ac @@ -400,6 +400,10 @@ fi AC_SEARCH_LIBS(pthread_create, [pthread]) AC_SEARCH_LIBS(pthread_detach, [pthread]) +AM_CONDITIONAL(THREADS_WIN32, test "$enable_threads" = "yes" && test "$bwin32" = "true") +AM_CONDITIONAL(THREADS_PTHREADS, test "$enable_threads" = "yes" && test "$bwin32" = "false") +AM_CONDITIONAL(THREADS_NONE, test "$enable_threads" != "yes") + dnl ------------------------------------------------------------------- dnl Check for functions before libevent, since libevent-1.2 apparently dnl exports strlcpy without defining it in a header. @@ -410,6 +414,7 @@ AC_CHECK_FUNCS( backtrace \ backtrace_symbols_fd \ clock_gettime \ + eventfd \ flock \ ftime \ getaddrinfo \ @@ -424,6 +429,8 @@ AC_CHECK_FUNCS( localtime_r \ lround \ memmem \ + pipe \ + pipe2 \ prctl \ rint \ sigaction \ @@ -437,7 +444,7 @@ AC_CHECK_FUNCS( sysconf \ sysctl \ uname \ - usleep \ + usleep \ vasprintf \ _vscprintf ) @@ -965,6 +972,7 @@ AC_CHECK_HEADERS( netinet/in6.h \ pwd.h \ stdint.h \ + sys/eventfd.h \ sys/file.h \ sys/ioctl.h \ sys/limits.h \ diff --git a/src/common/compat.c b/src/common/compat.c index 6d36321193..5575316b2b 100644 --- a/src/common/compat.c +++ b/src/common/compat.c @@ -27,7 +27,6 @@ #include "compat.h" #ifdef _WIN32 -#include #include #include #endif @@ -2544,109 +2543,6 @@ get_uname(void) * Process control */ -#if defined(USE_PTHREADS) -/** Wraps a void (*)(void*) function and its argument so we can - * invoke them in a way pthreads would expect. - */ -typedef struct tor_pthread_data_t { - void (*func)(void *); - void *data; -} tor_pthread_data_t; -/** Given a tor_pthread_data_t _data, call _data->func(d->data) - * and free _data. Used to make sure we can call functions the way pthread - * expects. */ -static void * -tor_pthread_helper_fn(void *_data) -{ - tor_pthread_data_t *data = _data; - void (*func)(void*); - void *arg; - /* mask signals to worker threads to avoid SIGPIPE, etc */ - sigset_t sigs; - /* We're in a subthread; don't handle any signals here. */ - sigfillset(&sigs); - pthread_sigmask(SIG_SETMASK, &sigs, NULL); - - func = data->func; - arg = data->data; - tor_free(_data); - func(arg); - return NULL; -} -/** - * A pthread attribute to make threads start detached. - */ -static pthread_attr_t attr_detached; -/** True iff we've called tor_threads_init() */ -static int threads_initialized = 0; -#endif - -/** Minimalist interface to run a void function in the background. On - * Unix calls fork, on win32 calls beginthread. Returns -1 on failure. - * func should not return, but rather should call spawn_exit. - * - * NOTE: if data is used, it should not be allocated on the stack, - * since in a multithreaded environment, there is no way to be sure that - * the caller's stack will still be around when the called function is - * running. - */ -int -spawn_func(void (*func)(void *), void *data) -{ -#if defined(USE_WIN32_THREADS) - int rv; - rv = (int)_beginthread(func, 0, data); - if (rv == (int)-1) - return -1; - return 0; -#elif defined(USE_PTHREADS) - pthread_t thread; - tor_pthread_data_t *d; - if (PREDICT_UNLIKELY(!threads_initialized)) - tor_threads_init(); - d = tor_malloc(sizeof(tor_pthread_data_t)); - d->data = data; - d->func = func; - if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d)) - return -1; - return 0; -#else - pid_t pid; - pid = fork(); - if (pid<0) - return -1; - if (pid==0) { - /* Child */ - func(data); - tor_assert(0); /* Should never reach here. */ - return 0; /* suppress "control-reaches-end-of-non-void" warning. */ - } else { - /* Parent */ - return 0; - } -#endif -} - -/** End the current thread/process. - */ -void -spawn_exit(void) -{ -#if defined(USE_WIN32_THREADS) - _endthread(); - //we should never get here. my compiler thinks that _endthread returns, this - //is an attempt to fool it. - tor_assert(0); - _exit(0); -#elif defined(USE_PTHREADS) - pthread_exit(NULL); -#else - /* http://www.erlenstar.demon.co.uk/unix/faq_2.html says we should - * call _exit, not exit, from child processes. */ - _exit(0); -#endif -} - /** Implementation logic for compute_num_cpus(). */ static int compute_num_cpus_impl(void) @@ -2935,280 +2831,6 @@ tor_gmtime_r(const time_t *timep, struct tm *result) } #endif -#if defined(USE_WIN32_THREADS) -void -tor_mutex_init(tor_mutex_t *m) -{ - InitializeCriticalSection(&m->mutex); -} -void -tor_mutex_uninit(tor_mutex_t *m) -{ - DeleteCriticalSection(&m->mutex); -} -void -tor_mutex_acquire(tor_mutex_t *m) -{ - tor_assert(m); - EnterCriticalSection(&m->mutex); -} -void -tor_mutex_release(tor_mutex_t *m) -{ - LeaveCriticalSection(&m->mutex); -} -unsigned long -tor_get_thread_id(void) -{ - return (unsigned long)GetCurrentThreadId(); -} -#elif defined(USE_PTHREADS) -/** A mutex attribute that we're going to use to tell pthreads that we want - * "reentrant" mutexes (i.e., once we can re-lock if we're already holding - * them.) */ -static pthread_mutexattr_t attr_reentrant; -/** Initialize mutex so it can be locked. Every mutex must be set - * up with tor_mutex_init() or tor_mutex_new(); not both. */ -void -tor_mutex_init(tor_mutex_t *mutex) -{ - int err; - if (PREDICT_UNLIKELY(!threads_initialized)) - tor_threads_init(); - err = pthread_mutex_init(&mutex->mutex, &attr_reentrant); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d creating a mutex.", err); - tor_fragile_assert(); - } -} -/** Wait until m is free, then acquire it. */ -void -tor_mutex_acquire(tor_mutex_t *m) -{ - int err; - tor_assert(m); - err = pthread_mutex_lock(&m->mutex); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d locking a mutex.", err); - tor_fragile_assert(); - } -} -/** Release the lock m so another thread can have it. */ -void -tor_mutex_release(tor_mutex_t *m) -{ - int err; - tor_assert(m); - err = pthread_mutex_unlock(&m->mutex); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d unlocking a mutex.", err); - tor_fragile_assert(); - } -} -/** Clean up the mutex m so that it no longer uses any system - * resources. Does not free m. This function must only be called on - * mutexes from tor_mutex_init(). */ -void -tor_mutex_uninit(tor_mutex_t *m) -{ - int err; - tor_assert(m); - err = pthread_mutex_destroy(&m->mutex); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d destroying a mutex.", err); - tor_fragile_assert(); - } -} -/** Return an integer representing this thread. */ -unsigned long -tor_get_thread_id(void) -{ - union { - pthread_t thr; - unsigned long id; - } r; - r.thr = pthread_self(); - return r.id; -} -#endif - -/** Return a newly allocated, ready-for-use mutex. */ -tor_mutex_t * -tor_mutex_new(void) -{ - tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t)); - tor_mutex_init(m); - return m; -} -/** Release all storage and system resources held by m. */ -void -tor_mutex_free(tor_mutex_t *m) -{ - if (!m) - return; - tor_mutex_uninit(m); - tor_free(m); -} - -/* Conditions. */ -#ifdef USE_PTHREADS -#if 0 -/** Cross-platform condition implementation. */ -struct tor_cond_t { - pthread_cond_t cond; -}; -/** Return a newly allocated condition, with nobody waiting on it. */ -tor_cond_t * -tor_cond_new(void) -{ - tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t)); - if (pthread_cond_init(&cond->cond, NULL)) { - tor_free(cond); - return NULL; - } - return cond; -} -/** Release all resources held by cond. */ -void -tor_cond_free(tor_cond_t *cond) -{ - if (!cond) - return; - if (pthread_cond_destroy(&cond->cond)) { - log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno)); - return; - } - tor_free(cond); -} -/** Wait until one of the tor_cond_signal functions is called on cond. - * All waiters on the condition must wait holding the same mutex. - * Returns 0 on success, negative on failure. */ -int -tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex) -{ - return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0; -} -/** Wake up one of the waiters on cond. */ -void -tor_cond_signal_one(tor_cond_t *cond) -{ - pthread_cond_signal(&cond->cond); -} -/** Wake up all of the waiters on cond. */ -void -tor_cond_signal_all(tor_cond_t *cond) -{ - pthread_cond_broadcast(&cond->cond); -} -#endif -/** Set up common structures for use by threading. */ -void -tor_threads_init(void) -{ - if (!threads_initialized) { - pthread_mutexattr_init(&attr_reentrant); - pthread_mutexattr_settype(&attr_reentrant, PTHREAD_MUTEX_RECURSIVE); - tor_assert(0==pthread_attr_init(&attr_detached)); - tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1)); - threads_initialized = 1; - set_main_thread(); - } -} -#elif defined(USE_WIN32_THREADS) -#if 0 -static DWORD cond_event_tls_index; -struct tor_cond_t { - CRITICAL_SECTION mutex; - smartlist_t *events; -}; -tor_cond_t * -tor_cond_new(void) -{ - tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t)); - InitializeCriticalSection(&cond->mutex); - cond->events = smartlist_new(); - return cond; -} -void -tor_cond_free(tor_cond_t *cond) -{ - if (!cond) - return; - DeleteCriticalSection(&cond->mutex); - /* XXXX notify? */ - smartlist_free(cond->events); - tor_free(cond); -} -int -tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex) -{ - HANDLE event; - int r; - tor_assert(cond); - tor_assert(mutex); - event = TlsGetValue(cond_event_tls_index); - if (!event) { - event = CreateEvent(0, FALSE, FALSE, NULL); - TlsSetValue(cond_event_tls_index, event); - } - EnterCriticalSection(&cond->mutex); - - tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT); - tor_assert(!smartlist_contains(cond->events, event)); - smartlist_add(cond->events, event); - - LeaveCriticalSection(&cond->mutex); - - tor_mutex_release(mutex); - r = WaitForSingleObject(event, INFINITE); - tor_mutex_acquire(mutex); - - switch (r) { - case WAIT_OBJECT_0: /* we got the mutex normally. */ - break; - case WAIT_ABANDONED: /* holding thread exited. */ - case WAIT_TIMEOUT: /* Should never happen. */ - tor_assert(0); - break; - case WAIT_FAILED: - log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError()); - } - return 0; -} -void -tor_cond_signal_one(tor_cond_t *cond) -{ - HANDLE event; - tor_assert(cond); - - EnterCriticalSection(&cond->mutex); - - if ((event = smartlist_pop_last(cond->events))) - SetEvent(event); - - LeaveCriticalSection(&cond->mutex); -} -void -tor_cond_signal_all(tor_cond_t *cond) -{ - tor_assert(cond); - - EnterCriticalSection(&cond->mutex); - SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event)); - smartlist_clear(cond->events); - LeaveCriticalSection(&cond->mutex); -} -#endif -void -tor_threads_init(void) -{ -#if 0 - cond_event_tls_index = TlsAlloc(); -#endif - set_main_thread(); -} -#endif - #if defined(HAVE_MLOCKALL) && HAVE_DECL_MLOCKALL && defined(RLIMIT_MEMLOCK) /** Attempt to raise the current and max rlimit to infinity for our process. * This only needs to be done once and can probably only be done when we have @@ -3292,23 +2914,6 @@ tor_mlockall(void) #endif } -/** Identity of the "main" thread */ -static unsigned long main_thread_id = -1; - -/** Start considering the current thread to be the 'main thread'. This has - * no effect on anything besides in_main_thread(). */ -void -set_main_thread(void) -{ - main_thread_id = tor_get_thread_id(); -} -/** Return true iff called from the main thread. */ -int -in_main_thread(void) -{ - return main_thread_id == tor_get_thread_id(); -} - /** * On Windows, WSAEWOULDBLOCK is not always correct: when you see it, * you need to ask the socket for its actual errno. Also, you need to diff --git a/src/common/compat.h b/src/common/compat.h index 04e8cb267c..23f8614196 100644 --- a/src/common/compat.h +++ b/src/common/compat.h @@ -36,9 +36,6 @@ #ifdef HAVE_STRING_H #include #endif -#if defined(HAVE_PTHREAD_H) && !defined(_WIN32) -#include -#endif #include #ifdef HAVE_SYS_RESOURCE_H #include @@ -642,61 +639,10 @@ char **get_environment(void); int get_total_system_memory(size_t *mem_out); -int spawn_func(void (*func)(void *), void *data); -void spawn_exit(void) ATTR_NORETURN; - -#if defined(_WIN32) -#define USE_WIN32_THREADS -#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE) -#define USE_PTHREADS -#else -#error "No threading system was found" -#endif - int compute_num_cpus(void); -/* Because we use threads instead of processes on most platforms (Windows, - * Linux, etc), we need locking for them. On platforms with poor thread - * support or broken gethostbyname_r, these functions are no-ops. */ - -/** A generic lock structure for multithreaded builds. */ -typedef struct tor_mutex_t { -#if defined(USE_WIN32_THREADS) - /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */ - CRITICAL_SECTION mutex; -#elif defined(USE_PTHREADS) - /** Pthreads-only: with pthreads, we implement locks with - * pthread_mutex_t. */ - pthread_mutex_t mutex; -#else - /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */ - int _unused; -#endif -} tor_mutex_t; - int tor_mlockall(void); -tor_mutex_t *tor_mutex_new(void); -void tor_mutex_init(tor_mutex_t *m); -void tor_mutex_acquire(tor_mutex_t *m); -void tor_mutex_release(tor_mutex_t *m); -void tor_mutex_free(tor_mutex_t *m); -void tor_mutex_uninit(tor_mutex_t *m); -unsigned long tor_get_thread_id(void); -void tor_threads_init(void); - -void set_main_thread(void); -int in_main_thread(void); - -#if 0 -typedef struct tor_cond_t tor_cond_t; -tor_cond_t *tor_cond_new(void); -void tor_cond_free(tor_cond_t *cond); -int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex); -void tor_cond_signal_one(tor_cond_t *cond); -void tor_cond_signal_all(tor_cond_t *cond); -#endif - /** Macros for MIN/MAX. Never use these when the arguments could have * side-effects. * {With GCC extensions we could probably define a safer MIN/MAX. But @@ -742,5 +688,8 @@ STATIC int tor_ersatz_socketpair(int family, int type, int protocol, #endif #endif +/* This needs some of the declarations above so we include it here. */ +#include "compat_threads.h" + #endif diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c new file mode 100644 index 0000000000..e1d4b0e79b --- /dev/null +++ b/src/common/compat_pthreads.c @@ -0,0 +1,285 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include +#include +#include + +#include "compat.h" +#include "torlog.h" +#include "util.h" + +/** Wraps a void (*)(void*) function and its argument so we can + * invoke them in a way pthreads would expect. + */ +typedef struct tor_pthread_data_t { + void (*func)(void *); + void *data; +} tor_pthread_data_t; +/** Given a tor_pthread_data_t _data, call _data->func(d->data) + * and free _data. Used to make sure we can call functions the way pthread + * expects. */ +static void * +tor_pthread_helper_fn(void *_data) +{ + tor_pthread_data_t *data = _data; + void (*func)(void*); + void *arg; + /* mask signals to worker threads to avoid SIGPIPE, etc */ + sigset_t sigs; + /* We're in a subthread; don't handle any signals here. */ + sigfillset(&sigs); + pthread_sigmask(SIG_SETMASK, &sigs, NULL); + + func = data->func; + arg = data->data; + tor_free(_data); + func(arg); + return NULL; +} +/** + * A pthread attribute to make threads start detached. + */ +static pthread_attr_t attr_detached; +/** True iff we've called tor_threads_init() */ +static int threads_initialized = 0; + +/** Minimalist interface to run a void function in the background. On + * Unix calls fork, on win32 calls beginthread. Returns -1 on failure. + * func should not return, but rather should call spawn_exit. + * + * NOTE: if data is used, it should not be allocated on the stack, + * since in a multithreaded environment, there is no way to be sure that + * the caller's stack will still be around when the called function is + * running. + */ +int +spawn_func(void (*func)(void *), void *data) +{ + pthread_t thread; + tor_pthread_data_t *d; + if (PREDICT_UNLIKELY(!threads_initialized)) + tor_threads_init(); + d = tor_malloc(sizeof(tor_pthread_data_t)); + d->data = data; + d->func = func; + if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d)) + return -1; + return 0; +} + +/** End the current thread/process. + */ +void +spawn_exit(void) +{ + pthread_exit(NULL); +} + +/** A mutex attribute that we're going to use to tell pthreads that we want + * "recursive" mutexes (i.e., once we can re-lock if we're already holding + * them.) */ +static pthread_mutexattr_t attr_recursive; + +/** Initialize mutex so it can be locked. Every mutex must be set + * up with tor_mutex_init() or tor_mutex_new(); not both. */ +void +tor_mutex_init(tor_mutex_t *mutex) +{ + int err; + if (PREDICT_UNLIKELY(!threads_initialized)) + tor_threads_init(); + err = pthread_mutex_init(&mutex->mutex, &attr_recursive); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d creating a mutex.", err); + tor_fragile_assert(); + } +} + +/** As tor_mutex_init, but initialize a mutex suitable that may be + * non-recursive, if the OS supports that. */ +void +tor_mutex_init_nonrecursive(tor_mutex_t *mutex) +{ + int err; + if (PREDICT_UNLIKELY(!threads_initialized)) + tor_threads_init(); + err = pthread_mutex_init(&mutex->mutex, NULL); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d creating a mutex.", err); + tor_fragile_assert(); + } +} + +/** Wait until m is free, then acquire it. */ +void +tor_mutex_acquire(tor_mutex_t *m) +{ + int err; + tor_assert(m); + err = pthread_mutex_lock(&m->mutex); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d locking a mutex.", err); + tor_fragile_assert(); + } +} +/** Release the lock m so another thread can have it. */ +void +tor_mutex_release(tor_mutex_t *m) +{ + int err; + tor_assert(m); + err = pthread_mutex_unlock(&m->mutex); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d unlocking a mutex.", err); + tor_fragile_assert(); + } +} +/** Clean up the mutex m so that it no longer uses any system + * resources. Does not free m. This function must only be called on + * mutexes from tor_mutex_init(). */ +void +tor_mutex_uninit(tor_mutex_t *m) +{ + int err; + tor_assert(m); + err = pthread_mutex_destroy(&m->mutex); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d destroying a mutex.", err); + tor_fragile_assert(); + } +} +/** Return an integer representing this thread. */ +unsigned long +tor_get_thread_id(void) +{ + union { + pthread_t thr; + unsigned long id; + } r; + r.thr = pthread_self(); + return r.id; +} + +/* Conditions. */ + +/** Initialize an already-allocated condition variable. */ +int +tor_cond_init(tor_cond_t *cond) +{ + pthread_condattr_t condattr; + + memset(cond, 0, sizeof(tor_cond_t)); + /* Default condition attribute. Might be used if clock monotonic is + * available else this won't affect anything. */ + if (pthread_condattr_init(&condattr)) { + return -1; + } + +#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) + /* Use monotonic time so when we timedwait() on it, any clock adjustment + * won't affect the timeout value. */ + if (pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC)) { + return -1; + } +#endif + if (pthread_cond_init(&cond->cond, &condattr)) { + return -1; + } + return 0; +} + +/** Release all resources held by cond, but do not free cond + * itself. */ +void +tor_cond_uninit(tor_cond_t *cond) +{ + if (pthread_cond_destroy(&cond->cond)) { + log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno)); + return; + } +} +/** Wait until one of the tor_cond_signal functions is called on cond. + * (If tv is set, and that amount of time passes with no signal to + * cond, return anyway. All waiters on the condition must wait holding + * the same mutex. All signallers should hold that mutex. The mutex + * needs to have been allocated with tor_mutex_init_for_cond(). + * + * Returns 0 on success, -1 on failure, 1 on timeout. */ +int +tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv) +{ + int r; + if (tv == NULL) { + while (1) { + r = pthread_cond_wait(&cond->cond, &mutex->mutex); + if (r == EINTR) { + /* EINTR should be impossible according to POSIX, but POSIX, like the + * Pirate's Code, is apparently treated "more like what you'd call + * guidelines than actual rules." */ + continue; + } + return r ? -1 : 0; + } + } else { + struct timeval tvnow, tvsum; + struct timespec ts; + while (1) { +#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) + if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) { + return -1; + } + tvnow.tv_sec = ts.tv_sec; + tvnow.tv_usec = ts.tv_nsec / 1000; + timeradd(tv, &tvnow, &tvsum); +#else + if (gettimeofday(&tvnow, NULL) < 0) + return -1; + timeradd(tv, &tvnow, &tvsum); +#endif /* HAVE_CLOCK_GETTIME, CLOCK_MONOTONIC */ + + ts.tv_sec = tvsum.tv_sec; + ts.tv_nsec = tvsum.tv_usec * 1000; + + r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts); + if (r == 0) + return 0; + else if (r == ETIMEDOUT) + return 1; + else if (r == EINTR) + continue; + else + return -1; + } + } +} +/** Wake up one of the waiters on cond. */ +void +tor_cond_signal_one(tor_cond_t *cond) +{ + pthread_cond_signal(&cond->cond); +} +/** Wake up all of the waiters on cond. */ +void +tor_cond_signal_all(tor_cond_t *cond) +{ + pthread_cond_broadcast(&cond->cond); +} + +/** Set up common structures for use by threading. */ +void +tor_threads_init(void) +{ + if (!threads_initialized) { + pthread_mutexattr_init(&attr_recursive); + pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE); + tor_assert(0==pthread_attr_init(&attr_detached)); + tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1)); + threads_initialized = 1; + set_main_thread(); + } +} + diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c new file mode 100644 index 0000000000..3b79292cdb --- /dev/null +++ b/src/common/compat_threads.c @@ -0,0 +1,303 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define _GNU_SOURCE + +#include "orconfig.h" +#define _GNU_SOURCE +#include +#include "compat.h" +#include "compat_threads.h" + +#include "util.h" +#include "torlog.h" + +#ifdef HAVE_SYS_EVENTFD_H +#include +#endif +#ifdef HAVE_FCNTL_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif + +/** Return a newly allocated, ready-for-use mutex. */ +tor_mutex_t * +tor_mutex_new(void) +{ + tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t)); + tor_mutex_init(m); + return m; +} +/** Return a newly allocated, ready-for-use mutex. This one might be + * non-recursive, if that's faster. */ +tor_mutex_t * +tor_mutex_new_nonrecursive(void) +{ + tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t)); + tor_mutex_init_nonrecursive(m); + return m; +} +/** Release all storage and system resources held by m. */ +void +tor_mutex_free(tor_mutex_t *m) +{ + if (!m) + return; + tor_mutex_uninit(m); + tor_free(m); +} + +/** Allocate and return a new condition variable. */ +tor_cond_t * +tor_cond_new(void) +{ + tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t)); + if (tor_cond_init(cond)<0) + tor_free(cond); + return cond; +} + +/** Free all storage held in c. */ +void +tor_cond_free(tor_cond_t *c) +{ + if (!c) + return; + tor_cond_uninit(c); + tor_free(c); +} + +/** Identity of the "main" thread */ +static unsigned long main_thread_id = -1; + +/** Start considering the current thread to be the 'main thread'. This has + * no effect on anything besides in_main_thread(). */ +void +set_main_thread(void) +{ + main_thread_id = tor_get_thread_id(); +} +/** Return true iff called from the main thread. */ +int +in_main_thread(void) +{ + return main_thread_id == tor_get_thread_id(); +} + +#if defined(HAVE_EVENTFD) || defined(HAVE_PIPE) +/* non-interruptable versions */ +static int +write_ni(int fd, const void *buf, size_t n) +{ + int r; + again: + r = write(fd, buf, n); + if (r < 0 && errno == EINTR) + goto again; + return r; +} +static int +read_ni(int fd, void *buf, size_t n) +{ + int r; + again: + r = read(fd, buf, n); + if (r < 0 && errno == EINTR) + goto again; + return r; +} +#endif + +/* non-interruptable versions */ +static int +send_ni(int fd, const void *buf, size_t n, int flags) +{ + int r; + again: + r = send(fd, buf, n, flags); + if (r < 0 && errno == EINTR) + goto again; + return r; +} + +static int +recv_ni(int fd, void *buf, size_t n, int flags) +{ + int r; + again: + r = recv(fd, buf, n, flags); + if (r < 0 && errno == EINTR) + goto again; + return r; +} + +#ifdef HAVE_EVENTFD +static int +eventfd_alert(int fd) +{ + uint64_t u = 1; + int r = write_ni(fd, (void*)&u, sizeof(u)); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} + +static int +eventfd_drain(int fd) +{ + uint64_t u = 0; + int r = read_ni(fd, (void*)&u, sizeof(u)); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} +#endif + +#ifdef HAVE_PIPE +static int +pipe_alert(int fd) +{ + ssize_t r = write(fd, "x", 1); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} + +static int +pipe_drain(int fd) +{ + char buf[32]; + ssize_t r; + while ((r = read(fd, buf, sizeof(buf))) >= 0) + ; + if (r == 0 || errno != EAGAIN) + return -1; + return 0; +} +#endif + +static int +sock_alert(tor_socket_t fd) +{ + ssize_t r = send_ni(fd, "x", 1, 0); + if (r < 0 && !ERRNO_IS_EAGAIN(tor_socket_errno(fd))) + return -1; + return 0; +} + +static int +sock_drain(tor_socket_t fd) +{ + char buf[32]; + ssize_t r; + while ((r = recv_ni(fd, buf, sizeof(buf), 0)) >= 0) + ; + if (r == 0 || !ERRNO_IS_EAGAIN(tor_socket_errno(fd))) + return -1; + return 0; +} + +/** Allocate a new set of alert sockets, and set the appropriate function + * pointers, in socks_out. */ +int +alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags) +{ + tor_socket_t socks[2] = { TOR_INVALID_SOCKET, TOR_INVALID_SOCKET }; + +#ifdef HAVE_EVENTFD + /* First, we try the Linux eventfd() syscall. This gives a 64-bit counter + * associated with a single file descriptor. */ +#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK) + if (!(flags & ASOCKS_NOEVENTFD2)) + socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); +#endif + if (socks[0] < 0 && !(flags & ASOCKS_NOEVENTFD)) { + socks[0] = eventfd(0,0); + if (socks[0] >= 0) { + if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || + set_socket_nonblocking(socks[0]) < 0) { + close(socks[0]); + return -1; + } + } + } + if (socks[0] >= 0) { + socks_out->read_fd = socks_out->write_fd = socks[0]; + socks_out->alert_fn = eventfd_alert; + socks_out->drain_fn = eventfd_drain; + return 0; + } +#endif + +#ifdef HAVE_PIPE2 + /* Now we're going to try pipes. First type the pipe2() syscall, if we + * have it, so we can save some calls... */ + if (!(flags & ASOCKS_NOPIPE2) && + pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) { + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = pipe_alert; + socks_out->drain_fn = pipe_drain; + return 0; + } +#endif + +#ifdef HAVE_PIPE + /* Now try the regular pipe() syscall. Pipes have a bit lower overhead than + * socketpairs, fwict. */ + if (!(flags & ASOCKS_NOPIPE) && + pipe(socks) == 0) { + if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || + fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 || + set_socket_nonblocking(socks[0]) < 0 || + set_socket_nonblocking(socks[1]) < 0) { + close(socks[0]); + close(socks[1]); + return -1; + } + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = pipe_alert; + socks_out->drain_fn = pipe_drain; + return 0; + } +#endif + + /* If nothing else worked, fall back on socketpair(). */ + if (!(flags & ASOCKS_NOSOCKETPAIR) && + tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) { + if (set_socket_nonblocking(socks[0]) < 0 || + set_socket_nonblocking(socks[1])) { + tor_close_socket(socks[0]); + tor_close_socket(socks[1]); + return -1; + } + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = sock_alert; + socks_out->drain_fn = sock_drain; + return 0; + } + return -1; +} + +/** Close the sockets in socks. */ +void +alert_sockets_close(alert_sockets_t *socks) +{ + if (socks->alert_fn == sock_alert) { + /* they are sockets. */ + tor_close_socket(socks->read_fd); + tor_close_socket(socks->write_fd); + } else { + close(socks->read_fd); + if (socks->write_fd != socks->read_fd) + close(socks->write_fd); + } + socks->read_fd = socks->write_fd = -1; +} + diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h new file mode 100644 index 0000000000..acf3083f37 --- /dev/null +++ b/src/common/compat_threads.h @@ -0,0 +1,115 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_COMPAT_THREADS_H +#define TOR_COMPAT_THREADS_H + +#include "orconfig.h" +#include "torint.h" +#include "testsupport.h" + +#if defined(HAVE_PTHREAD_H) && !defined(_WIN32) +#include +#endif + +#if defined(_WIN32) +#define USE_WIN32_THREADS +#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE) +#define USE_PTHREADS +#else +#error "No threading system was found" +#endif + +int spawn_func(void (*func)(void *), void *data); +void spawn_exit(void) ATTR_NORETURN; + +/* Because we use threads instead of processes on most platforms (Windows, + * Linux, etc), we need locking for them. On platforms with poor thread + * support or broken gethostbyname_r, these functions are no-ops. */ + +/** A generic lock structure for multithreaded builds. */ +typedef struct tor_mutex_t { +#if defined(USE_WIN32_THREADS) + /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */ + CRITICAL_SECTION mutex; +#elif defined(USE_PTHREADS) + /** Pthreads-only: with pthreads, we implement locks with + * pthread_mutex_t. */ + pthread_mutex_t mutex; +#else + /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */ + int _unused; +#endif +} tor_mutex_t; + +tor_mutex_t *tor_mutex_new(void); +tor_mutex_t *tor_mutex_new_nonrecursive(void); +void tor_mutex_init(tor_mutex_t *m); +void tor_mutex_init_nonrecursive(tor_mutex_t *m); +void tor_mutex_acquire(tor_mutex_t *m); +void tor_mutex_release(tor_mutex_t *m); +void tor_mutex_free(tor_mutex_t *m); +void tor_mutex_uninit(tor_mutex_t *m); +unsigned long tor_get_thread_id(void); +void tor_threads_init(void); + +/** Conditions need nonrecursive mutexes with pthreads. */ +#define tor_mutex_init_for_cond(m) tor_mutex_init_nonrecursive(m) + +void set_main_thread(void); +int in_main_thread(void); + +typedef struct tor_cond_t { +#ifdef USE_PTHREADS + pthread_cond_t cond; +#elif defined(USE_WIN32_THREADS) + HANDLE event; + + CRITICAL_SECTION lock; + int n_waiting; + int n_to_wake; + int generation; +#else +#error no known condition implementation. +#endif +} tor_cond_t; + +tor_cond_t *tor_cond_new(void); +void tor_cond_free(tor_cond_t *cond); +int tor_cond_init(tor_cond_t *cond); +void tor_cond_uninit(tor_cond_t *cond); +int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, + const struct timeval *tv); +void tor_cond_signal_one(tor_cond_t *cond); +void tor_cond_signal_all(tor_cond_t *cond); + +/** Helper type used to manage waking up the main thread while it's in + * the libevent main loop. Used by the work queue code. */ +typedef struct alert_sockets_s { + /* XXXX This structure needs a better name. */ + /** Socket that the main thread should listen for EV_READ events on. + * Note that this socket may be a regular fd on a non-Windows platform. + */ + tor_socket_t read_fd; + /** Socket to use when alerting the main thread. */ + tor_socket_t write_fd; + /** Function to alert the main thread */ + int (*alert_fn)(tor_socket_t write_fd); + /** Function to make the main thread no longer alerted. */ + int (*drain_fn)(tor_socket_t read_fd); +} alert_sockets_t; + +/* Flags to disable one or more alert_sockets backends. */ +#define ASOCKS_NOEVENTFD2 (1u<<0) +#define ASOCKS_NOEVENTFD (1u<<1) +#define ASOCKS_NOPIPE2 (1u<<2) +#define ASOCKS_NOPIPE (1u<<3) +#define ASOCKS_NOSOCKETPAIR (1u<<4) + +int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags); +void alert_sockets_close(alert_sockets_t *socks); + +#endif + diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c new file mode 100644 index 0000000000..71b994c4e4 --- /dev/null +++ b/src/common/compat_winthreads.c @@ -0,0 +1,196 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "compat.h" +#include +#include +#include "util.h" +#include "container.h" +#include "torlog.h" +#include + +/* This value is more or less total cargo-cult */ +#define SPIN_COUNT 2000 + +/** Minimalist interface to run a void function in the background. On + * Unix calls fork, on win32 calls beginthread. Returns -1 on failure. + * func should not return, but rather should call spawn_exit. + * + * NOTE: if data is used, it should not be allocated on the stack, + * since in a multithreaded environment, there is no way to be sure that + * the caller's stack will still be around when the called function is + * running. + */ +int +spawn_func(void (*func)(void *), void *data) +{ + int rv; + rv = (int)_beginthread(func, 0, data); + if (rv == (int)-1) + return -1; + return 0; +} + +/** End the current thread/process. + */ +void +spawn_exit(void) +{ + _endthread(); + //we should never get here. my compiler thinks that _endthread returns, this + //is an attempt to fool it. + tor_assert(0); + _exit(0); +} + +void +tor_mutex_init(tor_mutex_t *m) +{ + InitializeCriticalSection(&m->mutex); +} +void +tor_mutex_init_nonrecursive(tor_mutex_t *m) +{ + InitializeCriticalSection(&m->mutex); +} + +void +tor_mutex_uninit(tor_mutex_t *m) +{ + DeleteCriticalSection(&m->mutex); +} +void +tor_mutex_acquire(tor_mutex_t *m) +{ + tor_assert(m); + EnterCriticalSection(&m->mutex); +} +void +tor_mutex_release(tor_mutex_t *m) +{ + LeaveCriticalSection(&m->mutex); +} +unsigned long +tor_get_thread_id(void) +{ + return (unsigned long)GetCurrentThreadId(); +} + +int +tor_cond_init(tor_cond_t *cond) +{ + memset(cond, 0, sizeof(tor_cond_t)); + if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) { + return -1; + } + if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) { + DeleteCriticalSection(&cond->lock); + return -1; + } + cond->n_waiting = cond->n_to_wake = cond->generation = 0; + return 0; +} +void +tor_cond_uninit(tor_cond_t *cond) +{ + DeleteCriticalSection(&cond->lock); + CloseHandle(cond->event); +} + +static void +tor_cond_signal_impl(tor_cond_t *cond, int broadcast) +{ + EnterCriticalSection(&cond->lock); + if (broadcast) + cond->n_to_wake = cond->n_waiting; + else + ++cond->n_to_wake; + cond->generation++; + SetEvent(cond->event); + LeaveCriticalSection(&cond->lock); +} +void +tor_cond_signal_one(tor_cond_t *cond) +{ + tor_cond_signal_impl(cond, 0); +} +void +tor_cond_signal_all(tor_cond_t *cond) +{ + tor_cond_signal_impl(cond, 1); +} + +int +tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv) +{ + CRITICAL_SECTION *lock = &lock_->mutex; + int generation_at_start; + int waiting = 1; + int result = -1; + DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime; + if (tv) + ms_orig = ms = tv->tv_sec*1000 + (tv->tv_usec+999)/1000; + + EnterCriticalSection(&cond->lock); + ++cond->n_waiting; + generation_at_start = cond->generation; + LeaveCriticalSection(&cond->lock); + + LeaveCriticalSection(lock); + + startTime = GetTickCount(); + do { + DWORD res; + res = WaitForSingleObject(cond->event, ms); + EnterCriticalSection(&cond->lock); + if (cond->n_to_wake && + cond->generation != generation_at_start) { + --cond->n_to_wake; + --cond->n_waiting; + result = 0; + waiting = 0; + goto out; + } else if (res != WAIT_OBJECT_0) { + result = (res==WAIT_TIMEOUT) ? 1 : -1; + --cond->n_waiting; + waiting = 0; + goto out; + } else if (ms != INFINITE) { + endTime = GetTickCount(); + if (startTime + ms_orig <= endTime) { + result = 1; /* Timeout */ + --cond->n_waiting; + waiting = 0; + goto out; + } else { + ms = startTime + ms_orig - endTime; + } + } + /* If we make it here, we are still waiting. */ + if (cond->n_to_wake == 0) { + /* There is nobody else who should wake up; reset + * the event. */ + ResetEvent(cond->event); + } + out: + LeaveCriticalSection(&cond->lock); + } while (waiting); + + EnterCriticalSection(lock); + + EnterCriticalSection(&cond->lock); + if (!cond->n_waiting) + ResetEvent(cond->event); + LeaveCriticalSection(&cond->lock); + + return result; +} + +void +tor_threads_init(void) +{ + set_main_thread(); +} + diff --git a/src/common/include.am b/src/common/include.am index 6441596199..14838ab555 100644 --- a/src/common/include.am +++ b/src/common/include.am @@ -54,10 +54,18 @@ endif LIBDONNA += $(LIBED25519_REF10) +if THREADS_PTHREADS +threads_impl_source=src/common/compat_pthreads.c +endif +if THREADS_WIN32 +threads_impl_source=src/common/compat_winthreads.c +endif + LIBOR_A_SOURCES = \ src/common/address.c \ src/common/backtrace.c \ src/common/compat.c \ + src/common/compat_threads.c \ src/common/container.c \ src/common/di_ops.c \ src/common/log.c \ @@ -66,10 +74,12 @@ LIBOR_A_SOURCES = \ src/common/util_codedigest.c \ src/common/util_process.c \ src/common/sandbox.c \ + src/common/workqueue.c \ src/ext/csiphash.c \ src/ext/trunnel/trunnel.c \ $(libor_extra_source) \ - $(libor_mempool_source) + $(libor_mempool_source) \ + $(threads_impl_source) LIBOR_CRYPTO_A_SOURCES = \ src/common/aes.c \ @@ -102,7 +112,6 @@ src_common_libor_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) src_common_libor_crypto_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) src_common_libor_event_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) - COMMONHEADERS = \ src/common/address.h \ src/common/backtrace.h \ @@ -110,6 +119,7 @@ COMMONHEADERS = \ src/common/ciphers.inc \ src/common/compat.h \ src/common/compat_libevent.h \ + src/common/compat_threads.h \ src/common/container.h \ src/common/crypto.h \ src/common/crypto_curve25519.h \ @@ -128,6 +138,7 @@ COMMONHEADERS = \ src/common/tortls.h \ src/common/util.h \ src/common/util_process.h \ + src/common/workqueue.h \ $(libor_mempool_header) noinst_HEADERS+= $(COMMONHEADERS) diff --git a/src/common/workqueue.c b/src/common/workqueue.c new file mode 100644 index 0000000000..77a4fbc3f6 --- /dev/null +++ b/src/common/workqueue.c @@ -0,0 +1,490 @@ +/* copyright (c) 2013-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include "compat.h" +#include "compat_threads.h" +#include "util.h" +#include "workqueue.h" +#include "tor_queue.h" +#include "torlog.h" + +struct threadpool_s { + /** An array of pointers to workerthread_t: one for each running worker + * thread. */ + struct workerthread_s **threads; + + /** Condition variable that we wait on when we have no work, and which + * gets signaled when our queue becomes nonempty. */ + tor_cond_t condition; + /** Queue of pending work that we have to do. */ + TOR_TAILQ_HEAD(, workqueue_entry_s) work; + + /** The current 'update generation' of the threadpool. Any thread that is + * at an earlier generation needs to run the update function. */ + unsigned generation; + + /** Function that should be run for updates on each thread. */ + int (*update_fn)(void *, void *); + /** Function to free update arguments if they can't be run. */ + void (*free_update_arg_fn)(void *); + /** Array of n_threads update arguments. */ + void **update_args; + + /** Number of elements in threads. */ + int n_threads; + /** Mutex to protect all the above fields. */ + tor_mutex_t lock; + + /** A reply queue to use when constructing new threads. */ + replyqueue_t *reply_queue; + + /** Functions used to allocate and free thread state. */ + void *(*new_thread_state_fn)(void*); + void (*free_thread_state_fn)(void*); + void *new_thread_state_arg; +}; + +struct workqueue_entry_s { + /** The next workqueue_entry_t that's pending on the same thread or + * reply queue. */ + TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; + /** The threadpool to which this workqueue_entry_t was assigned. This field + * is set when the workqueue_entry_t is created, and won't be cleared until + * after it's handled in the main thread. */ + struct threadpool_s *on_pool; + /** True iff this entry is waiting for a worker to start processing it. */ + uint8_t pending; + /** Function to run in the worker thread. */ + int (*fn)(void *state, void *arg); + /** Function to run while processing the reply queue. */ + void (*reply_fn)(void *arg); + /** Argument for the above functions. */ + void *arg; +}; + +struct replyqueue_s { + /** Mutex to protect the answers field */ + tor_mutex_t lock; + /** Doubly-linked list of answers that the reply queue needs to handle. */ + TOR_TAILQ_HEAD(, workqueue_entry_s) answers; + + /** Mechanism to wake up the main thread when it is receiving answers. */ + alert_sockets_t alert; +}; + +/** A worker thread represents a single thread in a thread pool. To avoid + * contention, each gets its own queue. This breaks the guarantee that that + * queued work will get executed strictly in order. */ +typedef struct workerthread_s { + /** Which thread it this? In range 0..in_pool->n_threads-1 */ + int index; + /** The pool this thread is a part of. */ + struct threadpool_s *in_pool; + /** User-supplied state field that we pass to the worker functions of each + * work item. */ + void *state; + /** Reply queue to which we pass our results. */ + replyqueue_t *reply_queue; + /** The current update generation of this thread */ + unsigned generation; +} workerthread_t; + +static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); + +/** Allocate and return a new workqueue_entry_t, set up to run the function + * fn in the worker thread, and reply_fn in the main + * thread. See threadpool_queue_work() for full documentation. */ +static workqueue_entry_t * +workqueue_entry_new(int (*fn)(void*, void*), + void (*reply_fn)(void*), + void *arg) +{ + workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t)); + ent->fn = fn; + ent->reply_fn = reply_fn; + ent->arg = arg; + return ent; +} + +/** + * Release all storage held in ent. Call only when ent is not on + * any queue. + */ +static void +workqueue_entry_free(workqueue_entry_t *ent) +{ + if (!ent) + return; + memset(ent, 0xf0, sizeof(*ent)); + tor_free(ent); +} + +/** + * Cancel a workqueue_entry_t that has been returned from + * threadpool_queue_work. + * + * You must not call this function on any work whose reply function has been + * executed in the main thread; that will cause undefined behavior (probably, + * a crash). + * + * If the work is cancelled, this function return the argument passed to the + * work function. It is the caller's responsibility to free this storage. + * + * This function will have no effect if the worker thread has already executed + * or begun to execute the work item. In that case, it will return NULL. + */ +void * +workqueue_entry_cancel(workqueue_entry_t *ent) +{ + int cancelled = 0; + void *result = NULL; + tor_mutex_acquire(&ent->on_pool->lock); + if (ent->pending) { + TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work); + cancelled = 1; + result = ent->arg; + } + tor_mutex_release(&ent->on_pool->lock); + + if (cancelled) { + tor_free(ent); + } + return result; +} + +/**DOCDOC + + must hold lock */ +static int +worker_thread_has_work(workerthread_t *thread) +{ + return !TOR_TAILQ_EMPTY(&thread->in_pool->work) || + thread->generation != thread->in_pool->generation; +} + +/** + * Main function for the worker thread. + */ +static void +worker_thread_main(void *thread_) +{ + workerthread_t *thread = thread_; + threadpool_t *pool = thread->in_pool; + workqueue_entry_t *work; + int result; + + tor_mutex_acquire(&pool->lock); + while (1) { + /* lock must be held at this point. */ + while (worker_thread_has_work(thread)) { + /* lock must be held at this point. */ + if (thread->in_pool->generation != thread->generation) { + void *arg = thread->in_pool->update_args[thread->index]; + thread->in_pool->update_args[thread->index] = NULL; + int (*update_fn)(void*,void*) = thread->in_pool->update_fn; + thread->generation = thread->in_pool->generation; + tor_mutex_release(&pool->lock); + + int r = update_fn(thread->state, arg); + + if (r < 0) { + return; + } + + tor_mutex_acquire(&pool->lock); + continue; + } + work = TOR_TAILQ_FIRST(&pool->work); + TOR_TAILQ_REMOVE(&pool->work, work, next_work); + work->pending = 0; + tor_mutex_release(&pool->lock); + + /* We run the work function without holding the thread lock. This + * is the main thread's first opportunity to give us more work. */ + result = work->fn(thread->state, work->arg); + + /* Queue the reply for the main thread. */ + queue_reply(thread->reply_queue, work); + + /* We may need to exit the thread. */ + if (result >= WQ_RPL_ERROR) { + return; + } + tor_mutex_acquire(&pool->lock); + } + /* At this point the lock is held, and there is no work in this thread's + * queue. */ + + /* TODO: support an idle-function */ + + /* Okay. Now, wait till somebody has work for us. */ + if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { + log_warn(LD_GENERAL, "Fail tor_cond_wait."); + } + } +} + +/** Put a reply on the reply queue. The reply must not currently be on + * any thread's work queue. */ +static void +queue_reply(replyqueue_t *queue, workqueue_entry_t *work) +{ + int was_empty; + tor_mutex_acquire(&queue->lock); + was_empty = TOR_TAILQ_EMPTY(&queue->answers); + TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); + tor_mutex_release(&queue->lock); + + if (was_empty) { + if (queue->alert.alert_fn(queue->alert.write_fd) < 0) { + /* XXXX complain! */ + } + } +} + +/** Allocate and start a new worker thread to use state object state, + * and send responses to replyqueue. */ +static workerthread_t * +workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) +{ + workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); + thr->state = state; + thr->reply_queue = replyqueue; + thr->in_pool = pool; + + if (spawn_func(worker_thread_main, thr) < 0) { + log_err(LD_GENERAL, "Can't launch worker thread."); + return NULL; + } + + return thr; +} + +/** + * Queue an item of work for a thread in a thread pool. The function + * fn will be run in a worker thread, and will receive as arguments the + * thread's state object, and the provided object arg. It must return + * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN. + * + * Regardless of its return value, the function reply_fn will later be + * run in the main thread when it invokes replyqueue_process(), and will + * receive as its argument the same arg object. It's the reply + * function's responsibility to free the work object. + * + * On success, return a workqueue_entry_t object that can be passed to + * workqueue_entry_cancel(). On failure, return NULL. + * + * Note that because each thread has its own work queue, work items may not + * be executed strictly in order. + */ +workqueue_entry_t * +threadpool_queue_work(threadpool_t *pool, + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); + ent->on_pool = pool; + ent->pending = 1; + + tor_mutex_acquire(&pool->lock); + + TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); + + tor_mutex_release(&pool->lock); + + tor_cond_signal_one(&pool->condition); + + return ent; +} + +/** + * Queue a copy of a work item for every thread in a pool. This can be used, + * for example, to tell the threads to update some parameter in their states. + * + * Arguments are as for threadpool_queue_work, except that the + * arg value is passed to dup_fn once per each thread to + * make a copy of it. + * + * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update + * will be run. If a new update is scheduled before the old update finishes + * running, then the new will replace the old in any threads that haven't run + * it yet. + * + * Return 0 on success, -1 on failure. + */ +int +threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg) +{ + int i, n_threads; + void (*old_args_free_fn)(void *arg); + void **old_args; + void **new_args; + + tor_mutex_acquire(&pool->lock); + n_threads = pool->n_threads; + old_args = pool->update_args; + old_args_free_fn = pool->free_update_arg_fn; + + new_args = tor_calloc(n_threads, sizeof(void*)); + for (i = 0; i < n_threads; ++i) { + if (dup_fn) + new_args[i] = dup_fn(arg); + else + new_args[i] = arg; + } + + pool->update_args = new_args; + pool->free_update_arg_fn = free_fn; + pool->update_fn = fn; + ++pool->generation; + + tor_mutex_release(&pool->lock); + + tor_cond_signal_all(&pool->condition); + + if (old_args) { + for (i = 0; i < n_threads; ++i) { + if (old_args[i] && old_args_free_fn) + old_args_free_fn(old_args[i]); + } + tor_free(old_args); + } + + return 0; +} + +/** Launch threads until we have n. */ +static int +threadpool_start_threads(threadpool_t *pool, int n) +{ + tor_mutex_acquire(&pool->lock); + + if (pool->n_threads < n) + pool->threads = tor_realloc(pool->threads, sizeof(workerthread_t*)*n); + + while (pool->n_threads < n) { + void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); + workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); + thr->index = pool->n_threads; + + if (!thr) { + tor_mutex_release(&pool->lock); + return -1; + } + pool->threads[pool->n_threads++] = thr; + } + tor_mutex_release(&pool->lock); + + return 0; +} + +/** + * Construct a new thread pool with n worker threads, configured to + * send their output to replyqueue. The threads' states will be + * constructed with the new_thread_state_fn call, receiving arg + * as its argument. When the threads close, they will call + * free_thread_state_fn on their states. + */ +threadpool_t * +threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg) +{ + threadpool_t *pool; + pool = tor_malloc_zero(sizeof(threadpool_t)); + tor_mutex_init_nonrecursive(&pool->lock); + tor_cond_init(&pool->condition); + TOR_TAILQ_INIT(&pool->work); + + pool->new_thread_state_fn = new_thread_state_fn; + pool->new_thread_state_arg = arg; + pool->free_thread_state_fn = free_thread_state_fn; + pool->reply_queue = replyqueue; + + if (threadpool_start_threads(pool, n_threads) < 0) { + tor_mutex_uninit(&pool->lock); + tor_free(pool); + return NULL; + } + + return pool; +} + +/** Return the reply queue associated with a given thread pool. */ +replyqueue_t * +threadpool_get_replyqueue(threadpool_t *tp) +{ + return tp->reply_queue; +} + +/** Allocate a new reply queue. Reply queues are used to pass results from + * worker threads to the main thread. Since the main thread is running an + * IO-centric event loop, it needs to get woken up with means other than a + * condition variable. */ +replyqueue_t * +replyqueue_new(uint32_t alertsocks_flags) +{ + replyqueue_t *rq; + + rq = tor_malloc_zero(sizeof(replyqueue_t)); + if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { + tor_free(rq); + return NULL; + } + + tor_mutex_init(&rq->lock); + TOR_TAILQ_INIT(&rq->answers); + + return rq; +} + +/** + * Return the "read socket" for a given reply queue. The main thread should + * listen for read events on this socket, and call replyqueue_process() every + * time it triggers. + */ +tor_socket_t +replyqueue_get_socket(replyqueue_t *rq) +{ + return rq->alert.read_fd; +} + +/** + * Process all pending replies on a reply queue. The main thread should call + * this function every time the socket returned by replyqueue_get_socket() is + * readable. + */ +void +replyqueue_process(replyqueue_t *queue) +{ + if (queue->alert.drain_fn(queue->alert.read_fd) < 0) { + static ratelim_t warn_limit = RATELIM_INIT(7200); + log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, + "Failure from drain_fd"); + } + + tor_mutex_acquire(&queue->lock); + while (!TOR_TAILQ_EMPTY(&queue->answers)) { + /* lock must be held at this point.*/ + workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); + TOR_TAILQ_REMOVE(&queue->answers, work, next_work); + tor_mutex_release(&queue->lock); + work->on_pool = NULL; + + work->reply_fn(work->arg); + workqueue_entry_free(work); + + tor_mutex_acquire(&queue->lock); + } + + tor_mutex_release(&queue->lock); +} + diff --git a/src/common/workqueue.h b/src/common/workqueue.h new file mode 100644 index 0000000000..92e82b8a48 --- /dev/null +++ b/src/common/workqueue.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_WORKQUEUE_H +#define TOR_WORKQUEUE_H + +#include "compat.h" + +/** A replyqueue is used to tell the main thread about the outcome of + * work that we queued for the the workers. */ +typedef struct replyqueue_s replyqueue_t; +/** A thread-pool manages starting threads and passing work to them. */ +typedef struct threadpool_s threadpool_t; +/** A workqueue entry represents a request that has been passed to a thread + * pool. */ +typedef struct workqueue_entry_s workqueue_entry_t; + +/** Possible return value from a work function: indicates success. */ +#define WQ_RPL_REPLY 0 +/** Possible return value from a work function: indicates fatal error */ +#define WQ_RPL_ERROR 1 +/** Possible return value from a work function: indicates thread is shutting + * down. */ +#define WQ_RPL_SHUTDOWN 2 + +workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg); +int threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg); +void *workqueue_entry_cancel(workqueue_entry_t *pending_work); +threadpool_t *threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg); +replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp); + +replyqueue_t *replyqueue_new(uint32_t alertsocks_flags); +tor_socket_t replyqueue_get_socket(replyqueue_t *rq); +void replyqueue_process(replyqueue_t *queue); + +#endif + diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index 36ba3bffb7..d964e66922 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -745,6 +745,7 @@ circuit_free(circuit_t *circ) { void *mem; size_t memlen; + int should_free = 1; if (!circ) return; @@ -784,6 +785,8 @@ circuit_free(circuit_t *circ) memlen = sizeof(or_circuit_t); tor_assert(circ->magic == OR_CIRCUIT_MAGIC); + should_free = (ocirc->workqueue_entry == NULL); + crypto_cipher_free(ocirc->p_crypto); crypto_digest_free(ocirc->p_digest); crypto_cipher_free(ocirc->n_crypto); @@ -826,8 +829,18 @@ circuit_free(circuit_t *circ) * "active" checks will be violated. */ cell_queue_clear(&circ->n_chan_cells); - memwipe(mem, 0xAA, memlen); /* poison memory */ - tor_free(mem); + if (should_free) { + memwipe(mem, 0xAA, memlen); /* poison memory */ + tor_free(mem); + } else { + /* If we made it here, this is an or_circuit_t that still has a pending + * cpuworker request which we weren't able to cancel. Instead, set up + * the magic value so that when the reply comes back, we'll know to discard + * the reply and free this structure. + */ + memwipe(mem, 0xAA, memlen); + circ->magic = DEAD_CIRCUIT_MAGIC; + } } /** Deallocate the linked list circ->cpath, and remove the cpath from diff --git a/src/or/command.c b/src/or/command.c index 6dde2a9b7e..c4a0f9baeb 100644 --- a/src/or/command.c +++ b/src/or/command.c @@ -310,7 +310,7 @@ command_process_create_cell(cell_t *cell, channel_t *chan) /* hand it off to the cpuworkers, and then return. */ if (connection_or_digest_is_known_relay(chan->identity_digest)) rep_hist_note_circuit_handshake_requested(create_cell->handshake_type); - if (assign_onionskin_to_cpuworker(NULL, circ, create_cell) < 0) { + if (assign_onionskin_to_cpuworker(circ, create_cell) < 0) { log_debug(LD_GENERAL,"Failed to hand off onionskin. Closing."); circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_RESOURCELIMIT); return; diff --git a/src/or/config.c b/src/or/config.c index baff4198ee..91fbe970d9 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -1730,7 +1730,7 @@ options_act(const or_options_t *old_options) if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL))) inform_testing_reachability(); } - cpuworkers_rotate(); + cpuworkers_rotate_keyinfo(); if (dns_reset()) return -1; } else { diff --git a/src/or/connection.c b/src/or/connection.c index ccd823131d..97fdee732e 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -29,7 +29,6 @@ #include "connection_edge.h" #include "connection_or.h" #include "control.h" -#include "cpuworker.h" #include "directory.h" #include "dirserv.h" #include "dns.h" @@ -130,7 +129,6 @@ conn_type_to_string(int type) case CONN_TYPE_AP: return "Socks"; case CONN_TYPE_DIR_LISTENER: return "Directory listener"; case CONN_TYPE_DIR: return "Directory"; - case CONN_TYPE_CPUWORKER: return "CPU worker"; case CONN_TYPE_CONTROL_LISTENER: return "Control listener"; case CONN_TYPE_CONTROL: return "Control"; case CONN_TYPE_EXT_OR: return "Extended OR"; @@ -213,12 +211,6 @@ conn_state_to_string(int type, int state) case DIR_CONN_STATE_SERVER_WRITING: return "writing"; } break; - case CONN_TYPE_CPUWORKER: - switch (state) { - case CPUWORKER_STATE_IDLE: return "idle"; - case CPUWORKER_STATE_BUSY_ONION: return "busy with onion"; - } - break; case CONN_TYPE_CONTROL: switch (state) { case CONTROL_CONN_STATE_OPEN: return "open (protocol v1)"; @@ -248,7 +240,6 @@ connection_type_uses_bufferevent(connection_t *conn) case CONN_TYPE_CONTROL: case CONN_TYPE_OR: case CONN_TYPE_EXT_OR: - case CONN_TYPE_CPUWORKER: return 1; default: return 0; @@ -2417,7 +2408,6 @@ connection_mark_all_noncontrol_connections(void) if (conn->marked_for_close) continue; switch (conn->type) { - case CONN_TYPE_CPUWORKER: case CONN_TYPE_CONTROL_LISTENER: case CONN_TYPE_CONTROL: break; @@ -4511,8 +4501,6 @@ connection_process_inbuf(connection_t *conn, int package_partial) package_partial); case CONN_TYPE_DIR: return connection_dir_process_inbuf(TO_DIR_CONN(conn)); - case CONN_TYPE_CPUWORKER: - return connection_cpu_process_inbuf(conn); case CONN_TYPE_CONTROL: return connection_control_process_inbuf(TO_CONTROL_CONN(conn)); default: @@ -4572,8 +4560,6 @@ connection_finished_flushing(connection_t *conn) return connection_edge_finished_flushing(TO_EDGE_CONN(conn)); case CONN_TYPE_DIR: return connection_dir_finished_flushing(TO_DIR_CONN(conn)); - case CONN_TYPE_CPUWORKER: - return connection_cpu_finished_flushing(conn); case CONN_TYPE_CONTROL: return connection_control_finished_flushing(TO_CONTROL_CONN(conn)); default: @@ -4629,8 +4615,6 @@ connection_reached_eof(connection_t *conn) return connection_edge_reached_eof(TO_EDGE_CONN(conn)); case CONN_TYPE_DIR: return connection_dir_reached_eof(TO_DIR_CONN(conn)); - case CONN_TYPE_CPUWORKER: - return connection_cpu_reached_eof(conn); case CONN_TYPE_CONTROL: return connection_control_reached_eof(TO_CONTROL_CONN(conn)); default: @@ -4836,10 +4820,6 @@ assert_connection_ok(connection_t *conn, time_t now) tor_assert(conn->purpose >= DIR_PURPOSE_MIN_); tor_assert(conn->purpose <= DIR_PURPOSE_MAX_); break; - case CONN_TYPE_CPUWORKER: - tor_assert(conn->state >= CPUWORKER_STATE_MIN_); - tor_assert(conn->state <= CPUWORKER_STATE_MAX_); - break; case CONN_TYPE_CONTROL: tor_assert(conn->state >= CONTROL_CONN_STATE_MIN_); tor_assert(conn->state <= CONTROL_CONN_STATE_MAX_); @@ -4940,9 +4920,7 @@ proxy_type_to_string(int proxy_type) } /** Call connection_free_() on every connection in our array, and release all - * storage held by connection.c. This is used by cpuworkers and dnsworkers - * when they fork, so they don't keep resources held open (especially - * sockets). + * storage held by connection.c. * * Don't do the checks in connection_free(), because they will * fail. diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c index 340fbec620..39d2079994 100644 --- a/src/or/cpuworker.c +++ b/src/or/cpuworker.c @@ -5,84 +5,98 @@ /** * \file cpuworker.c - * \brief Implements a farm of 'CPU worker' processes to perform - * CPU-intensive tasks in another thread or process, to not - * interrupt the main thread. + * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities + * out to subprocesses. * * Right now, we only use this for processing onionskins. **/ #include "or.h" -#include "buffers.h" #include "channel.h" -#include "channeltls.h" #include "circuitbuild.h" #include "circuitlist.h" -#include "config.h" -#include "connection.h" #include "connection_or.h" +#include "config.h" #include "cpuworker.h" #include "main.h" #include "onion.h" #include "rephist.h" #include "router.h" +#include "workqueue.h" -/** The maximum number of cpuworker processes we will keep around. */ -#define MAX_CPUWORKERS 16 -/** The minimum number of cpuworker processes we will keep around. */ -#define MIN_CPUWORKERS 1 +#ifdef HAVE_EVENT2_EVENT_H +#include +#else +#include +#endif -/** The tag specifies which circuit this onionskin was from. */ -#define TAG_LEN 12 +static void queue_pending_tasks(void); -/** How many cpuworkers we have running right now. */ -static int num_cpuworkers=0; -/** How many of the running cpuworkers have an assigned task right now. */ -static int num_cpuworkers_busy=0; -/** We need to spawn new cpuworkers whenever we rotate the onion keys - * on platforms where execution contexts==processes. This variable stores - * the last time we got a key rotation event. */ -static time_t last_rotation_time=0; +typedef struct worker_state_s { + int generation; + server_onion_keys_t *onion_keys; +} worker_state_t; -static void cpuworker_main(void *data) ATTR_NORETURN; -static int spawn_cpuworker(void); -static void spawn_enough_cpuworkers(void); -static void process_pending_task(connection_t *cpuworker); +static void * +worker_state_new(void *arg) +{ + worker_state_t *ws; + (void)arg; + ws = tor_malloc_zero(sizeof(worker_state_t)); + ws->onion_keys = server_onion_keys_new(); + return ws; +} +static void +worker_state_free(void *arg) +{ + worker_state_t *ws = arg; + server_onion_keys_free(ws->onion_keys); + tor_free(ws); +} + +static replyqueue_t *replyqueue = NULL; +static threadpool_t *threadpool = NULL; +static struct event *reply_event = NULL; + +static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT; + +static int total_pending_tasks = 0; +static int max_pending_tasks = 128; + +static void +replyqueue_process_cb(evutil_socket_t sock, short events, void *arg) +{ + replyqueue_t *rq = arg; + (void) sock; + (void) events; + replyqueue_process(rq); +} /** Initialize the cpuworker subsystem. */ void cpu_init(void) { - cpuworkers_rotate(); -} - -/** Called when we're done sending a request to a cpuworker. */ -int -connection_cpu_finished_flushing(connection_t *conn) -{ - tor_assert(conn); - tor_assert(conn->type == CONN_TYPE_CPUWORKER); - return 0; -} - -/** Pack global_id and circ_id; set *tag to the result. (See note on - * cpuworker_main for wire format.) */ -static void -tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id) -{ - /*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/ - /*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/ - set_uint64(tag, chan_id); - set_uint32(tag+8, circ_id); -} - -/** Unpack tag into addr, port, and circ_id. - */ -static void -tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id) -{ - *chan_id = get_uint64(tag); - *circ_id = get_uint32(tag+8); + if (!replyqueue) { + replyqueue = replyqueue_new(0); + } + if (!reply_event) { + reply_event = tor_event_new(tor_libevent_get_base(), + replyqueue_get_socket(replyqueue), + EV_READ|EV_PERSIST, + replyqueue_process_cb, + replyqueue); + event_add(reply_event, NULL); + } + if (!threadpool) { + threadpool = threadpool_new(get_num_cpus(get_options()), + replyqueue, + worker_state_new, + worker_state_free, + NULL); + } + /* Total voodoo. Can we make this more sensible? */ + max_pending_tasks = get_num_cpus(get_options()) * 64; + crypto_seed_weak_rng(&request_sample_rng); } /** Magic numbers to make sure our cpuworker_requests don't grow any @@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id) typedef struct cpuworker_request_t { /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */ uint32_t magic; - /** Opaque tag to identify the job */ - uint8_t tag[TAG_LEN]; - /** Task code. Must be one of CPUWORKER_TASK_* */ - uint8_t task; /** Flag: Are we timing this request? */ unsigned timed : 1; @@ -114,8 +124,7 @@ typedef struct cpuworker_request_t { typedef struct cpuworker_reply_t { /** Magic number; must be CPUWORKER_REPLY_MAGIC. */ uint32_t magic; - /** Opaque tag to identify the job; matches the request's tag.*/ - uint8_t tag[TAG_LEN]; + /** True iff we got a successful request. */ uint8_t success; @@ -142,42 +151,39 @@ typedef struct cpuworker_reply_t { uint8_t rend_auth_material[DIGEST_LEN]; } cpuworker_reply_t; -/** Called when the onion key has changed and we need to spawn new - * cpuworkers. Close all currently idle cpuworkers, and mark the last - * rotation time as now. - */ -void -cpuworkers_rotate(void) +typedef struct cpuworker_job_u { + or_circuit_t *circ; + union { + cpuworker_request_t request; + cpuworker_reply_t reply; + } u; +} cpuworker_job_t; + +static int +update_state_threadfn(void *state_, void *work_) { - connection_t *cpuworker; - while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, - CPUWORKER_STATE_IDLE))) { - connection_mark_for_close(cpuworker); - --num_cpuworkers; - } - last_rotation_time = time(NULL); - if (server_mode(get_options())) - spawn_enough_cpuworkers(); + worker_state_t *state = state_; + worker_state_t *update = work_; + server_onion_keys_free(state->onion_keys); + state->onion_keys = update->onion_keys; + update->onion_keys = NULL; + ++state->generation; + return WQ_RPL_REPLY; } -/** If the cpuworker closes the connection, - * mark it as closed and spawn a new one as needed. */ -int -connection_cpu_reached_eof(connection_t *conn) +/** Called when the onion key has changed so update all CPU worker(s) with + * new function pointers with which a new state will be generated. + */ +void +cpuworkers_rotate_keyinfo(void) { - log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly."); - if (conn->state != CPUWORKER_STATE_IDLE) { - /* the circ associated with this cpuworker will have to wait until - * it gets culled in run_connection_housekeeping(), since we have - * no way to find out which circ it was. */ - log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ."); - num_cpuworkers_busy--; + if (threadpool_queue_update(threadpool, + worker_state_new, + update_state_threadfn, + worker_state_free, + NULL)) { + log_warn(LD_OR, "Failed to queue key update for worker threads."); } - num_cpuworkers--; - spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up - spinning. */ - connection_mark_for_close(conn); - return 0; } /** Indexed by handshake type: how many onionskins have we processed and @@ -197,8 +203,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1]; * time. (microseconds) */ #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000) -static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT; - /** Return true iff we'd like to measure a handshake of type * onionskin_type. Call only from the main thread. */ static int @@ -286,428 +290,266 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type, onionskin_type_name, (unsigned)overhead, relative_overhead*100); } -/** Called when we get data from a cpuworker. If the answer is not complete, - * wait for a complete answer. If the answer is complete, - * process it as appropriate. - */ -int -connection_cpu_process_inbuf(connection_t *conn) +/** Handle a reply from the worker threads. */ +static void +cpuworker_onion_handshake_replyfn(void *work_) { - uint64_t chan_id; - circid_t circ_id; - channel_t *p_chan = NULL; - circuit_t *circ; + cpuworker_job_t *job = work_; + cpuworker_reply_t rpl; + or_circuit_t *circ = NULL; - tor_assert(conn); - tor_assert(conn->type == CONN_TYPE_CPUWORKER); + --total_pending_tasks; - if (!connection_get_inbuf_len(conn)) - return 0; + /* Could avoid this, but doesn't matter. */ + memcpy(&rpl, &job->u.reply, sizeof(rpl)); - if (conn->state == CPUWORKER_STATE_BUSY_ONION) { - cpuworker_reply_t rpl; - if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t)) - return 0; /* not yet */ - tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t)); + tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC); - connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn); - - tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC); - - if (rpl.timed && rpl.success && - rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) { - /* Time how long this request took. The handshake_type check should be - needless, but let's leave it in to be safe. */ - struct timeval tv_end, tv_diff; - int64_t usec_roundtrip; - tor_gettimeofday(&tv_end); - timersub(&tv_end, &rpl.started_at, &tv_diff); - usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec; - if (usec_roundtrip >= 0 && - usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) { - ++onionskins_n_processed[rpl.handshake_type]; - onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec; - onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip; - if (onionskins_n_processed[rpl.handshake_type] >= 500000) { - /* Scale down every 500000 handshakes. On a busy server, that's - * less impressive than it sounds. */ - onionskins_n_processed[rpl.handshake_type] /= 2; - onionskins_usec_internal[rpl.handshake_type] /= 2; - onionskins_usec_roundtrip[rpl.handshake_type] /= 2; - } + if (rpl.timed && rpl.success && + rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) { + /* Time how long this request took. The handshake_type check should be + needless, but let's leave it in to be safe. */ + struct timeval tv_end, tv_diff; + int64_t usec_roundtrip; + tor_gettimeofday(&tv_end); + timersub(&tv_end, &rpl.started_at, &tv_diff); + usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec; + if (usec_roundtrip >= 0 && + usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) { + ++onionskins_n_processed[rpl.handshake_type]; + onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec; + onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip; + if (onionskins_n_processed[rpl.handshake_type] >= 500000) { + /* Scale down every 500000 handshakes. On a busy server, that's + * less impressive than it sounds. */ + onionskins_n_processed[rpl.handshake_type] /= 2; + onionskins_usec_internal[rpl.handshake_type] /= 2; + onionskins_usec_roundtrip[rpl.handshake_type] /= 2; } } - /* parse out the circ it was talking about */ - tag_unpack(rpl.tag, &chan_id, &circ_id); - circ = NULL; - log_debug(LD_OR, - "Unpacking cpuworker reply, chan_id is " U64_FORMAT - ", circ_id is %u", - U64_PRINTF_ARG(chan_id), (unsigned)circ_id); - p_chan = channel_find_by_global_id(chan_id); - - if (p_chan) - circ = circuit_get_by_circid_channel(circ_id, p_chan); - - if (rpl.success == 0) { - log_debug(LD_OR, - "decoding onionskin failed. " - "(Old key or bad software.) Closing."); - if (circ) - circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL); - goto done_processing; - } - if (!circ) { - /* This happens because somebody sends us a destroy cell and the - * circuit goes away, while the cpuworker is working. This is also - * why our tag doesn't include a pointer to the circ, because we'd - * never know if it's still valid. - */ - log_debug(LD_OR,"processed onion for a circ that's gone. Dropping."); - goto done_processing; - } - tor_assert(! CIRCUIT_IS_ORIGIN(circ)); - if (onionskin_answer(TO_OR_CIRCUIT(circ), - &rpl.created_cell, - (const char*)rpl.keys, - rpl.rend_auth_material) < 0) { - log_warn(LD_OR,"onionskin_answer failed. Closing."); - circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL); - goto done_processing; - } - log_debug(LD_OR,"onionskin_answer succeeded. Yay."); - } else { - tor_assert(0); /* don't ask me to do handshakes yet */ } + circ = job->circ; + + log_debug(LD_OR, + "Unpacking cpuworker reply %p, circ=%p, success=%d", + job, circ, rpl.success); + + if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) { + /* The circuit was supposed to get freed while the reply was + * pending. Instead, it got left for us to free so that we wouldn't freak + * out when the job->circ field wound up pointing to nothing. */ + log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory."); + circ->base_.magic = 0; + tor_free(circ); + goto done_processing; + } + + circ->workqueue_entry = NULL; + + if (rpl.success == 0) { + log_debug(LD_OR, + "decoding onionskin failed. " + "(Old key or bad software.) Closing."); + if (circ) + circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL); + goto done_processing; + } + + if (onionskin_answer(circ, + &rpl.created_cell, + (const char*)rpl.keys, + rpl.rend_auth_material) < 0) { + log_warn(LD_OR,"onionskin_answer failed. Closing."); + circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL); + goto done_processing; + } + log_debug(LD_OR,"onionskin_answer succeeded. Yay."); + + done_processing: - conn->state = CPUWORKER_STATE_IDLE; - num_cpuworkers_busy--; - if (conn->timestamp_created < last_rotation_time) { - connection_mark_for_close(conn); - num_cpuworkers--; - spawn_enough_cpuworkers(); - } else { - process_pending_task(conn); - } - return 0; + memwipe(&rpl, 0, sizeof(rpl)); + memwipe(job, 0, sizeof(*job)); + tor_free(job); + queue_pending_tasks(); } -/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair. - * Read and writes from fdarray[1]. Reads requests, writes answers. - * - * Request format: - * cpuworker_request_t. - * Response format: - * cpuworker_reply_t - */ -static void -cpuworker_main(void *data) +/** Implementation function for onion handshake requests. */ +static int +cpuworker_onion_handshake_threadfn(void *state_, void *work_) { - /* For talking to the parent thread/process */ - tor_socket_t *fdarray = data; - tor_socket_t fd; + worker_state_t *state = state_; + cpuworker_job_t *job = work_; /* variables for onion processing */ - server_onion_keys_t onion_keys; + server_onion_keys_t *onion_keys = state->onion_keys; cpuworker_request_t req; cpuworker_reply_t rpl; - fd = fdarray[1]; /* this side is ours */ - tor_free(data); + memcpy(&req, &job->u.request, sizeof(req)); - setup_server_onion_keys(&onion_keys); - - for (;;) { - if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) { - log_info(LD_OR, "read request failed. Exiting."); - goto end; - } - tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC); + tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC); + memset(&rpl, 0, sizeof(rpl)); + const create_cell_t *cc = &req.create_cell; + created_cell_t *cell_out = &rpl.created_cell; + struct timeval tv_start = {0,0}, tv_end; + int n; + rpl.timed = req.timed; + rpl.started_at = req.started_at; + rpl.handshake_type = cc->handshake_type; + if (req.timed) + tor_gettimeofday(&tv_start); + n = onion_skin_server_handshake(cc->handshake_type, + cc->onionskin, cc->handshake_len, + onion_keys, + cell_out->reply, + rpl.keys, CPATH_KEY_MATERIAL_LEN, + rpl.rend_auth_material); + if (n < 0) { + /* failure */ + log_debug(LD_OR,"onion_skin_server_handshake failed."); memset(&rpl, 0, sizeof(rpl)); - - if (req.task == CPUWORKER_TASK_ONION) { - const create_cell_t *cc = &req.create_cell; - created_cell_t *cell_out = &rpl.created_cell; - struct timeval tv_start = {0,0}, tv_end; - int n; - rpl.timed = req.timed; - rpl.started_at = req.started_at; - rpl.handshake_type = cc->handshake_type; - if (req.timed) - tor_gettimeofday(&tv_start); - n = onion_skin_server_handshake(cc->handshake_type, - cc->onionskin, cc->handshake_len, - &onion_keys, - cell_out->reply, - rpl.keys, CPATH_KEY_MATERIAL_LEN, - rpl.rend_auth_material); - if (n < 0) { - /* failure */ - log_debug(LD_OR,"onion_skin_server_handshake failed."); - memset(&rpl, 0, sizeof(rpl)); - memcpy(rpl.tag, req.tag, TAG_LEN); - rpl.success = 0; - } else { - /* success */ - log_debug(LD_OR,"onion_skin_server_handshake succeeded."); - memcpy(rpl.tag, req.tag, TAG_LEN); - cell_out->handshake_len = n; - switch (cc->cell_type) { - case CELL_CREATE: - cell_out->cell_type = CELL_CREATED; break; - case CELL_CREATE2: - cell_out->cell_type = CELL_CREATED2; break; - case CELL_CREATE_FAST: - cell_out->cell_type = CELL_CREATED_FAST; break; - default: - tor_assert(0); - goto end; - } - rpl.success = 1; - } - rpl.magic = CPUWORKER_REPLY_MAGIC; - if (req.timed) { - struct timeval tv_diff; - int64_t usec; - tor_gettimeofday(&tv_end); - timersub(&tv_end, &tv_start, &tv_diff); - usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec; - if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY) - rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY; - else - rpl.n_usec = (uint32_t) usec; - } - if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) { - log_err(LD_BUG,"writing response buf failed. Exiting."); - goto end; - } - log_debug(LD_OR,"finished writing response."); - } else if (req.task == CPUWORKER_TASK_SHUTDOWN) { - log_info(LD_OR,"Clean shutdown: exiting"); - goto end; + rpl.success = 0; + } else { + /* success */ + log_debug(LD_OR,"onion_skin_server_handshake succeeded."); + cell_out->handshake_len = n; + switch (cc->cell_type) { + case CELL_CREATE: + cell_out->cell_type = CELL_CREATED; break; + case CELL_CREATE2: + cell_out->cell_type = CELL_CREATED2; break; + case CELL_CREATE_FAST: + cell_out->cell_type = CELL_CREATED_FAST; break; + default: + tor_assert(0); + return WQ_RPL_SHUTDOWN; } - memwipe(&req, 0, sizeof(req)); - memwipe(&rpl, 0, sizeof(req)); + rpl.success = 1; } - end: + rpl.magic = CPUWORKER_REPLY_MAGIC; + if (req.timed) { + struct timeval tv_diff; + int64_t usec; + tor_gettimeofday(&tv_end); + timersub(&tv_end, &tv_start, &tv_diff); + usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec; + if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY) + rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY; + else + rpl.n_usec = (uint32_t) usec; + } + + memcpy(&job->u.reply, &rpl, sizeof(rpl)); + memwipe(&req, 0, sizeof(req)); memwipe(&rpl, 0, sizeof(req)); - release_server_onion_keys(&onion_keys); - tor_close_socket(fd); - crypto_thread_cleanup(); - spawn_exit(); + return WQ_RPL_REPLY; } -/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed. - */ -static int -spawn_cpuworker(void) -{ - tor_socket_t *fdarray; - tor_socket_t fd; - connection_t *conn; - int err; - - fdarray = tor_calloc(2, sizeof(tor_socket_t)); - if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) { - log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s", - tor_socket_strerror(-err)); - tor_free(fdarray); - return -1; - } - - tor_assert(SOCKET_OK(fdarray[0])); - tor_assert(SOCKET_OK(fdarray[1])); - - fd = fdarray[0]; - if (spawn_func(cpuworker_main, (void*)fdarray) < 0) { - tor_close_socket(fdarray[0]); - tor_close_socket(fdarray[1]); - tor_free(fdarray); - return -1; - } - log_debug(LD_OR,"just spawned a cpu worker."); - - conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX); - - /* set up conn so it's got all the data we need to remember */ - conn->s = fd; - conn->address = tor_strdup("localhost"); - tor_addr_make_unspec(&conn->addr); - - if (set_socket_nonblocking(fd) == -1) { - connection_free(conn); /* this closes fd */ - return -1; - } - - if (connection_add(conn) < 0) { /* no space, forget it */ - log_warn(LD_NET,"connection_add for cpuworker failed. Giving up."); - connection_free(conn); /* this closes fd */ - return -1; - } - - conn->state = CPUWORKER_STATE_IDLE; - connection_start_reading(conn); - - return 0; /* success */ -} - -/** If we have too few or too many active cpuworkers, try to spawn new ones - * or kill idle ones. - */ +/** Take pending tasks from the queue and assign them to cpuworkers. */ static void -spawn_enough_cpuworkers(void) -{ - int num_cpuworkers_needed = get_num_cpus(get_options()); - int reseed = 0; - - if (num_cpuworkers_needed < MIN_CPUWORKERS) - num_cpuworkers_needed = MIN_CPUWORKERS; - if (num_cpuworkers_needed > MAX_CPUWORKERS) - num_cpuworkers_needed = MAX_CPUWORKERS; - - while (num_cpuworkers < num_cpuworkers_needed) { - if (spawn_cpuworker() < 0) { - log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later."); - return; - } - num_cpuworkers++; - reseed++; - } - - if (reseed) - crypto_seed_weak_rng(&request_sample_rng); -} - -/** Take a pending task from the queue and assign it to 'cpuworker'. */ -static void -process_pending_task(connection_t *cpuworker) +queue_pending_tasks(void) { or_circuit_t *circ; create_cell_t *onionskin = NULL; - tor_assert(cpuworker); + while (total_pending_tasks < max_pending_tasks) { + circ = onion_next_task(&onionskin); - /* for now only process onion tasks */ + if (!circ) + return; - circ = onion_next_task(&onionskin); - if (!circ) - return; - if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin)) - log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring."); -} - -/** How long should we let a cpuworker stay busy before we give - * up on it and decide that we have a bug or infinite loop? - * This value is high because some servers with low memory/cpu - * sometimes spend an hour or more swapping, and Tor starves. */ -#define CPUWORKER_BUSY_TIMEOUT (60*60*12) - -/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get - * stuck in the 'busy' state, even though the cpuworker process thinks of - * itself as idle. I don't know why. But here's a workaround to kill any - * cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT. - */ -static void -cull_wedged_cpuworkers(void) -{ - time_t now = time(NULL); - smartlist_t *conns = get_connection_array(); - SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { - if (!conn->marked_for_close && - conn->type == CONN_TYPE_CPUWORKER && - conn->state == CPUWORKER_STATE_BUSY_ONION && - conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) { - log_notice(LD_BUG, - "closing wedged cpuworker. Can somebody find the bug?"); - num_cpuworkers_busy--; - num_cpuworkers--; - connection_mark_for_close(conn); - } - } SMARTLIST_FOREACH_END(conn); + if (assign_onionskin_to_cpuworker(circ, onionskin)) + log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring."); + } } /** Try to tell a cpuworker to perform the public key operations necessary to * respond to onionskin for the circuit circ. * - * If cpuworker is defined, assert that he's idle, and use him. Else, - * look for an idle cpuworker and use him. If none idle, queue task onto the - * pending onion list and return. Return 0 if we successfully assign the - * task, or -1 on failure. + * Return 0 if we successfully assign the task, or -1 on failure. */ int -assign_onionskin_to_cpuworker(connection_t *cpuworker, - or_circuit_t *circ, +assign_onionskin_to_cpuworker(or_circuit_t *circ, create_cell_t *onionskin) { + workqueue_entry_t *queue_entry; + cpuworker_job_t *job; cpuworker_request_t req; - time_t now = approx_time(); - static time_t last_culled_cpuworkers = 0; int should_time; - /* Checking for wedged cpuworkers requires a linear search over all - * connections, so let's do it only once a minute. - */ -#define CULL_CPUWORKERS_INTERVAL 60 - - if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) { - cull_wedged_cpuworkers(); - spawn_enough_cpuworkers(); - last_culled_cpuworkers = now; + if (!circ->p_chan) { + log_info(LD_OR,"circ->p_chan gone. Failing circ."); + tor_free(onionskin); + return -1; } - if (1) { - if (num_cpuworkers_busy == num_cpuworkers) { - log_debug(LD_OR,"No idle cpuworkers. Queuing."); - if (onion_pending_add(circ, onionskin) < 0) { - tor_free(onionskin); - return -1; - } - return 0; - } - - if (!cpuworker) - cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, - CPUWORKER_STATE_IDLE); - - tor_assert(cpuworker); - - if (!circ->p_chan) { - log_info(LD_OR,"circ->p_chan gone. Failing circ."); + if (total_pending_tasks >= max_pending_tasks) { + log_debug(LD_OR,"No idle cpuworkers. Queuing."); + if (onion_pending_add(circ, onionskin) < 0) { tor_free(onionskin); return -1; } - - if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest)) - rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type); - - should_time = should_time_request(onionskin->handshake_type); - memset(&req, 0, sizeof(req)); - req.magic = CPUWORKER_REQUEST_MAGIC; - tag_pack(req.tag, circ->p_chan->global_identifier, - circ->p_circ_id); - req.timed = should_time; - - cpuworker->state = CPUWORKER_STATE_BUSY_ONION; - /* touch the lastwritten timestamp, since that's how we check to - * see how long it's been since we asked the question, and sometimes - * we check before the first call to connection_handle_write(). */ - cpuworker->timestamp_lastwritten = now; - num_cpuworkers_busy++; - - req.task = CPUWORKER_TASK_ONION; - memcpy(&req.create_cell, onionskin, sizeof(create_cell_t)); - - tor_free(onionskin); - - if (should_time) - tor_gettimeofday(&req.started_at); - - connection_write_to_buf((void*)&req, sizeof(req), cpuworker); - memwipe(&req, 0, sizeof(req)); + return 0; } + + if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest)) + rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type); + + should_time = should_time_request(onionskin->handshake_type); + memset(&req, 0, sizeof(req)); + req.magic = CPUWORKER_REQUEST_MAGIC; + req.timed = should_time; + + memcpy(&req.create_cell, onionskin, sizeof(create_cell_t)); + + tor_free(onionskin); + + if (should_time) + tor_gettimeofday(&req.started_at); + + job = tor_malloc_zero(sizeof(cpuworker_job_t)); + job->circ = circ; + memcpy(&job->u.request, &req, sizeof(req)); + memwipe(&req, 0, sizeof(req)); + + ++total_pending_tasks; + queue_entry = threadpool_queue_work(threadpool, + cpuworker_onion_handshake_threadfn, + cpuworker_onion_handshake_replyfn, + job); + if (!queue_entry) { + log_warn(LD_BUG, "Couldn't queue work on threadpool"); + tor_free(job); + return -1; + } + + log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)", + job, queue_entry, job->circ); + + circ->workqueue_entry = queue_entry; + return 0; } +/** If circ has a pending handshake that hasn't been processed yet, + * remove it from the worker queue. */ +void +cpuworker_cancel_circ_handshake(or_circuit_t *circ) +{ + cpuworker_job_t *job; + if (circ->workqueue_entry == NULL) + return; + + job = workqueue_entry_cancel(circ->workqueue_entry); + if (job) { + /* It successfully cancelled. */ + memwipe(job, 0xe0, sizeof(*job)); + tor_free(job); + } + + circ->workqueue_entry = NULL; +} + diff --git a/src/or/cpuworker.h b/src/or/cpuworker.h index 2a2b37a975..70a595e472 100644 --- a/src/or/cpuworker.h +++ b/src/or/cpuworker.h @@ -13,19 +13,17 @@ #define TOR_CPUWORKER_H void cpu_init(void); -void cpuworkers_rotate(void); -int connection_cpu_finished_flushing(connection_t *conn); -int connection_cpu_reached_eof(connection_t *conn); -int connection_cpu_process_inbuf(connection_t *conn); +void cpuworkers_rotate_keyinfo(void); + struct create_cell_t; -int assign_onionskin_to_cpuworker(connection_t *cpuworker, - or_circuit_t *circ, +int assign_onionskin_to_cpuworker(or_circuit_t *circ, struct create_cell_t *onionskin); uint64_t estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type); void cpuworker_log_onionskin_overhead(int severity, int onionskin_type, const char *onionskin_type_name); +void cpuworker_cancel_circ_handshake(or_circuit_t *circ); #endif diff --git a/src/or/main.c b/src/or/main.c index abf3230c4c..136043c117 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -1271,7 +1271,7 @@ run_scheduled_events(time_t now) get_onion_key_set_at()+MIN_ONION_KEY_LIFETIME < now) { log_info(LD_GENERAL,"Rotating onion key."); rotate_onion_key(); - cpuworkers_rotate(); + cpuworkers_rotate_keyinfo(); if (router_rebuild_descriptor(1)<0) { log_info(LD_CONFIG, "Couldn't rebuild router descriptor"); } @@ -1960,9 +1960,9 @@ do_hup(void) * force a retry there. */ if (server_mode(options)) { - /* Restart cpuworker and dnsworker processes, so they get up-to-date + /* Update cpuworker and dnsworker processes, so they get up-to-date * configuration options. */ - cpuworkers_rotate(); + cpuworkers_rotate_keyinfo(); dns_reset(); } return 0; diff --git a/src/or/onion.c b/src/or/onion.c index 3723a3e11e..43fb63c832 100644 --- a/src/or/onion.c +++ b/src/or/onion.c @@ -295,6 +295,8 @@ onion_pending_remove(or_circuit_t *circ) victim = circ->onionqueue_entry; if (victim) onion_queue_entry_remove(victim); + + cpuworker_cancel_circ_handshake(circ); } /** Remove a queue entry victim from the queue, unlinking it from @@ -339,25 +341,25 @@ clear_pending_onions(void) /* ============================================================ */ -/** Fill in a server_onion_keys_t object at keys with all of the keys +/** Return a new server_onion_keys_t object with all of the keys * and other info we might need to do onion handshakes. (We make a copy of * our keys for each cpuworker to avoid race conditions with the main thread, * and to avoid locking) */ -void -setup_server_onion_keys(server_onion_keys_t *keys) +server_onion_keys_t * +server_onion_keys_new(void) { - memset(keys, 0, sizeof(server_onion_keys_t)); + server_onion_keys_t *keys = tor_malloc_zero(sizeof(server_onion_keys_t)); memcpy(keys->my_identity, router_get_my_id_digest(), DIGEST_LEN); dup_onion_keys(&keys->onion_key, &keys->last_onion_key); keys->curve25519_key_map = construct_ntor_key_map(); keys->junk_keypair = tor_malloc_zero(sizeof(curve25519_keypair_t)); curve25519_keypair_generate(keys->junk_keypair, 0); + return keys; } -/** Release all storage held in keys, but do not free keys - * itself (as it's likely to be stack-allocated.) */ +/** Release all storage held in keys. */ void -release_server_onion_keys(server_onion_keys_t *keys) +server_onion_keys_free(server_onion_keys_t *keys) { if (! keys) return; @@ -366,7 +368,8 @@ release_server_onion_keys(server_onion_keys_t *keys) crypto_pk_free(keys->last_onion_key); ntor_key_map_free(keys->curve25519_key_map); tor_free(keys->junk_keypair); - memset(keys, 0, sizeof(server_onion_keys_t)); + memwipe(keys, 0, sizeof(server_onion_keys_t)); + tor_free(keys); } /** Release whatever storage is held in state, depending on its diff --git a/src/or/onion.h b/src/or/onion.h index 35619879e4..96050083f8 100644 --- a/src/or/onion.h +++ b/src/or/onion.h @@ -30,8 +30,8 @@ typedef struct server_onion_keys_t { #define MAX_ONIONSKIN_CHALLENGE_LEN 255 #define MAX_ONIONSKIN_REPLY_LEN 255 -void setup_server_onion_keys(server_onion_keys_t *keys); -void release_server_onion_keys(server_onion_keys_t *keys); +server_onion_keys_t *server_onion_keys_new(void); +void server_onion_keys_free(server_onion_keys_t *keys); void onion_handshake_state_release(onion_handshake_state_t *state); diff --git a/src/or/or.h b/src/or/or.h index f821d0b405..ef217fbca8 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -213,8 +213,7 @@ typedef enum { #define CONN_TYPE_DIR_LISTENER 8 /** Type for HTTP connections to the directory server. */ #define CONN_TYPE_DIR 9 -/** Connection from the main process to a CPU worker process. */ -#define CONN_TYPE_CPUWORKER 10 +/* Type 10 is unused. */ /** Type for listening for connections from user interface process. */ #define CONN_TYPE_CONTROL_LISTENER 11 /** Type for connections from user interface process. */ @@ -276,17 +275,6 @@ typedef enum { /** State for any listener connection. */ #define LISTENER_STATE_READY 0 -#define CPUWORKER_STATE_MIN_ 1 -/** State for a connection to a cpuworker process that's idle. */ -#define CPUWORKER_STATE_IDLE 1 -/** State for a connection to a cpuworker process that's processing a - * handshake. */ -#define CPUWORKER_STATE_BUSY_ONION 2 -#define CPUWORKER_STATE_MAX_ 2 - -#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION -#define CPUWORKER_TASK_SHUTDOWN 255 - #define OR_CONN_STATE_MIN_ 1 /** State for a connection to an OR: waiting for connect() to finish. */ #define OR_CONN_STATE_CONNECTING 1 @@ -2707,8 +2695,14 @@ typedef struct { time_t expiry_time; } cpath_build_state_t; +/** "magic" value for an origin_circuit_t */ #define ORIGIN_CIRCUIT_MAGIC 0x35315243u +/** "magic" value for an or_circuit_t */ #define OR_CIRCUIT_MAGIC 0x98ABC04Fu +/** "magic" value for a circuit that would have been freed by circuit_free, + * but which we're keeping around until a cpuworker reply arrives. See + * circuit_free() for more documentation. */ +#define DEAD_CIRCUIT_MAGIC 0xdeadc14c struct create_cell_t; @@ -3128,6 +3122,9 @@ typedef struct or_circuit_t { /** Pointer to an entry on the onion queue, if this circuit is waiting for a * chance to give an onionskin to a cpuworker. Used only in onion.c */ struct onion_queue_t *onionqueue_entry; + /** Pointer to a workqueue entry, if this circuit has given an onionskin to + * a cpuworker and is waiting for a response. Used only in cpuworker.c */ + struct workqueue_entry_s *workqueue_entry; /** The circuit_id used in the previous (backward) hop of this circuit. */ circid_t p_circ_id; diff --git a/src/test/include.am b/src/test/include.am index 134c2ff56c..3c59a8b3c7 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -2,7 +2,7 @@ TESTS += src/test/test noinst_PROGRAMS+= src/test/bench if UNITTESTS_ENABLED -noinst_PROGRAMS+= src/test/test src/test/test-child +noinst_PROGRAMS+= src/test/test src/test/test-child src/test/test_workqueue endif src_test_AM_CPPFLAGS = -DSHARE_DATADIR="\"$(datadir)\"" \ @@ -47,6 +47,7 @@ src_test_test_SOURCES = \ src/test/test_routerkeys.c \ src/test/test_scheduler.c \ src/test/test_socks.c \ + src/test/test_threads.c \ src/test/test_util.c \ src/test/test_config.c \ src/test/test_hs.c \ @@ -63,6 +64,11 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS) src_test_bench_SOURCES = \ src/test/bench.c +src_test_test_workqueue_SOURCES = \ + src/test/test_workqueue.c +src_test_test_workqueue_CPPFLAGS= $(src_test_AM_CPPFLAGS) +src_test_test_workqueue_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) + src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ src_test_test_LDADD = src/or/libtor-testing.a src/common/libor-testing.a \ @@ -81,6 +87,15 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \ @TOR_SYSTEMD_LIBS@ +src_test_test_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ + @TOR_LDFLAGS_libevent@ +src_test_test_workqueue_LDADD = src/or/libtor-testing.a \ + src/common/libor-testing.a \ + src/common/libor-crypto-testing.a $(LIBDONNA) \ + src/common/libor-event-testing.a \ + @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \ + @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ + noinst_HEADERS+= \ src/test/fakechans.h \ src/test/test.h \ diff --git a/src/test/test.c b/src/test/test.c index 65d15f12bd..fc5290f0b9 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -1258,6 +1258,23 @@ test_stats(void *arg) tor_free(s); } +static void * +passthrough_test_setup(const struct testcase_t *testcase) +{ + return testcase->setup_data; +} +static int +passthrough_test_cleanup(const struct testcase_t *testcase, void *ptr) +{ + (void)testcase; + (void)ptr; + return 1; +} + +const struct testcase_setup_t passthrough_setup = { + passthrough_test_setup, passthrough_test_cleanup +}; + #define ENT(name) \ { #name, test_ ## name , 0, NULL, NULL } #define FORK(name) \ @@ -1297,6 +1314,7 @@ extern struct testcase_t cell_queue_tests[]; extern struct testcase_t options_tests[]; extern struct testcase_t socks_tests[]; extern struct testcase_t entrynodes_tests[]; +extern struct testcase_t thread_tests[]; extern struct testcase_t extorport_tests[]; extern struct testcase_t controller_event_tests[]; extern struct testcase_t logging_tests[]; @@ -1324,6 +1342,7 @@ static struct testgroup_t testgroups[] = { { "container/", container_tests }, { "util/", util_tests }, { "util/logging/", logging_tests }, + { "util/thread/", thread_tests }, { "cellfmt/", cell_format_tests }, { "cellqueue/", cell_queue_tests }, { "dir/", dir_tests }, diff --git a/src/test/test.h b/src/test/test.h index 48037a5ba3..b8057c59bf 100644 --- a/src/test/test.h +++ b/src/test/test.h @@ -158,5 +158,7 @@ crypto_pk_t *pk_generate(int idx); #define NS_MOCK(name) MOCK(name, NS(name)) #define NS_UNMOCK(name) UNMOCK(name) +extern const struct testcase_setup_t passthrough_setup; + #endif diff --git a/src/test/test_crypto.c b/src/test/test_crypto.c index 4a5a12c50a..8426c715a4 100644 --- a/src/test/test_crypto.c +++ b/src/test/test_crypto.c @@ -1975,30 +1975,14 @@ test_crypto_siphash(void *arg) ; } -static void * -pass_data_setup_fn(const struct testcase_t *testcase) -{ - return testcase->setup_data; -} -static int -pass_data_cleanup_fn(const struct testcase_t *testcase, void *ptr) -{ - (void)ptr; - (void)testcase; - return 1; -} -static const struct testcase_setup_t pass_data = { - pass_data_setup_fn, pass_data_cleanup_fn -}; - #define CRYPTO_LEGACY(name) \ { #name, test_crypto_ ## name , 0, NULL, NULL } struct testcase_t crypto_tests[] = { CRYPTO_LEGACY(formats), CRYPTO_LEGACY(rng), - { "aes_AES", test_crypto_aes, TT_FORK, &pass_data, (void*)"aes" }, - { "aes_EVP", test_crypto_aes, TT_FORK, &pass_data, (void*)"evp" }, + { "aes_AES", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"aes" }, + { "aes_EVP", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"evp" }, CRYPTO_LEGACY(sha), CRYPTO_LEGACY(pk), { "pk_fingerprints", test_crypto_pk_fingerprints, TT_FORK, NULL, NULL }, @@ -2006,23 +1990,25 @@ struct testcase_t crypto_tests[] = { CRYPTO_LEGACY(dh), CRYPTO_LEGACY(s2k_rfc2440), #ifdef HAVE_LIBSCRYPT_H - { "s2k_scrypt", test_crypto_s2k_general, 0, &pass_data, + { "s2k_scrypt", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"scrypt" }, - { "s2k_scrypt_low", test_crypto_s2k_general, 0, &pass_data, + { "s2k_scrypt_low", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"scrypt-low" }, #endif - { "s2k_pbkdf2", test_crypto_s2k_general, 0, &pass_data, + { "s2k_pbkdf2", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"pbkdf2" }, - { "s2k_rfc2440_general", test_crypto_s2k_general, 0, &pass_data, + { "s2k_rfc2440_general", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"rfc2440" }, - { "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &pass_data, + { "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"rfc2440-legacy" }, { "s2k_errors", test_crypto_s2k_errors, 0, NULL, NULL }, { "scrypt_vectors", test_crypto_scrypt_vectors, 0, NULL, NULL }, { "pbkdf2_vectors", test_crypto_pbkdf2_vectors, 0, NULL, NULL }, { "pwbox", test_crypto_pwbox, 0, NULL, NULL }, - { "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"aes" }, - { "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"evp" }, + { "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &passthrough_setup, + (void*)"aes" }, + { "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &passthrough_setup, + (void*)"evp" }, CRYPTO_LEGACY(base32_decode), { "kdf_TAP", test_crypto_kdf_TAP, 0, NULL, NULL }, { "hkdf_sha256", test_crypto_hkdf_sha256, 0, NULL, NULL }, diff --git a/src/test/test_threads.c b/src/test/test_threads.c new file mode 100644 index 0000000000..2ac08d4d28 --- /dev/null +++ b/src/test/test_threads.c @@ -0,0 +1,316 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include "or.h" +#include "compat_threads.h" +#include "test.h" + +/** mutex for thread test to stop the threads hitting data at the same time. */ +static tor_mutex_t *thread_test_mutex_ = NULL; +/** mutexes for the thread test to make sure that the threads have to + * interleave somewhat. */ +static tor_mutex_t *thread_test_start1_ = NULL, + *thread_test_start2_ = NULL; +/** Shared strmap for the thread test. */ +static strmap_t *thread_test_strmap_ = NULL; +/** The name of thread1 for the thread test */ +static char *thread1_name_ = NULL; +/** The name of thread2 for the thread test */ +static char *thread2_name_ = NULL; + +static int thread_fns_failed = 0; + +static unsigned long thread_fn_tid1, thread_fn_tid2; + +static void thread_test_func_(void* _s) ATTR_NORETURN; + +/** How many iterations have the threads in the unit test run? */ +static int t1_count = 0, t2_count = 0; + +/** Helper function for threading unit tests: This function runs in a + * subthread. It grabs its own mutex (start1 or start2) to make sure that it + * should start, then it repeatedly alters _test_thread_strmap protected by + * thread_test_mutex_. */ +static void +thread_test_func_(void* _s) +{ + char *s = _s; + int i, *count; + tor_mutex_t *m; + char buf[64]; + char **cp; + if (!strcmp(s, "thread 1")) { + m = thread_test_start1_; + cp = &thread1_name_; + count = &t1_count; + thread_fn_tid1 = tor_get_thread_id(); + } else { + m = thread_test_start2_; + cp = &thread2_name_; + count = &t2_count; + thread_fn_tid2 = tor_get_thread_id(); + } + + tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id()); + *cp = tor_strdup(buf); + + tor_mutex_acquire(m); + + for (i=0; i<10000; ++i) { + tor_mutex_acquire(thread_test_mutex_); + strmap_set(thread_test_strmap_, "last to run", *cp); + ++*count; + tor_mutex_release(thread_test_mutex_); + } + tor_mutex_acquire(thread_test_mutex_); + strmap_set(thread_test_strmap_, s, *cp); + if (in_main_thread()) + ++thread_fns_failed; + tor_mutex_release(thread_test_mutex_); + + tor_mutex_release(m); + + spawn_exit(); +} + +/** Run unit tests for threading logic. */ +static void +test_threads_basic(void *arg) +{ + char *s1 = NULL, *s2 = NULL; + int done = 0, timedout = 0; + time_t started; +#ifndef _WIN32 + struct timeval tv; + tv.tv_sec=0; + tv.tv_usec=100*1000; +#endif + (void) arg; + + set_main_thread(); + + thread_test_mutex_ = tor_mutex_new(); + thread_test_start1_ = tor_mutex_new(); + thread_test_start2_ = tor_mutex_new(); + thread_test_strmap_ = strmap_new(); + s1 = tor_strdup("thread 1"); + s2 = tor_strdup("thread 2"); + tor_mutex_acquire(thread_test_start1_); + tor_mutex_acquire(thread_test_start2_); + spawn_func(thread_test_func_, s1); + spawn_func(thread_test_func_, s2); + tor_mutex_release(thread_test_start2_); + tor_mutex_release(thread_test_start1_); + started = time(NULL); + while (!done) { + tor_mutex_acquire(thread_test_mutex_); + strmap_assert_ok(thread_test_strmap_); + if (strmap_get(thread_test_strmap_, "thread 1") && + strmap_get(thread_test_strmap_, "thread 2")) { + done = 1; + } else if (time(NULL) > started + 150) { + timedout = done = 1; + } + tor_mutex_release(thread_test_mutex_); +#ifndef _WIN32 + /* Prevent the main thread from starving the worker threads. */ + select(0, NULL, NULL, NULL, &tv); +#endif + } + tor_mutex_acquire(thread_test_start1_); + tor_mutex_release(thread_test_start1_); + tor_mutex_acquire(thread_test_start2_); + tor_mutex_release(thread_test_start2_); + + tor_mutex_free(thread_test_mutex_); + + if (timedout) { + printf("\nTimed out: %d %d", t1_count, t2_count); + tt_assert(strmap_get(thread_test_strmap_, "thread 1")); + tt_assert(strmap_get(thread_test_strmap_, "thread 2")); + tt_assert(!timedout); + } + + /* different thread IDs. */ + tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"), + strmap_get(thread_test_strmap_, "thread 2"))); + tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"), + strmap_get(thread_test_strmap_, "last to run")) || + !strcmp(strmap_get(thread_test_strmap_, "thread 2"), + strmap_get(thread_test_strmap_, "last to run"))); + + tt_int_op(thread_fns_failed, ==, 0); + tt_int_op(thread_fn_tid1, !=, thread_fn_tid2); + + done: + tor_free(s1); + tor_free(s2); + tor_free(thread1_name_); + tor_free(thread2_name_); + if (thread_test_strmap_) + strmap_free(thread_test_strmap_, NULL); + if (thread_test_start1_) + tor_mutex_free(thread_test_start1_); + if (thread_test_start2_) + tor_mutex_free(thread_test_start2_); +} + +typedef struct cv_testinfo_s { + tor_cond_t *cond; + tor_mutex_t *mutex; + int value; + int addend; + int shutdown; + int n_shutdown; + int n_wakeups; + int n_timeouts; + int n_threads; + const struct timeval *tv; +} cv_testinfo_t; + +static cv_testinfo_t * +cv_testinfo_new(void) +{ + cv_testinfo_t *i = tor_malloc_zero(sizeof(*i)); + i->cond = tor_cond_new(); + i->mutex = tor_mutex_new_nonrecursive(); + return i; +} + +static void +cv_testinfo_free(cv_testinfo_t *i) +{ + if (!i) + return; + tor_cond_free(i->cond); + tor_mutex_free(i->mutex); + tor_free(i); +} + +static void cv_test_thr_fn_(void *arg) ATTR_NORETURN; + +static void +cv_test_thr_fn_(void *arg) +{ + cv_testinfo_t *i = arg; + int tid, r; + + tor_mutex_acquire(i->mutex); + tid = i->n_threads++; + tor_mutex_release(i->mutex); + (void) tid; + + tor_mutex_acquire(i->mutex); + while (1) { + if (i->addend) { + i->value += i->addend; + i->addend = 0; + } + + if (i->shutdown) { + ++i->n_shutdown; + i->shutdown = 0; + tor_mutex_release(i->mutex); + spawn_exit(); + } + r = tor_cond_wait(i->cond, i->mutex, i->tv); + ++i->n_wakeups; + if (r == 1) { + ++i->n_timeouts; + tor_mutex_release(i->mutex); + spawn_exit(); + } + } +} + +static void +test_threads_conditionvar(void *arg) +{ + cv_testinfo_t *ti=NULL; + const struct timeval msec100 = { 0, 100*1000 }; + const int timeout = !strcmp(arg, "tv"); + + ti = cv_testinfo_new(); + if (timeout) { + ti->tv = &msec100; + } + spawn_func(cv_test_thr_fn_, ti); + spawn_func(cv_test_thr_fn_, ti); + spawn_func(cv_test_thr_fn_, ti); + spawn_func(cv_test_thr_fn_, ti); + + tor_mutex_acquire(ti->mutex); + ti->addend = 7; + ti->shutdown = 1; + tor_cond_signal_one(ti->cond); + tor_mutex_release(ti->mutex); + +#define SPIN() \ + while (1) { \ + tor_mutex_acquire(ti->mutex); \ + if (ti->addend == 0) { \ + break; \ + } \ + tor_mutex_release(ti->mutex); \ + } + + SPIN(); + + ti->addend = 30; + ti->shutdown = 1; + tor_cond_signal_all(ti->cond); + tor_mutex_release(ti->mutex); + SPIN(); + + ti->addend = 1000; + if (! timeout) ti->shutdown = 1; + tor_cond_signal_one(ti->cond); + tor_mutex_release(ti->mutex); + SPIN(); + ti->addend = 300; + if (! timeout) ti->shutdown = 1; + tor_cond_signal_all(ti->cond); + tor_mutex_release(ti->mutex); + + SPIN(); + tor_mutex_release(ti->mutex); + + tt_int_op(ti->value, ==, 1337); + if (!timeout) { + tt_int_op(ti->n_shutdown, ==, 4); + } else { +#ifdef _WIN32 + Sleep(500); /* msec */ +#elif defined(HAVE_USLEEP) + usleep(500*1000); /* usec */ +#else + { + struct tv = { 0, 500*1000 }; + select(0, NULL, NULL, NULL, &tv); + } +#endif + tor_mutex_acquire(ti->mutex); + tt_int_op(ti->n_shutdown, ==, 2); + tt_int_op(ti->n_timeouts, ==, 2); + tor_mutex_release(ti->mutex); + } + + done: + cv_testinfo_free(ti); +} + +#define THREAD_TEST(name) \ + { #name, test_threads_##name, TT_FORK, NULL, NULL } + +struct testcase_t thread_tests[] = { + THREAD_TEST(basic), + { "conditionvar", test_threads_conditionvar, TT_FORK, + &passthrough_setup, (void*)"no-tv" }, + { "conditionvar_timeout", test_threads_conditionvar, TT_FORK, + &passthrough_setup, (void*)"tv" }, + END_OF_TESTCASES +}; + diff --git a/src/test/test_util.c b/src/test/test_util.c index 2ee1d6dfc1..b53c8fc7a3 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -1607,142 +1607,6 @@ test_util_pow2(void *arg) ; } -/** mutex for thread test to stop the threads hitting data at the same time. */ -static tor_mutex_t *thread_test_mutex_ = NULL; -/** mutexes for the thread test to make sure that the threads have to - * interleave somewhat. */ -static tor_mutex_t *thread_test_start1_ = NULL, - *thread_test_start2_ = NULL; -/** Shared strmap for the thread test. */ -static strmap_t *thread_test_strmap_ = NULL; -/** The name of thread1 for the thread test */ -static char *thread1_name_ = NULL; -/** The name of thread2 for the thread test */ -static char *thread2_name_ = NULL; - -static void thread_test_func_(void* _s) ATTR_NORETURN; - -/** How many iterations have the threads in the unit test run? */ -static int t1_count = 0, t2_count = 0; - -/** Helper function for threading unit tests: This function runs in a - * subthread. It grabs its own mutex (start1 or start2) to make sure that it - * should start, then it repeatedly alters _test_thread_strmap protected by - * thread_test_mutex_. */ -static void -thread_test_func_(void* _s) -{ - char *s = _s; - int i, *count; - tor_mutex_t *m; - char buf[64]; - char **cp; - if (!strcmp(s, "thread 1")) { - m = thread_test_start1_; - cp = &thread1_name_; - count = &t1_count; - } else { - m = thread_test_start2_; - cp = &thread2_name_; - count = &t2_count; - } - - tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id()); - *cp = tor_strdup(buf); - - tor_mutex_acquire(m); - - for (i=0; i<10000; ++i) { - tor_mutex_acquire(thread_test_mutex_); - strmap_set(thread_test_strmap_, "last to run", *cp); - ++*count; - tor_mutex_release(thread_test_mutex_); - } - tor_mutex_acquire(thread_test_mutex_); - strmap_set(thread_test_strmap_, s, *cp); - tor_mutex_release(thread_test_mutex_); - - tor_mutex_release(m); - - spawn_exit(); -} - -/** Run unit tests for threading logic. */ -static void -test_util_threads(void *arg) -{ - char *s1 = NULL, *s2 = NULL; - int done = 0, timedout = 0; - time_t started; -#ifndef _WIN32 - struct timeval tv; - tv.tv_sec=0; - tv.tv_usec=100*1000; -#endif - (void)arg; - thread_test_mutex_ = tor_mutex_new(); - thread_test_start1_ = tor_mutex_new(); - thread_test_start2_ = tor_mutex_new(); - thread_test_strmap_ = strmap_new(); - s1 = tor_strdup("thread 1"); - s2 = tor_strdup("thread 2"); - tor_mutex_acquire(thread_test_start1_); - tor_mutex_acquire(thread_test_start2_); - spawn_func(thread_test_func_, s1); - spawn_func(thread_test_func_, s2); - tor_mutex_release(thread_test_start2_); - tor_mutex_release(thread_test_start1_); - started = time(NULL); - while (!done) { - tor_mutex_acquire(thread_test_mutex_); - strmap_assert_ok(thread_test_strmap_); - if (strmap_get(thread_test_strmap_, "thread 1") && - strmap_get(thread_test_strmap_, "thread 2")) { - done = 1; - } else if (time(NULL) > started + 150) { - timedout = done = 1; - } - tor_mutex_release(thread_test_mutex_); -#ifndef _WIN32 - /* Prevent the main thread from starving the worker threads. */ - select(0, NULL, NULL, NULL, &tv); -#endif - } - tor_mutex_acquire(thread_test_start1_); - tor_mutex_release(thread_test_start1_); - tor_mutex_acquire(thread_test_start2_); - tor_mutex_release(thread_test_start2_); - - tor_mutex_free(thread_test_mutex_); - - if (timedout) { - printf("\nTimed out: %d %d", t1_count, t2_count); - tt_assert(strmap_get(thread_test_strmap_, "thread 1")); - tt_assert(strmap_get(thread_test_strmap_, "thread 2")); - tt_assert(!timedout); - } - - /* different thread IDs. */ - tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"), - strmap_get(thread_test_strmap_, "thread 2"))); - tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"), - strmap_get(thread_test_strmap_, "last to run")) || - !strcmp(strmap_get(thread_test_strmap_, "thread 2"), - strmap_get(thread_test_strmap_, "last to run"))); - - done: - tor_free(s1); - tor_free(s2); - tor_free(thread1_name_); - tor_free(thread2_name_); - if (thread_test_strmap_) - strmap_free(thread_test_strmap_, NULL); - if (thread_test_start1_) - tor_mutex_free(thread_test_start1_); - if (thread_test_start2_) - tor_mutex_free(thread_test_start2_); -} - /** Run unit tests for compression functions */ static void test_util_gzip(void *arg) @@ -4783,23 +4647,6 @@ test_util_socket(void *arg) tor_close_socket(fd4); } -static void * -socketpair_test_setup(const struct testcase_t *testcase) -{ - return testcase->setup_data; -} -static int -socketpair_test_cleanup(const struct testcase_t *testcase, void *ptr) -{ - (void)testcase; - (void)ptr; - return 1; -} - -static const struct testcase_setup_t socketpair_setup = { - socketpair_test_setup, socketpair_test_cleanup -}; - /* Test for socketpair and ersatz_socketpair(). We test them both, since * the latter is a tolerably good way to exersize tor_accept_socket(). */ static void @@ -4928,7 +4775,6 @@ struct testcase_t util_tests[] = { UTIL_LEGACY(memarea), UTIL_LEGACY(control_formats), UTIL_LEGACY(mmap), - UTIL_LEGACY(threads), UTIL_LEGACY(sscanf), UTIL_LEGACY(format_time_interval), UTIL_LEGACY(path_is_relative), @@ -4975,10 +4821,10 @@ struct testcase_t util_tests[] = { UTIL_TEST(mathlog, 0), UTIL_TEST(weak_random, 0), UTIL_TEST(socket, TT_FORK), - { "socketpair", test_util_socketpair, TT_FORK, &socketpair_setup, + { "socketpair", test_util_socketpair, TT_FORK, &passthrough_setup, (void*)"0" }, { "socketpair_ersatz", test_util_socketpair, TT_FORK, - &socketpair_setup, (void*)"1" }, + &passthrough_setup, (void*)"1" }, UTIL_TEST(max_mem, 0), UTIL_TEST(hostname_validation, 0), UTIL_TEST(ipv4_validation, 0), diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c new file mode 100644 index 0000000000..aaff5069be --- /dev/null +++ b/src/test/test_workqueue.c @@ -0,0 +1,409 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "or.h" +#include "compat_threads.h" +#include "onion.h" +#include "workqueue.h" +#include "crypto.h" +#include "crypto_curve25519.h" +#include "compat_libevent.h" + +#include +#ifdef HAVE_EVENT2_EVENT_H +#include +#else +#include +#endif + +static int opt_verbose = 0; +static int opt_n_threads = 8; +static int opt_n_items = 10000; +static int opt_n_inflight = 1000; +static int opt_n_lowwater = 250; +static int opt_n_cancel = 0; +static int opt_ratio_rsa = 5; + +#ifdef TRACK_RESPONSES +tor_mutex_t bitmap_mutex; +int handled_len; +bitarray_t *handled; +#endif + +typedef struct state_s { + int magic; + int n_handled; + crypto_pk_t *rsa; + curve25519_secret_key_t ecdh; + int is_shutdown; +} state_t; + +typedef struct rsa_work_s { + int serial; + uint8_t msg[128]; + uint8_t msglen; +} rsa_work_t; + +typedef struct ecdh_work_s { + int serial; + union { + curve25519_public_key_t pk; + uint8_t msg[32]; + } u; +} ecdh_work_t; + +static void +mark_handled(int serial) +{ +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + tor_assert(serial < handled_len); + tor_assert(! bitarray_is_set(handled, serial)); + bitarray_set(handled, serial); + tor_mutex_release(&bitmap_mutex); +#else + (void)serial; +#endif +} + +static int +workqueue_do_rsa(void *state, void *work) +{ + rsa_work_t *rw = work; + state_t *st = state; + crypto_pk_t *rsa = st->rsa; + uint8_t sig[256]; + int len; + + tor_assert(st->magic == 13371337); + + len = crypto_pk_private_sign(rsa, (char*)sig, 256, + (char*)rw->msg, rw->msglen); + if (len < 0) { + rw->msglen = 0; + return WQ_RPL_ERROR; + } + + memset(rw->msg, 0, sizeof(rw->msg)); + rw->msglen = len; + memcpy(rw->msg, sig, len); + ++st->n_handled; + + mark_handled(rw->serial); + + return WQ_RPL_REPLY; +} + +static int +workqueue_do_shutdown(void *state, void *work) +{ + (void)state; + (void)work; + crypto_pk_free(((state_t*)state)->rsa); + tor_free(state); + return WQ_RPL_SHUTDOWN; +} + +static int +workqueue_do_ecdh(void *state, void *work) +{ + ecdh_work_t *ew = work; + uint8_t output[CURVE25519_OUTPUT_LEN]; + state_t *st = state; + + tor_assert(st->magic == 13371337); + + curve25519_handshake(output, &st->ecdh, &ew->u.pk); + memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); + ++st->n_handled; + mark_handled(ew->serial); + return WQ_RPL_REPLY; +} + +static void * +new_state(void *arg) +{ + state_t *st; + (void)arg; + + st = tor_malloc(sizeof(*st)); + /* Every thread gets its own keys. not a problem for benchmarking */ + st->rsa = crypto_pk_new(); + if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { + crypto_pk_free(st->rsa); + tor_free(st); + return NULL; + } + curve25519_secret_key_generate(&st->ecdh, 0); + st->magic = 13371337; + return st; +} + +static void +free_state(void *arg) +{ + state_t *st = arg; + crypto_pk_free(st->rsa); + tor_free(st); +} + +static tor_weak_rng_t weak_rng; +static int n_sent = 0; +static int rsa_sent = 0; +static int ecdh_sent = 0; +static int n_received = 0; + +#ifdef TRACK_RESPONSES +bitarray_t *received; +#endif + +static void +handle_reply(void *arg) +{ +#ifdef TRACK_RESPONSES + rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ + tor_assert(! bitarray_is_set(received, rw->serial)); + bitarray_set(received,rw->serial); +#endif + + tor_free(arg); + ++n_received; +} + +static workqueue_entry_t * +add_work(threadpool_t *tp) +{ + int add_rsa = + opt_ratio_rsa == 0 || + tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0; + + if (add_rsa) { + rsa_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + crypto_rand((char*)w->msg, 20); + w->msglen = 20; + ++rsa_sent; + return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w); + } else { + ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + /* Not strictly right, but this is just for benchmarks. */ + crypto_rand((char*)w->u.pk.public_key, 32); + ++ecdh_sent; + return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w); + } +} + +static int n_failed_cancel = 0; +static int n_successful_cancel = 0; + +static int +add_n_work_items(threadpool_t *tp, int n) +{ + int n_queued = 0; + int n_try_cancel = 0, i; + workqueue_entry_t **to_cancel; + workqueue_entry_t *ent; + + to_cancel = tor_malloc(sizeof(workqueue_entry_t*) * opt_n_cancel); + + while (n_queued++ < n) { + ent = add_work(tp); + if (! ent) { + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + return -1; + } + if (n_try_cancel < opt_n_cancel && + tor_weak_random_range(&weak_rng, n) < opt_n_cancel) { + to_cancel[n_try_cancel++] = ent; + } + } + + for (i = 0; i < n_try_cancel; ++i) { + void *work = workqueue_entry_cancel(to_cancel[i]); + if (! work) { + n_failed_cancel++; + } else { + n_successful_cancel++; + tor_free(work); + } + } + + tor_free(to_cancel); + return 0; +} + +static int shutting_down = 0; + +static void +replysock_readable_cb(tor_socket_t sock, short what, void *arg) +{ + threadpool_t *tp = arg; + replyqueue_t *rq = threadpool_get_replyqueue(tp); + + int old_r = n_received; + (void) sock; + (void) what; + + replyqueue_process(rq); + if (old_r == n_received) + return; + + if (opt_verbose) { + printf("%d / %d", n_received, n_sent); + if (opt_n_cancel) + printf(" (%d cancelled, %d uncancellable)", + n_successful_cancel, n_failed_cancel); + puts(""); + } +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + for (i = 0; i < opt_n_items; ++i) { + if (bitarray_is_set(received, i)) + putc('o', stdout); + else if (bitarray_is_set(handled, i)) + putc('!', stdout); + else + putc('.', stdout); + } + puts(""); + tor_mutex_release(&bitmap_mutex); +#endif + + if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) { + int n_to_send = n_received + opt_n_inflight - n_sent; + if (n_to_send > opt_n_items - n_sent) + n_to_send = opt_n_items - n_sent; + add_n_work_items(tp, n_to_send); + } + + if (shutting_down == 0 && + n_received+n_successful_cancel == n_sent && + n_sent >= opt_n_items) { + shutting_down = 1; + threadpool_queue_update(tp, NULL, + workqueue_do_shutdown, NULL, NULL); + } +} + +static void +help(void) +{ + puts( + "Options:\n" + " -N Run this many items of work\n" + " -T Use this many threads\n" + " -I Have no more than this many requests queued at once\n" + " -L Add items whenever fewer than this many are pending\n" + " -C Try to cancel N items of every batch that we add\n" + " -R Make one out of this many items be a slow (RSA) one\n" + " --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n" + " Disable one of the alert_socket backends."); +} + +int +main(int argc, char **argv) +{ + replyqueue_t *rq; + threadpool_t *tp; + int i; + tor_libevent_cfg evcfg; + struct event *ev; + uint32_t as_flags = 0; + + for (i = 1; i < argc; ++i) { + if (!strcmp(argv[i], "-v")) { + opt_verbose = 1; + } else if (!strcmp(argv[i], "-T") && i+1 opt_n_inflight || + opt_ratio_rsa < 0) { + help(); + return 1; + } + + init_logging(1); + crypto_global_init(1, NULL, NULL); + crypto_seed_rng(1); + + rq = replyqueue_new(as_flags); + tor_assert(rq); + tp = threadpool_new(opt_n_threads, + rq, new_state, free_state, NULL); + tor_assert(tp); + + crypto_seed_weak_rng(&weak_rng); + + memset(&evcfg, 0, sizeof(evcfg)); + tor_libevent_initialize(&evcfg); + + ev = tor_event_new(tor_libevent_get_base(), + replyqueue_get_socket(rq), EV_READ|EV_PERSIST, + replysock_readable_cb, tp); + + event_add(ev, NULL); + +#ifdef TRACK_RESPONSES + handled = bitarray_init_zero(opt_n_items); + received = bitarray_init_zero(opt_n_items); + tor_mutex_init(&bitmap_mutex); + handled_len = opt_n_items; +#endif + + for (i = 0; i < opt_n_inflight; ++i) { + if (! add_work(tp)) { + puts("Couldn't add work."); + return 1; + } + } + + { + struct timeval limit = { 30, 0 }; + tor_event_base_loopexit(tor_libevent_get_base(), &limit); + } + + event_base_loop(tor_libevent_get_base(), 0); + + if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) { + printf("%d vs %d\n", n_sent, opt_n_items); + printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent); + puts("FAIL"); + return 1; + } else { + puts("OK"); + return 0; + } +} +