Make AP connections wait for a circuit if none exists.

Also:
  - Refactor socks request into a separate struct
  - Add a separate 'waiting for circuit' state to AP connections
    between 'waiting for socks' and 'open'.

Arma: can you check out the XXX's I've added to connection_edge? I may
be mishandling some async and close logic.


svn:r783
This commit is contained in:
Nick Mathewson 2003-11-11 02:41:31 +00:00
parent 894b1bc5d0
commit dafb0e6a6e
5 changed files with 135 additions and 54 deletions

View File

@ -395,19 +395,16 @@ int fetch_from_buf_http(buf_t *buf,
* socks4a: "socksheader username\0 destaddr\0"
* socks5 phase one: "version #methods methods"
* socks5 phase two: "version command 0 addresstype..."
* If it's a complete and valid handshake, and destaddr fits in addr_out,
* then pull the handshake off the buf, assign to addr_out and port_out,
* and return 1.
* If it's a complete and valid handshake, and destaddr fits in
* MAX_SOCKS_ADDR_LEN bytes, then pull the handshake off the buf,
* assign to *req, and return 1.
* If it's invalid or too big, return -1.
* Else it's not all there yet, leave buf alone and return 0.
* If you want to specify the socks reply, write it into *reply
* and set *replylen, else leave *replylen alone.
* If returning 0 or -1, *addr_out and *port_out are undefined.
*/
int fetch_from_buf_socks(buf_t *buf, char *socks_version,
char *reply, int *replylen,
char *addr_out, int max_addrlen,
uint16_t *port_out) {
int fetch_from_buf_socks(buf_t *buf, socks_request_t *req) {
unsigned char len;
char *tmpbuf=NULL;
uint32_t destip;
@ -421,25 +418,25 @@ int fetch_from_buf_socks(buf_t *buf, char *socks_version,
case 5: /* socks5 */
if(*socks_version != 5) { /* we need to negotiate a method */
if(req->socks_version != 5) { /* we need to negotiate a method */
unsigned char nummethods = (unsigned char)*(buf->mem+1);
assert(!*socks_version);
assert(!req->socks_version);
log_fn(LOG_DEBUG,"socks5: learning offered methods");
if(buf->datalen < 2+nummethods)
return 0;
if(!nummethods || !memchr(buf->mem+2, 0, nummethods)) {
log_fn(LOG_WARN,"socks5: offered methods don't include 'no auth'. Rejecting.");
*replylen = 2; /* 2 bytes of response */
*reply = 5; /* socks5 reply */
*(reply+1) = 0xFF; /* reject all methods */
req->replylen = 2; /* 2 bytes of response */
req->reply[0] = 5; /* socks5 reply */
req->reply[1] = 0xFF; /* reject all methods */
return -1;
}
buf_remove_from_front(buf,2+nummethods);/* remove packet from buf */
*replylen = 2; /* 2 bytes of response */
*reply = 5; /* socks5 reply */
*(reply+1) = 0; /* choose the 'no auth' method */
*socks_version = 5; /* remember that we've already negotiated auth */
req->replylen = 2; /* 2 bytes of response */
req->reply[0] = 5; /* socks5 reply */
req->reply[1] = 0; /* choose the 'no auth' method */
req->socks_version = 5; /* remember that we've already negotiated auth */
log_fn(LOG_DEBUG,"socks5: accepted method 0");
return 0;
}
@ -459,13 +456,13 @@ int fetch_from_buf_socks(buf_t *buf, char *socks_version,
destip = ntohl(*(uint32_t*)(buf->mem+4));
in.s_addr = htonl(destip);
tmpbuf = inet_ntoa(in);
if(strlen(tmpbuf)+1 > max_addrlen) {
if(strlen(tmpbuf)+1 > MAX_SOCKS_ADDR_LEN) {
log_fn(LOG_WARN,"socks5 IP takes %d bytes, which doesn't fit in %d",
strlen(tmpbuf)+1,max_addrlen);
strlen(tmpbuf)+1,MAX_SOCKS_ADDR_LEN);
return -1;
}
strcpy(addr_out,tmpbuf);
*port_out = ntohs(*(uint16_t*)(buf->mem+8));
strcpy(req->addr,tmpbuf);
req->port = ntohs(*(uint16_t*)(buf->mem+8));
buf_remove_from_front(buf, 10);
return 1;
case 3: /* fqdn */
@ -473,14 +470,14 @@ int fetch_from_buf_socks(buf_t *buf, char *socks_version,
len = (unsigned char)*(buf->mem+4);
if(buf->datalen < 7+len) /* addr/port there? */
return 0; /* not yet */
if(len+1 > max_addrlen) {
if(len+1 > MAX_SOCKS_ADDR_LEN) {
log_fn(LOG_WARN,"socks5 hostname is %d bytes, which doesn't fit in %d",
len+1,max_addrlen);
len+1,MAX_SOCKS_ADDR_LEN);
return -1;
}
memcpy(addr_out,buf->mem+5,len);
addr_out[len] = 0;
*port_out = ntohs(*(uint16_t*)(buf->mem+5+len));
memcpy(req->addr,buf->mem+5,len);
req->addr[len] = 0;
req->port = ntohs(*(uint16_t*)(buf->mem+5+len));
buf_remove_from_front(buf, 5+len+2);
return 1;
default: /* unsupported */
@ -490,7 +487,7 @@ int fetch_from_buf_socks(buf_t *buf, char *socks_version,
assert(0);
case 4: /* socks4 */
*socks_version = 4;
req->socks_version = 4;
if(buf->datalen < SOCKS4_NETWORK_LEN) /* basic info available? */
return 0; /* not yet */
@ -499,9 +496,9 @@ int fetch_from_buf_socks(buf_t *buf, char *socks_version,
return -1;
}
*port_out = ntohs(*(uint16_t*)(buf->mem+2));
req->port = ntohs(*(uint16_t*)(buf->mem+2));
destip = ntohl(*(uint32_t*)(buf->mem+4));
if(!*port_out || !destip) {
if(!req->port || !destip) {
log_fn(LOG_WARN,"socks4: Port or DestIP is zero.");
return -1;
}
@ -509,7 +506,7 @@ int fetch_from_buf_socks(buf_t *buf, char *socks_version,
log_fn(LOG_DEBUG,"socks4: destip not in form 0.0.0.x.");
in.s_addr = htonl(destip);
tmpbuf = inet_ntoa(in);
if(strlen(tmpbuf)+1 > max_addrlen) {
if(strlen(tmpbuf)+1 > MAX_SOCKS_ADDR_LEN) {
log_fn(LOG_WARN,"socks4 addr (%d bytes) too long.", strlen(tmpbuf));
return -1;
}
@ -530,13 +527,13 @@ int fetch_from_buf_socks(buf_t *buf, char *socks_version,
log_fn(LOG_DEBUG,"Destaddr not here yet.");
return 0;
}
if(max_addrlen <= next-startaddr) {
if(MAX_SOCKS_ADDR_LEN <= next-startaddr) {
log_fn(LOG_WARN,"Destaddr too long.");
return -1;
}
}
log_fn(LOG_DEBUG,"Everything is here. Success.");
strcpy(addr_out, socks4_prot == socks4 ? tmpbuf : startaddr);
strcpy(req->addr, socks4_prot == socks4 ? tmpbuf : startaddr);
buf_remove_from_front(buf, next-buf->mem+1); /* next points to the final \0 on inbuf */
return 1;

View File

@ -755,6 +755,9 @@ int circuit_send_next_onion_skin(circuit_t *circ) {
if(hop == circ->cpath) { /* done building the circuit. whew. */
circ->state = CIRCUIT_STATE_OPEN;
log_fn(LOG_INFO,"circuit built!");
/* Tell any AP connections that have been waiting for a new
* circuit that one is ready. */
connection_ap_attach_pending();
return 0;
}

View File

@ -85,6 +85,10 @@ connection_t *connection_new(int type) {
conn->inbuf = buf_new();
conn->outbuf = buf_new();
}
if (type == CONN_TYPE_AP) {
conn->socks_request = tor_malloc(sizeof(socks_request_t));
memset(conn->socks_request, 0, sizeof(socks_request_t));
}
conn->timestamp_created = now;
conn->timestamp_lastread = now;
@ -115,6 +119,7 @@ void connection_free(connection_t *conn) {
if (conn->identity_pkey)
crypto_free_pk_env(conn->identity_pkey);
tor_free(conn->nickname);
tor_free(conn->socks_request);
if(conn->s >= 0) {
log_fn(LOG_INFO,"closing fd %d.",conn->s);
@ -760,6 +765,9 @@ void assert_connection_ok(connection_t *conn, time_t now)
assert_cpath_layer_ok(conn->cpath_layer);
/* XXX unchecked, package window, deliver window. */
}
if (conn->type != CONN_TYPE_AP) {
assert(!conn->socks_request);
}
switch(conn->type)
{
@ -779,6 +787,12 @@ void assert_connection_ok(connection_t *conn, time_t now)
case CONN_TYPE_AP:
assert(conn->state >= _AP_CONN_STATE_MIN &&
conn->state <= _AP_CONN_STATE_MAX);
if (conn->state == AP_CONN_STATE_SOCKS_WAIT ||
conn->state == AP_CONN_STATE_CIRCUIT_WAIT) {
assert(conn->socks_request);
} else {
assert(!conn->socks_request);
}
break;
case CONN_TYPE_DIR:
assert(conn->state >= _DIR_CONN_STATE_MIN &&

View File

@ -7,8 +7,8 @@
extern or_options_t options; /* command-line and config-file options */
static int connection_ap_handshake_process_socks(connection_t *conn);
static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ,
char *destaddr, uint16_t destport);
static int connection_ap_handshake_attach_circuit(connection_t *conn);
static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ);
static int connection_ap_handshake_socks_reply(connection_t *conn, char *reply,
int replylen, char success);
@ -465,6 +465,31 @@ repeat_connection_edge_package_raw_inbuf:
goto repeat_connection_edge_package_raw_inbuf;
}
/* Tell any APs that are waiting for a new circuit that one is available */
void connection_ap_attach_pending(void)
{
connection_t *conn;
int r;
while ((conn = connection_get_by_type_state(CONN_TYPE_AP,
AP_CONN_STATE_CIRCUIT_WAIT))) {
r = connection_ap_handshake_attach_circuit(conn);
if (r == 0) {
/* r==0: We're attached; do nothing. */
} else if (r>0) {
/* r>0: There was no circuit to attach to: stop the loop. */
break;
} else {
/* r<0: There was an error sending the begin cell; other pending
* AP connections may succeed.
*/
/* XXX Is this right? How do we say that the connection failed?
* Should I close it? mark it for close? -NM */
connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
}
}
}
static void connection_edge_consider_sending_sendme(connection_t *conn) {
circuit_t *circ;
@ -491,23 +516,23 @@ static void connection_edge_consider_sending_sendme(connection_t *conn) {
}
static int connection_ap_handshake_process_socks(connection_t *conn) {
circuit_t *circ;
char destaddr[200]; /* XXX why 200? but not 256, because it won't fit in a cell */
char reply[256];
uint16_t destport;
int replylen=0;
socks_request_t *socks;
int sockshere;
assert(conn);
assert(conn->type == CONN_TYPE_AP);
assert(conn->state == AP_CONN_STATE_SOCKS_WAIT);
assert(conn->socks_request);
socks = conn->socks_request;
log_fn(LOG_DEBUG,"entered.");
sockshere = fetch_from_buf_socks(conn->inbuf, &conn->socks_version, reply, &replylen,
destaddr, sizeof(destaddr), &destport);
sockshere = fetch_from_buf_socks(conn->inbuf, socks);
conn->socks_version = socks->socks_version;
if(sockshere == -1 || sockshere == 0) {
if(replylen) { /* we should send reply back */
if(socks->replylen) { /* we should send reply back */
log_fn(LOG_DEBUG,"reply is already set for us. Using it.");
connection_ap_handshake_socks_reply(conn, reply, replylen, 0);
connection_ap_handshake_socks_reply(conn, socks->reply, socks->replylen, 0);
} else if(sockshere == -1) { /* send normal reject */
log_fn(LOG_WARN,"Fetching socks handshake failed. Closing.");
connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
@ -517,13 +542,35 @@ static int connection_ap_handshake_process_socks(connection_t *conn) {
return sockshere;
} /* else socks handshake is done, continue processing */
conn->state = AP_CONN_STATE_CIRCUIT_WAIT;
if (connection_ap_handshake_attach_circuit(conn)<0)
return -1;
return 0;
}
/* Try to find a live circuit. If we don't find one, tell 'conn' to
* stop reading and return 1. Otherwise, associate the CONN_TYPE_AP
* connection 'conn' with the newest live circuit, and start sending a
* BEGIN cell down the circuit. Returns 0 on success, and -1 on
* error.
*/
static int connection_ap_handshake_attach_circuit(connection_t *conn) {
circuit_t *circ;
assert(conn);
assert(conn->type == CONN_TYPE_AP);
assert(conn->state == AP_CONN_STATE_CIRCUIT_WAIT);
assert(conn->socks_request);
/* find the circuit that we should use, if there is one. */
circ = circuit_get_newest_open();
if(!circ) {
log_fn(LOG_INFO,"No circuit ready. Closing.");
return -1;
log_fn(LOG_INFO,"No circuit ready for edge connection; delaying.");
connection_stop_reading(conn); /* XXX Is this correct? -NM */
return 1;
}
connection_start_reading(conn); /* XXX Is this correct? -NM */
circ->dirty = 1;
@ -536,7 +583,7 @@ static int connection_ap_handshake_process_socks(connection_t *conn) {
assert(circ->cpath->prev->state == CPATH_STATE_OPEN);
conn->cpath_layer = circ->cpath->prev;
if(connection_ap_handshake_send_begin(conn, circ, destaddr, destport) < 0) {
if(connection_ap_handshake_send_begin(conn, circ) < 0) {
circuit_close(circ);
return -1;
}
@ -545,11 +592,15 @@ static int connection_ap_handshake_process_socks(connection_t *conn) {
}
/* deliver the destaddr:destport in a relay cell */
static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ,
char *destaddr, uint16_t destport) {
static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ)
{
char payload[CELL_PAYLOAD_SIZE];
int payload_len;
assert(ap_conn->type == CONN_TYPE_AP);
assert(ap_conn->state == AP_CONN_STATE_CIRCUIT_WAIT);
assert(ap_conn->socks_request);
if(crypto_pseudo_rand(STREAM_ID_SIZE, ap_conn->stream_id) < 0)
return -1;
/* FIXME check for collisions */
@ -557,7 +608,7 @@ static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *
memcpy(payload, ap_conn->stream_id, STREAM_ID_SIZE);
payload_len = STREAM_ID_SIZE + 1 +
snprintf(payload+STREAM_ID_SIZE,CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE-STREAM_ID_SIZE,
"%s:%d", destaddr, destport);
"%s:%d", ap_conn->socks_request->addr, ap_conn->socks_request->port);
log_fn(LOG_DEBUG,"Sending relay cell to begin stream %d.",*(int *)ap_conn->stream_id);
@ -568,6 +619,8 @@ static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *
ap_conn->package_window = STREAMWINDOW_START;
ap_conn->deliver_window = STREAMWINDOW_START;
ap_conn->state = AP_CONN_STATE_OPEN;
tor_free(ap_conn->socks_request);
ap_conn->socks_request = NULL;
log_fn(LOG_INFO,"Address/port sent, ap socket %d, n_aci %d",ap_conn->s,circ->n_aci);
return 0;
}

View File

@ -160,7 +160,7 @@
/* the AP state values must be disjoint from the EXIT state values */
#define _AP_CONN_STATE_MIN 4
#define AP_CONN_STATE_SOCKS_WAIT 4
#define AP_CONN_STATE_OR_WAIT 5
#define AP_CONN_STATE_CIRCUIT_WAIT 5
#define AP_CONN_STATE_OPEN 6
#define _AP_CONN_STATE_MAX 6
@ -254,6 +254,7 @@ typedef struct {
#define ZERO_STREAM "\0\0\0\0\0\0\0\0"
typedef struct buf_t buf_t;
typedef struct socks_request_t socks_request_t;
struct connection_t {
@ -304,7 +305,6 @@ struct connection_t {
*/
/* Used only by edge connections: */
char socks_version;
char stream_id[STREAM_ID_SIZE];
struct connection_t *next_stream; /* points to the next stream at this edge, if any */
struct crypt_path_t *cpath_layer; /* a pointer to which node in the circ this conn exits at */
@ -315,6 +315,10 @@ struct connection_t {
int done_receiving;
char has_sent_end; /* for debugging: set once we've set the stream end,
and check in circuit_about_to_close_connection() */
/* Used only by AP connections */
char socks_version;
socks_request_t *socks_request;
};
typedef struct connection_t connection_t;
@ -455,6 +459,17 @@ typedef struct {
int loglevel;
} or_options_t;
#define MAX_SOCKS_REPLY_LEN 256
/* Not 256; addresses must fit in a begin cell. */
#define MAX_SOCKS_ADDR_LEN 200
struct socks_request_t {
char socks_version;
int replylen;
char reply[MAX_SOCKS_REPLY_LEN];
char addr[MAX_SOCKS_ADDR_LEN];
uint16_t port;
};
/* all the function prototypes go here */
/********************************* buffers.c ***************************/
@ -480,10 +495,7 @@ int fetch_from_buf(char *string, int string_len, buf_t *buf);
int fetch_from_buf_http(buf_t *buf,
char *headers_out, int max_headerlen,
char *body_out, int max_bodylen);
int fetch_from_buf_socks(buf_t *buf, char *socks_version,
char *reply, int *replylen,
char *addr_out, int max_addrlen,
uint16_t *port_out);
int fetch_from_buf_socks(buf_t *buf, socks_request_t *req);
/********************************* circuit.c ***************************/
@ -603,6 +615,8 @@ int connection_edge_package_raw_inbuf(connection_t *conn);
void connection_exit_connect(connection_t *conn);
void connection_ap_attach_pending(void);
extern uint64_t stats_n_data_cells_packaged;
extern uint64_t stats_n_data_bytes_packaged;
extern uint64_t stats_n_data_cells_received;