From 6deed60bb5b5f495b4812f15c0e7a3b21fc440e4 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 17 Mar 2003 02:42:45 +0000 Subject: [PATCH] Add code for end-to-end zlib compression. Still needs flow-control svn:r187 --- TODO | 6 +-- src/or/buffers.c | 68 ++++++++++++++++++++++++++ src/or/connection.c | 100 +++++++++++++++++++++++++++++++++++++-- src/or/connection_ap.c | 16 +++++++ src/or/connection_exit.c | 17 +++++++ src/or/or.h | 46 +++++++++++++++++- 6 files changed, 246 insertions(+), 7 deletions(-) diff --git a/TODO b/TODO index 79f1b3be58..5b11ee8398 100644 --- a/TODO +++ b/TODO @@ -19,8 +19,8 @@ ARMA - arma claims o Implement topics - Rotate circuits after N minutes? - Circuits should expire when circuit->expire triggers -NICK - Handle half-open connections -NICK - On the fly compression of each stream +NICK . Handle half-open connections +NICK . On the fly compression of each stream o Clean up the event loop (optimize and sanitize) - Exit policies - Path selection algorithms @@ -88,7 +88,7 @@ SPEC!! - Handle socks commands other than connect, eg, bind? - Keep track of load over links/nodes, to know who's hosed NICK - Daemonize and package - - Teach it to fork and background + o Teach it to fork and background - Red Hat spec file - Debian spec file equivalent diff --git a/src/or/buffers.c b/src/or/buffers.c index 424e2fe9e6..61706fcad7 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -136,6 +136,74 @@ int write_to_buf(char *string, int string_len, } +#ifdef USE_ZLIB +int compress_from_buf(char *string, int string_len, + char **buf_in, int *buflen_in, int *buf_datalen_in, + z_stream *zstream, int flush) { + int err; + + if (!*buf_datalen_in) + return 0; + + zstream->next_in = *buf_in; + zstream->avail_in = *buf_datalen_in; + zstream->next_out = string; + zstream->avail_out = string_len; + + err = deflate(zstream, flush); + + switch (err) + { + case Z_OK: + case Z_STREAM_END: + memmove(*buf_in, zstream->next_in, zstream->avail_in); + *buf_datalen_in = zstream->avail_in; + return string_len - zstream->avail_out; + case Z_STREAM_ERROR: + case Z_BUF_ERROR: + log(LOG_ERR, "Error processing compression: %s", zstream->msg); + return -1; + default: + log(LOG_ERR, "Unknown return value from deflate: %d", err); + return -1; + } +} + +int decompress_buf_to_buf(char **buf_in, int *buflen_in, int *buf_datalen_in, + char **buf_out, int *buflen_out, int *buf_datalen_out, + z_stream *zstream, int flush) +{ + int err; + + zstream->next_in = *buf_in; + zstream->avail_in = *buf_datalen_in; + zstream->next_out = *buf_out + *buf_datalen_out; + zstream->avail_out = *buflen_out - *buf_datalen_out; + + if (!zstream->avail_in && !zstream->avail_out) + return 0; + + err = inflate(zstream, flush); + + switch (err) + { + case Z_OK: + case Z_STREAM_END: + memmove(*buf_in, zstream->next_in, zstream->avail_in); + *buf_datalen_in = zstream->avail_in; + *buf_datalen_out = *buflen_out - zstream->avail_out; + return 1; + case Z_STREAM_ERROR: + case Z_BUF_ERROR: + log(LOG_ERR, "Error processing compression: %s", zstream->msg); + return 1; + default: + log(LOG_ERR, "Unknown return value from deflate: %d", err); + return -1; + } +} +#endif + int fetch_from_buf(char *string, int string_len, char **buf, int *buflen, int *buf_datalen) { diff --git a/src/or/connection.c b/src/or/connection.c index 4108aa2a23..23743cecdf 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -126,6 +126,29 @@ connection_t *connection_new(int type) { if(type == CONN_TYPE_OR) { directory_set_dirty(); } +#ifdef USE_ZLIB + if (type == CONN_TYPE_AP || type == CONN_TYPE_EXIT) { + if (buf_new(&conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen) < 0) + return NULL; + if (! (conn->compression = malloc(sizeof(z_stream)))) + return NULL; + if (! (conn->decompression = malloc(sizeof(z_stream)))) + return NULL; + memset(conn->compression, 0, sizeof(z_stream)); + memset(conn->decompression, 0, sizeof(z_stream)); + if (deflateInit(conn->compression, Z_DEFAULT_COMPRESSION) != Z_OK) { + log(LOG_ERR, "Error initializing zlib: %s", conn->compression->msg); + return NULL; + } + if (inflateInit(conn->decompression) != Z_OK) { + log(LOG_ERR, "Error initializing zlib: %s", conn->decompression->msg); + return NULL; + } + } else { + conn->compression = conn->decompression = NULL; + } + conn->done_sending = conn->done_receiving = 0 +#endif return conn; } @@ -156,6 +179,19 @@ void connection_free(connection_t *conn) { if(conn->type == CONN_TYPE_OR) { directory_set_dirty(); } +#ifdef USE_ZLIB + if (conn->compression) { + if (inflateEnd(conn->decompression) != Z_OK) + log(LOG_ERR,"connection_free(): while closing zlib: %s", + conn->decompression->msg); + if (deflateEnd(conn->compression) != Z_OK) + log(LOG_ERR,"connection_free(): while closing zlib: %s", + conn->compression->msg); + free(conn->compression); + free(conn->decompression); + buf_free(conn->z_outbuf); + } +#endif free(conn); } @@ -337,6 +373,49 @@ int connection_fetch_from_buf(char *string, int len, connection_t *conn) { return fetch_from_buf(string, len, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen); } +#ifdef USE_ZLIB +int connection_compress_from_buf(char *string, int len, connection_t *conn, + int flush) { + return compress_from_buf(string, len, + &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen, + conn->compression, flush); +} + +int connection_decompress_to_buf(char *string, int len, connection_t *conn, + int flush) { + /* This is not sane with respect to flow control; we want to spool out to + * z_outbuf, but only decompress and write as needed. + */ + int n; + struct timeval now; + + if (write_to_buf(string, len, + &conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen) < 0) + return -1; + + n = decompress_buf_to_buf( + &conn->z_outbuf, &conn->z_outbuflen, &conn->z_outbuf_datalen, + &conn->outbuf, &conn->outbuflen, &conn->outbuf_datalen, + conn->decompression, flush); + + if (n < 0) + return -1; + + if(gettimeofday(&now,NULL) < 0) + return -1; + + if(!n) + return 0; + + if(conn->marked_for_close) + return 0; + + conn->timestamp_lastwritten = now.tv_sec; + + return n; +} +#endif + int connection_find_on_inbuf(char *string, int len, connection_t *conn) { return find_on_inbuf(string, len, conn->inbuf, conn->inbuf_datalen); } @@ -607,7 +686,7 @@ int connection_process_inbuf(connection_t *conn) { } int connection_package_raw_inbuf(connection_t *conn) { - int amount_to_process; + int amount_to_process, len; cell_t cell; circuit_t *circ; @@ -618,13 +697,27 @@ int connection_package_raw_inbuf(connection_t *conn) { repeat_connection_package_raw_inbuf: amount_to_process = conn->inbuf_datalen; - + if(!amount_to_process) return 0; /* Initialize the cell with 0's */ memset(&cell, 0, sizeof(cell_t)); +#ifdef USE_ZLIB + /* This compression logic is not necessarily optimal: + * 1) Maybe we should try to read as much as we can onto the inbuf before + * compressing. + * 2) + */ + len = connection_compress_from_buf(cell.payload + TOPIC_HEADER_SIZE, + CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE, + conn, Z_SYNC_FLUSH); + if (len < 0) + return -1; + + cell.length = len; +#else if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) { cell.length = CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE; } else { @@ -633,6 +726,7 @@ repeat_connection_package_raw_inbuf: if(connection_fetch_from_buf(cell.payload+TOPIC_HEADER_SIZE, cell.length, conn) < 0) return -1; +#endif circ = circuit_get_by_conn(conn); if(!circ) { @@ -677,7 +771,7 @@ repeat_connection_package_raw_inbuf: } log(LOG_DEBUG,"connection_package_raw_inbuf(): receive_topicwindow at AP is %d",conn->p_receive_topicwindow); } - if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) { + if (conn->inbuf_datalen) { log(LOG_DEBUG,"connection_package_raw_inbuf(): recursing."); goto repeat_connection_package_raw_inbuf; } diff --git a/src/or/connection_ap.c b/src/or/connection_ap.c index 9d28d5a4a9..339c141378 100644 --- a/src/or/connection_ap.c +++ b/src/or/connection_ap.c @@ -418,11 +418,21 @@ int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) { } log(LOG_DEBUG,"connection_ap_process_data_cell(): willing to receive %d more cells from circ",conn->n_receive_topicwindow); +#ifdef USE_ZLIB + if(connection_decompress_to_buf(cell->payload + TOPIC_HEADER_SIZE, + cell->length - TOPIC_HEADER_SIZE, + conn, Z_SYNC_FLUSH) < 0) { + log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close."); + conn->marked_for_close = 1; + return 0; + } +#else if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE, cell->length - TOPIC_HEADER_SIZE, conn) < 0) { conn->marked_for_close = 1; return 0; } +#endif if(connection_consider_sending_sendme(conn, EDGE_AP) < 0) conn->marked_for_close = 1; return 0; @@ -440,6 +450,12 @@ int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) { } for(prevconn = circ->p_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ; prevconn->next_topic = conn->next_topic; +#endif +#if 0 + conn->done_sending = 1; + shutdown(conn->s, 1); /* XXX check return; refactor NM */ + if (conn->done_receiving) + conn->marked_for_close = 1; #endif conn->marked_for_close = 1; break; diff --git a/src/or/connection_exit.c b/src/or/connection_exit.c index fb0daadc49..857dfe8845 100644 --- a/src/or/connection_exit.c +++ b/src/or/connection_exit.c @@ -217,12 +217,22 @@ int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ) { log(LOG_DEBUG,"connection_exit_process_data_cell(): data received while resolving/connecting. Queueing."); } log(LOG_DEBUG,"connection_exit_process_data_cell(): put %d bytes on outbuf.",cell->length - TOPIC_HEADER_SIZE); +#ifdef USE_ZLIB + if(connection_decompress_to_buf(cell->payload + TOPIC_HEADER_SIZE, + cell->length - TOPIC_HEADER_SIZE, + conn, Z_SYNC_FLUSH) < 0) { + log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close."); + conn->marked_for_close = 1; + return 0; + } +#else if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE, cell->length - TOPIC_HEADER_SIZE, conn) < 0) { log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close."); conn->marked_for_close = 1; return 0; } +#endif if(connection_consider_sending_sendme(conn, EDGE_EXIT) < 0) conn->marked_for_close = 1; return 0; @@ -241,6 +251,13 @@ int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ) { for(prevconn = circ->n_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ; prevconn->next_topic = conn->next_topic; #endif +#if 0 + conn->done_sending = 1; + shutdown(conn->s, 1); /* XXX check return; refactor NM */ + if (conn->done_receiving) + conn->marked_for_close = 1; +#endif + conn->marked_for_close = 1; break; case TOPIC_COMMAND_CONNECTED: diff --git a/src/or/or.h b/src/or/or.h index 13a24203aa..a35fb26b27 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -36,6 +36,9 @@ #include #include #include +#ifdef USE_ZLIB +#include +#endif #include "../common/crypto.h" #include "../common/log.h" @@ -171,6 +174,7 @@ #define CONFIG_TYPE_INT 2 #define CONFIG_TYPE_LONG 3 #define CONFIG_TYPE_DOUBLE 4 +#define CONFIG_TYPE_BOOL 5 #define CONFIG_LINE_MAXLEN 1024 @@ -254,24 +258,39 @@ struct connection_t { uint16_t port; /* used by exit and ap: */ - uint16_t topic_id; struct connection_t *next_topic; int n_receive_topicwindow; int p_receive_topicwindow; + int done_sending; + int done_receiving; +#ifdef USE_ZLIB + char *z_outbuf; + int z_outbuflen; + int z_outbuf_datalen; + z_stream *compression; + z_stream *decompression; +#endif + +/* Used by ap: */ char socks_version; char read_username; +/* Used by exit and ap: */ char *dest_addr; uint16_t dest_port; /* host order */ +/* Used by ap: */ char dest_tmp[512]; int dest_tmplen; +/* Used by everyone */ char *address; /* strdup into this, because free_connection frees it */ +/* Used for cell connections */ crypto_pk_env_t *pkey; /* public RSA key for the other side */ +/* Used while negotiating OR/OR connections */ char nonce[8]; }; @@ -383,6 +402,7 @@ typedef struct { char *RouterFile; char *PrivateKeyFile; double CoinWeight; + int Daemon; int ORPort; int OPPort; int APPort; @@ -421,12 +441,29 @@ int write_to_buf(char *string, int string_len, * return total number of bytes on the buf */ + int fetch_from_buf(char *string, int string_len, char **buf, int *buflen, int *buf_datalen); /* if there is string_len bytes in buf, write them onto string, * then memmove buf back (that is, remove them from buf) */ +#ifdef USE_ZLIB +int compress_from_buf(char *string, int string_len, + char **buf_in, int *buflen_in, int *buf_datalen_in, + z_stream *zstream, int flush); + /* read and compress as many characters as possible from buf, writing up to + * string_len of them onto string, then memmove buf back. Return number of + * characters written. + */ + +int decompress_buf_to_buf(char **buf_in, int *buflen_in, int *buf_datalen_in, + char **buf_out, int *buflen_out, int *buf_datalen_out, + z_stream *zstream, int flush); + /* XXX document this NM + */ +#endif + int find_on_inbuf(char *string, int string_len, char *buf, int buf_datalen); /* find first instance of needle 'string' on haystack 'buf'. return how @@ -529,6 +566,13 @@ int connection_read_to_buf(connection_t *conn); int connection_fetch_from_buf(char *string, int len, connection_t *conn); +#ifdef USE_ZLIB +int connection_compress_from_buf(char *string, int len, connection_t *conn, + int flush); +int connection_decompress_to_buf(char *string, int len, connection_t *conn, + int flush); +#endif + int connection_outbuf_too_full(connection_t *conn); int connection_find_on_inbuf(char *string, int len, connection_t *conn); int connection_wants_to_flush(connection_t *conn);