mirror of
https://gitlab.torproject.org/tpo/core/tor.git
synced 2024-11-10 21:23:58 +01:00
Make pending work cancellable.
This commit is contained in:
parent
a82604b526
commit
c7eebe237d
@ -31,16 +31,18 @@ keep array of threads; round-robin between them.
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
typedef struct workqueue_entry_s {
|
struct workqueue_entry_s {
|
||||||
TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work;
|
TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
|
||||||
int (*fn)(int status, void *state, void *arg);
|
struct workerthread_s *on_thread;
|
||||||
|
uint8_t pending;
|
||||||
|
int (*fn)(void *state, void *arg);
|
||||||
void (*reply_fn)(void *arg);
|
void (*reply_fn)(void *arg);
|
||||||
void *arg;
|
void *arg;
|
||||||
} workqueue_entry_t;
|
};
|
||||||
|
|
||||||
struct replyqueue_s {
|
struct replyqueue_s {
|
||||||
tor_mutex_t lock;
|
tor_mutex_t lock;
|
||||||
TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers;
|
TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
|
||||||
|
|
||||||
void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2.
|
void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2.
|
||||||
tor_socket_t write_sock;
|
tor_socket_t write_sock;
|
||||||
@ -50,7 +52,7 @@ struct replyqueue_s {
|
|||||||
typedef struct workerthread_s {
|
typedef struct workerthread_s {
|
||||||
tor_mutex_t lock;
|
tor_mutex_t lock;
|
||||||
tor_cond_t condition;
|
tor_cond_t condition;
|
||||||
TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work;
|
TOR_TAILQ_HEAD(, workqueue_entry_s) work;
|
||||||
unsigned is_running;
|
unsigned is_running;
|
||||||
unsigned is_shut_down;
|
unsigned is_shut_down;
|
||||||
unsigned waiting;
|
unsigned waiting;
|
||||||
@ -76,7 +78,7 @@ struct threadpool_s {
|
|||||||
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
|
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
|
||||||
|
|
||||||
static workqueue_entry_t *
|
static workqueue_entry_t *
|
||||||
workqueue_entry_new(int (*fn)(int, void*, void*),
|
workqueue_entry_new(int (*fn)(void*, void*),
|
||||||
void (*reply_fn)(void*),
|
void (*reply_fn)(void*),
|
||||||
void *arg)
|
void *arg)
|
||||||
{
|
{
|
||||||
@ -95,6 +97,23 @@ workqueue_entry_free(workqueue_entry_t *ent)
|
|||||||
tor_free(ent);
|
tor_free(ent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
workqueue_entry_cancel(workqueue_entry_t *ent)
|
||||||
|
{
|
||||||
|
int cancelled = 0;
|
||||||
|
tor_mutex_acquire(&ent->on_thread->lock);
|
||||||
|
if (ent->pending) {
|
||||||
|
TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work);
|
||||||
|
cancelled = 1;
|
||||||
|
}
|
||||||
|
tor_mutex_release(&ent->on_thread->lock);
|
||||||
|
|
||||||
|
if (cancelled) {
|
||||||
|
tor_free(ent);
|
||||||
|
}
|
||||||
|
return cancelled;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
worker_thread_main(void *thread_)
|
worker_thread_main(void *thread_)
|
||||||
{
|
{
|
||||||
@ -107,20 +126,17 @@ worker_thread_main(void *thread_)
|
|||||||
thread->is_running = 1;
|
thread->is_running = 1;
|
||||||
while (1) {
|
while (1) {
|
||||||
/* lock held. */
|
/* lock held. */
|
||||||
while (!TOR_SIMPLEQ_EMPTY(&thread->work)) {
|
while (!TOR_TAILQ_EMPTY(&thread->work)) {
|
||||||
/* lock held. */
|
/* lock held. */
|
||||||
|
|
||||||
work = TOR_SIMPLEQ_FIRST(&thread->work);
|
work = TOR_TAILQ_FIRST(&thread->work);
|
||||||
TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work);
|
TOR_TAILQ_REMOVE(&thread->work, work, next_work);
|
||||||
|
work->pending = 0;
|
||||||
tor_mutex_release(&thread->lock);
|
tor_mutex_release(&thread->lock);
|
||||||
|
|
||||||
result = work->fn(WQ_CMD_RUN, thread->state, work->arg);
|
result = work->fn(thread->state, work->arg);
|
||||||
|
|
||||||
if (result == WQ_RPL_QUEUE) {
|
queue_reply(thread->reply_queue, work);
|
||||||
queue_reply(thread->reply_queue, work);
|
|
||||||
} else {
|
|
||||||
workqueue_entry_free(work);
|
|
||||||
}
|
|
||||||
|
|
||||||
tor_mutex_acquire(&thread->lock);
|
tor_mutex_acquire(&thread->lock);
|
||||||
if (result >= WQ_RPL_ERROR) {
|
if (result >= WQ_RPL_ERROR) {
|
||||||
@ -148,8 +164,8 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
|
|||||||
{
|
{
|
||||||
int was_empty;
|
int was_empty;
|
||||||
tor_mutex_acquire(&queue->lock);
|
tor_mutex_acquire(&queue->lock);
|
||||||
was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers);
|
was_empty = TOR_TAILQ_EMPTY(&queue->answers);
|
||||||
TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work);
|
TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
|
||||||
tor_mutex_release(&queue->lock);
|
tor_mutex_release(&queue->lock);
|
||||||
|
|
||||||
if (was_empty) {
|
if (was_empty) {
|
||||||
@ -175,7 +191,7 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
|
|||||||
workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
|
workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
|
||||||
tor_mutex_init_for_cond(&thr->lock);
|
tor_mutex_init_for_cond(&thr->lock);
|
||||||
tor_cond_init(&thr->condition);
|
tor_cond_init(&thr->condition);
|
||||||
TOR_SIMPLEQ_INIT(&thr->work);
|
TOR_TAILQ_INIT(&thr->work);
|
||||||
thr->state = state;
|
thr->state = state;
|
||||||
thr->reply_queue = replyqueue;
|
thr->reply_queue = replyqueue;
|
||||||
|
|
||||||
@ -187,9 +203,9 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
|
|||||||
return thr;
|
return thr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *
|
workqueue_entry_t *
|
||||||
threadpool_queue_work(threadpool_t *pool,
|
threadpool_queue_work(threadpool_t *pool,
|
||||||
int (*fn)(int, void *, void *),
|
int (*fn)(void *, void *),
|
||||||
void (*reply_fn)(void *),
|
void (*reply_fn)(void *),
|
||||||
void *arg)
|
void *arg)
|
||||||
{
|
{
|
||||||
@ -206,11 +222,12 @@ threadpool_queue_work(threadpool_t *pool,
|
|||||||
pool->next_for_work = 0;
|
pool->next_for_work = 0;
|
||||||
tor_mutex_release(&pool->lock);
|
tor_mutex_release(&pool->lock);
|
||||||
|
|
||||||
|
|
||||||
ent = workqueue_entry_new(fn, reply_fn, arg);
|
ent = workqueue_entry_new(fn, reply_fn, arg);
|
||||||
|
|
||||||
tor_mutex_acquire(&worker->lock);
|
tor_mutex_acquire(&worker->lock);
|
||||||
TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work);
|
ent->on_thread = worker;
|
||||||
|
ent->pending = 1;
|
||||||
|
TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
|
||||||
|
|
||||||
if (worker->waiting) /* XXXX inside or outside of lock?? */
|
if (worker->waiting) /* XXXX inside or outside of lock?? */
|
||||||
tor_cond_signal_one(&worker->condition);
|
tor_cond_signal_one(&worker->condition);
|
||||||
@ -298,7 +315,7 @@ replyqueue_new(void)
|
|||||||
rq = tor_malloc_zero(sizeof(replyqueue_t));
|
rq = tor_malloc_zero(sizeof(replyqueue_t));
|
||||||
|
|
||||||
tor_mutex_init(&rq->lock);
|
tor_mutex_init(&rq->lock);
|
||||||
TOR_SIMPLEQ_INIT(&rq->answers);
|
TOR_TAILQ_INIT(&rq->answers);
|
||||||
|
|
||||||
rq->read_sock = pair[0];
|
rq->read_sock = pair[0];
|
||||||
rq->write_sock = pair[1];
|
rq->write_sock = pair[1];
|
||||||
@ -331,10 +348,10 @@ replyqueue_process(replyqueue_t *queue)
|
|||||||
/* XXXX freak out on r == 0, or r == "error, not retryable". */
|
/* XXXX freak out on r == 0, or r == "error, not retryable". */
|
||||||
|
|
||||||
tor_mutex_acquire(&queue->lock);
|
tor_mutex_acquire(&queue->lock);
|
||||||
while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) {
|
while (!TOR_TAILQ_EMPTY(&queue->answers)) {
|
||||||
/* lock held. */
|
/* lock held. */
|
||||||
workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers);
|
workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
|
||||||
TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work);
|
TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
|
||||||
tor_mutex_release(&queue->lock);
|
tor_mutex_release(&queue->lock);
|
||||||
|
|
||||||
work->reply_fn(work->arg);
|
work->reply_fn(work->arg);
|
||||||
@ -345,3 +362,4 @@ replyqueue_process(replyqueue_t *queue)
|
|||||||
|
|
||||||
tor_mutex_release(&queue->lock);
|
tor_mutex_release(&queue->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,20 +8,20 @@
|
|||||||
|
|
||||||
typedef struct replyqueue_s replyqueue_t;
|
typedef struct replyqueue_s replyqueue_t;
|
||||||
typedef struct threadpool_s threadpool_t;
|
typedef struct threadpool_s threadpool_t;
|
||||||
|
typedef struct workqueue_entry_s workqueue_entry_t;
|
||||||
|
|
||||||
#define WQ_CMD_RUN 0
|
#define WQ_CMD_RUN 0
|
||||||
#define WQ_CMD_CANCEL 1
|
#define WQ_CMD_CANCEL 1
|
||||||
|
|
||||||
#define WQ_RPL_QUEUE 0
|
#define WQ_RPL_REPLY 0
|
||||||
#define WQ_RPL_NOQUEUE 1
|
#define WQ_RPL_ERROR 1
|
||||||
#define WQ_RPL_ERROR 2
|
#define WQ_RPL_SHUTDOWN 2
|
||||||
#define WQ_RPL_SHUTDOWN 3
|
|
||||||
|
|
||||||
void *threadpool_queue_work(threadpool_t *pool,
|
workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
|
||||||
int (*fn)(int, void *, void *),
|
int (*fn)(void *, void *),
|
||||||
void (*reply_fn)(void *),
|
void (*reply_fn)(void *),
|
||||||
void *arg);
|
void *arg);
|
||||||
|
int workqueue_entry_cancel(workqueue_entry_t *pending_work);
|
||||||
int threadpool_start_threads(threadpool_t *pool, int n);
|
int threadpool_start_threads(threadpool_t *pool, int n);
|
||||||
threadpool_t *threadpool_new(int n_threads,
|
threadpool_t *threadpool_new(int n_threads,
|
||||||
replyqueue_t *replyqueue,
|
replyqueue_t *replyqueue,
|
||||||
|
@ -64,7 +64,7 @@ mark_handled(int serial)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
workqueue_do_rsa(int cmd, void *state, void *work)
|
workqueue_do_rsa(void *state, void *work)
|
||||||
{
|
{
|
||||||
rsa_work_t *rw = work;
|
rsa_work_t *rw = work;
|
||||||
state_t *st = state;
|
state_t *st = state;
|
||||||
@ -74,16 +74,11 @@ workqueue_do_rsa(int cmd, void *state, void *work)
|
|||||||
|
|
||||||
tor_assert(st->magic == 13371337);
|
tor_assert(st->magic == 13371337);
|
||||||
|
|
||||||
if (cmd == WQ_CMD_CANCEL) {
|
|
||||||
tor_free(work);
|
|
||||||
return WQ_RPL_NOQUEUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
len = crypto_pk_private_sign(rsa, (char*)sig, 256,
|
len = crypto_pk_private_sign(rsa, (char*)sig, 256,
|
||||||
(char*)rw->msg, rw->msglen);
|
(char*)rw->msg, rw->msglen);
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
tor_free(work);
|
rw->msglen = 0;
|
||||||
return WQ_RPL_NOQUEUE;
|
return WQ_RPL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(rw->msg, 0, sizeof(rw->msg));
|
memset(rw->msg, 0, sizeof(rw->msg));
|
||||||
@ -93,12 +88,12 @@ workqueue_do_rsa(int cmd, void *state, void *work)
|
|||||||
|
|
||||||
mark_handled(rw->serial);
|
mark_handled(rw->serial);
|
||||||
|
|
||||||
return WQ_RPL_QUEUE;
|
return WQ_RPL_REPLY;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
static int
|
static int
|
||||||
workqueue_do_shutdown(int cmd, void *state, void *work)
|
workqueue_do_shutdown(void *state, void *work)
|
||||||
{
|
{
|
||||||
(void)state;
|
(void)state;
|
||||||
(void)work;
|
(void)work;
|
||||||
@ -110,7 +105,7 @@ workqueue_do_shutdown(int cmd, void *state, void *work)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
static int
|
static int
|
||||||
workqueue_do_ecdh(int cmd, void *state, void *work)
|
workqueue_do_ecdh(void *state, void *work)
|
||||||
{
|
{
|
||||||
ecdh_work_t *ew = work;
|
ecdh_work_t *ew = work;
|
||||||
uint8_t output[CURVE25519_OUTPUT_LEN];
|
uint8_t output[CURVE25519_OUTPUT_LEN];
|
||||||
@ -118,16 +113,11 @@ workqueue_do_ecdh(int cmd, void *state, void *work)
|
|||||||
|
|
||||||
tor_assert(st->magic == 13371337);
|
tor_assert(st->magic == 13371337);
|
||||||
|
|
||||||
if (cmd == WQ_CMD_CANCEL) {
|
|
||||||
tor_free(work);
|
|
||||||
return WQ_RPL_NOQUEUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
curve25519_handshake(output, &st->ecdh, &ew->u.pk);
|
curve25519_handshake(output, &st->ecdh, &ew->u.pk);
|
||||||
memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
|
memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
|
||||||
++st->n_handled;
|
++st->n_handled;
|
||||||
mark_handled(ew->serial);
|
mark_handled(ew->serial);
|
||||||
return WQ_RPL_QUEUE;
|
return WQ_RPL_REPLY;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *
|
static void *
|
||||||
|
Loading…
Reference in New Issue
Block a user