Ensure worker threads actually exit when it is time

This includes a small refactoring to use a new enum (workqueue_reply_t)
for the return values instead of just ints.
This commit is contained in:
Sebastian Hahn 2015-08-20 16:48:13 +02:00 committed by Nick Mathewson
parent 2657ea802b
commit 32220d38c0
5 changed files with 33 additions and 25 deletions

View File

@ -0,0 +1,6 @@
o Minor bugfixes:
- Ensure that worker threads actually exit when a fatal error or
shutdown is indicated. This doesn't currently affect the behaviour
of Tor, because Tor never indicates fatal error or shutdown except
in its unit tests. Fixes bug 16868; bugfix on 0.2.6.3-alpha.

View File

@ -25,7 +25,7 @@ struct threadpool_s {
unsigned generation; unsigned generation;
/** Function that should be run for updates on each thread. */ /** Function that should be run for updates on each thread. */
int (*update_fn)(void *, void *); workqueue_reply_t (*update_fn)(void *, void *);
/** Function to free update arguments if they can't be run. */ /** Function to free update arguments if they can't be run. */
void (*free_update_arg_fn)(void *); void (*free_update_arg_fn)(void *);
/** Array of n_threads update arguments. */ /** Array of n_threads update arguments. */
@ -56,7 +56,7 @@ struct workqueue_entry_s {
/** True iff this entry is waiting for a worker to start processing it. */ /** True iff this entry is waiting for a worker to start processing it. */
uint8_t pending; uint8_t pending;
/** Function to run in the worker thread. */ /** Function to run in the worker thread. */
int (*fn)(void *state, void *arg); workqueue_reply_t (*fn)(void *state, void *arg);
/** Function to run while processing the reply queue. */ /** Function to run while processing the reply queue. */
void (*reply_fn)(void *arg); void (*reply_fn)(void *arg);
/** Argument for the above functions. */ /** Argument for the above functions. */
@ -96,7 +96,7 @@ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
* <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
* thread. See threadpool_queue_work() for full documentation. */ * thread. See threadpool_queue_work() for full documentation. */
static workqueue_entry_t * static workqueue_entry_t *
workqueue_entry_new(int (*fn)(void*, void*), workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
void (*reply_fn)(void*), void (*reply_fn)(void*),
void *arg) void *arg)
{ {
@ -172,7 +172,7 @@ worker_thread_main(void *thread_)
workerthread_t *thread = thread_; workerthread_t *thread = thread_;
threadpool_t *pool = thread->in_pool; threadpool_t *pool = thread->in_pool;
workqueue_entry_t *work; workqueue_entry_t *work;
int result; workqueue_reply_t result;
tor_mutex_acquire(&pool->lock); tor_mutex_acquire(&pool->lock);
while (1) { while (1) {
@ -182,13 +182,14 @@ worker_thread_main(void *thread_)
if (thread->in_pool->generation != thread->generation) { if (thread->in_pool->generation != thread->generation) {
void *arg = thread->in_pool->update_args[thread->index]; void *arg = thread->in_pool->update_args[thread->index];
thread->in_pool->update_args[thread->index] = NULL; thread->in_pool->update_args[thread->index] = NULL;
int (*update_fn)(void*,void*) = thread->in_pool->update_fn; workqueue_reply_t (*update_fn)(void*,void*) =
thread->in_pool->update_fn;
thread->generation = thread->in_pool->generation; thread->generation = thread->in_pool->generation;
tor_mutex_release(&pool->lock); tor_mutex_release(&pool->lock);
int r = update_fn(thread->state, arg); workqueue_reply_t r = update_fn(thread->state, arg);
if (r < 0) { if (r != WQ_RPL_REPLY) {
return; return;
} }
@ -208,7 +209,7 @@ worker_thread_main(void *thread_)
queue_reply(thread->reply_queue, work); queue_reply(thread->reply_queue, work);
/* We may need to exit the thread. */ /* We may need to exit the thread. */
if (result >= WQ_RPL_ERROR) { if (result != WQ_RPL_REPLY) {
return; return;
} }
tor_mutex_acquire(&pool->lock); tor_mutex_acquire(&pool->lock);
@ -281,7 +282,7 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
*/ */
workqueue_entry_t * workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool, threadpool_queue_work(threadpool_t *pool,
int (*fn)(void *, void *), workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *), void (*reply_fn)(void *),
void *arg) void *arg)
{ {
@ -318,7 +319,7 @@ threadpool_queue_work(threadpool_t *pool,
int int
threadpool_queue_update(threadpool_t *pool, threadpool_queue_update(threadpool_t *pool,
void *(*dup_fn)(void *), void *(*dup_fn)(void *),
int (*fn)(void *, void *), workqueue_reply_t (*fn)(void *, void *),
void (*free_fn)(void *), void (*free_fn)(void *),
void *arg) void *arg)
{ {

View File

@ -15,21 +15,22 @@ typedef struct threadpool_s threadpool_t;
* pool. */ * pool. */
typedef struct workqueue_entry_s workqueue_entry_t; typedef struct workqueue_entry_s workqueue_entry_t;
/** Possible return value from a work function: indicates success. */ /** Possible return value from a work function: */
#define WQ_RPL_REPLY 0 typedef enum {
/** Possible return value from a work function: indicates fatal error */ WQ_RPL_REPLY = 0, /** indicates success */
#define WQ_RPL_ERROR 1 WQ_RPL_ERROR = 1, /** indicates fatal error */
/** Possible return value from a work function: indicates thread is shutting WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */
* down. */ } workqueue_reply_t;
#define WQ_RPL_SHUTDOWN 2
workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
int (*fn)(void *, void *), workqueue_reply_t (*fn)(void *,
void *),
void (*reply_fn)(void *), void (*reply_fn)(void *),
void *arg); void *arg);
int threadpool_queue_update(threadpool_t *pool, int threadpool_queue_update(threadpool_t *pool,
void *(*dup_fn)(void *), void *(*dup_fn)(void *),
int (*fn)(void *, void *), workqueue_reply_t (*fn)(void *, void *),
void (*free_fn)(void *), void (*free_fn)(void *),
void *arg); void *arg);
void *workqueue_entry_cancel(workqueue_entry_t *pending_work); void *workqueue_entry_cancel(workqueue_entry_t *pending_work);

View File

@ -160,7 +160,7 @@ typedef struct cpuworker_job_u {
} u; } u;
} cpuworker_job_t; } cpuworker_job_t;
static int static workqueue_reply_t
update_state_threadfn(void *state_, void *work_) update_state_threadfn(void *state_, void *work_)
{ {
worker_state_t *state = state_; worker_state_t *state = state_;
@ -387,7 +387,7 @@ cpuworker_onion_handshake_replyfn(void *work_)
} }
/** Implementation function for onion handshake requests. */ /** Implementation function for onion handshake requests. */
static int static workqueue_reply_t
cpuworker_onion_handshake_threadfn(void *state_, void *work_) cpuworker_onion_handshake_threadfn(void *state_, void *work_)
{ {
worker_state_t *state = state_; worker_state_t *state = state_;

View File

@ -70,7 +70,7 @@ mark_handled(int serial)
#endif #endif
} }
static int static workqueue_reply_t
workqueue_do_rsa(void *state, void *work) workqueue_do_rsa(void *state, void *work)
{ {
rsa_work_t *rw = work; rsa_work_t *rw = work;
@ -98,7 +98,7 @@ workqueue_do_rsa(void *state, void *work)
return WQ_RPL_REPLY; return WQ_RPL_REPLY;
} }
static int static workqueue_reply_t
workqueue_do_shutdown(void *state, void *work) workqueue_do_shutdown(void *state, void *work)
{ {
(void)state; (void)state;
@ -108,7 +108,7 @@ workqueue_do_shutdown(void *state, void *work)
return WQ_RPL_SHUTDOWN; return WQ_RPL_SHUTDOWN;
} }
static int static workqueue_reply_t
workqueue_do_ecdh(void *state, void *work) workqueue_do_ecdh(void *state, void *work)
{ {
ecdh_work_t *ew = work; ecdh_work_t *ew = work;
@ -124,7 +124,7 @@ workqueue_do_ecdh(void *state, void *work)
return WQ_RPL_REPLY; return WQ_RPL_REPLY;
} }
static int static workqueue_reply_t
workqueue_shutdown_error(void *state, void *work) workqueue_shutdown_error(void *state, void *work)
{ {
(void)state; (void)state;