From 2dda97e8fd89875739b5f57f556c748ce7f5925f Mon Sep 17 00:00:00 2001 From: Roger Dingledine Date: Wed, 20 Aug 2003 23:05:22 +0000 Subject: [PATCH] implemented cpuworkers please poke at it and report bugs still needs polishing, and only handles onions now (should handle OR handshakes too) svn:r402 --- src/common/util.c | 15 +++ src/common/util.h | 2 + src/or/Makefile.am | 4 +- src/or/command.c | 11 +- src/or/connection.c | 13 ++- src/or/cpuworker.c | 255 ++++++++++++++++++++++++++++++++++++++++++++ src/or/dns.c | 71 ++++++------ src/or/main.c | 19 +--- src/or/onion.c | 52 +++------ src/or/or.h | 32 ++++-- 10 files changed, 370 insertions(+), 104 deletions(-) create mode 100644 src/or/cpuworker.c diff --git a/src/common/util.c b/src/common/util.c index 3a80b94da7..21584093ce 100644 --- a/src/common/util.c +++ b/src/common/util.c @@ -88,6 +88,21 @@ void tv_addms(struct timeval *a, long ms) { a->tv_usec %= 1000000; } +/* a wrapper for write(2) that makes sure to write all count bytes. + * Only use if fd is a blocking socket. */ +int write_all(int fd, const void *buf, size_t count) { + int written = 0; + int result; + + while(written != count) { + result = write(fd, buf+written, count-written); + if(result<0) + return -1; + written += result; + } + return count; +} + void set_socket_nonblocking(int socket) { #ifdef MS_WINDOWS diff --git a/src/common/util.h b/src/common/util.h index 614a5388a6..8552fb19a3 100644 --- a/src/common/util.h +++ b/src/common/util.h @@ -51,6 +51,8 @@ void tv_addms(struct timeval *a, long ms); void tv_add(struct timeval *a, struct timeval *b); int tv_cmp(struct timeval *a, struct timeval *b); +int write_all(int fd, const void *buf, size_t count); + void set_socket_nonblocking(int socket); /* Minimalist interface to run a void function in the background. On diff --git a/src/or/Makefile.am b/src/or/Makefile.am index 4eed574c9e..53dbd64a67 100644 --- a/src/or/Makefile.am +++ b/src/or/Makefile.am @@ -7,14 +7,14 @@ bin_PROGRAMS = or or_SOURCES = buffers.c circuit.c command.c connection.c \ connection_exit.c connection_ap.c connection_or.c config.c \ onion.c routers.c directory.c dns.c connection_edge.c \ - main.c tor_main.c + cpuworker.c main.c tor_main.c or_LDADD = ../common/libor.a test_SOURCES = buffers.c circuit.c command.c connection.c \ connection_exit.c connection_ap.c connection_or.c config.c \ onion.c routers.c directory.c dns.c connection_edge.c \ - main.c test.c + cpuworker.c main.c test.c test_LDADD = ../common/libor.a diff --git a/src/or/command.c b/src/or/command.c index ee14cce124..f38708e2f3 100644 --- a/src/or/command.c +++ b/src/or/command.c @@ -97,13 +97,13 @@ void command_process_create_cell(cell_t *cell, connection_t *conn) { memcpy(circ->onionskin,cell->payload,cell->length); - /* add it to the pending onions queue, and then return */ - if(onion_pending_add(circ) < 0) { - log_fn(LOG_DEBUG,"Failed to queue onionskin. Closing."); + /* hand it off to the cpuworkers, and then return */ + if(assign_to_cpuworker(NULL, CPUWORKER_TASK_ONION, circ) < 0) { + log_fn(LOG_DEBUG,"Failed to hand off onionskin. Closing."); circuit_close(circ); + return; } - log_fn(LOG_DEBUG,"success: queued onionskin."); - return; + log_fn(LOG_DEBUG,"success: handed off onionskin."); } void command_process_created_cell(cell_t *cell, connection_t *conn) { @@ -153,7 +153,6 @@ void command_process_created_cell(cell_t *cell, connection_t *conn) { return; } } - return; } void command_process_relay_cell(cell_t *cell, connection_t *conn) { diff --git a/src/or/connection.c b/src/or/connection.c index 78bf0dca55..3b3ff97000 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -21,7 +21,8 @@ char *conn_type_to_string[] = { "App", /* 7 */ "Dir listener",/* 8 */ "Dir", /* 9 */ - "DNS master", /* 10 */ + "DNS worker", /* 10 */ + "CPU worker", /* 11 */ }; char *conn_state_to_string[][15] = { @@ -58,7 +59,11 @@ char *conn_state_to_string[][15] = { "reading", /* 2 */ "awaiting command", /* 3 */ "writing" }, /* 4 */ - { "open" }, /* dns master, 0 */ + { "idle", /* dns worker, 0 */ + "busy" }, /* 1 */ + { "idle", /* cpu worker, 0 */ + "busy with onion", /* 1 */ + "busy with handshake" }, /* 2 */ }; /********* END VARIABLES ************/ @@ -542,6 +547,8 @@ int connection_process_inbuf(connection_t *conn) { return connection_dir_process_inbuf(conn); case CONN_TYPE_DNSWORKER: return connection_dns_process_inbuf(conn); + case CONN_TYPE_CPUWORKER: + return connection_cpu_process_inbuf(conn); default: log_fn(LOG_DEBUG,"got unexpected conn->type."); return -1; @@ -679,6 +686,8 @@ int connection_finished_flushing(connection_t *conn) { return connection_dir_finished_flushing(conn); case CONN_TYPE_DNSWORKER: return connection_dns_finished_flushing(conn); + case CONN_TYPE_CPUWORKER: + return connection_cpu_finished_flushing(conn); default: log_fn(LOG_DEBUG,"got unexpected conn->type."); return -1; diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c new file mode 100644 index 0000000000..c2437e5a52 --- /dev/null +++ b/src/or/cpuworker.c @@ -0,0 +1,255 @@ +/* Copyright 2003 Roger Dingledine. */ +/* See LICENSE for licensing information */ +/* $Id$ */ + +#include "or.h" +extern or_options_t options; /* command-line and config-file options */ + +#define MAX_QUESTIONLEN 256 + +#define MAX_CPUWORKERS 17 +#define MIN_CPUWORKERS 2 + +#define LEN_ONION_RESPONSE (1+DH_KEY_LEN+32) +#define LEN_HANDSHAKE_RESPONSE (somethingelse) + +int num_cpuworkers=0; +int num_cpuworkers_busy=0; + +int cpuworker_main(void *data); +static int spawn_cpuworker(void); +static void spawn_enough_cpuworkers(void); +static int process_pending_task(connection_t *cpuworker); + +void cpu_init(void) { + spawn_enough_cpuworkers(); +} + +int connection_cpu_finished_flushing(connection_t *conn) { + assert(conn && conn->type == CONN_TYPE_CPUWORKER); + connection_stop_writing(conn); + return 0; +} + +int connection_cpu_process_inbuf(connection_t *conn) { + unsigned char buf[MAX_QUESTIONLEN]; + + assert(conn && conn->type == CONN_TYPE_CPUWORKER); + + if(conn->inbuf_reached_eof) { + log_fn(LOG_ERR,"Read eof. Worker dying."); + if(conn->state != CPUWORKER_STATE_IDLE) { + onion_pending_remove(conn->circ); + circuit_close(conn->circ); + conn->circ = NULL; + num_cpuworkers_busy--; + } + num_cpuworkers--; + return -1; + } + + if(conn->state == CPUWORKER_STATE_BUSY_ONION) { + assert(conn->circ); + if(conn->inbuf_datalen < LEN_ONION_RESPONSE) /* entire answer available? */ + return 0; /* not yet */ + assert(conn->inbuf_datalen == LEN_ONION_RESPONSE); + + connection_fetch_from_buf(buf,LEN_ONION_RESPONSE,conn); + + if(*buf == 0 || conn->circ->p_conn == NULL || + onionskin_process(conn->circ, buf+1, buf+1+DH_KEY_LEN) < 0) { + log_fn(LOG_DEBUG,"decoding onion, onionskin_process, or p_conn failed. Closing."); +// onion_pending_remove(conn->circ); + circuit_close(conn->circ); + } else { + log_fn(LOG_DEBUG,"onionskin_process succeeded. Yay."); +// onion_pending_remove(conn->circ); + } + conn->circ = NULL; + } else { + assert(conn->state == CPUWORKER_STATE_BUSY_HANDSHAKE); + + assert(0); /* don't ask me to do handshakes yet */ + } + + conn->state = CPUWORKER_STATE_IDLE; + num_cpuworkers_busy--; + process_pending_task(conn); /* discard return value */ + return 0; +} + +int cpuworker_main(void *data) { + unsigned char question[MAX_QUESTIONLEN]; + unsigned char question_type; + int *fdarray = data; + int fd; + int len; + + /* variables for onion processing */ + unsigned char keys[32]; + unsigned char response[DH_KEY_LEN]; + unsigned char buf[MAX_QUESTIONLEN]; + + close(fdarray[0]); /* this is the side of the socketpair the parent uses */ + fd = fdarray[1]; /* this side is ours */ + + for(;;) { + + if(read(fd, &question_type, 1) != 1) { + log_fn(LOG_INFO,"read type failed. Exiting."); + spawn_exit(); + } + assert(question_type == CPUWORKER_TASK_ONION || + question_type == CPUWORKER_TASK_HANDSHAKE); + + if(question_type == CPUWORKER_TASK_ONION) + len = DH_ONIONSKIN_LEN; + else + len = 0; /* XXX */ + + if(read(fd, question, len) != len) { + log(LOG_INFO,"cpuworker_main(): read question failed. Exiting."); + spawn_exit(); + } + + if(question_type == CPUWORKER_TASK_ONION) { + if(onion_skin_server_handshake(question, get_privatekey(), + response, keys, 32) < 0) { + /* failure */ + log_fn(LOG_ERR,"onion_skin_server_handshake failed."); + memset(buf,0,LEN_ONION_RESPONSE); /* send all zeros for failure */ + } else { + /* success */ + log_fn(LOG_DEBUG,"onion_skin_server_handshake succeeded."); + buf[0] = 1; /* 1 means success */ + memcpy(buf+1,response,DH_KEY_LEN); + memcpy(buf+1+DH_KEY_LEN,keys,32); + } + if(write_all(fd, buf, LEN_ONION_RESPONSE) != LEN_ONION_RESPONSE) { + log_fn(LOG_INFO,"writing response buf failed. Exiting."); + spawn_exit(); + } + log_fn(LOG_DEBUG,"finished writing response/keys."); + } else { /* we've been asked to do a handshake. not implemented yet. */ + spawn_exit(); + } + } + return 0; /* windows wants this function to return an int */ +} + +static int spawn_cpuworker(void) { + int fd[2]; + connection_t *conn; + + if(tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) { + perror("socketpair"); + exit(1); + } + + spawn_func(cpuworker_main, (void*)fd); + log_fn(LOG_DEBUG,"just spawned a worker."); + close(fd[1]); /* we don't need the worker's side of the pipe */ + + conn = connection_new(CONN_TYPE_CPUWORKER); + if(!conn) { + close(fd[0]); + return -1; + } + + set_socket_nonblocking(fd[0]); + + /* set up conn so it's got all the data we need to remember */ + conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */ + conn->bandwidth = -1; + conn->s = fd[0]; + + if(connection_add(conn) < 0) { /* no space, forget it */ + log_fn(LOG_INFO,"connection_add failed. Giving up."); + connection_free(conn); /* this closes fd[0] */ + return -1; + } + + conn->state = CPUWORKER_STATE_IDLE; + connection_start_reading(conn); + + return 0; /* success */ +} + +static void spawn_enough_cpuworkers(void) { + int num_cpuworkers_needed = options.NumCpus + 1; + + if(num_cpuworkers_needed < MIN_CPUWORKERS) + num_cpuworkers_needed = MIN_CPUWORKERS; + if(num_cpuworkers_needed > MAX_CPUWORKERS) + num_cpuworkers_needed = MAX_CPUWORKERS; + + while(num_cpuworkers < num_cpuworkers_needed) { + if(spawn_cpuworker() < 0) { + log_fn(LOG_ERR,"spawn failed!"); + return; + } + num_cpuworkers++; + } +} + + +static int process_pending_task(connection_t *cpuworker) { + circuit_t *circ; + + assert(cpuworker); + + /* for now only process onion tasks */ + + circ = onion_next_task(); + if(!circ) + return 0; + return assign_to_cpuworker(cpuworker, CPUWORKER_TASK_ONION, circ); +} + +/* if cpuworker is defined, assert that he's idle, and use him. else, + * look for an idle cpuworker and use him. if none idle, queue task onto + * the pending onion list and return. + * If question_type is CPUWORKER_TASK_ONION then task is a circ, else + * (something else) + */ +int assign_to_cpuworker(connection_t *cpuworker, unsigned char question_type, + void *task) { + circuit_t *circ; + + if(question_type == CPUWORKER_TASK_ONION) { + circ = task; + + if(num_cpuworkers_busy == num_cpuworkers) { + log_fn(LOG_DEBUG,"No idle cpuworkers. Queuing."); + if(onion_pending_add(circ) < 0) + return -1; + return 0; + } + + if(!cpuworker) + cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, CPUWORKER_STATE_IDLE); + + assert(cpuworker); + + cpuworker->circ = circ; + cpuworker->state = CPUWORKER_STATE_BUSY_ONION; + num_cpuworkers_busy++; + + if(connection_write_to_buf(&question_type, 1, cpuworker) < 0 || + connection_write_to_buf(circ->onionskin, DH_ONIONSKIN_LEN, cpuworker) < 0) { + log_fn(LOG_NOTICE,"Write failed. Closing worker and failing circ."); + cpuworker->marked_for_close = 1; + return -1; + } + } + return 0; +} + +/* + Local Variables: + mode:c + indent-tabs-mode:nil + c-basic-offset:2 + End: +*/ + diff --git a/src/or/dns.c b/src/or/dns.c index 704ffc5c28..eb57d037be 100644 --- a/src/or/dns.c +++ b/src/or/dns.c @@ -16,15 +16,15 @@ #define MIN_DNSWORKERS 3 #define MAX_IDLE_DNSWORKERS 10 -int num_workers=0; -int num_workers_busy=0; +int num_dnsworkers=0; +int num_dnsworkers_busy=0; static void purge_expired_resolves(uint32_t now); -static int dns_assign_to_worker(connection_t *exitconn); +static int assign_to_dnsworker(connection_t *exitconn); static void dns_found_answer(char *question, uint32_t answer); int dnsworker_main(void *data); -static int dns_spawn_worker(void); -static void spawn_enough_workers(void); +static int spawn_dnsworker(void); +static void spawn_enough_dnsworkers(void); struct pending_connection_t { struct connection_t *conn; @@ -60,7 +60,7 @@ static void init_cache_tree(void) { void dns_init(void) { init_cache_tree(); - spawn_enough_workers(); + spawn_enough_dnsworkers(); } static struct cached_resolve *oldest_cached_resolve = NULL; /* linked list, */ @@ -144,42 +144,42 @@ int dns_resolve(connection_t *exitconn) { newest_cached_resolve = resolve; SPLAY_INSERT(cache_tree, &cache_root, resolve); - return dns_assign_to_worker(exitconn); + return assign_to_dnsworker(exitconn); } assert(0); return 0; /* not reached; keep gcc happy */ } -static int dns_assign_to_worker(connection_t *exitconn) { +static int assign_to_dnsworker(connection_t *exitconn) { connection_t *dnsconn; unsigned char len; - spawn_enough_workers(); /* respawn here, to be sure there are enough */ + spawn_enough_dnsworkers(); /* respawn here, to be sure there are enough */ dnsconn = connection_get_by_type_state(CONN_TYPE_DNSWORKER, DNSWORKER_STATE_IDLE); if(!dnsconn) { - log(LOG_INFO,"dns_assign_to_worker(): no idle dns workers. Failing."); + log_fn(LOG_INFO,"no idle dns workers. Failing."); dns_cancel_pending_resolve(exitconn->address, NULL); return -1; } dnsconn->address = strdup(exitconn->address); dnsconn->state = DNSWORKER_STATE_BUSY; - num_workers_busy++; + num_dnsworkers_busy++; len = strlen(dnsconn->address); /* FFFF we should have it retry if the first worker bombs out */ if(connection_write_to_buf(&len, 1, dnsconn) < 0 || connection_write_to_buf(dnsconn->address, len, dnsconn) < 0) { - log(LOG_NOTICE,"dns_assign_to_worker(): Write failed. Closing worker and failing resolve."); + log_fn(LOG_NOTICE,"Write failed. Closing worker and failing resolve."); dnsconn->marked_for_close = 1; dns_cancel_pending_resolve(exitconn->address, NULL); return -1; } -// log(LOG_DEBUG,"dns_assign_to_worker(): submitted '%s'", exitconn->address); +// log_fn(LOG_DEBUG,"submitted '%s'", exitconn->address); return 0; } @@ -301,9 +301,9 @@ int connection_dns_process_inbuf(connection_t *conn) { log(LOG_ERR,"connection_dnsworker_process_inbuf(): Read eof. Worker dying."); if(conn->state == DNSWORKER_STATE_BUSY) { dns_cancel_pending_resolve(conn->address, NULL); - num_workers_busy--; + num_dnsworkers_busy--; } - num_workers--; + num_dnsworkers--; return -1; } @@ -319,7 +319,7 @@ int connection_dns_process_inbuf(connection_t *conn) { free(conn->address); conn->address = NULL; conn->state = DNSWORKER_STATE_IDLE; - num_workers_busy--; + num_dnsworkers_busy--; return 0; } @@ -328,8 +328,8 @@ int dnsworker_main(void *data) { char question[MAX_ADDRESSLEN]; unsigned char question_len; struct hostent *rent; - int fd; int *fdarray = data; + int fd; close(fdarray[0]); /* this is the side of the socketpair the parent uses */ fd = fdarray[1]; /* this side is ours */ @@ -351,14 +351,13 @@ int dnsworker_main(void *data) { rent = gethostbyname(question); if (!rent) { log(LOG_INFO,"dnsworker_main(): Could not resolve dest addr %s. Returning nulls.",question); - /* XXX it's conceivable write could return 1 through 3. but that's never gonna happen, right? */ - if(write(fd, "\0\0\0\0", 4) != 4) { + if(write_all(fd, "\0\0\0\0", 4) != 4) { log(LOG_INFO,"dnsworker_main(): writing nulls failed. Exiting."); spawn_exit(); } } else { assert(rent->h_length == 4); /* break to remind us if we move away from ipv4 */ - if(write(fd, rent->h_addr, 4) != 4) { + if(write_all(fd, rent->h_addr, 4) != 4) { log(LOG_INFO,"dnsworker_main(): writing answer failed. Exiting."); spawn_exit(); } @@ -368,7 +367,7 @@ int dnsworker_main(void *data) { return 0; /* windows wants this function to return an int */ } -static int dns_spawn_worker(void) { +static int spawn_dnsworker(void) { int fd[2]; connection_t *conn; @@ -378,7 +377,7 @@ static int dns_spawn_worker(void) { } spawn_func(dnsworker_main, (void*)fd); - log(LOG_DEBUG,"dns_spawn_worker(): just spawned a worker."); + log_fn(LOG_DEBUG,"just spawned a worker."); close(fd[1]); /* we don't need the worker's side of the pipe */ conn = connection_new(CONN_TYPE_DNSWORKER); @@ -395,7 +394,7 @@ static int dns_spawn_worker(void) { conn->s = fd[0]; if(connection_add(conn) < 0) { /* no space, forget it */ - log(LOG_INFO,"dns_spawn_worker(): connection_add failed. Giving up."); + log_fn(LOG_INFO,"connection_add failed. Giving up."); connection_free(conn); /* this closes fd[0] */ return -1; } @@ -406,12 +405,12 @@ static int dns_spawn_worker(void) { return 0; /* success */ } -static void spawn_enough_workers(void) { - int num_workers_needed; /* aim to have 1 more than needed, +static void spawn_enough_dnsworkers(void) { + int num_dnsworkers_needed; /* aim to have 1 more than needed, * but no less than min and no more than max */ connection_t *dnsconn; - if(num_workers_busy == MAX_DNSWORKERS) { + if(num_dnsworkers_busy == MAX_DNSWORKERS) { /* We always want at least one worker idle. * So find the oldest busy worker and kill it. */ @@ -422,28 +421,28 @@ static void spawn_enough_workers(void) { dns_cancel_pending_resolve(dnsconn->address, NULL); dnsconn->marked_for_close = 1; - num_workers_busy--; + num_dnsworkers_busy--; } - if(num_workers_busy >= MIN_DNSWORKERS) - num_workers_needed = num_workers_busy+1; + if(num_dnsworkers_busy >= MIN_DNSWORKERS) + num_dnsworkers_needed = num_dnsworkers_busy+1; else - num_workers_needed = MIN_DNSWORKERS; + num_dnsworkers_needed = MIN_DNSWORKERS; - while(num_workers < num_workers_needed) { - if(dns_spawn_worker() < 0) { - log(LOG_ERR,"spawn_enough_workers(): spawn failed!"); + while(num_dnsworkers < num_dnsworkers_needed) { + if(spawn_dnsworker() < 0) { + log(LOG_ERR,"spawn_enough_dnsworkers(): spawn failed!"); return; } - num_workers++; + num_dnsworkers++; } - while(num_workers > num_workers_needed+MAX_IDLE_DNSWORKERS) { /* too many idle? */ + while(num_dnsworkers > num_dnsworkers_needed+MAX_IDLE_DNSWORKERS) { /* too many idle? */ /* cull excess workers */ dnsconn = connection_get_by_type_state(CONN_TYPE_DNSWORKER, DNSWORKER_STATE_IDLE); assert(dnsconn); dnsconn->marked_for_close = 1; - num_workers--; + num_dnsworkers--; } } diff --git a/src/or/main.c b/src/or/main.c index c5206aebc4..0913c28ae1 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -434,12 +434,7 @@ static int prepare_for_poll(int *timeout) { current_second = now.tv_sec; /* remember which second it is, for next time */ } - if(onion_pending_check()) { - /* there's an onion pending. check for new things to do, but don't wait any time */ - *timeout = 0; - } else { - *timeout = 1000 - (now.tv_usec / 1000); /* how many milliseconds til the next second? */ - } + *timeout = 1000 - (now.tv_usec / 1000); /* how many milliseconds til the next second? */ return 0; } @@ -502,9 +497,10 @@ static int do_main_loop(void) { return -1; } set_privatekey(prkey); + cpu_init(); /* launch cpuworkers. Need to do this *after* we've read the private key. */ } - /* load the private key, if we're supposed to have one */ + /* load the directory private key, if we're supposed to have one */ if(options.DirPort) { prkey = crypto_new_pk_env(CRYPTO_PK_RSA); if (!prkey) { @@ -523,8 +519,8 @@ static int do_main_loop(void) { * and start the listeners */ retry_all_connections((uint16_t) options.ORPort, - (uint16_t) options.APPort, - (uint16_t) options.DirPort); + (uint16_t) options.APPort, + (uint16_t) options.DirPort); for(;;) { #ifndef MS_WIN32 /* do signal stuff only on unix */ @@ -568,11 +564,6 @@ static int do_main_loop(void) { } #endif - if(poll_result == 0) { - /* poll timed out without anything to do. process a pending onion, if any. */ - onion_pending_process_one(); - } - if(poll_result > 0) { /* we have at least one connection to deal with */ /* do all the reads and errors first, so we can detect closed sockets */ for(i=0;inext); if(ol_length >= options.MaxOnionsPending) { - log(LOG_INFO,"onion_pending_add(): Already have %d onions queued. Closing.", ol_length); + log_fn(LOG_INFO,"Already have %d onions queued. Closing.", ol_length); free(tmp); return -1; } @@ -59,38 +58,20 @@ int onion_pending_add(circuit_t *circ) { } -int onion_pending_check(void) { - if(ol_list) - return 1; - else - return 0; -} - -void onion_pending_process_one(void) { - circuit_t *circ; +circuit_t *onion_next_task(void) { if(!ol_list) - return; /* no onions pending, we're done */ + return NULL; /* no onions pending, we're done */ assert(ol_list->circ); if(!ol_list->circ->p_conn) { - log(LOG_INFO,"onion_pending_process_one(): ol_list->circ->p_conn null, must have died?"); + log_fn(LOG_INFO,"ol_list->circ->p_conn null, must have died?"); onion_pending_remove(ol_list->circ); - return; /* it died on us */ + return onion_next_task(); /* recurse: how about the next one? */ } assert(ol_length > 0); - circ = ol_list->circ; - - if(onionskin_process(circ) < 0) { - log(LOG_DEBUG,"onion_pending_process_one(): Failed. Closing."); - onion_pending_remove(circ); - circuit_close(circ); - } else { - log(LOG_DEBUG,"onion_pending_process_one(): Succeeded."); - onion_pending_remove(circ); - } - return; + return ol_list->circ; } /* go through ol_list, find the onion_queue_t element which points to @@ -130,10 +111,9 @@ void onion_pending_remove(circuit_t *circ) { free(victim); } -/* learn keys, initialize, then send a created cell back */ -static int onionskin_process(circuit_t *circ) { +/* given a response payload and keys, initialize, then send a created cell back */ +int onionskin_process(circuit_t *circ, unsigned char *payload, unsigned char *keys) { unsigned char iv[16]; - unsigned char keys[32]; cell_t cell; memset(iv, 0, 16); @@ -145,32 +125,28 @@ static int onionskin_process(circuit_t *circ) { circ->state = CIRCUIT_STATE_OPEN; - log(LOG_DEBUG,"onionskin_process(): Entering."); + log_fn(LOG_DEBUG,"Entering."); - if(onion_skin_server_handshake(circ->onionskin, get_privatekey(), - cell.payload, keys, 32) < 0) { - log(LOG_ERR,"onionskin_process(): onion_skin_server_handshake failed."); - return -1; - } + memcpy(cell.payload, payload, DH_KEY_LEN); - log(LOG_DEBUG,"onionskin_process: init cipher forward %d, backward %d.", *(int*)keys, *(int*)(keys+16)); + log_fn(LOG_DEBUG,"init cipher forward %d, backward %d.", *(int*)keys, *(int*)(keys+16)); if (!(circ->n_crypto = crypto_create_init_cipher(CIRCUIT_CIPHER,keys,iv,0))) { - log(LOG_ERR,"Cipher initialization failed."); + log_fn(LOG_ERR,"Cipher initialization failed (n)."); return -1; } if (!(circ->p_crypto = crypto_create_init_cipher(CIRCUIT_CIPHER,keys+16,iv,1))) { - log(LOG_ERR,"Cipher initialization failed."); + log_fn(LOG_ERR,"Cipher initialization failed (p)."); return -1; } if(connection_write_cell_to_buf(&cell, circ->p_conn) < 0) { return -1; } - log(LOG_DEBUG,"onionskin_process(): Finished sending 'created' cell."); + log_fn(LOG_DEBUG,"Finished sending 'created' cell."); return 0; } diff --git a/src/or/or.h b/src/or/or.h index 83c80a36b0..53033f0fcd 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -114,12 +114,20 @@ #define CONN_TYPE_DIR_LISTENER 8 #define CONN_TYPE_DIR 9 #define CONN_TYPE_DNSWORKER 10 +#define CONN_TYPE_CPUWORKER 11 #define LISTENER_STATE_READY 0 #define DNSWORKER_STATE_IDLE 0 #define DNSWORKER_STATE_BUSY 1 +#define CPUWORKER_STATE_IDLE 0 +#define CPUWORKER_STATE_BUSY_ONION 1 +#define CPUWORKER_STATE_BUSY_HANDSHAKE 2 + +#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION +#define CPUWORKER_TASK_HANDSHAKE CPUWORKER_STATE_BUSY_HANDSHAKE + /* how to read these states: * foo_CONN_STATE_bar_baz: * "I am acting as a bar, currently in stage baz of talking with a foo." @@ -328,8 +336,8 @@ struct connection_t { char nonce[8]; /* Used by worker connections */ - int num_processed; - + int num_processed; /* statistics kept by dns worker */ + struct circuit_t *circ; /* by cpu worker to know who he's working for */ }; typedef struct connection_t connection_t; @@ -399,7 +407,7 @@ struct crypt_path_t { typedef struct crypt_path_t crypt_path_t; /* struct for a path (circuit) through the network */ -typedef struct { +struct circuit_t { uint32_t n_addr; uint16_t n_port; connection_t *p_conn; @@ -428,7 +436,9 @@ typedef struct { // uint32_t recvlen; /* length of the onion so far */ void *next; -} circuit_t; +}; + +typedef struct circuit_t circuit_t; struct onion_queue_t { circuit_t *circ; @@ -678,6 +688,15 @@ connection_t *connection_or_connect(routerinfo_t *router); int connection_or_create_listener(struct sockaddr_in *bindaddr); int connection_or_handle_listener_read(connection_t *conn); +/********************************* cpuworker.c *****************************/ + +void cpu_init(void); +int connection_cpu_finished_flushing(connection_t *conn); +int connection_cpu_process_inbuf(connection_t *conn); +int cpuworker_main(void *data); +int assign_to_cpuworker(connection_t *cpuworker, unsigned char question_type, + void *task); + /********************************* directory.c ***************************/ void directory_initiate_fetch(routerinfo_t *router); @@ -737,10 +756,11 @@ int decide_aci_type(uint32_t local_addr, uint16_t local_port, uint32_t remote_addr, uint16_t remote_port); int onion_pending_add(circuit_t *circ); -int onion_pending_check(void); -void onion_pending_process_one(void); +circuit_t *onion_next_task(void); void onion_pending_remove(circuit_t *circ); +int onionskin_process(circuit_t *circ, unsigned char *payload, unsigned char *keys); + /* uses a weighted coin with weight cw to choose a route length */ int chooselen(double cw);