mirror of
https://gitlab.torproject.org/tpo/core/tor.git
synced 2024-09-21 05:26:20 +02:00
Move responsibility for threadpool reply-handler events to workqueue
This change makes cpuworker and test_workqueue no longer need to include event2/event.h. Now workqueue.c needs to include it, but that is at least somewhat logical here.
This commit is contained in:
parent
b3586629c9
commit
6a5f62f68f
@ -1,3 +1,4 @@
|
|||||||
|
|
||||||
/* copyright (c) 2013-2015, The Tor Project, Inc. */
|
/* copyright (c) 2013-2015, The Tor Project, Inc. */
|
||||||
/* See LICENSE for licensing information */
|
/* See LICENSE for licensing information */
|
||||||
|
|
||||||
@ -24,6 +25,7 @@
|
|||||||
|
|
||||||
#include "orconfig.h"
|
#include "orconfig.h"
|
||||||
#include "compat.h"
|
#include "compat.h"
|
||||||
|
#include "compat_libevent.h"
|
||||||
#include "compat_threads.h"
|
#include "compat_threads.h"
|
||||||
#include "crypto.h"
|
#include "crypto.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
@ -31,6 +33,8 @@
|
|||||||
#include "tor_queue.h"
|
#include "tor_queue.h"
|
||||||
#include "torlog.h"
|
#include "torlog.h"
|
||||||
|
|
||||||
|
#include <event2/event.h>
|
||||||
|
|
||||||
#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
|
#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
|
||||||
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
|
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
|
||||||
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
|
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
|
||||||
@ -63,6 +67,9 @@ struct threadpool_s {
|
|||||||
void (*free_update_arg_fn)(void *);
|
void (*free_update_arg_fn)(void *);
|
||||||
/** Array of n_threads update arguments. */
|
/** Array of n_threads update arguments. */
|
||||||
void **update_args;
|
void **update_args;
|
||||||
|
/** Event to notice when another thread has sent a reply. */
|
||||||
|
struct event *reply_event;
|
||||||
|
void (*reply_cb)(threadpool_t *);
|
||||||
|
|
||||||
/** Number of elements in threads. */
|
/** Number of elements in threads. */
|
||||||
int n_threads;
|
int n_threads;
|
||||||
@ -597,15 +604,41 @@ replyqueue_new(uint32_t alertsocks_flags)
|
|||||||
return rq;
|
return rq;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** Internal: Run from the libevent mainloop when there is work to handle in
|
||||||
* Return the "read socket" for a given reply queue. The main thread should
|
* the reply queue handler. */
|
||||||
* listen for read events on this socket, and call replyqueue_process() every
|
static void
|
||||||
* time it triggers.
|
reply_event_cb(evutil_socket_t sock, short events, void *arg)
|
||||||
*/
|
|
||||||
tor_socket_t
|
|
||||||
replyqueue_get_socket(replyqueue_t *rq)
|
|
||||||
{
|
{
|
||||||
return rq->alert.read_fd;
|
threadpool_t *tp = arg;
|
||||||
|
(void) sock;
|
||||||
|
(void) events;
|
||||||
|
replyqueue_process(tp->reply_queue);
|
||||||
|
if (tp->reply_cb)
|
||||||
|
tp->reply_cb(tp);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Register the threadpool <b>tp</b>'s reply queue with the libevent
|
||||||
|
* mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after
|
||||||
|
* each time there is work to process from the reply queue. Return 0 on
|
||||||
|
* success, -1 on failure.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
threadpool_register_reply_event(threadpool_t *tp,
|
||||||
|
void (*cb)(threadpool_t *tp))
|
||||||
|
{
|
||||||
|
struct event_base *base = tor_libevent_get_base();
|
||||||
|
|
||||||
|
if (tp->reply_event) {
|
||||||
|
tor_event_free(tp->reply_event);
|
||||||
|
}
|
||||||
|
tp->reply_event = tor_event_new(base,
|
||||||
|
tp->reply_queue->alert.read_fd,
|
||||||
|
EV_READ|EV_PERSIST,
|
||||||
|
reply_event_cb,
|
||||||
|
tp);
|
||||||
|
tor_assert(tp->reply_event);
|
||||||
|
tp->reply_cb = cb;
|
||||||
|
return event_add(tp->reply_event, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -56,8 +56,11 @@ threadpool_t *threadpool_new(int n_threads,
|
|||||||
replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
|
replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
|
||||||
|
|
||||||
replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);
|
replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);
|
||||||
tor_socket_t replyqueue_get_socket(replyqueue_t *rq);
|
|
||||||
void replyqueue_process(replyqueue_t *queue);
|
void replyqueue_process(replyqueue_t *queue);
|
||||||
|
|
||||||
|
struct event_base;
|
||||||
|
int threadpool_register_reply_event(threadpool_t *tp,
|
||||||
|
void (*cb)(threadpool_t *tp));
|
||||||
|
|
||||||
#endif /* !defined(TOR_WORKQUEUE_H) */
|
#endif /* !defined(TOR_WORKQUEUE_H) */
|
||||||
|
|
||||||
|
@ -30,8 +30,6 @@
|
|||||||
#include "router.h"
|
#include "router.h"
|
||||||
#include "workqueue.h"
|
#include "workqueue.h"
|
||||||
|
|
||||||
#include <event2/event.h>
|
|
||||||
|
|
||||||
static void queue_pending_tasks(void);
|
static void queue_pending_tasks(void);
|
||||||
|
|
||||||
typedef struct worker_state_s {
|
typedef struct worker_state_s {
|
||||||
@ -69,22 +67,12 @@ worker_state_free_void(void *arg)
|
|||||||
|
|
||||||
static replyqueue_t *replyqueue = NULL;
|
static replyqueue_t *replyqueue = NULL;
|
||||||
static threadpool_t *threadpool = NULL;
|
static threadpool_t *threadpool = NULL;
|
||||||
static struct event *reply_event = NULL;
|
|
||||||
|
|
||||||
static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
|
static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
|
||||||
|
|
||||||
static int total_pending_tasks = 0;
|
static int total_pending_tasks = 0;
|
||||||
static int max_pending_tasks = 128;
|
static int max_pending_tasks = 128;
|
||||||
|
|
||||||
static void
|
|
||||||
replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
|
|
||||||
{
|
|
||||||
replyqueue_t *rq = arg;
|
|
||||||
(void) sock;
|
|
||||||
(void) events;
|
|
||||||
replyqueue_process(rq);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Initialize the cpuworker subsystem. It is OK to call this more than once
|
/** Initialize the cpuworker subsystem. It is OK to call this more than once
|
||||||
* during Tor's lifetime.
|
* during Tor's lifetime.
|
||||||
*/
|
*/
|
||||||
@ -94,14 +82,6 @@ cpu_init(void)
|
|||||||
if (!replyqueue) {
|
if (!replyqueue) {
|
||||||
replyqueue = replyqueue_new(0);
|
replyqueue = replyqueue_new(0);
|
||||||
}
|
}
|
||||||
if (!reply_event) {
|
|
||||||
reply_event = tor_event_new(tor_libevent_get_base(),
|
|
||||||
replyqueue_get_socket(replyqueue),
|
|
||||||
EV_READ|EV_PERSIST,
|
|
||||||
replyqueue_process_cb,
|
|
||||||
replyqueue);
|
|
||||||
event_add(reply_event, NULL);
|
|
||||||
}
|
|
||||||
if (!threadpool) {
|
if (!threadpool) {
|
||||||
/*
|
/*
|
||||||
In our threadpool implementation, half the threads are permissive and
|
In our threadpool implementation, half the threads are permissive and
|
||||||
@ -115,7 +95,12 @@ cpu_init(void)
|
|||||||
worker_state_new,
|
worker_state_new,
|
||||||
worker_state_free_void,
|
worker_state_free_void,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
|
int r = threadpool_register_reply_event(threadpool, NULL);
|
||||||
|
|
||||||
|
tor_assert(r == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Total voodoo. Can we make this more sensible? */
|
/* Total voodoo. Can we make this more sensible? */
|
||||||
max_pending_tasks = get_num_cpus(get_options()) * 64;
|
max_pending_tasks = get_num_cpus(get_options()) * 64;
|
||||||
crypto_seed_weak_rng(&request_sample_rng);
|
crypto_seed_weak_rng(&request_sample_rng);
|
||||||
|
@ -12,7 +12,6 @@
|
|||||||
#include "compat_libevent.h"
|
#include "compat_libevent.h"
|
||||||
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <event2/event.h>
|
|
||||||
|
|
||||||
#define MAX_INFLIGHT (1<<16)
|
#define MAX_INFLIGHT (1<<16)
|
||||||
|
|
||||||
@ -159,6 +158,7 @@ static tor_weak_rng_t weak_rng;
|
|||||||
static int n_sent = 0;
|
static int n_sent = 0;
|
||||||
static int rsa_sent = 0;
|
static int rsa_sent = 0;
|
||||||
static int ecdh_sent = 0;
|
static int ecdh_sent = 0;
|
||||||
|
static int n_received_previously = 0;
|
||||||
static int n_received = 0;
|
static int n_received = 0;
|
||||||
static int no_shutdown = 0;
|
static int no_shutdown = 0;
|
||||||
|
|
||||||
@ -256,19 +256,13 @@ add_n_work_items(threadpool_t *tp, int n)
|
|||||||
static int shutting_down = 0;
|
static int shutting_down = 0;
|
||||||
|
|
||||||
static void
|
static void
|
||||||
replysock_readable_cb(tor_socket_t sock, short what, void *arg)
|
replysock_readable_cb(threadpool_t *tp)
|
||||||
{
|
{
|
||||||
threadpool_t *tp = arg;
|
if (n_received_previously == n_received)
|
||||||
replyqueue_t *rq = threadpool_get_replyqueue(tp);
|
|
||||||
|
|
||||||
int old_r = n_received;
|
|
||||||
(void) sock;
|
|
||||||
(void) what;
|
|
||||||
|
|
||||||
replyqueue_process(rq);
|
|
||||||
if (old_r == n_received)
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
n_received_previously = n_received;
|
||||||
|
|
||||||
if (opt_verbose) {
|
if (opt_verbose) {
|
||||||
printf("%d / %d", n_received, n_sent);
|
printf("%d / %d", n_received, n_sent);
|
||||||
if (opt_n_cancel)
|
if (opt_n_cancel)
|
||||||
@ -337,7 +331,6 @@ main(int argc, char **argv)
|
|||||||
threadpool_t *tp;
|
threadpool_t *tp;
|
||||||
int i;
|
int i;
|
||||||
tor_libevent_cfg evcfg;
|
tor_libevent_cfg evcfg;
|
||||||
struct event *ev;
|
|
||||||
uint32_t as_flags = 0;
|
uint32_t as_flags = 0;
|
||||||
|
|
||||||
for (i = 1; i < argc; ++i) {
|
for (i = 1; i < argc; ++i) {
|
||||||
@ -411,11 +404,11 @@ main(int argc, char **argv)
|
|||||||
memset(&evcfg, 0, sizeof(evcfg));
|
memset(&evcfg, 0, sizeof(evcfg));
|
||||||
tor_libevent_initialize(&evcfg);
|
tor_libevent_initialize(&evcfg);
|
||||||
|
|
||||||
ev = tor_event_new(tor_libevent_get_base(),
|
{
|
||||||
replyqueue_get_socket(rq), EV_READ|EV_PERSIST,
|
int r = threadpool_register_reply_event(tp,
|
||||||
replysock_readable_cb, tp);
|
replysock_readable_cb);
|
||||||
|
tor_assert(r == 0);
|
||||||
event_add(ev, NULL);
|
}
|
||||||
|
|
||||||
#ifdef TRACK_RESPONSES
|
#ifdef TRACK_RESPONSES
|
||||||
handled = bitarray_init_zero(opt_n_items);
|
handled = bitarray_init_zero(opt_n_items);
|
||||||
|
Loading…
Reference in New Issue
Block a user