diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c index 59b54a600a..a2e406521f 100644 --- a/src/common/compat_pthreads.c +++ b/src/common/compat_pthreads.c @@ -164,6 +164,7 @@ tor_get_thread_id(void) /* Conditions. */ +/** Initialize an already-allocated condition variable. */ int tor_cond_init(tor_cond_t *cond) { @@ -173,7 +174,9 @@ tor_cond_init(tor_cond_t *cond) } return 0; } -/** Release all resources held by cond. */ + +/** Release all resources held by cond, but do not free cond + * itself. */ void tor_cond_uninit(tor_cond_t *cond) { @@ -183,7 +186,11 @@ tor_cond_uninit(tor_cond_t *cond) } } /** Wait until one of the tor_cond_signal functions is called on cond. - * All waiters on the condition must wait holding the same mutex. + * (If tv is set, and that amount of time passes with no signal to + * cond, return anyway. All waiters on the condition must wait holding + * the same mutex. All signallers should hold that mutex. The mutex + * needs to have been allocated with tor_mutex_init_for_cond(). + * * Returns 0 on success, -1 on failure, 1 on timeout. */ int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv) diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c index 98bdbbcf5e..024c627cf1 100644 --- a/src/common/compat_threads.c +++ b/src/common/compat_threads.c @@ -40,6 +40,7 @@ tor_mutex_free(tor_mutex_t *m) tor_free(m); } +/** Allocate and return a new condition variable. */ tor_cond_t * tor_cond_new(void) { @@ -48,6 +49,8 @@ tor_cond_new(void) tor_free(cond); return cond; } + +/** Free all storage held in c. */ void tor_cond_free(tor_cond_t *c) { @@ -140,13 +143,16 @@ sock_drain(tor_socket_t fd) return 0; } -/** Allocate a new set of alert sockets. DOCDOC */ +/** Allocate a new set of alert sockets, and set the appropriate function + * pointers, in socks_out. */ int alert_sockets_create(alert_sockets_t *socks_out) { tor_socket_t socks[2]; #ifdef HAVE_EVENTFD + /* First, we try the Linux eventfd() syscall. This gives a 64-bit counter + * associated with a single file descriptor. */ #if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK) socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); #else @@ -171,6 +177,8 @@ alert_sockets_create(alert_sockets_t *socks_out) #endif #ifdef HAVE_PIPE2 + /* Now we're going to try pipes. First type the pipe2() syscall, if we + * have it, so we can save some calls... */ if (pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) { socks_out->read_fd = socks[0]; socks_out->write_fd = socks[1]; @@ -181,6 +189,8 @@ alert_sockets_create(alert_sockets_t *socks_out) #endif #ifdef HAVE_PIPE + /* Now try the regular pipe() syscall. Pipes have a bit lower overhead than + * socketpairs, fwict. */ if (pipe(socks) == 0) { if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 || @@ -198,12 +208,35 @@ alert_sockets_create(alert_sockets_t *socks_out) } #endif + /* If nothing else worked, fall back on socketpair(). */ if (tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) { - set_socket_nonblocking(socks[0]); - set_socket_nonblocking(socks[1]); + if (set_socket_nonblocking(socks[0]) < 0 || + set_socket_nonblocking(socks[1])) { + tor_close_socket(socks[0]); + tor_close_socket(socks[1]); + return -1; + } + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; socks_out->alert_fn = sock_alert; socks_out->drain_fn = sock_drain; return 0; } return -1; } + +/** Close the sockets in socks. */ +void +alert_sockets_close(alert_sockets_t *socks) +{ + if (socks->alert_fn == sock_alert) { + /* they are sockets. */ + tor_close_socket(socks->read_fd); + tor_close_socket(socks->write_fd); + } else { + close(socks->read_fd); + if (socks->write_fd != socks->read_fd) + close(socks->write_fd); + } + socks->read_fd = socks->write_fd = -1; +} diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h index b053136c15..9070f13e80 100644 --- a/src/common/compat_threads.h +++ b/src/common/compat_threads.h @@ -82,15 +82,23 @@ int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, void tor_cond_signal_one(tor_cond_t *cond); void tor_cond_signal_all(tor_cond_t *cond); -/** DOCDOC */ +/** Helper type used to manage waking up the main thread while it's in + * the libevent main loop. Used by the work queue code. */ typedef struct alert_sockets_s { - /*XXX needs a better name */ + /* XXXX This structure needs a better name. */ + /** Socket that the main thread should listen for EV_READ events on. + * Note that this socket may be a regular fd on a non-Windows platform. + */ tor_socket_t read_fd; + /** Socket to use when alerting the main thread. */ tor_socket_t write_fd; + /** Function to alert the main thread */ int (*alert_fn)(tor_socket_t write_fd); + /** Function to make the main thread no longer alerted. */ int (*drain_fn)(tor_socket_t read_fd); } alert_sockets_t; int alert_sockets_create(alert_sockets_t *socks_out); +void alert_sockets_close(alert_sockets_t *socks); #endif diff --git a/src/common/workqueue.c b/src/common/workqueue.c index c4b64de58b..e07787b404 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -9,67 +9,84 @@ #include "tor_queue.h" #include "torlog.h" -/* - design: +struct threadpool_s { + /** An array of pointers to workerthread_t: one for each running worker + * thread. */ + struct workerthread_s **threads; + /** Index of the next thread that we'll give work to.*/ + int next_for_work; - each thread has its own queue, try to keep at least elements min..max cycles - worth of work on each queue. + /** Number of elements in threads. */ + int n_threads; + /** Mutex to protect all the above fields. */ + tor_mutex_t lock; -keep array of threads; round-robin between them. + /** A reply queue to use when constructing new threads. */ + replyqueue_t *reply_queue; - When out of work, work-steal. - - alert threads with condition variables. - - alert main thread with fd, since it's libevent. - - - */ + /** Functions used to allocate and free thread state. */ + void *(*new_thread_state_fn)(void*); + void (*free_thread_state_fn)(void*); + void *new_thread_state_arg; +}; struct workqueue_entry_s { + /** The next workqueue_entry_t that's pending on the same thread or + * reply queue. */ TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; + /** The thread to which this workqueue_entry_t was assigned. This field + * is set when the workqueue_entry_t is created, and won't be cleared until + * after it's handled in the main thread. */ struct workerthread_s *on_thread; + /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; + /** Function to run in the worker thread. */ int (*fn)(void *state, void *arg); + /** Function to run while processing the reply queue. */ void (*reply_fn)(void *arg); + /** Argument for the above functions. */ void *arg; }; struct replyqueue_s { + /** Mutex to protect the answers field */ tor_mutex_t lock; + /** Doubly-linked list of answers that the reply queue needs to handle. */ TOR_TAILQ_HEAD(, workqueue_entry_s) answers; - alert_sockets_t alert; // lock not held on this. + /** Mechanism to wake up the main thread when it is receiving answers. */ + alert_sockets_t alert; }; +/** A worker thread represents a single thread in a thread pool. To avoid + * contention, each gets its own queue. This breaks the guarantee that that + * queued work will get executed strictly in order. */ typedef struct workerthread_s { + /** Lock to protect all fields of this thread and its queue. */ tor_mutex_t lock; + /** Condition variable that we wait on when we have no work, and which + * gets signaled when our queue becomes nonempty. */ tor_cond_t condition; + /** Queue of pending work that we have to do. */ TOR_TAILQ_HEAD(, workqueue_entry_s) work; + /** True iff this thread is currently in its loop. */ unsigned is_running; + /** True iff this thread has crashed or is shut down for some reason. */ unsigned is_shut_down; + /** True if we're waiting for more elements to get added to the queue. */ unsigned waiting; + /** User-supplied state field that we pass to the worker functions of each + * work item. */ void *state; + /** Reply queue to which we pass our results. */ replyqueue_t *reply_queue; } workerthread_t; -struct threadpool_s { - workerthread_t **threads; - int next_for_work; - - tor_mutex_t lock; - int n_threads; - - replyqueue_t *reply_queue; - - void *(*new_thread_state_fn)(void*); - void (*free_thread_state_fn)(void*); - void *new_thread_state_arg; - -}; - static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); +/** Allocate and return a new workqueue_entry_t, set up to run the function + * fn in the worker thread, and reply_fn in the main + * thread. See threadpool_queue_work() for full documentation. */ static workqueue_entry_t * workqueue_entry_new(int (*fn)(void*, void*), void (*reply_fn)(void*), @@ -82,6 +99,10 @@ workqueue_entry_new(int (*fn)(void*, void*), return ent; } +/** + * Release all storage held in ent. Call only when ent is not on + * any queue. + */ static void workqueue_entry_free(workqueue_entry_t *ent) { @@ -90,6 +111,20 @@ workqueue_entry_free(workqueue_entry_t *ent) tor_free(ent); } +/** + * Cancel a workqueue_entry_t that has been returned from + * threadpool_queue_work. + * + * You must not call this function on any work whose reply function has been + * executed in the main thread; that will cause undefined behavior (probably, + * a crash). + * + * If the work is cancelled, this function return 1. It is the caller's + * responsibility to free any storage in the work function's arguments. + * + * This function will have no effect if the worker thread has already executed + * or begun to execute the work item. In that case, it will return 0. + */ int workqueue_entry_cancel(workqueue_entry_t *ent) { @@ -107,6 +142,9 @@ workqueue_entry_cancel(workqueue_entry_t *ent) return cancelled; } +/** + * Main function for the worker thread. + */ static void worker_thread_main(void *thread_) { @@ -115,23 +153,26 @@ worker_thread_main(void *thread_) int result; tor_mutex_acquire(&thread->lock); - thread->is_running = 1; while (1) { - /* lock held. */ + /* lock must be held at this point. */ while (!TOR_TAILQ_EMPTY(&thread->work)) { - /* lock held. */ + /* lock must be held at this point. */ work = TOR_TAILQ_FIRST(&thread->work); TOR_TAILQ_REMOVE(&thread->work, work, next_work); work->pending = 0; tor_mutex_release(&thread->lock); + /* We run the work function without holding the thread lock. This + * is the main thread's first opportunity to give us more work. */ result = work->fn(thread->state, work->arg); + /* Queue the reply for the main thread. */ queue_reply(thread->reply_queue, work); tor_mutex_acquire(&thread->lock); + /* We may need to exit the thread. */ if (result >= WQ_RPL_ERROR) { thread->is_running = 0; thread->is_shut_down = 1; @@ -139,19 +180,23 @@ worker_thread_main(void *thread_) return; } } - /* Lock held; no work in this thread's queue. */ + /* At this point the lock is held, and there is no work in this thread's + * queue. */ /* TODO: Try work-stealing. */ - /* TODO: support an idle-function */ + /* Okay. Now, wait till somebody has work for us. */ thread->waiting = 1; - if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0) - /* ERR */ + if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0) { + /* XXXX ERROR */ + } thread->waiting = 0; } } +/** Put a reply on the reply queue. The reply must not currently be on + * any thread's work queue. */ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work) { @@ -168,6 +213,8 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) } } +/** Allocate and start a new worker thread to use state object state, + * and send responses to replyqueue. */ static workerthread_t * workerthread_new(void *state, replyqueue_t *replyqueue) { @@ -186,6 +233,10 @@ workerthread_new(void *state, replyqueue_t *replyqueue) return thr; } +/** + * Add an item of work to a single worker thread. See threadpool_queue_work(*) + * for arguments. + */ static workqueue_entry_t * workerthread_queue_work(workerthread_t *worker, int (*fn)(void *, void *), @@ -206,6 +257,23 @@ workerthread_queue_work(workerthread_t *worker, return ent; } +/** + * Queue an item of work for a thread in a thread pool. The function + * fn will be run in a worker thread, and will receive as arguments the + * thread's state object, and the provided object arg. It must return + * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN. + * + * Regardless of its return value, the function reply_fn will later be + * run in the main thread when it invokes replyqueue_process(), and will + * receive as its argument the same arg object. It's the reply + * function's responsibility to free the work object. + * + * On success, return a workqueue_entry_t object that can be passed to + * workqueue_entry_cancel(). On failure, return NULL. + * + * Note that because each thread has its own work queue, work items may not + * be executed strictly in order. + */ workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, int (*fn)(void *, void *), @@ -215,6 +283,7 @@ threadpool_queue_work(threadpool_t *pool, workerthread_t *worker; tor_mutex_acquire(&pool->lock); + /* Pick the next thread in random-access order. */ worker = pool->threads[pool->next_for_work++]; if (!worker) { tor_mutex_release(&pool->lock); @@ -227,9 +296,19 @@ threadpool_queue_work(threadpool_t *pool, return workerthread_queue_work(worker, fn, reply_fn, arg); } +/** + * Queue a copy of a work item for every thread in a pool. This can be used, + * for example, to tell the threads to update some parameter in their states. + * + * Arguments are as for threadpool_queue_work, except that the + * arg value is passed to dup_fn once per each thread to + * make a copy of it. + * + * Return 0 on success, -1 on failure. + */ int threadpool_queue_for_all(threadpool_t *pool, - void *(*dup_fn)(void *), + void *(*dup_fn)(const void *), int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) @@ -251,6 +330,7 @@ threadpool_queue_for_all(threadpool_t *pool, } } +/** Launch threads until we have n. */ static int threadpool_start_threads(threadpool_t *pool, int n) { @@ -274,6 +354,13 @@ threadpool_start_threads(threadpool_t *pool, int n) return 0; } +/** + * Construct a new thread pool with n worker threads, configured to + * send their output to replyqueue. The threads' states will be + * constructed with the new_thread_state_fn call, receiving arg + * as its argument. When the threads close, they will call + * free_thread_state_fn on their states. + */ threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, @@ -298,12 +385,17 @@ threadpool_new(int n_threads, return pool; } +/** Return the reply queue associated with a given thread pool. */ replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp) { return tp->reply_queue; } +/** Allocate a new reply queue. Reply queues are used to pass results from + * worker threads to the main thread. Since the main thread is running an + * IO-centric event loop, it needs to get woken up with means other than a + * condition variable. */ replyqueue_t * replyqueue_new(void) { @@ -321,12 +413,22 @@ replyqueue_new(void) return rq; } +/** + * Return the "read socket" for a given reply queue. The main thread should + * listen for read events on this socket, and call replyqueue_process() every + * time it triggers. + */ tor_socket_t replyqueue_get_socket(replyqueue_t *rq) { return rq->alert.read_fd; } +/** + * Process all pending replies on a reply queue. The main thread should call + * this function every time the socket returned by replyqueue_get_socket() is + * readable. + */ void replyqueue_process(replyqueue_t *queue) { @@ -336,7 +438,7 @@ replyqueue_process(replyqueue_t *queue) tor_mutex_acquire(&queue->lock); while (!TOR_TAILQ_EMPTY(&queue->answers)) { - /* lock held. */ + /* lock must be held at this point.*/ workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); diff --git a/src/common/workqueue.h b/src/common/workqueue.h index 684fb192ba..dca947e915 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -6,15 +6,21 @@ #include "compat.h" +/** A replyqueue is used to tell the main thread about the outcome of + * work that we queued for the the workers. */ typedef struct replyqueue_s replyqueue_t; +/** A thread-pool manages starting threads and passing work to them. */ typedef struct threadpool_s threadpool_t; +/** A workqueue entry represents a request that has been passed to a thread + * pool. */ typedef struct workqueue_entry_s workqueue_entry_t; -#define WQ_CMD_RUN 0 -#define WQ_CMD_CANCEL 1 - +/** Possible return value from a work function: indicates success. */ #define WQ_RPL_REPLY 0 +/** Possible return value from a work function: indicates fatal error */ #define WQ_RPL_ERROR 1 +/** Possible return value from a work function: indicates thread is shutting + * down. */ #define WQ_RPL_SHUTDOWN 2 workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, @@ -22,7 +28,7 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, void (*reply_fn)(void *), void *arg); int threadpool_queue_for_all(threadpool_t *pool, - void *(*dup_fn)(void *), + void *(*dup_fn)(const void *), int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg);