Add a timeout to tor_cond_wait; add tor_cond impl from libevent

The windows code may need some tweaks for it to compile; I've not
tested it yet.
This commit is contained in:
Nick Mathewson 2013-09-22 22:08:41 -04:00
parent c2f0d52b7f
commit e865248156
3 changed files with 126 additions and 69 deletions

View File

@ -148,10 +148,6 @@ tor_get_thread_id(void)
/* Conditions. */ /* Conditions. */
/** Cross-platform condition implementation. */
struct tor_cond_t {
pthread_cond_t cond;
};
/** Return a newly allocated condition, with nobody waiting on it. */ /** Return a newly allocated condition, with nobody waiting on it. */
tor_cond_t * tor_cond_t *
tor_cond_new(void) tor_cond_new(void)
@ -177,11 +173,25 @@ tor_cond_free(tor_cond_t *cond)
} }
/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>. /** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
* All waiters on the condition must wait holding the same <b>mutex</b>. * All waiters on the condition must wait holding the same <b>mutex</b>.
* Returns 0 on success, negative on failure. */ * Returns 0 on success, -1 on failure, 1 on timeout. */
int int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex) tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
{ {
if (tv == NULL) {
return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0; return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
} else {
struct timespec ts;
int r;
ts.tv_sec = tv->tv_sec;
ts.tv_nsec = tv->tv_usec * 1000;
r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts);
if (r == 0)
return 0;
else if (r == ETIMEDOUT)
return 1;
else
return -1;
}
} }
/** Wake up one of the waiters on <b>cond</b>. */ /** Wake up one of the waiters on <b>cond</b>. */
void void

View File

@ -57,10 +57,25 @@ void tor_threads_init(void);
void set_main_thread(void); void set_main_thread(void);
int in_main_thread(void); int in_main_thread(void);
typedef struct tor_cond_t tor_cond_t; typedef struct tor_cond_t {
#ifdef USE_PTHREADS
pthread_cond_t cond;
#elif defined(USE_WIN32_THREADS)
HANDLE event;
CRITICAL_SECTION lock;
int n_waiting;
int n_to_wake;
int generation;
#else
#error no known condition implementation.
#endif
} tor_cond_t;
tor_cond_t *tor_cond_new(void); tor_cond_t *tor_cond_new(void);
void tor_cond_free(tor_cond_t *cond); void tor_cond_free(tor_cond_t *cond);
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex); int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex,
const struct timeval *tv);
void tor_cond_signal_one(tor_cond_t *cond); void tor_cond_signal_one(tor_cond_t *cond);
void tor_cond_signal_all(tor_cond_t *cond); void tor_cond_signal_all(tor_cond_t *cond);

View File

@ -70,17 +70,20 @@ tor_get_thread_id(void)
return (unsigned long)GetCurrentThreadId(); return (unsigned long)GetCurrentThreadId();
} }
static DWORD cond_event_tls_index;
struct tor_cond_t {
CRITICAL_SECTION mutex;
smartlist_t *events;
};
tor_cond_t * tor_cond_t *
tor_cond_new(void) tor_cond_new(void)
{ {
tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t)); tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
InitializeCriticalSection(&cond->mutex); if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) {
cond->events = smartlist_new(); tor_free(cond);
return NULL;
}
if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) {
DeleteCriticalSection(&cond->lock);
tor_free(cond);
return NULL;
}
cond->n_waiting = cond->n_to_wake = cond->generation = 0;
return cond; return cond;
} }
void void
@ -88,74 +91,103 @@ tor_cond_free(tor_cond_t *cond)
{ {
if (!cond) if (!cond)
return; return;
DeleteCriticalSection(&cond->mutex); DeleteCriticalSection(&cond->lock);
/* XXXX notify? */ CloseHandle(cond->event);
smartlist_free(cond->events); mm_free(cond);
tor_free(cond);
} }
int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex) static void
tor_cond_signal_impl(tor_cond_t *cond, int broadcast)
{ {
HANDLE event; EnterCriticalSection(&cond->lock);
int r; if (broadcast)
tor_assert(cond); cond->n_to_wake = cond->n_waiting;
tor_assert(mutex); else
event = TlsGetValue(cond_event_tls_index); ++cond->n_to_wake;
if (!event) { cond->generation++;
event = CreateEvent(0, FALSE, FALSE, NULL); SetEvent(cond->event);
TlsSetValue(cond_event_tls_index, event); LeaveCriticalSection(&cond->lock);
}
EnterCriticalSection(&cond->mutex);
tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT);
tor_assert(!smartlist_contains(cond->events, event));
smartlist_add(cond->events, event);
LeaveCriticalSection(&cond->mutex);
tor_mutex_release(mutex);
r = WaitForSingleObject(event, INFINITE);
tor_mutex_acquire(mutex);
switch (r) {
case WAIT_OBJECT_0: /* we got the mutex normally. */
break;
case WAIT_ABANDONED: /* holding thread exited. */
case WAIT_TIMEOUT: /* Should never happen. */
tor_assert(0);
break;
case WAIT_FAILED:
log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError());
}
return 0; return 0;
} }
void void
tor_cond_signal_one(tor_cond_t *cond) tor_cond_signal_one(tor_cond_t *cond)
{ {
HANDLE event; tor_cond_signal_impl(cond, 0);
tor_assert(cond);
EnterCriticalSection(&cond->mutex);
if ((event = smartlist_pop_last(cond->events)))
SetEvent(event);
LeaveCriticalSection(&cond->mutex);
} }
void void
tor_cond_signal_all(tor_cond_t *cond) tor_cond_signal_all(tor_cond_t *cond)
{ {
tor_assert(cond); tor_cond_signal_impl(cond, 1);
}
EnterCriticalSection(&cond->mutex); int
SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event)); tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock, const struct timeval *tv)
smartlist_clear(cond->events); {
LeaveCriticalSection(&cond->mutex); CRITICAL_SECTION *lock = &lock->mutex;
int generation_at_start;
int waiting = 1;
int result = -1;
DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime;
if (tv)
ms_orig = ms = evutil_tv_to_msec_(tv);
EnterCriticalSection(&cond->lock);
++cond->n_waiting;
generation_at_start = cond->generation;
LeaveCriticalSection(&cond->lock);
LeaveCriticalSection(lock);
startTime = GetTickCount();
do {
DWORD res;
res = WaitForSingleObject(cond->event, ms);
EnterCriticalSection(&cond->lock);
if (cond->n_to_wake &&
cond->generation != generation_at_start) {
--cond->n_to_wake;
--cond->n_waiting;
result = 0;
waiting = 0;
goto out;
} else if (res != WAIT_OBJECT_0) {
result = (res==WAIT_TIMEOUT) ? 1 : -1;
--cond->n_waiting;
waiting = 0;
goto out;
} else if (ms != INFINITE) {
endTime = GetTickCount();
if (startTime + ms_orig <= endTime) {
result = 1; /* Timeout */
--cond->n_waiting;
waiting = 0;
goto out;
} else {
ms = startTime + ms_orig - endTime;
}
}
/* If we make it here, we are still waiting. */
if (cond->n_to_wake == 0) {
/* There is nobody else who should wake up; reset
* the event. */
ResetEvent(cond->event);
}
out:
LeaveCriticalSection(&cond->lock);
} while (waiting);
EnterCriticalSection(lock);
EnterCriticalSection(&cond->lock);
if (!cond->n_waiting)
ResetEvent(cond->event);
LeaveCriticalSection(&cond->lock);
return result;
} }
void void
tor_threads_init(void) tor_threads_init(void)
{ {
cond_event_tls_index = TlsAlloc();
set_main_thread(); set_main_thread();
} }