Add support for linked connections with bufferevent_pair.

Also, set directory connections (linked and otherwise) to use bufferevents.

Also, stop using outbuf_flushlen anywhere except for OR connections.
This commit is contained in:
Nick Mathewson 2009-08-11 15:16:16 -04:00
parent b63f6518cb
commit 4af6887d20
6 changed files with 102 additions and 30 deletions

View File

@ -191,8 +191,8 @@ connection_type_uses_bufferevent(connection_t *conn)
{ {
switch (conn->type) { switch (conn->type) {
case CONN_TYPE_AP: case CONN_TYPE_AP:
return 1;
case CONN_TYPE_EXIT: case CONN_TYPE_EXIT:
case CONN_TYPE_DIR:
return 1; return 1;
default: default:
return 0; return 0;

View File

@ -100,7 +100,7 @@ connection_get_inbuf_len(connection_t *conn)
IF_HAS_BUFFEREVENT(conn, { IF_HAS_BUFFEREVENT(conn, {
return evbuffer_get_length(bufferevent_get_input(conn->bufev)); return evbuffer_get_length(bufferevent_get_input(conn->bufev));
}) ELSE_IF_NO_BUFFEREVENT { }) ELSE_IF_NO_BUFFEREVENT {
return buf_datalen(conn->inbuf); return conn->inbuf ? buf_datalen(conn->inbuf) : 0;
} }
} }
@ -110,7 +110,7 @@ connection_get_outbuf_len(connection_t *conn)
IF_HAS_BUFFEREVENT(conn, { IF_HAS_BUFFEREVENT(conn, {
return evbuffer_get_length(bufferevent_get_output(conn->bufev)); return evbuffer_get_length(bufferevent_get_output(conn->bufev));
}) ELSE_IF_NO_BUFFEREVENT { }) ELSE_IF_NO_BUFFEREVENT {
return buf_datalen(conn->outbuf); return conn->outbuf ? buf_datalen(conn->outbuf) : 0;
} }
} }

View File

@ -357,8 +357,9 @@ connection_edge_finished_connecting(edge_connection_t *edge_conn)
rep_hist_note_exit_stream_opened(conn->port); rep_hist_note_exit_stream_opened(conn->port);
conn->state = EXIT_CONN_STATE_OPEN; conn->state = EXIT_CONN_STATE_OPEN;
connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */ IF_HAS_NO_BUFFEREVENT(conn)
if (connection_wants_to_flush(conn)) /* in case there are any queued relay connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */
if (connection_get_outbuf_len(conn)) /* in case there are any queued relay
* cells */ * cells */
connection_start_writing(conn); connection_start_writing(conn);
/* deliver a 'connected' relay cell back through the circuit. */ /* deliver a 'connected' relay cell back through the circuit. */
@ -2109,8 +2110,10 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn)
ap_conn->socks_request->port); ap_conn->socks_request->port);
payload_len = (int)strlen(payload)+1; payload_len = (int)strlen(payload)+1;
log_debug(LD_APP, log_info(LD_APP,
"Sending relay cell to begin stream %d.", ap_conn->stream_id); "Sending relay cell %d to begin stream %d.",
(int)ap_conn->use_begindir,
ap_conn->stream_id);
begin_type = ap_conn->use_begindir ? begin_type = ap_conn->use_begindir ?
RELAY_COMMAND_BEGIN_DIR : RELAY_COMMAND_BEGIN; RELAY_COMMAND_BEGIN_DIR : RELAY_COMMAND_BEGIN;
@ -2218,9 +2221,11 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn)
* and call connection_ap_handshake_attach_circuit(conn) on it. * and call connection_ap_handshake_attach_circuit(conn) on it.
* *
* Return the other end of the linked connection pair, or -1 if error. * Return the other end of the linked connection pair, or -1 if error.
* DOCDOC partner.
*/ */
edge_connection_t * edge_connection_t *
connection_ap_make_link(char *address, uint16_t port, connection_ap_make_link(connection_t *partner,
char *address, uint16_t port,
const char *digest, int use_begindir, int want_onehop) const char *digest, int use_begindir, int want_onehop)
{ {
edge_connection_t *conn; edge_connection_t *conn;
@ -2255,6 +2260,8 @@ connection_ap_make_link(char *address, uint16_t port,
tor_addr_make_unspec(&conn->_base.addr); tor_addr_make_unspec(&conn->_base.addr);
conn->_base.port = 0; conn->_base.port = 0;
connection_link_connections(partner, TO_CONN(conn));
if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */ if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
connection_free(TO_CONN(conn)); connection_free(TO_CONN(conn));
return NULL; return NULL;
@ -2772,12 +2779,13 @@ connection_exit_connect(edge_connection_t *edge_conn)
} }
conn->state = EXIT_CONN_STATE_OPEN; conn->state = EXIT_CONN_STATE_OPEN;
if (connection_wants_to_flush(conn)) { if (connection_get_outbuf_len(conn)) {
/* in case there are any queued data cells */ /* in case there are any queued data cells */
log_warn(LD_BUG,"newly connected conn had data waiting!"); log_warn(LD_BUG,"newly connected conn had data waiting!");
// connection_start_writing(conn); // connection_start_writing(conn);
} }
connection_watch_events(conn, READ_EVENT); IF_HAS_NO_BUFFEREVENT(conn)
connection_watch_events(conn, READ_EVENT);
/* also, deliver a 'connected' cell back through the circuit. */ /* also, deliver a 'connected' cell back through the circuit. */
if (connection_edge_is_rendezvous_stream(edge_conn)) { if (connection_edge_is_rendezvous_stream(edge_conn)) {

View File

@ -29,7 +29,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn);
int connection_ap_handshake_send_begin(edge_connection_t *ap_conn); int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn); int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
edge_connection_t *connection_ap_make_link(char *address, uint16_t port, edge_connection_t *connection_ap_make_link(connection_t *partner,
char *address, uint16_t port,
const char *digest, const char *digest,
int use_begindir, int want_onehop); int use_begindir, int want_onehop);
void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply, void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,

View File

@ -892,14 +892,14 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
* hook up both sides * hook up both sides
*/ */
linked_conn = linked_conn =
connection_ap_make_link(conn->_base.address, conn->_base.port, connection_ap_make_link(TO_CONN(conn),
conn->_base.address, conn->_base.port,
digest, use_begindir, conn->dirconn_direct); digest, use_begindir, conn->dirconn_direct);
if (!linked_conn) { if (!linked_conn) {
log_warn(LD_NET,"Making tunnel to dirserver failed."); log_warn(LD_NET,"Making tunnel to dirserver failed.");
connection_mark_for_close(TO_CONN(conn)); connection_mark_for_close(TO_CONN(conn));
return; return;
} }
connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn));
if (connection_add(TO_CONN(conn)) < 0) { if (connection_add(TO_CONN(conn)) < 0) {
log_warn(LD_NET,"Unable to add connection for link to dirserver."); log_warn(LD_NET,"Unable to add connection for link to dirserver.");
@ -912,8 +912,12 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
payload, payload_len, payload, payload_len,
supports_conditional_consensus, supports_conditional_consensus,
if_modified_since); if_modified_since);
connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT); connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT);
connection_start_reading(TO_CONN(linked_conn)); IF_HAS_BUFFEREVENT(TO_CONN(linked_conn), {
connection_watch_events(TO_CONN(linked_conn), READ_EVENT|WRITE_EVENT);
}) ELSE_IF_NO_BUFFEREVENT
connection_start_reading(TO_CONN(linked_conn));
} }
} }
@ -3352,6 +3356,7 @@ connection_dir_finished_flushing(dir_connection_t *conn)
DIRREQ_DIRECT, DIRREQ_DIRECT,
DIRREQ_FLUSHING_DIR_CONN_FINISHED); DIRREQ_FLUSHING_DIR_CONN_FINISHED);
switch (conn->_base.state) { switch (conn->_base.state) {
case DIR_CONN_STATE_CONNECTING:
case DIR_CONN_STATE_CLIENT_SENDING: case DIR_CONN_STATE_CLIENT_SENDING:
log_debug(LD_DIR,"client finished sending command."); log_debug(LD_DIR,"client finished sending command.");
conn->_base.state = DIR_CONN_STATE_CLIENT_READING; conn->_base.state = DIR_CONN_STATE_CLIENT_READING;

View File

@ -155,6 +155,32 @@ int can_complete_circuit=0;
* *
****************************************************************************/ ****************************************************************************/
#ifdef USE_BUFFEREVENTS
static void
free_old_inbuf(connection_t *conn)
{
if (! conn->inbuf)
return;
tor_assert(conn->outbuf);
tor_assert(buf_datalen(conn->inbuf) == 0);
tor_assert(buf_datalen(conn->outbuf) == 0);
buf_free(conn->inbuf);
buf_free(conn->outbuf);
conn->inbuf = conn->outbuf = NULL;
if (conn->read_event) {
event_del(conn->read_event);
tor_event_free(conn->read_event);
}
if (conn->write_event) {
event_del(conn->read_event);
tor_event_free(conn->write_event);
}
conn->read_event = conn->write_event = NULL;
}
#endif
/** Add <b>conn</b> to the array of connections that we can poll on. The /** Add <b>conn</b> to the array of connections that we can poll on. The
* connection's socket must be set; the connection starts out * connection's socket must be set; the connection starts out
* non-reading and non-writing. * non-reading and non-writing.
@ -173,28 +199,47 @@ connection_add_impl(connection_t *conn, int is_connecting)
smartlist_add(connection_array, conn); smartlist_add(connection_array, conn);
#ifdef USE_BUFFEREVENTS #ifdef USE_BUFFEREVENTS
if (connection_type_uses_bufferevent(conn) && if (connection_type_uses_bufferevent(conn)) {
conn->s >= 0 && !conn->linked) { if (conn->s >= 0 && !conn->linked) {
conn->bufev = bufferevent_socket_new( conn->bufev = bufferevent_socket_new(
tor_libevent_get_base(), tor_libevent_get_base(),
conn->s, conn->s,
BEV_OPT_DEFER_CALLBACKS); BEV_OPT_DEFER_CALLBACKS);
if (conn->inbuf) { /* XXXX CHECK FOR NULL RETURN! */
if (is_connecting) {
/* Put the bufferevent into a "connecting" state so that we'll get
* a "connected" event callback on successful write. */
bufferevent_socket_connect(conn->bufev, NULL, 0);
}
connection_configure_bufferevent_callbacks(conn);
} else if (conn->linked && conn->linked_conn &&
connection_type_uses_bufferevent(conn->linked_conn)) {
tor_assert(conn->s < 0);
if (!conn->bufev) {
struct bufferevent *pair[2] = { NULL, NULL };
/* XXXX CHECK FOR ERROR RETURN! */
bufferevent_pair_new(tor_libevent_get_base(),
BEV_OPT_DEFER_CALLBACKS,
pair);
tor_assert(pair[0]);
conn->bufev = pair[0];
conn->linked_conn->bufev = pair[1];
} /* else the other side already was added, and got a bufferevent_pair */
connection_configure_bufferevent_callbacks(conn);
}
if (conn->bufev && conn->inbuf) {
/* XXX Instead we should assert that there is no inbuf, once we /* XXX Instead we should assert that there is no inbuf, once we
* have linked connections using bufferevents. */ * have linked connections using bufferevents. */
tor_assert(conn->outbuf); free_old_inbuf(conn);
tor_assert(buf_datalen(conn->inbuf) == 0);
tor_assert(buf_datalen(conn->outbuf) == 0);
buf_free(conn->inbuf);
buf_free(conn->outbuf);
conn->inbuf = conn->outbuf = NULL;
} }
if (is_connecting) {
/* Put the bufferevent into a "connecting" state so that we'll get if (conn->linked_conn && conn->linked_conn->bufev &&
* a "connected" event callback on successful write. */ conn->linked_conn->inbuf) {
bufferevent_socket_connect(conn->bufev, NULL, 0); /* XXX Instead we should assert that there is no inbuf, once we
* have linked connections using bufferevents. */
free_old_inbuf(conn->linked_conn);
} }
connection_configure_bufferevent_callbacks(conn);
} }
#else #else
(void) is_connecting; (void) is_connecting;
@ -205,6 +250,7 @@ connection_add_impl(connection_t *conn, int is_connecting)
conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn); conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn);
conn->write_event = tor_event_new(tor_libevent_get_base(), conn->write_event = tor_event_new(tor_libevent_get_base(),
conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn); conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn);
/* XXXX CHECK FOR NULL RETURN! */
} }
log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.", log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.",
@ -671,11 +717,19 @@ conn_close_if_marked(int i)
/* assert_all_pending_dns_resolves_ok(); */ /* assert_all_pending_dns_resolves_ok(); */
#ifdef USE_BUFFEREVENTS #ifdef USE_BUFFEREVENTS
if (conn->bufev && conn->hold_open_until_flushed) if (conn->bufev && conn->hold_open_until_flushed) {
if (conn->linked) {
/* We need to do this explicitly so that the linked connection
* notices that there was an EOF. */
bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED);
/* XXXX Now can we free it? */
}
return 0; return 0;
}
#endif #endif
log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s); log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
IF_HAS_BUFFEREVENT(conn, goto unlink);
if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) { if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
/* s == -1 means it's an incomplete edge connection, or that the socket /* s == -1 means it's an incomplete edge connection, or that the socket
* has already been closed as unflushable. */ * has already been closed as unflushable. */
@ -743,6 +797,10 @@ conn_close_if_marked(int i)
conn->marked_for_close); conn->marked_for_close);
} }
} }
#ifdef USE_BUFFEREVENTS
unlink:
#endif
connection_unlink(conn); /* unlink, remove, free */ connection_unlink(conn); /* unlink, remove, free */
return 1; return 1;
} }