win32: Support condition variable broadcasting on XP
authorHugo Landau <hlandau@openssl.org>
Tue, 29 Aug 2023 13:33:44 +0000 (14:33 +0100)
committerHugo Landau <hlandau@openssl.org>
Wed, 6 Sep 2023 09:32:14 +0000 (10:32 +0100)
Reviewed-by: Tomas Mraz <tomas@openssl.org>
Reviewed-by: Matt Caswell <matt@openssl.org>
(Merged from https://github.com/openssl/openssl/pull/21827)

crypto/thread/arch/thread_win.c

index ce7ff4931134e265b3460ddac175f12668f7c874..b877211143375adb6c76ee600359b07c0b125c7f 100644 (file)
@@ -173,58 +173,360 @@ static int determine_timeout(OSSL_TIME deadline, DWORD *w_timeout_p)
 }
 
 # if defined(OPENSSL_THREADS_WINNT_LEGACY)
+#  include <assert.h>
+
+/*
+ * Win32, before Vista, did not have an OS-provided condition variable
+ * construct. This leads to the need to construct our own condition variable
+ * construct in order to support Windows XP.
+ *
+ * It is difficult to construct a condition variable construct using the
+ * OS-provided primitives in a way that is both correct (avoiding race
+ * conditions where broadcasts get lost) and fair.
+ *
+ * CORRECTNESS:
+ *   A blocked thread is a thread which is calling wait(), between the
+ *   precise instants at which the external mutex passed to wait() is
+ *   unlocked and the instant at which it is relocked.
+ *
+ *   a)
+ *     - If broadcast() is called, ALL blocked threads MUST be unblocked.
+ *     - If signal() is called, at least one blocked thread MUST be unblocked.
+ *
+ *     (i.e.: a signal or broadcast must never get 'lost')
+ *
+ *   b)
+ *     - If broadcast() or signal() is called, this must not cause a thread
+ *       which is not blocked to return immediately from a subsequent
+ *       call to wait().
+ *
+ * FAIRNESS:
+ *   If broadcast() is called at time T1, all blocked threads must be unblocked
+ *   before any thread which subsequently calls wait() at time T2 > T1 is
+ *   unblocked.
+ *
+ *   An example of an implementation which lacks fairness is as follows:
+ *
+ *     t1 enters wait()
+ *     t2 enters wait()
+ *
+ *     tZ calls broadcast()
+ *
+ *     t1 exits wait()
+ *     t1 enters wait()
+ *
+ *     tZ calls broadcast()
+ *
+ *     t1 exits wait()
+ *
+ * IMPLEMENTATION:
+ *
+ *   The most suitable primitives available to us in Windows XP are semaphores,
+ *   auto-reset events and manual-reset events. A solution based on semaphores
+ *   is chosen.
+ *
+ *   PROBLEM. Designing a solution based on semaphores is non-trivial because,
+ *   while it is easy to track the number of waiters in an interlocked data
+ *   structure and then add that number to the semaphore, this does not
+ *   guarantee fairness or correctness. Consider the following situation:
+ *
+ *     - t1 enters wait(), adding 1 to the wait counter & blocks on the semaphore
+ *     - t2 enters wait(), adding 1 to the wait counter & blocks on the semaphore
+ *     - tZ calls broadcast(), finds the wait counter is 2, adds 2 to the semaphore
+ *
+ *     - t1 exits wait()
+ *     - t1 immediately reenters wait() and blocks on the semaphore
+ *     - The semaphore is still positive due to also having been signalled
+ *       for t2, therefore it is decremented
+ *     - t1 exits wait() immediately; t2 is never woken
+ *
+ *   GENERATION COUNTERS. One naive solution to this is to use a generation
+ *   counter. Each broadcast() invocation increments a generation counter. If
+ *   the generation counter has not changed during a semaphore wait operation
+ *   inside wait(), this indicates that no broadcast() call has been made in
+ *   the meantime; therefore, the successful semaphore decrement must have
+ *   'stolen' a wakeup from another thread which was waiting to wakeup from the
+ *   prior broadcast() call but which had not yet had a chance to do so. The
+ *   semaphore can then be reincremented and the wait() operation repeated.
+ *
+ *   However, this suffers from the obvious problem that without OS guarantees
+ *   as to how semaphore readiness events are distributed amongst threads,
+ *   there is no particular guarantee that the semaphore readiness event will
+ *   not be immediately redistributed back to the same thread t1.
+ *
+ *   SOLUTION. A solution is chosen as follows. In its initial state, a
+ *   condition variable can accept waiters, who wait for the semaphore
+ *   normally. However, once broadcast() is called, the condition
+ *   variable becomes 'closed'. Any existing blocked threads are unblocked,
+ *   but any new calls to wait() will instead enter a blocking pre-wait stage.
+ *   Pre-wait threads are not considered to be waiting (and the external
+ *   mutex remains held). A call to wait() in pre-wait cannot progress
+ *   to waiting until all threads due to be unblocked by the prior broadcast()
+ *   call have returned and had a chance to execute.
+ *
+ *   This pre-wait does not affect a thread if it does not call wait()
+ *   again until after all threads have had a chance to execute.
+ *
+ *   RESOURCE USAGE. Aside from an allocation for the condition variable
+ *   structure, this solution uses two Win32 semaphores.
+ *
+ * FUTURE OPTIMISATIONS:
+ *
+ *   An optimised multi-generation implementation is possible at the cost of
+ *   higher Win32 resource usage. Multiple 'buckets' could be defined, with
+ *   usage rotating between buckets internally as buckets become closed.
+ *   This would avoid the need for the prewait in more cases, depending
+ *   on intensity of usage.
+ *
+ */
+typedef struct legacy_condvar_st {
+    CRYPTO_MUTEX    *int_m;       /* internal mutex */
+    HANDLE          sema;         /* main wait semaphore */
+    HANDLE          prewait_sema; /* prewait semaphore */
+    /*
+     * All of the following fields are protected by int_m.
+     *
+     * num_wake only ever increases by virtue of a corresponding decrease in
+     * num_wait. num_wait can decrease for other reasons (for example due to a
+     * wait operation timing out).
+     */
+    size_t          num_wait;     /* Num. threads currently blocked */
+    size_t          num_wake;     /* Num. threads due to wake up */
+    size_t          num_prewait;  /* Num. threads in prewait */
+    size_t          gen;          /* Prewait generation */
+    int             closed;       /* Is closed? */
+} LEGACY_CONDVAR;
 
 CRYPTO_CONDVAR *ossl_crypto_condvar_new(void)
 {
-    HANDLE h;
+    LEGACY_CONDVAR *cv;
+
+    if ((cv = OPENSSL_malloc(sizeof(LEGACY_CONDVAR))) == NULL)
+        return NULL;
 
-    if ((h = CreateEventA(NULL, FALSE, FALSE, NULL)) == NULL)
+    if ((cv->int_m = ossl_crypto_mutex_new()) == NULL) {
+        OPENSSL_free(cv);
         return NULL;
+    }
 
-    return (CRYPTO_CONDVAR *)h;
+    if ((cv->sema = CreateSemaphoreA(NULL, 0, LONG_MAX, NULL)) == NULL) {
+        ossl_crypto_mutex_free(&cv->int_m);
+        OPENSSL_free(cv);
+        return NULL;
+    }
+
+    if ((cv->prewait_sema = CreateSemaphoreA(NULL, 0, LONG_MAX, NULL)) == NULL) {
+        CloseHandle(cv->sema);
+        ossl_crypto_mutex_free(&cv->int_m);
+        OPENSSL_free(cv);
+        return NULL;
+    }
+
+    cv->num_wait      = 0;
+    cv->num_wake      = 0;
+    cv->num_prewait   = 0;
+    cv->closed        = 0;
+
+    return (CRYPTO_CONDVAR *)cv;
 }
 
-void ossl_crypto_condvar_wait(CRYPTO_CONDVAR *cv, CRYPTO_MUTEX *mutex)
+void ossl_crypto_condvar_free(CRYPTO_CONDVAR **cv_p)
 {
-    ossl_crypto_mutex_unlock(mutex);
-    WaitForSingleObject((HANDLE)cv, INFINITE);
-    ossl_crypto_mutex_lock(mutex);
+    if (*cv_p != NULL) {
+        LEGACY_CONDVAR *cv = *(LEGACY_CONDVAR **)cv_p;
+
+        CloseHandle(cv->sema);
+        CloseHandle(cv->prewait_sema);
+        ossl_crypto_mutex_free(&cv->int_m);
+        OPENSSL_free(cv);
+    }
+
+    *cv_p = NULL;
 }
 
-void ossl_crypto_condvar_wait_timeout(CRYPTO_CONDVAR *cv, CRYPTO_MUTEX *mutex,
-                                      OSSL_TIME deadline)
+static uint32_t obj_wait(HANDLE h, OSSL_TIME deadline)
 {
     DWORD timeout;
 
     if (!determine_timeout(deadline, &timeout))
         timeout = 1;
 
-    ossl_crypto_mutex_unlock(mutex);
-    WaitForSingleObject((HANDLE)cv, timeout);
-    ossl_crypto_mutex_lock(mutex);
+    return WaitForSingleObject(h, timeout);
 }
 
-void ossl_crypto_condvar_broadcast(CRYPTO_CONDVAR *cv)
+void ossl_crypto_condvar_wait_timeout(CRYPTO_CONDVAR *cv_, CRYPTO_MUTEX *ext_m,
+                                      OSSL_TIME deadline)
+{
+    LEGACY_CONDVAR *cv = (LEGACY_CONDVAR *)cv_;
+    int closed, set_prewait = 0, have_orig_gen = 0;
+    uint32_t rc;
+    size_t orig_gen;
+
+    /* Admission control - prewait until we can enter our actual wait phase. */
+    do {
+        ossl_crypto_mutex_lock(cv->int_m);
+
+        closed = cv->closed;
+
+        /*
+         * Once prewait is over the prewait semaphore is signalled and
+         * num_prewait is set to 0. Use a generation counter to track if we need
+         * to remove a value we added to num_prewait when exiting (e.g. due to
+         * timeout or failure of WaitForSingleObject).
+         */
+        if (!have_orig_gen) {
+            orig_gen = cv->gen;
+            have_orig_gen = 1;
+        } else if (cv->gen != orig_gen) {
+            set_prewait = 0;
+            orig_gen = cv->gen;
+        }
+
+        if (!closed) {
+            /* We can now be admitted. */
+            ++cv->num_wait;
+            if (set_prewait) {
+                --cv->num_prewait;
+                set_prewait = 0;
+            }
+        } else if (!set_prewait) {
+            ++cv->num_prewait;
+            set_prewait = 1;
+        }
+
+        ossl_crypto_mutex_unlock(cv->int_m);
+
+        if (closed)
+            if (obj_wait(cv->prewait_sema, deadline) != WAIT_OBJECT_0) {
+                /*
+                 * If we got WAIT_OBJECT_0 we are safe - num_prewait has been
+                 * set to 0 and the semaphore has been consumed. On the other
+                 * hand if we timed out, there may be a residual posting that
+                 * was made just after we timed out. However in the worst case
+                 * this will just cause an internal spurious wakeup here in the
+                 * future, so we do not care too much about this. We treat
+                 * failure and timeout cases as the same, and simply exit in
+                 * this case.
+                 */
+                ossl_crypto_mutex_lock(cv->int_m);
+                if (set_prewait && cv->gen == orig_gen)
+                    --cv->num_prewait;
+                ossl_crypto_mutex_unlock(cv->int_m);
+                return;
+            }
+    } while (closed);
+
+    /*
+     * Unlock external mutex. Do not do this until we have been admitted, as we
+     * must guarantee we wake if broadcast is called at any time after ext_m is
+     * unlocked.
+     */
+    ossl_crypto_mutex_unlock(ext_m);
+
+    for (;;) {
+        /* Wait. */
+        rc = obj_wait(cv->sema, deadline);
+
+        /* Reacquire internal mutex and probe state. */
+        ossl_crypto_mutex_lock(cv->int_m);
+
+        if (cv->num_wake > 0) {
+            /*
+             * A wake token is available, so we can wake up. Consume the token
+             * and get out of here. We don't care what WaitForSingleObject
+             * returned here (e.g. if it timed out coincidentally). In the
+             * latter case a signal might be left in the semaphore which causes
+             * a future WaitForSingleObject call to return immediately, but in
+             * this case we will just loop again.
+             */
+            --cv->num_wake;
+            if (cv->num_wake == 0 && cv->closed) {
+                /*
+                 * We consumed the last wake token, so we can now open the
+                 * condition variable for new admissions.
+                 */
+                cv->closed = 0;
+                if (cv->num_prewait > 0) {
+                    ReleaseSemaphore(cv->prewait_sema, (LONG)cv->num_prewait, NULL);
+                    cv->num_prewait = 0;
+                    ++cv->gen;
+                }
+            }
+        } else if (rc == WAIT_OBJECT_0) {
+            /*
+             * We got a wakeup from the semaphore but we did not have any wake
+             * tokens. This ideally does not happen, but might if during a
+             * previous wait() call the semaphore is posted just after
+             * WaitForSingleObject returns due to a timeout (such that the
+             * num_wake > 0 case is taken above). Just spin again. (It is worth
+             * noting that repeated WaitForSingleObject calls is the only method
+             * documented for decrementing a Win32 semaphore, so this is
+             * basically the best possible strategy.)
+             */
+            ossl_crypto_mutex_unlock(cv->int_m);
+            continue;
+        } else {
+            /*
+             * Assume we timed out. The WaitForSingleObject call may also have
+             * failed for some other reason, which we treat as a timeout.
+             */
+            assert(cv->num_wait > 0);
+            --cv->num_wait;
+        }
+
+        break;
+    }
+
+    ossl_crypto_mutex_unlock(cv->int_m);
+    ossl_crypto_mutex_lock(ext_m);
+}
+
+void ossl_crypto_condvar_wait(CRYPTO_CONDVAR *cv, CRYPTO_MUTEX *ext_m)
 {
-    /* Not supported */
+    ossl_crypto_condvar_wait_timeout(cv, ext_m, ossl_time_infinite());
 }
 
-void ossl_crypto_condvar_signal(CRYPTO_CONDVAR *cv)
+void ossl_crypto_condvar_broadcast(CRYPTO_CONDVAR *cv_)
 {
-    HANDLE *cv_p = (HANDLE *)cv;
+    LEGACY_CONDVAR *cv = (LEGACY_CONDVAR *)cv_;
+    size_t num_wake;
+
+    ossl_crypto_mutex_lock(cv->int_m);
+
+    num_wake = cv->num_wait;
+    if (num_wake == 0) {
+        ossl_crypto_mutex_unlock(cv->int_m);
+        return;
+    }
 
-    SetEvent(cv_p);
+    cv->num_wake  += num_wake;
+    cv->num_wait  -= num_wake;
+    cv->closed     = 1;
+
+    ossl_crypto_mutex_unlock(cv->int_m);
+    ReleaseSemaphore(cv->sema, num_wake, NULL);
 }
 
-void ossl_crypto_condvar_free(CRYPTO_CONDVAR **cv)
+void ossl_crypto_condvar_signal(CRYPTO_CONDVAR *cv_)
 {
-    HANDLE **cv_p;
+    LEGACY_CONDVAR *cv = (LEGACY_CONDVAR *)cv_;
 
-    cv_p = (HANDLE **)cv;
-    if (*cv_p != NULL)
-        CloseHandle(*cv_p);
+    ossl_crypto_mutex_lock(cv->int_m);
 
-    *cv_p = NULL;
+    if (cv->num_wait == 0) {
+        ossl_crypto_mutex_unlock(cv->int_m);
+        return;
+    }
+
+    /*
+     * We do not close the condition variable when merely signalling, as there
+     * are no guaranteed fairness semantics here, unlike for a broadcast.
+     */
+    --cv->num_wait;
+    ++cv->num_wake;
+
+    ossl_crypto_mutex_unlock(cv->int_m);
+    ReleaseSemaphore(cv->sema, 1, NULL);
 }
 
 # else