r12652@Kushana: nickm | 2007-03-25 15:01:48 -0400

A surprisingly simple patch to stop reading on edge connections when their circuits get too full, and start again when they empty out.  This lets us remove the logic to block begin_dir conns when the corresponding or conns get full: it was already broken by cell queues anyway.


svn:r9905
This commit is contained in:
Nick Mathewson 2007-03-26 14:08:18 +00:00
parent 38c0bb3a99
commit d1381aef82
8 changed files with 78 additions and 212 deletions

View File

@ -6,6 +6,8 @@ Changes in version 0.2.0.1-alpha - 2007-??-??
queue for each circuit. This lets us use less slack memory, and queue for each circuit. This lets us use less slack memory, and
will eventually let us be smarter about prioritizing different kinds will eventually let us be smarter about prioritizing different kinds
of traffic. of traffic.
- Stop reading on edge connections when their corresponding circuit
buffers are full; start again as the circuits empty out.
o Security fixes: o Security fixes:
- Directory authorities now call routers stable if they have an - Directory authorities now call routers stable if they have an
@ -67,6 +69,9 @@ Changes in version 0.2.0.1-alpha - 2007-??-??
o Code simplifications and refactoring o Code simplifications and refactoring
- Stop passing around circuit_t and crypt_path_t pointers that are - Stop passing around circuit_t and crypt_path_t pointers that are
implicit in other procedure arguments. implicit in other procedure arguments.
- Drop the old code to choke directory connections when the corresponding
or connections got full: thanks to the cell queue feature, or conns
don't get full any more.
Changes in version 0.1.2.12-rc - 2007-03-16 Changes in version 0.1.2.12-rc - 2007-03-16

View File

@ -74,7 +74,7 @@ Things we'd like to do in 0.2.0.x:
is full, not when the orconn is "too full". is full, not when the orconn is "too full".
- Do we switch to arena-allocation for cells? - Do we switch to arena-allocation for cells?
- Can we stop doing so many memcpys on cells? - Can we stop doing so many memcpys on cells?
- Also, only package data from exitconns when there is space on the o Also, only package data from exitconns when there is space on the
target OR conn's outbuf? or when the circuit is not too full. target OR conn's outbuf? or when the circuit is not too full.
- MAYBE kill stalled circuits rather than stalled connections; consider - MAYBE kill stalled circuits rather than stalled connections; consider
anonymity implications. anonymity implications.

View File

@ -416,10 +416,6 @@ connection_about_to_close_connection(connection_t *conn)
} }
if (conn->purpose == DIR_PURPOSE_FETCH_RENDDESC) if (conn->purpose == DIR_PURPOSE_FETCH_RENDDESC)
rend_client_desc_here(dir_conn->rend_query); /* give it a try */ rend_client_desc_here(dir_conn->rend_query); /* give it a try */
/* If this is from BEGIN_DIR, unlink it from the edge_conn and
* the or_conn. */
if (dir_conn->bridge_conn)
connection_dirserv_unlink_from_bridge(dir_conn);
break; break;
case CONN_TYPE_OR: case CONN_TYPE_OR:
or_conn = TO_OR_CONN(conn); or_conn = TO_OR_CONN(conn);
@ -446,13 +442,6 @@ connection_about_to_close_connection(connection_t *conn)
control_event_or_conn_status(or_conn, OR_CONN_EVENT_CLOSED, control_event_or_conn_status(or_conn, OR_CONN_EVENT_CLOSED,
control_tls_error_to_reason(or_conn->tls_error)); control_tls_error_to_reason(or_conn->tls_error));
} }
/* Remove any dir_conns that are blocked on this one. Non-blocked
* ones will die when the circuits do. */
while (or_conn->blocked_dir_connections) {
dir_connection_t *dir_conn = or_conn->blocked_dir_connections;
connection_dirserv_unlink_from_bridge(dir_conn);
tor_assert(or_conn->blocked_dir_connections != dir_conn);
}
/* Now close all the attached circuits on it. */ /* Now close all the attached circuits on it. */
circuit_unlink_all_from_or_conn(TO_OR_CONN(conn), circuit_unlink_all_from_or_conn(TO_OR_CONN(conn),
END_CIRC_REASON_OR_CONN_CLOSED); END_CIRC_REASON_OR_CONN_CLOSED);
@ -485,9 +474,6 @@ connection_about_to_close_connection(connection_t *conn)
if (conn->state == EXIT_CONN_STATE_RESOLVING) { if (conn->state == EXIT_CONN_STATE_RESOLVING) {
connection_dns_remove(edge_conn); connection_dns_remove(edge_conn);
} }
/* If we're relaying a dirserv connection, clean up any pointers */
if (edge_conn->bridge_for_conn)
connection_dirserv_unlink_from_bridge(edge_conn->bridge_for_conn);
break; break;
} }
} }
@ -2431,8 +2417,7 @@ assert_connection_ok(connection_t *conn, time_t now)
if (conn->outbuf_flushlen > 0) { if (conn->outbuf_flushlen > 0) {
tor_assert(connection_is_writing(conn) || conn->wants_to_write || tor_assert(connection_is_writing(conn) || conn->wants_to_write ||
(conn->type == CONN_TYPE_DIR && conn->edge_blocked_on_circ);
TO_DIR_CONN(conn)->is_blocked_on_or_conn));
} }
if (conn->hold_open_until_flushed) if (conn->hold_open_until_flushed)
@ -2496,34 +2481,7 @@ assert_connection_ok(connection_t *conn, time_t now)
tor_assert(conn->purpose == EXIT_PURPOSE_CONNECT || tor_assert(conn->purpose == EXIT_PURPOSE_CONNECT ||
conn->purpose == EXIT_PURPOSE_RESOLVE); conn->purpose == EXIT_PURPOSE_RESOLVE);
} }
if (edge_conn->bridge_for_conn) {
tor_assert(conn->type == CONN_TYPE_EXIT);
tor_assert(edge_conn->bridge_for_conn->bridge_conn == edge_conn);
}
} else if (conn->type == CONN_TYPE_DIR) { } else if (conn->type == CONN_TYPE_DIR) {
dir_connection_t *dir_conn = TO_DIR_CONN(conn);
if (dir_conn->bridge_conn) {
tor_assert(DIR_CONN_IS_SERVER(conn));
tor_assert(dir_conn->bridge_conn->bridge_for_conn == dir_conn);
if (dir_conn->bridge_conn->on_circuit) {
dir_connection_t *d;
or_connection_t *or_conn;
tor_assert(!CIRCUIT_IS_ORIGIN(dir_conn->bridge_conn->on_circuit));
or_conn = TO_OR_CIRCUIT(dir_conn->bridge_conn->on_circuit)->p_conn;
if (dir_conn->is_blocked_on_or_conn)
tor_assert(or_conn);
for (d = or_conn->blocked_dir_connections; d;
d = d->next_blocked_on_same_or_conn) {
if (d == dir_conn) {
tor_assert(dir_conn->is_blocked_on_or_conn == 1);
break;
}
}
if (!d)
tor_assert(!dir_conn->is_blocked_on_or_conn);
}
}
} else { } else {
/* Purpose is only used for dir and exit types currently */ /* Purpose is only used for dir and exit types currently */
tor_assert(!conn->purpose); tor_assert(!conn->purpose);

View File

@ -2469,9 +2469,6 @@ connection_exit_connect_dir(edge_connection_t *exit_conn)
return 0; return 0;
} }
dir_conn->bridge_conn = exit_conn;
exit_conn->bridge_for_conn = dir_conn;
connection_start_reading(TO_CONN(dir_conn)); connection_start_reading(TO_CONN(dir_conn));
connection_start_reading(TO_CONN(exit_conn)); connection_start_reading(TO_CONN(exit_conn));

View File

@ -16,7 +16,6 @@ const char connection_or_c_id[] =
static int connection_tls_finish_handshake(or_connection_t *conn); static int connection_tls_finish_handshake(or_connection_t *conn);
static int connection_or_process_cells_from_inbuf(or_connection_t *conn); static int connection_or_process_cells_from_inbuf(or_connection_t *conn);
static int connection_or_empty_enough_for_dirserv_data(or_connection_t *conn);
/**************************************************************/ /**************************************************************/
@ -232,21 +231,22 @@ connection_or_process_inbuf(or_connection_t *conn)
} }
} }
/**DOCDOC*/
#define OR_CONN_HIGHWATER (32*1024)
/**DOCDOC*/
#define OR_CONN_LOWWATER (16*1024)
/** Called whenever we have flushed some data on an or_conn: add more data /** Called whenever we have flushed some data on an or_conn: add more data
* from active circuits. */ * from active circuits. */
int int
connection_or_flushed_some(or_connection_t *conn) connection_or_flushed_some(or_connection_t *conn)
{ {
if (conn->blocked_dir_connections && size_t datalen = buf_datalen(conn->_base.outbuf);
connection_or_empty_enough_for_dirserv_data(conn)) { if (datalen < OR_CONN_LOWWATER) {
connection_dirserv_stop_blocking_all_on_or_conn(conn); int n = (OR_CONN_HIGHWATER - datalen) / CELL_NETWORK_SIZE;
}
if (buf_datalen(conn->_base.outbuf) < 16*1024) {
int n = (32*1024 - buf_datalen(conn->_base.outbuf)) / CELL_NETWORK_SIZE;
while (conn->active_circuits && n > 0) { while (conn->active_circuits && n > 0) {
int flushed; int flushed = connection_or_flush_from_first_active_circuit(conn, 1);
log_info(LD_GENERAL, "Loop, n==%d",n);
flushed = connection_or_flush_from_first_active_circuit(conn, 1);
n -= flushed; n -= flushed;
} }
} }
@ -799,32 +799,3 @@ connection_or_send_destroy(uint16_t circ_id, or_connection_t *conn, int reason)
return 0; return 0;
} }
/** A high waterlevel for whether to refill this OR connection
* with more directory information, if any is pending. */
#define DIR_BUF_FULLNESS_THRESHOLD (128*1024)
/** A bottom waterlevel for whether to refill this OR connection
* with more directory information, if any is pending. We don't want
* to make this too low, since we already run the risk of starving
* the pending dir connections if the OR conn is frequently busy with
* other things. */
#define DIR_BUF_EMPTINESS_THRESHOLD (96*1024)
/** Return true iff there is so much data waiting to be flushed on <b>conn</b>
* that we should stop writing directory data to it. */
int
connection_or_too_full_for_dirserv_data(or_connection_t *conn)
{
return buf_datalen(conn->_base.outbuf) > DIR_BUF_FULLNESS_THRESHOLD;
}
/** Return true iff there is no longer so much data waiting to be flushed on
* <b>conn</b> that we should not write directory data to it. */
static int
connection_or_empty_enough_for_dirserv_data(or_connection_t *conn)
{
/* Note that the threshold to stop writing is a bit higher than the
* threshold to start again: this should (with any luck) keep us from
* flapping about indefinitely. */
return buf_datalen(conn->_base.outbuf) < DIR_BUF_EMPTINESS_THRESHOLD;
}

View File

@ -2012,99 +2012,6 @@ dirserv_test_reachability(int try_all)
ctr = (ctr + 1) % 128; ctr = (ctr + 1) % 128;
} }
/** If <b>conn</b> is a dirserv connection tunneled over an or_connection,
* return that connection. Otherwise, return NULL. */
static INLINE or_connection_t *
connection_dirserv_get_target_or_conn(dir_connection_t *conn)
{
if (conn->bridge_conn &&
conn->bridge_conn->on_circuit &&
!CIRCUIT_IS_ORIGIN(conn->bridge_conn->on_circuit)) {
or_circuit_t *circ = TO_OR_CIRCUIT(conn->bridge_conn->on_circuit);
return circ->p_conn;
} else {
return NULL;
}
}
/** Remove <b>dir_conn</b> from the list of bridged dirserv connections
* blocking on <b>or_conn</b>, and set its status to nonblocked. */
static INLINE void
connection_dirserv_remove_from_blocked_list(or_connection_t *or_conn,
dir_connection_t *dir_conn)
{
dir_connection_t **c;
for (c = &or_conn->blocked_dir_connections; *c;
c = &(*c)->next_blocked_on_same_or_conn) {
if (*c == dir_conn) {
tor_assert(dir_conn->is_blocked_on_or_conn == 1);
*c = dir_conn->next_blocked_on_same_or_conn;
dir_conn->next_blocked_on_same_or_conn = NULL;
dir_conn->is_blocked_on_or_conn = 0;
return;
}
}
tor_assert(!dir_conn->is_blocked_on_or_conn);
}
/** If <b>dir_conn</b> is a dirserv connection that's bridged over an edge_conn
* onto an or_conn, remove it from the blocked list (if it's blocked) and
* unlink it and the edge_conn from one another. */
void
connection_dirserv_unlink_from_bridge(dir_connection_t *dir_conn)
{
edge_connection_t *edge_conn;
or_connection_t *or_conn;
tor_assert(dir_conn);
edge_conn = dir_conn->bridge_conn;
or_conn = connection_dirserv_get_target_or_conn(dir_conn);
if (or_conn) {
/* XXXX Really, this is only necessary if dir_conn->is_blocked_on_or_conn.
* But for now, let's leave it in, so the assert can catch */
connection_dirserv_remove_from_blocked_list(or_conn, dir_conn);
}
dir_conn->is_blocked_on_or_conn = 0; /* Probably redundant. */
edge_conn->bridge_for_conn = NULL;
dir_conn->bridge_conn = NULL;
}
/** Stop writing on a bridged dir_conn, and remember that it's blocked because
* its or_conn was too full. */
static void
connection_dirserv_mark_as_blocked(dir_connection_t *dir_conn)
{
or_connection_t *or_conn;
if (dir_conn->is_blocked_on_or_conn)
return;
tor_assert(! dir_conn->next_blocked_on_same_or_conn);
or_conn = connection_dirserv_get_target_or_conn(dir_conn);
if (!or_conn)
return;
dir_conn->next_blocked_on_same_or_conn = or_conn->blocked_dir_connections;
or_conn->blocked_dir_connections = dir_conn;
dir_conn->is_blocked_on_or_conn = 1;
connection_stop_writing(TO_CONN(dir_conn));
}
/** Tell all bridged dir_conns that were blocked because or_conn's outbuf was
* too full that they can write again. */
void
connection_dirserv_stop_blocking_all_on_or_conn(or_connection_t *or_conn)
{
dir_connection_t *dir_conn, *next;
dir_conn = or_conn->blocked_dir_connections;
while (dir_conn) {
next = dir_conn->next_blocked_on_same_or_conn;
dir_conn->is_blocked_on_or_conn = 0;
dir_conn->next_blocked_on_same_or_conn = NULL;
connection_start_writing(TO_CONN(dir_conn));
dir_conn = next;
}
or_conn->blocked_dir_connections = NULL;
}
/** Return an approximate estimate of the number of bytes that will /** Return an approximate estimate of the number of bytes that will
* be needed to transmit the server descriptors (if is_serverdescs -- * be needed to transmit the server descriptors (if is_serverdescs --
* they can be either d/ or fp/ queries) or networkstatus objects (if * they can be either d/ or fp/ queries) or networkstatus objects (if
@ -2309,18 +2216,11 @@ connection_dirserv_add_networkstatus_bytes_to_outbuf(dir_connection_t *conn)
int int
connection_dirserv_flushed_some(dir_connection_t *conn) connection_dirserv_flushed_some(dir_connection_t *conn)
{ {
or_connection_t *or_conn;
tor_assert(conn->_base.state == DIR_CONN_STATE_SERVER_WRITING); tor_assert(conn->_base.state == DIR_CONN_STATE_SERVER_WRITING);
if (buf_datalen(conn->_base.outbuf) >= DIRSERV_BUFFER_MIN) if (buf_datalen(conn->_base.outbuf) >= DIRSERV_BUFFER_MIN)
return 0; return 0;
if ((or_conn = connection_dirserv_get_target_or_conn(conn)) &&
connection_or_too_full_for_dirserv_data(or_conn)) {
connection_dirserv_mark_as_blocked(conn);
return 0;
}
switch (conn->dir_spool_src) { switch (conn->dir_spool_src) {
case DIR_SPOOL_SERVER_BY_DIGEST: case DIR_SPOOL_SERVER_BY_DIGEST:
case DIR_SPOOL_SERVER_BY_FP: case DIR_SPOOL_SERVER_BY_FP:

View File

@ -750,10 +750,11 @@ typedef struct connection_t {
* before closing it? */ * before closing it? */
unsigned int inbuf_reached_eof:1; /**< Boolean: did read() return 0 on this unsigned int inbuf_reached_eof:1; /**< Boolean: did read() return 0 on this
* conn? */ * conn? */
unsigned edge_has_sent_end:1; /**< For debugging; only used on edge unsigned int edge_has_sent_end:1; /**< For debugging; only used on edge
* connections. Set once we've set the stream end, * connections. Set once we've set the stream end,
* and check in connection_about_to_close_connection(). * and check in connection_about_to_close_connection().
*/ */
unsigned int edge_blocked_on_circ:1; /**< DOCDOC */
/** Used for OR conns that shouldn't get any new circs attached to them. */ /** Used for OR conns that shouldn't get any new circs attached to them. */
unsigned int or_is_obsolete:1; unsigned int or_is_obsolete:1;
/** For AP connections only. If 1, and we fail to reach the chosen exit, /** For AP connections only. If 1, and we fail to reach the chosen exit,
@ -818,9 +819,7 @@ typedef struct or_connection_t {
struct circuit_t *active_circuits; /**< DOCDOC */ struct circuit_t *active_circuits; /**< DOCDOC */
struct or_connection_t *next_with_same_id; /**< Next connection with same struct or_connection_t *next_with_same_id; /**< Next connection with same
* identity digest as this one. */ * identity digest as this one. */
/** Linked list of bridged dirserver connections that can't write until
* this connection's outbuf is less full. */
struct dir_connection_t *blocked_dir_connections;
circ_id_type_t circ_id_type:2; /**< When we send CREATE cells along this circ_id_type_t circ_id_type:2; /**< When we send CREATE cells along this
* connection, which half of the space should * connection, which half of the space should
* we use? */ * we use? */
@ -869,10 +868,6 @@ typedef struct edge_connection_t {
/** Bytes written since last call to control_event_stream_bandwidth_used() */ /** Bytes written since last call to control_event_stream_bandwidth_used() */
uint32_t n_written; uint32_t n_written;
/** Exit only: a dirserv connection that is tunneled over this connection
* using a socketpair. */
struct dir_connection_t *bridge_for_conn;
char rend_query[REND_SERVICE_ID_LEN+1]; /**< What rendezvous service are we char rend_query[REND_SERVICE_ID_LEN+1]; /**< What rendezvous service are we
* querying for? (AP only) */ * querying for? (AP only) */
@ -891,10 +886,6 @@ typedef struct dir_connection_t {
char *requested_resource; /**< Which 'resource' did we ask the directory char *requested_resource; /**< Which 'resource' did we ask the directory
* for? */ * for? */
unsigned int dirconn_direct:1; /**< Is this dirconn direct, or via Tor? */ unsigned int dirconn_direct:1; /**< Is this dirconn direct, or via Tor? */
/** True iff this is a dirserv conn, and it's tunneled over an or_conn,
* and we've stopped writing because the or_conn had too much pending
* data to write */
unsigned int is_blocked_on_or_conn : 1;
/* Used only for server sides of some dir connections, to implement /* Used only for server sides of some dir connections, to implement
* "spooling" of directory material to the outbuf. Otherwise, we'd have * "spooling" of directory material to the outbuf. Otherwise, we'd have
@ -919,15 +910,6 @@ typedef struct dir_connection_t {
char identity_digest[DIGEST_LEN]; /**< Hash of the public RSA key for char identity_digest[DIGEST_LEN]; /**< Hash of the public RSA key for
* the directory server's signing key. */ * the directory server's signing key. */
/** If this is a dirserv conn created with a BEGIN_DIR (a "bridged" dirserv
* connection), a pointer to the edge_conn at the other end of its
* socketpair. */
edge_connection_t *bridge_conn;
/** Next connection in linked list of dirserv connections blocked until
* the or_conns over which they're bridged have enough space in their
* outbufs. */
struct dir_connection_t *next_blocked_on_same_or_conn;
} dir_connection_t; } dir_connection_t;
/** Subtype of connection_t for an connection to a controller. */ /** Subtype of connection_t for an connection to a controller. */
@ -1396,6 +1378,11 @@ typedef struct circuit_t {
uint16_t n_port; uint16_t n_port;
/** The IPv4 address of the OR that is next in this circuit. */ /** The IPv4 address of the OR that is next in this circuit. */
uint32_t n_addr; uint32_t n_addr;
/** DOCDOC */
unsigned int streams_blocked_on_n_conn : 1;
unsigned int streams_blocked_on_p_conn : 1;
/** How many relay data cells can we package (read from edge streams) /** How many relay data cells can we package (read from edge streams)
* on this circuit before we receive a circuit-level sendme cell asking * on this circuit before we receive a circuit-level sendme cell asking
* for more? */ * for more? */
@ -2196,7 +2183,6 @@ char *alloc_http_authenticator(const char *authenticator);
void assert_connection_ok(connection_t *conn, time_t now); void assert_connection_ok(connection_t *conn, time_t now);
int connection_or_nonopen_was_started_here(or_connection_t *conn); int connection_or_nonopen_was_started_here(or_connection_t *conn);
int connection_or_too_full_for_dirserv_data(or_connection_t *conn);
/********************************* connection_edge.c *************************/ /********************************* connection_edge.c *************************/

View File

@ -1462,6 +1462,11 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
} }
} }
/** DOCDOC */
#define CELL_QUEUE_HIGHWATER_SIZE 256
/** DOCDOC */
#define CELL_QUEUE_LOWWATER_SIZE 64
/** DOCDOC */ /** DOCDOC */
static INLINE void static INLINE void
cell_free(cell_t *cell) cell_free(cell_t *cell)
@ -1532,6 +1537,7 @@ cell_queue_pop(cell_queue_t *queue)
return cell; return cell;
} }
/** DOCDOC */
static INLINE circuit_t ** static INLINE circuit_t **
next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
{ {
@ -1611,6 +1617,36 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
orconn->active_circuits = NULL; orconn->active_circuits = NULL;
} }
/** DOCDOC */
static void
block_streams_on_circ(circuit_t *circ, or_connection_t *orconn, int block)
{
edge_connection_t *edge = NULL;
if (circ->n_conn == orconn) {
circ->streams_blocked_on_n_conn = block;
if (CIRCUIT_IS_ORIGIN(circ))
edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
circ->streams_blocked_on_p_conn = block;
tor_assert(!CIRCUIT_IS_ORIGIN(circ));
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
for (; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
conn->edge_blocked_on_circ = block;
if (block) {
if (connection_is_reading(conn))
connection_stop_reading(conn);
} else {
/* Is this right? */
if (!connection_is_reading(conn))
connection_start_reading(conn);
}
}
}
/** DOCDOC */ /** DOCDOC */
int int
connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max) connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max)
@ -1618,22 +1654,29 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max)
int n_flushed; int n_flushed;
cell_queue_t *queue; cell_queue_t *queue;
circuit_t *circ; circuit_t *circ;
int streams_blocked;
circ = conn->active_circuits; circ = conn->active_circuits;
if (!circ) return 0; if (!circ) return 0;
if (circ->n_conn == conn) if (circ->n_conn == conn) {
queue = &circ->n_conn_cells; queue = &circ->n_conn_cells;
else streams_blocked = circ->streams_blocked_on_n_conn;
} else {
queue = &TO_OR_CIRCUIT(circ)->p_conn_cells; queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
streams_blocked = circ->streams_blocked_on_p_conn;
}
for (n_flushed = 0; n_flushed < max && queue->head; ++n_flushed) { for (n_flushed = 0; n_flushed < max && queue->head; ++n_flushed) {
cell_t *cell = cell_queue_pop(queue); cell_t *cell = cell_queue_pop(queue);
connection_or_write_cell_to_buf(cell, conn); connection_or_write_cell_to_buf(cell, conn);
cell_free(cell); cell_free(cell);
log_info(LD_GENERAL, "flushed a cell; n ==%d", queue->n);
++n_flushed; ++n_flushed;
} }
conn->active_circuits = *next_circ_on_conn_p(circ, conn); conn->active_circuits = *next_circ_on_conn_p(circ, conn);
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
block_streams_on_circ(circ, conn, 0); /* unblock streams */
if (queue->n == 0) { if (queue->n == 0) {
log_info(LD_GENERAL, "Made a circuit inactive."); log_info(LD_GENERAL, "Made a circuit inactive.");
make_circuit_inactive_on_conn(circ, conn); make_circuit_inactive_on_conn(circ, conn);
@ -1647,16 +1690,20 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
cell_t *cell, int direction) cell_t *cell, int direction)
{ {
cell_queue_t *queue; cell_queue_t *queue;
int streams_blocked;
if (direction == CELL_DIRECTION_OUT) { if (direction == CELL_DIRECTION_OUT) {
queue = &circ->n_conn_cells; queue = &circ->n_conn_cells;
streams_blocked = circ->streams_blocked_on_n_conn;
} else { } else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
queue = &orcirc->p_conn_cells; queue = &orcirc->p_conn_cells;
streams_blocked = circ->streams_blocked_on_p_conn;
} }
cell_queue_append_copy(queue, cell); cell_queue_append_copy(queue, cell);
log_info(LD_GENERAL, "Added a cell; n ==%d", queue->n); if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
block_streams_on_circ(circ, orconn, 1); /* block streams */
if (queue->n == 1) { if (queue->n == 1) {
/* This was the first cell added to the queue. We need to make this /* This was the first cell added to the queue. We need to make this
@ -1676,6 +1723,7 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
} }
} }
/** DOCDOC */
void void
assert_active_circuits_ok(or_connection_t *orconn) assert_active_circuits_ok(or_connection_t *orconn)
{ {
@ -1693,3 +1741,4 @@ assert_active_circuits_ok(or_connection_t *orconn)
cur = next; cur = next;
} while (cur != head); } while (cur != head);
} }