From ecfb36823e2daadbf44fcb2c9f108acc2cfcd511 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 9 Oct 2003 18:45:14 +0000 Subject: [PATCH] Refactor, rename, and clarify svn:r569 --- src/or/circuit.c | 10 +- src/or/connection.c | 5 +- src/or/connection_edge.c | 18 ++-- src/or/connection_or.c | 13 ++- src/or/main.c | 218 ++++++++++++++++++++++----------------- src/or/onion.c | 2 +- src/or/or.h | 11 +- 7 files changed, 156 insertions(+), 121 deletions(-) diff --git a/src/or/circuit.c b/src/or/circuit.c index ea901030c6..0f825139a3 100644 --- a/src/or/circuit.c +++ b/src/or/circuit.c @@ -264,7 +264,7 @@ int circuit_deliver_relay_cell(cell_t *cell, circuit_t *circ, } log_fn(LOG_DEBUG,"Passing on unrecognized cell."); - connection_write_cell_to_buf(cell, conn); + connection_or_write_cell_to_buf(cell, conn); return 0; } @@ -409,11 +409,11 @@ void circuit_resume_edge_reading(circuit_t *circ, int edge_type, crypt_path_t *l if((edge_type == EDGE_EXIT && conn->package_window > 0) || (edge_type == EDGE_AP && conn->package_window > 0 && conn->cpath_layer == layer_hint)) { connection_start_reading(conn); - connection_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */ + connection_edge_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */ /* If the circuit won't accept any more data, return without looking * at any more of the streams. Any connections that should be stopped - * have already been stopped by connection_package_raw_inbuf. */ + * have already been stopped by connection_edge_package_raw_inbuf. */ if(circuit_consider_stop_edge_reading(circ, edge_type, layer_hint)) return; } @@ -733,7 +733,7 @@ int circuit_send_next_onion_skin(circuit_t *circ) { return -1; } - connection_write_cell_to_buf(&cell, circ->n_conn); + connection_or_write_cell_to_buf(&cell, circ->n_conn); circ->cpath->state = CPATH_STATE_AWAITING_KEYS; circ->state = CIRCUIT_STATE_BUILDING; @@ -837,7 +837,7 @@ int circuit_extend(cell_t *cell, circuit_t *circ) { memcpy(newcell.payload, cell->payload+RELAY_HEADER_SIZE+6, DH_ONIONSKIN_LEN); - connection_write_cell_to_buf(&newcell, circ->n_conn); + connection_or_write_cell_to_buf(&newcell, circ->n_conn); return 0; } diff --git a/src/or/connection.c b/src/or/connection.c index faa701b03c..20677c466c 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -68,6 +68,7 @@ char *conn_state_to_string[][_CONN_TYPE_MAX+1] = { /********* END VARIABLES ************/ static int connection_init_accepted_conn(connection_t *conn); +static int connection_handle_listener_read(connection_t *conn, int new_type); /**************************************************************/ @@ -162,7 +163,7 @@ int connection_create_listener(struct sockaddr_in *bindaddr, int type) { return 0; } -int connection_handle_listener_read(connection_t *conn, int new_type) { +static int connection_handle_listener_read(connection_t *conn, int new_type) { int news; /* the new socket */ connection_t *newconn; struct sockaddr_in remote; /* information about the remote peer when connecting to other routers */ @@ -645,7 +646,7 @@ int connection_send_destroy(aci_t aci, connection_t *conn) { cell.aci = aci; cell.command = CELL_DESTROY; log_fn(LOG_INFO,"Sending destroy (aci %d).",aci); - connection_write_cell_to_buf(&cell, conn); + connection_or_write_cell_to_buf(&cell, conn); return 0; } diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index db3159875b..b1fa4ef28f 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -13,6 +13,8 @@ static int connection_ap_handshake_socks_reply(connection_t *conn, char *reply, int replylen, char success); static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ); +static void connection_edge_consider_sending_sendme(connection_t *conn, + int edge_type); int connection_edge_process_inbuf(connection_t *conn) { @@ -43,7 +45,7 @@ int connection_edge_process_inbuf(connection_t *conn) { /*ENDCLOSE*/ return connection_ap_handshake_process_socks(conn); case AP_CONN_STATE_OPEN: case EXIT_CONN_STATE_OPEN: - if(connection_package_raw_inbuf(conn) < 0) + if(connection_edge_package_raw_inbuf(conn) < 0) /*ENDCLOSE*/ return -1; return 0; case EXIT_CONN_STATE_CONNECTING: @@ -167,7 +169,7 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection stats_n_data_bytes_received += (cell->length - RELAY_HEADER_SIZE); connection_write_to_buf(cell->payload + RELAY_HEADER_SIZE, cell->length - RELAY_HEADER_SIZE, conn); - connection_consider_sending_sendme(conn, edge_type); + connection_edge_consider_sending_sendme(conn, edge_type); return 0; case RELAY_COMMAND_END: if(!conn) { @@ -252,7 +254,7 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection conn->package_window += STREAMWINDOW_INCREMENT; log_fn(LOG_DEBUG,"stream-level sendme, packagewindow now %d.", conn->package_window); connection_start_reading(conn); - connection_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */ + connection_edge_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */ break; default: log_fn(LOG_WARNING,"unknown relay command %d.",relay_command); @@ -295,7 +297,7 @@ int connection_edge_finished_flushing(connection_t *conn) { case AP_CONN_STATE_OPEN: case EXIT_CONN_STATE_OPEN: connection_stop_writing(conn); - connection_consider_sending_sendme(conn, conn->type); + connection_edge_consider_sending_sendme(conn, conn->type); return 0; case AP_CONN_STATE_SOCKS_WAIT: connection_stop_writing(conn); @@ -312,7 +314,7 @@ uint64_t stats_n_data_bytes_packaged = 0; uint64_t stats_n_data_cells_received = 0; uint64_t stats_n_data_bytes_received = 0; -int connection_package_raw_inbuf(connection_t *conn) { +int connection_edge_package_raw_inbuf(connection_t *conn) { int amount_to_process, length; char payload[CELL_PAYLOAD_SIZE]; circuit_t *circ; @@ -320,7 +322,7 @@ int connection_package_raw_inbuf(connection_t *conn) { assert(conn); assert(!connection_speaks_cells(conn)); -repeat_connection_package_raw_inbuf: +repeat_connection_edge_package_raw_inbuf: circ = circuit_get_by_conn(conn); if(!circ) { @@ -376,10 +378,10 @@ repeat_connection_package_raw_inbuf: log_fn(LOG_DEBUG,"conn->package_window is now %d",conn->package_window); /* handle more if there's more, or return 0 if there isn't */ - goto repeat_connection_package_raw_inbuf; + goto repeat_connection_edge_package_raw_inbuf; } -void connection_consider_sending_sendme(connection_t *conn, int edge_type) { +static void connection_edge_consider_sending_sendme(connection_t *conn, int edge_type) { circuit_t *circ; cell_t cell; diff --git a/src/or/connection_or.c b/src/or/connection_or.c index c473340995..ece6c66e96 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -7,6 +7,7 @@ extern or_options_t options; /* command-line and config-file options */ static int connection_tls_finish_handshake(connection_t *conn); +static int connection_or_process_cell_from_inbuf(connection_t *conn); /**************************************************************/ @@ -39,7 +40,7 @@ int connection_or_process_inbuf(connection_t *conn) { if(conn->state != OR_CONN_STATE_OPEN) return 0; /* don't do anything */ - return connection_process_cell_from_inbuf(conn); + return connection_or_process_cell_from_inbuf(conn); } int connection_or_finished_flushing(connection_t *conn) { @@ -252,17 +253,19 @@ static int connection_tls_finish_handshake(connection_t *conn) { /* ********************************** */ -void connection_write_cell_to_buf(const cell_t *cellp, connection_t *conn) { +void connection_or_write_cell_to_buf(const cell_t *cellp, connection_t *conn) { char networkcell[CELL_NETWORK_SIZE]; char *n = networkcell; + assert(connection_speaks_cells(conn)); + cell_pack(n, cellp); connection_write_to_buf(n, CELL_NETWORK_SIZE, conn); } /* if there's a whole cell there, pull it off and process it. */ -int connection_process_cell_from_inbuf(connection_t *conn) { +static int connection_or_process_cell_from_inbuf(connection_t *conn) { char buf[CELL_NETWORK_SIZE]; cell_t cell; @@ -277,7 +280,9 @@ int connection_process_cell_from_inbuf(connection_t *conn) { cell_unpack(&cell, buf); command_process_cell(&cell, conn); - + + /* CLEAR Shouldn't this be connection_or_process_inbuf at least? Or maybe + just use a loop? If not, doc why not. */ return connection_process_inbuf(conn); /* process the remainder of the buffer */ } diff --git a/src/or/main.c b/src/or/main.c index 83a979e0c8..11bbe33962 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -103,6 +103,10 @@ void connection_set_poll_socket(connection_t *conn) { poll_array[conn->poll_index].fd = conn->s; } +/* Remove the current function from the global list, and remove the + * corresponding poll entry. Calling this function will shift the last + * connection (if any) into the position occupied by conn. + */ int connection_remove(connection_t *conn) { int current_index; @@ -185,10 +189,8 @@ static void conn_read(int i) { /* see http://www.greenend.org.uk/rjk/2001/06/poll.html for * discussion of POLLIN vs POLLHUP */ if(!(poll_array[i].revents & (POLLIN|POLLHUP|POLLERR))) - if(!connection_speaks_cells(conn) || - conn->state != OR_CONN_STATE_OPEN || - !connection_is_reading(conn) || - !tor_tls_get_pending_bytes(conn->tls)) + if(!connection_is_reading(conn) || + !connection_has_pending_tls_data(conn)) return; /* this conn should not read */ log_fn(LOG_DEBUG,"socket %d wants to read.",conn->s); @@ -233,7 +235,7 @@ static void conn_write(int i) { } else assert_connection_ok(conn, time(NULL)); } -static void check_conn_marked(int i) { +static void conn_close_if_marked(int i) { connection_t *conn; conn = connection_array[i]; @@ -255,115 +257,139 @@ static void check_conn_marked(int i) { connection_free(conn); if(ireceiver_bucket += conn->bandwidth; + // log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket); + } + + if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */ + && global_read_bucket > 0 /* and we're allowed to read */ + && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) { + /* and either a non-cell conn or a cell conn with non-empty bucket */ + conn->wants_to_read = 0; + connection_start_reading(conn); + if(conn->wants_to_write == 1) { + conn->wants_to_write = 0; + connection_start_writing(conn); + } + } + + /* check connections to see whether we should send a keepalive, expire, or wait */ + if(!connection_speaks_cells(conn)) + return; + + if(now >= conn->timestamp_lastwritten + options.KeepalivePeriod) { + if((!options.OnionRouter && !circuit_get_by_conn(conn)) || + (!connection_state_is_open(conn))) { + /* we're an onion proxy, with no circuits; or our handshake has expired. kill it. */ + log_fn(LOG_INFO,"Expiring connection to %d (%s:%d).", + i,conn->address, conn->port); + conn->marked_for_close = 1; + } else { + /* either a full router, or we've got a circuit. send a padding cell. */ + log_fn(LOG_DEBUG,"Sending keepalive to (%s:%d)", + conn->address, conn->port); + memset(&cell,0,sizeof(cell_t)); + cell.command = CELL_PADDING; + connection_or_write_cell_to_buf(&cell, conn); + } + } +} + +/* Perform regular maintenance tasks. This function gets run once per + * second by prepare_for_poll. + */ +static void run_scheduled_events(time_t now) { static long time_to_fetch_directory = 0; static long time_to_new_circuit = 0; - cell_t cell; circuit_t *circ; + int i; + + /* 1. Every DirFetchPostPeriod seconds, we get a new directory and upload + * our descriptor (if any). */ + if(time_to_fetch_directory < now) { + /* it's time to fetch a new directory and/or post our descriptor */ + if(options.OnionRouter) { + router_rebuild_descriptor(); + router_upload_desc_to_dirservers(); + } + if(!options.DirPort) { + /* NOTE directory servers do not currently fetch directories. + * Hope this doesn't bite us later. */ + directory_initiate_command(router_pick_directory_server(), + DIR_CONN_STATE_CONNECTING_FETCH); + } + time_to_fetch_directory = now + options.DirFetchPostPeriod; + } + + /* 2. Every NewCircuitPeriod seconds, we expire old ciruits and make a + * new one as needed. + */ + if(options.APPort && time_to_new_circuit < now) { + circuit_expire_unused_circuits(); + circuit_launch_new(-1); /* tell it to forget about previous failures */ + circ = circuit_get_newest_open(); + if(!circ || circ->dirty) { + log_fn(LOG_INFO,"Youngest circuit %s; launching replacement.", circ ? "dirty" : "missing"); + circuit_launch_new(0); /* make an onion and lay the circuit */ + } + time_to_new_circuit = now + options.NewCircuitPeriod; + } + + /* 3. Every second, we check how much bandwidth we've consumed and + * increment global_read_bucket. + */ + stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket; + if(global_read_bucket < 9*options.TotalBandwidth) { + global_read_bucket += options.TotalBandwidth; + log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket); + } + stats_prev_global_read_bucket = global_read_bucket; + + + /* 4. We do houskeeping for each connection... */ + for(i=0;i current_second) { /* the second has rolled over. check more stuff. */ ++stats_n_seconds_reading; - - if(time_to_fetch_directory < now.tv_sec) { - /* it's time to fetch a new directory and/or post our descriptor */ - if(options.OnionRouter) { - router_rebuild_descriptor(); - router_upload_desc_to_dirservers(); - } - if(!options.DirPort) { - /* NOTE directory servers do not currently fetch directories. - * Hope this doesn't bite us later. */ - directory_initiate_command(router_pick_directory_server(), - DIR_CONN_STATE_CONNECTING_FETCH); - } - time_to_fetch_directory = now.tv_sec + options.DirFetchPostPeriod; - } - - if(options.APPort && time_to_new_circuit < now.tv_sec) { - circuit_expire_unused_circuits(); - circuit_launch_new(-1); /* tell it to forget about previous failures */ - circ = circuit_get_newest_open(); - if(!circ || circ->dirty) { - log_fn(LOG_INFO,"Youngest circuit %s; launching replacement.", circ ? "dirty" : "missing"); - circuit_launch_new(0); /* make an onion and lay the circuit */ - } - time_to_new_circuit = now.tv_sec + options.NewCircuitPeriod; - } - - stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket; - if(global_read_bucket < 9*options.TotalBandwidth) { - global_read_bucket += options.TotalBandwidth; - log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket); - } - stats_prev_global_read_bucket = global_read_bucket; - - /* do housekeeping for each connection */ - for(i=0;ireceiver_bucket += conn->bandwidth; -// log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket); - } - - if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */ - && global_read_bucket > 0 /* and we're allowed to read */ - && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) { - /* and either a non-cell conn or a cell conn with non-empty bucket */ - conn->wants_to_read = 0; - connection_start_reading(conn); - if(conn->wants_to_write == 1) { - conn->wants_to_write = 0; - connection_start_writing(conn); - } - } - - /* check connections to see whether we should send a keepalive, expire, or wait */ - if(!connection_speaks_cells(conn)) - continue; /* this conn type doesn't send cells */ - if(now.tv_sec >= conn->timestamp_lastwritten + options.KeepalivePeriod) { - if((!options.OnionRouter && !circuit_get_by_conn(conn)) || - (!connection_state_is_open(conn))) { - /* we're an onion proxy, with no circuits; or our handshake has expired. kill it. */ - log_fn(LOG_INFO,"Expiring connection to %d (%s:%d).", - i,conn->address, conn->port); - conn->marked_for_close = 1; - } else { - /* either a full router, or we've got a circuit. send a padding cell. */ - log_fn(LOG_DEBUG,"Sending keepalive to (%s:%d)", - conn->address, conn->port); - memset(&cell,0,sizeof(cell_t)); - cell.command = CELL_PADDING; - connection_write_cell_to_buf(&cell, conn); - } - } - } - /* blow away any connections that need to die. can't do this later - * because we might open up a circuit and not realize we're about to cull - * the connection it's running over. - */ - for(i=0;itls)) { + if(connection_has_pending_tls_data(conn)) { log_fn(LOG_DEBUG,"sock %d has pending bytes.",conn->s); return 0; /* has pending bytes to read; don't let poll wait. */ } @@ -621,7 +647,7 @@ static int do_main_loop(void) { /* any of the conns need to be closed now? */ for(i=0;ip_conn); + connection_or_write_cell_to_buf(&cell, circ->p_conn); log_fn(LOG_DEBUG,"Finished sending 'created' cell."); return 0; diff --git a/src/or/or.h b/src/or/or.h index 5307a0b4ad..2ae89fc75f 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -537,7 +537,6 @@ connection_t *connection_new(int type); void connection_free(connection_t *conn); int connection_create_listener(struct sockaddr_in *bindaddr, int type); -int connection_handle_listener_read(connection_t *conn, int new_type); int connection_connect(connection_t *conn, char *address, uint32_t addr, uint16_t port); int retry_all_connections(uint16_t or_listenport, uint16_t ap_listenport, uint16_t dir_listenport); @@ -564,6 +563,10 @@ connection_t *connection_get_by_type_state_lastwritten(int type, int state); int connection_receiver_bucket_should_increase(connection_t *conn); #define connection_speaks_cells(conn) ((conn)->type == CONN_TYPE_OR) +#define connection_has_pending_tls_data(conn) \ + ((conn)->type == CONN_TYPE_OR && \ + (conn)->state == OR_CONN_STATE_OPEN && \ + tor_tls_get_pending_bytes(conn->tls)) int connection_is_listener(connection_t *conn); int connection_state_is_open(connection_t *conn); @@ -584,8 +587,7 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection int edge_type, crypt_path_t *layer_hint); int connection_edge_finished_flushing(connection_t *conn); -int connection_package_raw_inbuf(connection_t *conn); -void connection_consider_sending_sendme(connection_t *conn, int edge_type); +int connection_edge_package_raw_inbuf(connection_t *conn); int connection_exit_connect(connection_t *conn); @@ -605,8 +607,7 @@ connection_t *connection_or_connect(routerinfo_t *router); int connection_tls_start_handshake(connection_t *conn, int receiving); int connection_tls_continue_handshake(connection_t *conn); -void connection_write_cell_to_buf(const cell_t *cellp, connection_t *conn); -int connection_process_cell_from_inbuf(connection_t *conn); +void connection_or_write_cell_to_buf(const cell_t *cellp, connection_t *conn); /********************************* cpuworker.c *****************************/