Refactor cpuworker to use workqueue/threadpool code.

This commit is contained in:
Nick Mathewson 2013-10-02 12:32:09 -04:00
parent cc6529e9bb
commit 1e896214e7
12 changed files with 243 additions and 399 deletions

10
changes/better_workqueues Normal file
View File

@ -0,0 +1,10 @@
o Major features:
- Refactor the CPU worker implementation for better performance by
avoiding the kernel and lengthening pipelines. The original
implementation used sockets to transfer data from the main thread
to the worker threads, and didn't allow any thread to be assigned
more than a single piece of work at once. The new implementation
avoids communications overhead by making requests in shared
memory, avoiding kernel IO where possible, and keeping more
request in flight at once. Resolves issue #9682.

View File

@ -108,6 +108,7 @@ workqueue_entry_free(workqueue_entry_t *ent)
{
if (!ent)
return;
memset(ent, 0xf0, sizeof(*ent));
tor_free(ent);
}
@ -310,7 +311,7 @@ threadpool_queue_work(threadpool_t *pool,
*/
int
threadpool_queue_for_all(threadpool_t *pool,
void *(*dup_fn)(const void *),
void *(*dup_fn)(void *),
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
@ -444,6 +445,7 @@ replyqueue_process(replyqueue_t *queue)
workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
work->on_thread = NULL;
work->reply_fn(work->arg);
workqueue_entry_free(work);

View File

@ -28,7 +28,7 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
void (*reply_fn)(void *),
void *arg);
int threadpool_queue_for_all(threadpool_t *pool,
void *(*dup_fn)(const void *),
void *(*dup_fn)(void *),
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg);

View File

@ -310,7 +310,7 @@ command_process_create_cell(cell_t *cell, channel_t *chan)
/* hand it off to the cpuworkers, and then return. */
if (connection_or_digest_is_known_relay(chan->identity_digest))
rep_hist_note_circuit_handshake_requested(create_cell->handshake_type);
if (assign_onionskin_to_cpuworker(NULL, circ, create_cell) < 0) {
if (assign_onionskin_to_cpuworker(circ, create_cell) < 0) {
log_debug(LD_GENERAL,"Failed to hand off onionskin. Closing.");
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_RESOURCELIMIT);
return;

View File

@ -1729,7 +1729,7 @@ options_act(const or_options_t *old_options)
if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL)))
inform_testing_reachability();
}
cpuworkers_rotate();
cpuworkers_rotate_keyinfo();
if (dns_reset())
return -1;
} else {

View File

@ -29,7 +29,6 @@
#include "connection_edge.h"
#include "connection_or.h"
#include "control.h"
#include "cpuworker.h"
#include "directory.h"
#include "dirserv.h"
#include "dns.h"
@ -130,7 +129,6 @@ conn_type_to_string(int type)
case CONN_TYPE_AP: return "Socks";
case CONN_TYPE_DIR_LISTENER: return "Directory listener";
case CONN_TYPE_DIR: return "Directory";
case CONN_TYPE_CPUWORKER: return "CPU worker";
case CONN_TYPE_CONTROL_LISTENER: return "Control listener";
case CONN_TYPE_CONTROL: return "Control";
case CONN_TYPE_EXT_OR: return "Extended OR";
@ -213,12 +211,6 @@ conn_state_to_string(int type, int state)
case DIR_CONN_STATE_SERVER_WRITING: return "writing";
}
break;
case CONN_TYPE_CPUWORKER:
switch (state) {
case CPUWORKER_STATE_IDLE: return "idle";
case CPUWORKER_STATE_BUSY_ONION: return "busy with onion";
}
break;
case CONN_TYPE_CONTROL:
switch (state) {
case CONTROL_CONN_STATE_OPEN: return "open (protocol v1)";
@ -248,7 +240,6 @@ connection_type_uses_bufferevent(connection_t *conn)
case CONN_TYPE_CONTROL:
case CONN_TYPE_OR:
case CONN_TYPE_EXT_OR:
case CONN_TYPE_CPUWORKER:
return 1;
default:
return 0;
@ -2436,7 +2427,6 @@ connection_mark_all_noncontrol_connections(void)
if (conn->marked_for_close)
continue;
switch (conn->type) {
case CONN_TYPE_CPUWORKER:
case CONN_TYPE_CONTROL_LISTENER:
case CONN_TYPE_CONTROL:
break;
@ -4530,8 +4520,6 @@ connection_process_inbuf(connection_t *conn, int package_partial)
package_partial);
case CONN_TYPE_DIR:
return connection_dir_process_inbuf(TO_DIR_CONN(conn));
case CONN_TYPE_CPUWORKER:
return connection_cpu_process_inbuf(conn);
case CONN_TYPE_CONTROL:
return connection_control_process_inbuf(TO_CONTROL_CONN(conn));
default:
@ -4591,8 +4579,6 @@ connection_finished_flushing(connection_t *conn)
return connection_edge_finished_flushing(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_finished_flushing(TO_DIR_CONN(conn));
case CONN_TYPE_CPUWORKER:
return connection_cpu_finished_flushing(conn);
case CONN_TYPE_CONTROL:
return connection_control_finished_flushing(TO_CONTROL_CONN(conn));
default:
@ -4648,8 +4634,6 @@ connection_reached_eof(connection_t *conn)
return connection_edge_reached_eof(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_reached_eof(TO_DIR_CONN(conn));
case CONN_TYPE_CPUWORKER:
return connection_cpu_reached_eof(conn);
case CONN_TYPE_CONTROL:
return connection_control_reached_eof(TO_CONTROL_CONN(conn));
default:
@ -4855,10 +4839,6 @@ assert_connection_ok(connection_t *conn, time_t now)
tor_assert(conn->purpose >= DIR_PURPOSE_MIN_);
tor_assert(conn->purpose <= DIR_PURPOSE_MAX_);
break;
case CONN_TYPE_CPUWORKER:
tor_assert(conn->state >= CPUWORKER_STATE_MIN_);
tor_assert(conn->state <= CPUWORKER_STATE_MAX_);
break;
case CONN_TYPE_CONTROL:
tor_assert(conn->state >= CONTROL_CONN_STATE_MIN_);
tor_assert(conn->state <= CONTROL_CONN_STATE_MAX_);
@ -4959,9 +4939,7 @@ proxy_type_to_string(int proxy_type)
}
/** Call connection_free_() on every connection in our array, and release all
* storage held by connection.c. This is used by cpuworkers and dnsworkers
* when they fork, so they don't keep resources held open (especially
* sockets).
* storage held by connection.c.
*
* Don't do the checks in connection_free(), because they will
* fail.

View File

@ -5,84 +5,98 @@
/**
* \file cpuworker.c
* \brief Implements a farm of 'CPU worker' processes to perform
* CPU-intensive tasks in another thread or process, to not
* interrupt the main thread.
* \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
* out to subprocesses.
*
* Right now, we only use this for processing onionskins.
**/
#include "or.h"
#include "buffers.h"
#include "channel.h"
#include "channeltls.h"
#include "circuitbuild.h"
#include "circuitlist.h"
#include "config.h"
#include "connection.h"
#include "connection_or.h"
#include "config.h"
#include "cpuworker.h"
#include "main.h"
#include "onion.h"
#include "rephist.h"
#include "router.h"
#include "workqueue.h"
/** The maximum number of cpuworker processes we will keep around. */
#define MAX_CPUWORKERS 16
/** The minimum number of cpuworker processes we will keep around. */
#define MIN_CPUWORKERS 1
#ifdef HAVE_EVENT2_EVENT_H
#include <event2/event.h>
#else
#include <event.h>
#endif
/** The tag specifies which circuit this onionskin was from. */
#define TAG_LEN 12
static void queue_pending_tasks(void);
/** How many cpuworkers we have running right now. */
static int num_cpuworkers=0;
/** How many of the running cpuworkers have an assigned task right now. */
static int num_cpuworkers_busy=0;
/** We need to spawn new cpuworkers whenever we rotate the onion keys
* on platforms where execution contexts==processes. This variable stores
* the last time we got a key rotation event. */
static time_t last_rotation_time=0;
typedef struct worker_state_s {
int generation;
server_onion_keys_t *onion_keys;
} worker_state_t;
static void cpuworker_main(void *data) ATTR_NORETURN;
static int spawn_cpuworker(void);
static void spawn_enough_cpuworkers(void);
static void process_pending_task(connection_t *cpuworker);
static void *
worker_state_new(void *arg)
{
worker_state_t *ws;
(void)arg;
ws = tor_malloc_zero(sizeof(worker_state_t));
ws->onion_keys = server_onion_keys_new();
return ws;
}
static void
worker_state_free(void *arg)
{
worker_state_t *ws = arg;
server_onion_keys_free(ws->onion_keys);
tor_free(ws);
}
static replyqueue_t *replyqueue = NULL;
static threadpool_t *threadpool = NULL;
static struct event *reply_event = NULL;
static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
static int total_pending_tasks = 0;
static int max_pending_tasks = 128;
static void
replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
{
replyqueue_t *rq = arg;
(void) sock;
(void) events;
replyqueue_process(rq);
}
/** Initialize the cpuworker subsystem.
*/
void
cpu_init(void)
{
cpuworkers_rotate();
}
/** Called when we're done sending a request to a cpuworker. */
int
connection_cpu_finished_flushing(connection_t *conn)
{
tor_assert(conn);
tor_assert(conn->type == CONN_TYPE_CPUWORKER);
return 0;
}
/** Pack global_id and circ_id; set *tag to the result. (See note on
* cpuworker_main for wire format.) */
static void
tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id)
{
/*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/
/*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/
set_uint64(tag, chan_id);
set_uint32(tag+8, circ_id);
}
/** Unpack <b>tag</b> into addr, port, and circ_id.
*/
static void
tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
{
*chan_id = get_uint64(tag);
*circ_id = get_uint32(tag+8);
if (!replyqueue) {
replyqueue = replyqueue_new(0);
}
if (!reply_event) {
reply_event = tor_event_new(tor_libevent_get_base(),
replyqueue_get_socket(replyqueue),
EV_READ|EV_PERSIST,
replyqueue_process_cb,
replyqueue);
event_add(reply_event, NULL);
}
if (!threadpool) {
threadpool = threadpool_new(get_num_cpus(get_options()),
replyqueue,
worker_state_new,
worker_state_free,
NULL);
}
/* Total voodoo. Can we make this more sensible? */
max_pending_tasks = get_num_cpus(get_options()) * 64;
crypto_seed_weak_rng(&request_sample_rng);
}
/** Magic numbers to make sure our cpuworker_requests don't grow any
@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
typedef struct cpuworker_request_t {
/** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
uint32_t magic;
/** Opaque tag to identify the job */
uint8_t tag[TAG_LEN];
/** Task code. Must be one of CPUWORKER_TASK_* */
uint8_t task;
/** Flag: Are we timing this request? */
unsigned timed : 1;
@ -114,8 +124,7 @@ typedef struct cpuworker_request_t {
typedef struct cpuworker_reply_t {
/** Magic number; must be CPUWORKER_REPLY_MAGIC. */
uint32_t magic;
/** Opaque tag to identify the job; matches the request's tag.*/
uint8_t tag[TAG_LEN];
/** True iff we got a successful request. */
uint8_t success;
@ -142,42 +151,46 @@ typedef struct cpuworker_reply_t {
uint8_t rend_auth_material[DIGEST_LEN];
} cpuworker_reply_t;
typedef struct cpuworker_job_u {
uint64_t chan_id;
uint32_t circ_id;
union {
cpuworker_request_t request;
cpuworker_reply_t reply;
} u;
} cpuworker_job_t;
static int
update_state_threadfn(void *state_, void *work_)
{
worker_state_t *state = state_;
worker_state_t *update = work_;
server_onion_keys_free(state->onion_keys);
state->onion_keys = update->onion_keys;
update->onion_keys = NULL;
++state->generation;
return WQ_RPL_REPLY;
}
static void
update_state_replyfn(void *work_)
{
tor_free(work_);
}
/** Called when the onion key has changed and we need to spawn new
* cpuworkers. Close all currently idle cpuworkers, and mark the last
* rotation time as now.
*/
void
cpuworkers_rotate(void)
cpuworkers_rotate_keyinfo(void)
{
connection_t *cpuworker;
while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
CPUWORKER_STATE_IDLE))) {
connection_mark_for_close(cpuworker);
--num_cpuworkers;
if (threadpool_queue_for_all(threadpool,
worker_state_new,
update_state_threadfn,
update_state_replyfn,
NULL)) {
log_warn(LD_OR, "Failed to queue key update for worker threads.");
}
last_rotation_time = time(NULL);
if (server_mode(get_options()))
spawn_enough_cpuworkers();
}
/** If the cpuworker closes the connection,
* mark it as closed and spawn a new one as needed. */
int
connection_cpu_reached_eof(connection_t *conn)
{
log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly.");
if (conn->state != CPUWORKER_STATE_IDLE) {
/* the circ associated with this cpuworker will have to wait until
* it gets culled in run_connection_housekeeping(), since we have
* no way to find out which circ it was. */
log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ.");
num_cpuworkers_busy--;
}
num_cpuworkers--;
spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up
spinning. */
connection_mark_for_close(conn);
return 0;
}
/** Indexed by handshake type: how many onionskins have we processed and
@ -197,8 +210,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
* time. (microseconds) */
#define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
/** Return true iff we'd like to measure a handshake of type
* <b>onionskin_type</b>. Call only from the main thread. */
static int
@ -286,31 +297,22 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
onionskin_type_name, (unsigned)overhead, relative_overhead*100);
}
/** Called when we get data from a cpuworker. If the answer is not complete,
* wait for a complete answer. If the answer is complete,
* process it as appropriate.
*/
int
connection_cpu_process_inbuf(connection_t *conn)
/** */
static void
cpuworker_onion_handshake_replyfn(void *work_)
{
cpuworker_job_t *job = work_;
cpuworker_reply_t rpl;
uint64_t chan_id;
circid_t circ_id;
channel_t *p_chan = NULL;
circuit_t *circ;
circuit_t *circ = NULL;
tor_assert(conn);
tor_assert(conn->type == CONN_TYPE_CPUWORKER);
--total_pending_tasks;
if (!connection_get_inbuf_len(conn))
return 0;
if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
cpuworker_reply_t rpl;
if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t))
return 0; /* not yet */
tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t));
connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn);
if (1) {
/* Could avoid this, but doesn't matter. */
memcpy(&rpl, &job->u.reply, sizeof(rpl));
tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
@ -337,18 +339,21 @@ connection_cpu_process_inbuf(connection_t *conn)
}
}
}
/* parse out the circ it was talking about */
tag_unpack(rpl.tag, &chan_id, &circ_id);
circ = NULL;
log_debug(LD_OR,
"Unpacking cpuworker reply, chan_id is " U64_FORMAT
", circ_id is %u",
U64_PRINTF_ARG(chan_id), (unsigned)circ_id);
/* Find the circ it was talking about */
chan_id = job->chan_id;
circ_id = job->circ_id;
p_chan = channel_find_by_global_id(chan_id);
if (p_chan)
circ = circuit_get_by_circid_channel(circ_id, p_chan);
log_debug(LD_OR,
"Unpacking cpuworker reply %p, chan_id is " U64_FORMAT
", circ_id is %u, p_chan=%p, circ=%p, success=%d",
job, U64_PRINTF_ARG(chan_id), (unsigned)circ_id,
p_chan, circ, rpl.success);
if (rpl.success == 0) {
log_debug(LD_OR,
"decoding onionskin failed. "
@ -367,6 +372,7 @@ connection_cpu_process_inbuf(connection_t *conn)
goto done_processing;
}
tor_assert(! CIRCUIT_IS_ORIGIN(circ));
TO_OR_CIRCUIT(circ)->workqueue_entry = NULL;
if (onionskin_answer(TO_OR_CIRCUIT(circ),
&rpl.created_cell,
(const char*)rpl.keys,
@ -376,58 +382,33 @@ connection_cpu_process_inbuf(connection_t *conn)
goto done_processing;
}
log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
} else {
tor_assert(0); /* don't ask me to do handshakes yet */
}
done_processing:
conn->state = CPUWORKER_STATE_IDLE;
num_cpuworkers_busy--;
if (conn->timestamp_created < last_rotation_time) {
connection_mark_for_close(conn);
num_cpuworkers--;
spawn_enough_cpuworkers();
} else {
process_pending_task(conn);
}
return 0;
memwipe(&rpl, 0, sizeof(rpl));
memwipe(job, 0, sizeof(*job));
tor_free(job);
queue_pending_tasks();
}
/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair.
* Read and writes from fdarray[1]. Reads requests, writes answers.
*
* Request format:
* cpuworker_request_t.
* Response format:
* cpuworker_reply_t
*/
static void
cpuworker_main(void *data)
/** Implementation function for onion handshake requests. */
static int
cpuworker_onion_handshake_threadfn(void *state_, void *work_)
{
/* For talking to the parent thread/process */
tor_socket_t *fdarray = data;
tor_socket_t fd;
worker_state_t *state = state_;
cpuworker_job_t *job = work_;
/* variables for onion processing */
server_onion_keys_t onion_keys;
server_onion_keys_t *onion_keys = state->onion_keys;
cpuworker_request_t req;
cpuworker_reply_t rpl;
fd = fdarray[1]; /* this side is ours */
tor_free(data);
memcpy(&req, &job->u.request, sizeof(req));
setup_server_onion_keys(&onion_keys);
tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
memset(&rpl, 0, sizeof(rpl));
for (;;) {
if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) {
log_info(LD_OR, "read request failed. Exiting.");
goto end;
}
tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
memset(&rpl, 0, sizeof(rpl));
if (req.task == CPUWORKER_TASK_ONION) {
if (1) {
const create_cell_t *cc = &req.create_cell;
created_cell_t *cell_out = &rpl.created_cell;
struct timeval tv_start = {0,0}, tv_end;
@ -439,7 +420,7 @@ cpuworker_main(void *data)
tor_gettimeofday(&tv_start);
n = onion_skin_server_handshake(cc->handshake_type,
cc->onionskin, cc->handshake_len,
&onion_keys,
onion_keys,
cell_out->reply,
rpl.keys, CPATH_KEY_MATERIAL_LEN,
rpl.rend_auth_material);
@ -447,12 +428,10 @@ cpuworker_main(void *data)
/* failure */
log_debug(LD_OR,"onion_skin_server_handshake failed.");
memset(&rpl, 0, sizeof(rpl));
memcpy(rpl.tag, req.tag, TAG_LEN);
rpl.success = 0;
} else {
/* success */
log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
memcpy(rpl.tag, req.tag, TAG_LEN);
cell_out->handshake_len = n;
switch (cc->cell_type) {
case CELL_CREATE:
@ -463,7 +442,7 @@ cpuworker_main(void *data)
cell_out->cell_type = CELL_CREATED_FAST; break;
default:
tor_assert(0);
goto end;
return WQ_RPL_SHUTDOWN;
}
rpl.success = 1;
}
@ -479,187 +458,55 @@ cpuworker_main(void *data)
else
rpl.n_usec = (uint32_t) usec;
}
if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) {
log_err(LD_BUG,"writing response buf failed. Exiting.");
goto end;
}
log_debug(LD_OR,"finished writing response.");
} else if (req.task == CPUWORKER_TASK_SHUTDOWN) {
log_info(LD_OR,"Clean shutdown: exiting");
goto end;
}
memwipe(&req, 0, sizeof(req));
memwipe(&rpl, 0, sizeof(req));
}
end:
memcpy(&job->u.reply, &rpl, sizeof(rpl));
memwipe(&req, 0, sizeof(req));
memwipe(&rpl, 0, sizeof(req));
release_server_onion_keys(&onion_keys);
tor_close_socket(fd);
crypto_thread_cleanup();
spawn_exit();
return WQ_RPL_REPLY;
}
/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed.
*/
static int
spawn_cpuworker(void)
{
tor_socket_t *fdarray;
tor_socket_t fd;
connection_t *conn;
int err;
fdarray = tor_calloc(2, sizeof(tor_socket_t));
if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) {
log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s",
tor_socket_strerror(-err));
tor_free(fdarray);
return -1;
}
tor_assert(SOCKET_OK(fdarray[0]));
tor_assert(SOCKET_OK(fdarray[1]));
fd = fdarray[0];
if (spawn_func(cpuworker_main, (void*)fdarray) < 0) {
tor_close_socket(fdarray[0]);
tor_close_socket(fdarray[1]);
tor_free(fdarray);
return -1;
}
log_debug(LD_OR,"just spawned a cpu worker.");
conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX);
/* set up conn so it's got all the data we need to remember */
conn->s = fd;
conn->address = tor_strdup("localhost");
tor_addr_make_unspec(&conn->addr);
if (set_socket_nonblocking(fd) == -1) {
connection_free(conn); /* this closes fd */
return -1;
}
if (connection_add(conn) < 0) { /* no space, forget it */
log_warn(LD_NET,"connection_add for cpuworker failed. Giving up.");
connection_free(conn); /* this closes fd */
return -1;
}
conn->state = CPUWORKER_STATE_IDLE;
connection_start_reading(conn);
return 0; /* success */
}
/** If we have too few or too many active cpuworkers, try to spawn new ones
* or kill idle ones.
*/
/** Take pending tasks from the queue and assign them to cpuworkers. */
static void
spawn_enough_cpuworkers(void)
{
int num_cpuworkers_needed = get_num_cpus(get_options());
int reseed = 0;
if (num_cpuworkers_needed < MIN_CPUWORKERS)
num_cpuworkers_needed = MIN_CPUWORKERS;
if (num_cpuworkers_needed > MAX_CPUWORKERS)
num_cpuworkers_needed = MAX_CPUWORKERS;
while (num_cpuworkers < num_cpuworkers_needed) {
if (spawn_cpuworker() < 0) {
log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later.");
return;
}
num_cpuworkers++;
reseed++;
}
if (reseed)
crypto_seed_weak_rng(&request_sample_rng);
}
/** Take a pending task from the queue and assign it to 'cpuworker'. */
static void
process_pending_task(connection_t *cpuworker)
queue_pending_tasks(void)
{
or_circuit_t *circ;
create_cell_t *onionskin = NULL;
tor_assert(cpuworker);
while (total_pending_tasks < max_pending_tasks) {
circ = onion_next_task(&onionskin);
/* for now only process onion tasks */
if (!circ)
return;
circ = onion_next_task(&onionskin);
if (!circ)
return;
if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin))
log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
}
/** How long should we let a cpuworker stay busy before we give
* up on it and decide that we have a bug or infinite loop?
* This value is high because some servers with low memory/cpu
* sometimes spend an hour or more swapping, and Tor starves. */
#define CPUWORKER_BUSY_TIMEOUT (60*60*12)
/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get
* stuck in the 'busy' state, even though the cpuworker process thinks of
* itself as idle. I don't know why. But here's a workaround to kill any
* cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT.
*/
static void
cull_wedged_cpuworkers(void)
{
time_t now = time(NULL);
smartlist_t *conns = get_connection_array();
SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
if (!conn->marked_for_close &&
conn->type == CONN_TYPE_CPUWORKER &&
conn->state == CPUWORKER_STATE_BUSY_ONION &&
conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) {
log_notice(LD_BUG,
"closing wedged cpuworker. Can somebody find the bug?");
num_cpuworkers_busy--;
num_cpuworkers--;
connection_mark_for_close(conn);
}
} SMARTLIST_FOREACH_END(conn);
if (assign_onionskin_to_cpuworker(circ, onionskin))
log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
}
}
/** Try to tell a cpuworker to perform the public key operations necessary to
* respond to <b>onionskin</b> for the circuit <b>circ</b>.
*
* If <b>cpuworker</b> is defined, assert that he's idle, and use him. Else,
* look for an idle cpuworker and use him. If none idle, queue task onto the
* pending onion list and return. Return 0 if we successfully assign the
* task, or -1 on failure.
* Return 0 if we successfully assign the task, or -1 on failure.
*/
int
assign_onionskin_to_cpuworker(connection_t *cpuworker,
or_circuit_t *circ,
assign_onionskin_to_cpuworker(or_circuit_t *circ,
create_cell_t *onionskin)
{
workqueue_entry_t *queue_entry;
cpuworker_job_t *job;
cpuworker_request_t req;
time_t now = approx_time();
static time_t last_culled_cpuworkers = 0;
int should_time;
/* Checking for wedged cpuworkers requires a linear search over all
* connections, so let's do it only once a minute.
*/
#define CULL_CPUWORKERS_INTERVAL 60
if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) {
cull_wedged_cpuworkers();
spawn_enough_cpuworkers();
last_culled_cpuworkers = now;
}
if (1) {
if (num_cpuworkers_busy == num_cpuworkers) {
if (!circ->p_chan) {
log_info(LD_OR,"circ->p_chan gone. Failing circ.");
tor_free(onionskin);
return -1;
}
if (total_pending_tasks >= max_pending_tasks) {
log_debug(LD_OR,"No idle cpuworkers. Queuing.");
if (onion_pending_add(circ, onionskin) < 0) {
tor_free(onionskin);
@ -668,36 +515,14 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
return 0;
}
if (!cpuworker)
cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
CPUWORKER_STATE_IDLE);
tor_assert(cpuworker);
if (!circ->p_chan) {
log_info(LD_OR,"circ->p_chan gone. Failing circ.");
tor_free(onionskin);
return -1;
}
if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
should_time = should_time_request(onionskin->handshake_type);
memset(&req, 0, sizeof(req));
req.magic = CPUWORKER_REQUEST_MAGIC;
tag_pack(req.tag, circ->p_chan->global_identifier,
circ->p_circ_id);
req.timed = should_time;
cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
/* touch the lastwritten timestamp, since that's how we check to
* see how long it's been since we asked the question, and sometimes
* we check before the first call to connection_handle_write(). */
cpuworker->timestamp_lastwritten = now;
num_cpuworkers_busy++;
req.task = CPUWORKER_TASK_ONION;
memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
tor_free(onionskin);
@ -705,9 +530,46 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
if (should_time)
tor_gettimeofday(&req.started_at);
connection_write_to_buf((void*)&req, sizeof(req), cpuworker);
job = tor_malloc_zero(sizeof(cpuworker_job_t));
job->chan_id = circ->p_chan->global_identifier;
job->circ_id = circ->p_circ_id;
memcpy(&job->u.request, &req, sizeof(req));
memwipe(&req, 0, sizeof(req));
++total_pending_tasks;
queue_entry = threadpool_queue_work(threadpool,
cpuworker_onion_handshake_threadfn,
cpuworker_onion_handshake_replyfn,
job);
if (!queue_entry) {
log_warn(LD_BUG, "Couldn't queue work on threadpool");
tor_free(job);
return -1;
}
log_debug(LD_OR, "Queued task %p (qe=%p, chanid="U64_FORMAT", circid=%u)",
job, queue_entry, U64_PRINTF_ARG(job->chan_id), job->circ_id);
circ->workqueue_entry = queue_entry;
}
return 0;
}
/** If <b>circ</b> has a pending handshake that hasn't been processed yet,
* remove it from the worker queue. */
void
cpuworker_cancel_circ_handshake(or_circuit_t *circ)
{
cpuworker_job_t *job;
if (circ->workqueue_entry == NULL)
return;
job = workqueue_entry_cancel(circ->workqueue_entry);
if (job) {
/* It successfully cancelled. */
memwipe(job, 0xe0, sizeof(*job));
tor_free(job);
}
circ->workqueue_entry = NULL;
}

View File

@ -13,19 +13,17 @@
#define TOR_CPUWORKER_H
void cpu_init(void);
void cpuworkers_rotate(void);
int connection_cpu_finished_flushing(connection_t *conn);
int connection_cpu_reached_eof(connection_t *conn);
int connection_cpu_process_inbuf(connection_t *conn);
void cpuworkers_rotate_keyinfo(void);
struct create_cell_t;
int assign_onionskin_to_cpuworker(connection_t *cpuworker,
or_circuit_t *circ,
int assign_onionskin_to_cpuworker(or_circuit_t *circ,
struct create_cell_t *onionskin);
uint64_t estimated_usec_for_onionskins(uint32_t n_requests,
uint16_t onionskin_type);
void cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
const char *onionskin_type_name);
void cpuworker_cancel_circ_handshake(or_circuit_t *circ);
#endif

View File

@ -1271,7 +1271,7 @@ run_scheduled_events(time_t now)
get_onion_key_set_at()+MIN_ONION_KEY_LIFETIME < now) {
log_info(LD_GENERAL,"Rotating onion key.");
rotate_onion_key();
cpuworkers_rotate();
cpuworkers_rotate_keyinfo();
if (router_rebuild_descriptor(1)<0) {
log_info(LD_CONFIG, "Couldn't rebuild router descriptor");
}
@ -1960,9 +1960,9 @@ do_hup(void)
* force a retry there. */
if (server_mode(options)) {
/* Restart cpuworker and dnsworker processes, so they get up-to-date
/* Update cpuworker and dnsworker processes, so they get up-to-date
* configuration options. */
cpuworkers_rotate();
cpuworkers_rotate_keyinfo();
dns_reset();
}
return 0;

View File

@ -295,6 +295,8 @@ onion_pending_remove(or_circuit_t *circ)
victim = circ->onionqueue_entry;
if (victim)
onion_queue_entry_remove(victim);
cpuworker_cancel_circ_handshake(circ);
}
/** Remove a queue entry <b>victim</b> from the queue, unlinking it from
@ -339,25 +341,25 @@ clear_pending_onions(void)
/* ============================================================ */
/** Fill in a server_onion_keys_t object at <b>keys</b> with all of the keys
/** Return a new server_onion_keys_t object with all of the keys
* and other info we might need to do onion handshakes. (We make a copy of
* our keys for each cpuworker to avoid race conditions with the main thread,
* and to avoid locking) */
void
setup_server_onion_keys(server_onion_keys_t *keys)
server_onion_keys_t *
server_onion_keys_new(void)
{
memset(keys, 0, sizeof(server_onion_keys_t));
server_onion_keys_t *keys = tor_malloc_zero(sizeof(server_onion_keys_t));
memcpy(keys->my_identity, router_get_my_id_digest(), DIGEST_LEN);
dup_onion_keys(&keys->onion_key, &keys->last_onion_key);
keys->curve25519_key_map = construct_ntor_key_map();
keys->junk_keypair = tor_malloc_zero(sizeof(curve25519_keypair_t));
curve25519_keypair_generate(keys->junk_keypair, 0);
return keys;
}
/** Release all storage held in <b>keys</b>, but do not free <b>keys</b>
* itself (as it's likely to be stack-allocated.) */
/** Release all storage held in <b>keys</b>. */
void
release_server_onion_keys(server_onion_keys_t *keys)
server_onion_keys_free(server_onion_keys_t *keys)
{
if (! keys)
return;
@ -366,7 +368,8 @@ release_server_onion_keys(server_onion_keys_t *keys)
crypto_pk_free(keys->last_onion_key);
ntor_key_map_free(keys->curve25519_key_map);
tor_free(keys->junk_keypair);
memset(keys, 0, sizeof(server_onion_keys_t));
memwipe(keys, 0, sizeof(server_onion_keys_t));
tor_free(keys);
}
/** Release whatever storage is held in <b>state</b>, depending on its

View File

@ -30,8 +30,8 @@ typedef struct server_onion_keys_t {
#define MAX_ONIONSKIN_CHALLENGE_LEN 255
#define MAX_ONIONSKIN_REPLY_LEN 255
void setup_server_onion_keys(server_onion_keys_t *keys);
void release_server_onion_keys(server_onion_keys_t *keys);
server_onion_keys_t *server_onion_keys_new(void);
void server_onion_keys_free(server_onion_keys_t *keys);
void onion_handshake_state_release(onion_handshake_state_t *state);

View File

@ -213,8 +213,7 @@ typedef enum {
#define CONN_TYPE_DIR_LISTENER 8
/** Type for HTTP connections to the directory server. */
#define CONN_TYPE_DIR 9
/** Connection from the main process to a CPU worker process. */
#define CONN_TYPE_CPUWORKER 10
/* Type 10 is unused. */
/** Type for listening for connections from user interface process. */
#define CONN_TYPE_CONTROL_LISTENER 11
/** Type for connections from user interface process. */
@ -276,17 +275,6 @@ typedef enum {
/** State for any listener connection. */
#define LISTENER_STATE_READY 0
#define CPUWORKER_STATE_MIN_ 1
/** State for a connection to a cpuworker process that's idle. */
#define CPUWORKER_STATE_IDLE 1
/** State for a connection to a cpuworker process that's processing a
* handshake. */
#define CPUWORKER_STATE_BUSY_ONION 2
#define CPUWORKER_STATE_MAX_ 2
#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION
#define CPUWORKER_TASK_SHUTDOWN 255
#define OR_CONN_STATE_MIN_ 1
/** State for a connection to an OR: waiting for connect() to finish. */
#define OR_CONN_STATE_CONNECTING 1
@ -3158,6 +3146,9 @@ typedef struct or_circuit_t {
/** Pointer to an entry on the onion queue, if this circuit is waiting for a
* chance to give an onionskin to a cpuworker. Used only in onion.c */
struct onion_queue_t *onionqueue_entry;
/** Pointer to a workqueue entry, if this circuit has given an onionskin to
* a cpuworker and is waiting for a response. Used only in cpuworker.c */
struct workqueue_entry_s *workqueue_entry;
/** The circuit_id used in the previous (backward) hop of this circuit. */
circid_t p_circ_id;