mirror of
https://gitlab.torproject.org/tpo/core/tor.git
synced 2024-11-10 21:23:58 +01:00
Test and fix workqueue_entry_cancel().
This commit is contained in:
parent
ebbc177005
commit
e5f8c772f4
@ -119,27 +119,29 @@ workqueue_entry_free(workqueue_entry_t *ent)
|
||||
* executed in the main thread; that will cause undefined behavior (probably,
|
||||
* a crash).
|
||||
*
|
||||
* If the work is cancelled, this function return 1. It is the caller's
|
||||
* responsibility to free any storage in the work function's arguments.
|
||||
* If the work is cancelled, this function return the argument passed to the
|
||||
* work function. It is the caller's responsibility to free this storage.
|
||||
*
|
||||
* This function will have no effect if the worker thread has already executed
|
||||
* or begun to execute the work item. In that case, it will return 0.
|
||||
* or begun to execute the work item. In that case, it will return NULL.
|
||||
*/
|
||||
int
|
||||
void *
|
||||
workqueue_entry_cancel(workqueue_entry_t *ent)
|
||||
{
|
||||
int cancelled = 0;
|
||||
void *result = NULL;
|
||||
tor_mutex_acquire(&ent->on_thread->lock);
|
||||
if (ent->pending) {
|
||||
TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work);
|
||||
cancelled = 1;
|
||||
result = ent->arg;
|
||||
}
|
||||
tor_mutex_release(&ent->on_thread->lock);
|
||||
|
||||
if (cancelled) {
|
||||
tor_free(ent);
|
||||
}
|
||||
return cancelled;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -32,7 +32,7 @@ int threadpool_queue_for_all(threadpool_t *pool,
|
||||
int (*fn)(void *, void *),
|
||||
void (*reply_fn)(void *),
|
||||
void *arg);
|
||||
int workqueue_entry_cancel(workqueue_entry_t *pending_work);
|
||||
void *workqueue_entry_cancel(workqueue_entry_t *pending_work);
|
||||
threadpool_t *threadpool_new(int n_threads,
|
||||
replyqueue_t *replyqueue,
|
||||
void *(*new_thread_state_fn)(void*),
|
||||
|
@ -23,6 +23,7 @@ static int opt_n_threads = 8;
|
||||
static int opt_n_items = 10000;
|
||||
static int opt_n_inflight = 1000;
|
||||
static int opt_n_lowwater = 250;
|
||||
static int opt_n_cancel = 0;
|
||||
static int opt_ratio_rsa = 5;
|
||||
|
||||
#ifdef TRACK_RESPONSES
|
||||
@ -172,29 +173,70 @@ handle_reply(void *arg)
|
||||
++n_received;
|
||||
}
|
||||
|
||||
static int
|
||||
static workqueue_entry_t *
|
||||
add_work(threadpool_t *tp)
|
||||
{
|
||||
int add_rsa =
|
||||
opt_ratio_rsa == 0 ||
|
||||
tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
|
||||
|
||||
if (add_rsa) {
|
||||
rsa_work_t *w = tor_malloc_zero(sizeof(*w));
|
||||
w->serial = n_sent++;
|
||||
crypto_rand((char*)w->msg, 20);
|
||||
w->msglen = 20;
|
||||
++rsa_sent;
|
||||
return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL;
|
||||
return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w);
|
||||
} else {
|
||||
ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
|
||||
w->serial = n_sent++;
|
||||
/* Not strictly right, but this is just for benchmarks. */
|
||||
crypto_rand((char*)w->u.pk.public_key, 32);
|
||||
++ecdh_sent;
|
||||
return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL;
|
||||
return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
|
||||
}
|
||||
}
|
||||
|
||||
static int n_failed_cancel = 0;
|
||||
static int n_successful_cancel = 0;
|
||||
|
||||
static int
|
||||
add_n_work_items(threadpool_t *tp, int n)
|
||||
{
|
||||
int n_queued = 0;
|
||||
int n_try_cancel = 0, i;
|
||||
workqueue_entry_t **to_cancel;
|
||||
workqueue_entry_t *ent;
|
||||
|
||||
to_cancel = tor_malloc(sizeof(workqueue_entry_t*) * opt_n_cancel);
|
||||
|
||||
while (n_queued++ < n) {
|
||||
ent = add_work(tp);
|
||||
if (! ent) {
|
||||
puts("Couldn't add work.");
|
||||
tor_event_base_loopexit(tor_libevent_get_base(), NULL);
|
||||
return -1;
|
||||
}
|
||||
if (n_try_cancel < opt_n_cancel &&
|
||||
tor_weak_random_range(&weak_rng, n) < opt_n_cancel) {
|
||||
to_cancel[n_try_cancel++] = ent;
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < n_try_cancel; ++i) {
|
||||
void *work = workqueue_entry_cancel(to_cancel[i]);
|
||||
if (! work) {
|
||||
n_failed_cancel++;
|
||||
} else {
|
||||
n_successful_cancel++;
|
||||
tor_free(work);
|
||||
}
|
||||
}
|
||||
|
||||
tor_free(to_cancel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int shutting_down = 0;
|
||||
static int n_shutdowns_done = 0;
|
||||
|
||||
@ -223,8 +265,13 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
|
||||
if (old_r == n_received)
|
||||
return;
|
||||
|
||||
if (opt_verbose)
|
||||
printf("%d / %d\n", n_received, n_sent);
|
||||
if (opt_verbose) {
|
||||
printf("%d / %d", n_received, n_sent);
|
||||
if (opt_n_cancel)
|
||||
printf(" (%d cancelled, %d uncancellable)",
|
||||
n_successful_cancel, n_failed_cancel);
|
||||
puts("");
|
||||
}
|
||||
#ifdef TRACK_RESPONSES
|
||||
tor_mutex_acquire(&bitmap_mutex);
|
||||
for (i = 0; i < opt_n_items; ++i) {
|
||||
@ -239,16 +286,14 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
|
||||
tor_mutex_release(&bitmap_mutex);
|
||||
#endif
|
||||
|
||||
if (n_sent - n_received < opt_n_lowwater) {
|
||||
while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) {
|
||||
if (! add_work(tp)) {
|
||||
puts("Couldn't add work.");
|
||||
tor_event_base_loopexit(tor_libevent_get_base(), NULL);
|
||||
}
|
||||
}
|
||||
if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
|
||||
int n_to_send = n_received + opt_n_inflight - n_sent;
|
||||
if (n_to_send > opt_n_items - n_sent)
|
||||
n_to_send = opt_n_items - n_sent;
|
||||
add_n_work_items(tp, n_to_send);
|
||||
}
|
||||
|
||||
if (shutting_down == 0 && n_received == n_sent && n_sent >= opt_n_items) {
|
||||
if (shutting_down == 0 && n_received+n_successful_cancel == n_sent && n_sent >= opt_n_items) {
|
||||
shutting_down = 1;
|
||||
threadpool_queue_for_all(tp, NULL, workqueue_do_shutdown, shutdown_reply, NULL);
|
||||
}
|
||||
@ -263,6 +308,7 @@ help(void)
|
||||
" -T <threads> Use this many threads\n"
|
||||
" -I <inflight> Have no more than this many requests queued at once\n"
|
||||
" -L <lowwater> Add items whenever fewer than this many are pending\n"
|
||||
" -C <cancel> Try to cancel N items of every batch that we add\n"
|
||||
" -R <ratio> Make one out of this many items be a slow (RSA) one\n"
|
||||
" --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
|
||||
" Disable one of the alert_socket backends.");
|
||||
@ -291,6 +337,8 @@ main(int argc, char **argv)
|
||||
opt_n_lowwater = atoi(argv[++i]);
|
||||
} else if (!strcmp(argv[i], "-R") && i+1<argc) {
|
||||
opt_ratio_rsa = atoi(argv[++i]);
|
||||
} else if (!strcmp(argv[i], "-C") && i+1<argc) {
|
||||
opt_n_cancel = atoi(argv[++i]);
|
||||
} else if (!strcmp(argv[i], "--no-eventfd2")) {
|
||||
as_flags |= ASOCKS_NOEVENTFD2;
|
||||
} else if (!strcmp(argv[i], "--no-eventfd")) {
|
||||
@ -311,6 +359,7 @@ main(int argc, char **argv)
|
||||
}
|
||||
if (opt_n_threads < 1 ||
|
||||
opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
|
||||
opt_n_cancel > opt_n_inflight ||
|
||||
opt_ratio_rsa < 0) {
|
||||
help();
|
||||
return 1;
|
||||
@ -358,7 +407,7 @@ main(int argc, char **argv)
|
||||
|
||||
event_base_loop(tor_libevent_get_base(), 0);
|
||||
|
||||
if (n_sent != opt_n_items || n_received != n_sent ||
|
||||
if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent ||
|
||||
n_shutdowns_done != opt_n_threads) {
|
||||
puts("FAIL");
|
||||
return 1;
|
||||
|
Loading…
Reference in New Issue
Block a user