Added an explicit yield (OP_SLEEP) to QUIC testing for cooperative threading.
[openssl.git] / crypto / async / async.c
1 /*
2  * Copyright 2015-2022 The OpenSSL Project Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License 2.0 (the "License").  You may not use
5  * this file except in compliance with the License.  You can obtain a copy
6  * in the file LICENSE in the source distribution or at
7  * https://www.openssl.org/source/license.html
8  */
9
10 /*
11  * Without this we start getting longjmp crashes because it thinks we're jumping
12  * up the stack when in fact we are jumping to an entirely different stack. The
13  * cost of this is not having certain buffer overrun/underrun checks etc for
14  * this source file :-(
15  */
16 #undef _FORTIFY_SOURCE
17
18 /* This must be the first #include file */
19 #include "async_local.h"
20
21 #include <openssl/err.h>
22 #include "crypto/cryptlib.h"
23 #include <string.h>
24
25 #define ASYNC_JOB_RUNNING   0
26 #define ASYNC_JOB_PAUSING   1
27 #define ASYNC_JOB_PAUSED    2
28 #define ASYNC_JOB_STOPPING  3
29
30 static CRYPTO_THREAD_LOCAL ctxkey;
31 static CRYPTO_THREAD_LOCAL poolkey;
32
33 static void async_delete_thread_state(void *arg);
34
35 static async_ctx *async_ctx_new(void)
36 {
37     async_ctx *nctx;
38
39     if (!ossl_init_thread_start(NULL, NULL, async_delete_thread_state))
40         return NULL;
41
42     nctx = OPENSSL_malloc(sizeof(*nctx));
43     if (nctx == NULL)
44         goto err;
45
46     async_fibre_init_dispatcher(&nctx->dispatcher);
47     nctx->currjob = NULL;
48     nctx->blocked = 0;
49     if (!CRYPTO_THREAD_set_local(&ctxkey, nctx))
50         goto err;
51
52     return nctx;
53 err:
54     OPENSSL_free(nctx);
55
56     return NULL;
57 }
58
59 async_ctx *async_get_ctx(void)
60 {
61     return (async_ctx *)CRYPTO_THREAD_get_local(&ctxkey);
62 }
63
64 static int async_ctx_free(void)
65 {
66     async_ctx *ctx;
67
68     ctx = async_get_ctx();
69
70     if (!CRYPTO_THREAD_set_local(&ctxkey, NULL))
71         return 0;
72
73     OPENSSL_free(ctx);
74
75     return 1;
76 }
77
78 static ASYNC_JOB *async_job_new(void)
79 {
80     ASYNC_JOB *job = NULL;
81
82     job = OPENSSL_zalloc(sizeof(*job));
83     if (job == NULL)
84         return NULL;
85
86     job->status = ASYNC_JOB_RUNNING;
87
88     return job;
89 }
90
91 static void async_job_free(ASYNC_JOB *job)
92 {
93     if (job != NULL) {
94         OPENSSL_free(job->funcargs);
95         async_fibre_free(&job->fibrectx);
96         OPENSSL_free(job);
97     }
98 }
99
100 static ASYNC_JOB *async_get_pool_job(void) {
101     ASYNC_JOB *job;
102     async_pool *pool;
103
104     pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
105     if (pool == NULL) {
106         /*
107          * Pool has not been initialised, so init with the defaults, i.e.
108          * no max size and no pre-created jobs
109          */
110         if (ASYNC_init_thread(0, 0) == 0)
111             return NULL;
112         pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
113     }
114
115     job = sk_ASYNC_JOB_pop(pool->jobs);
116     if (job == NULL) {
117         /* Pool is empty */
118         if ((pool->max_size != 0) && (pool->curr_size >= pool->max_size))
119             return NULL;
120
121         job = async_job_new();
122         if (job != NULL) {
123             if (! async_fibre_makecontext(&job->fibrectx)) {
124                 async_job_free(job);
125                 return NULL;
126             }
127             pool->curr_size++;
128         }
129     }
130     return job;
131 }
132
133 static void async_release_job(ASYNC_JOB *job) {
134     async_pool *pool;
135
136     pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
137     if (pool == NULL) {
138         ERR_raise(ERR_LIB_ASYNC, ERR_R_INTERNAL_ERROR);
139         return;
140     }
141     OPENSSL_free(job->funcargs);
142     job->funcargs = NULL;
143     sk_ASYNC_JOB_push(pool->jobs, job);
144 }
145
146 void async_start_func(void)
147 {
148     ASYNC_JOB *job;
149     async_ctx *ctx = async_get_ctx();
150
151     if (ctx == NULL) {
152         ERR_raise(ERR_LIB_ASYNC, ERR_R_INTERNAL_ERROR);
153         return;
154     }
155     while (1) {
156         /* Run the job */
157         job = ctx->currjob;
158         job->ret = job->func(job->funcargs);
159
160         /* Stop the job */
161         job->status = ASYNC_JOB_STOPPING;
162         if (!async_fibre_swapcontext(&job->fibrectx,
163                                      &ctx->dispatcher, 1)) {
164             /*
165              * Should not happen. Getting here will close the thread...can't do
166              * much about it
167              */
168             ERR_raise(ERR_LIB_ASYNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
169         }
170     }
171 }
172
173 int ASYNC_start_job(ASYNC_JOB **job, ASYNC_WAIT_CTX *wctx, int *ret,
174                     int (*func)(void *), void *args, size_t size)
175 {
176     async_ctx *ctx;
177     OSSL_LIB_CTX *libctx;
178
179     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
180         return ASYNC_ERR;
181
182     ctx = async_get_ctx();
183     if (ctx == NULL)
184         ctx = async_ctx_new();
185     if (ctx == NULL)
186         return ASYNC_ERR;
187
188     if (*job != NULL)
189         ctx->currjob = *job;
190
191     for (;;) {
192         if (ctx->currjob != NULL) {
193             if (ctx->currjob->status == ASYNC_JOB_STOPPING) {
194                 *ret = ctx->currjob->ret;
195                 ctx->currjob->waitctx = NULL;
196                 async_release_job(ctx->currjob);
197                 ctx->currjob = NULL;
198                 *job = NULL;
199                 return ASYNC_FINISH;
200             }
201
202             if (ctx->currjob->status == ASYNC_JOB_PAUSING) {
203                 *job = ctx->currjob;
204                 ctx->currjob->status = ASYNC_JOB_PAUSED;
205                 ctx->currjob = NULL;
206                 return ASYNC_PAUSE;
207             }
208
209             if (ctx->currjob->status == ASYNC_JOB_PAUSED) {
210                 if (*job == NULL)
211                     return ASYNC_ERR;
212                 ctx->currjob = *job;
213
214                 /*
215                  * Restore the default libctx to what it was the last time the
216                  * fibre ran
217                  */
218                 libctx = OSSL_LIB_CTX_set0_default(ctx->currjob->libctx);
219                 if (libctx == NULL) {
220                     /* Failed to set the default context */
221                     ERR_raise(ERR_LIB_ASYNC, ERR_R_INTERNAL_ERROR);
222                     goto err;
223                 }
224                 /* Resume previous job */
225                 if (!async_fibre_swapcontext(&ctx->dispatcher,
226                         &ctx->currjob->fibrectx, 1)) {
227                     ctx->currjob->libctx = OSSL_LIB_CTX_set0_default(libctx);
228                     ERR_raise(ERR_LIB_ASYNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
229                     goto err;
230                 }
231                 /*
232                  * In case the fibre changed the default libctx we set it back
233                  * again to what it was originally, and remember what it had
234                  * been changed to.
235                  */
236                 ctx->currjob->libctx = OSSL_LIB_CTX_set0_default(libctx);
237                 continue;
238             }
239
240             /* Should not happen */
241             ERR_raise(ERR_LIB_ASYNC, ERR_R_INTERNAL_ERROR);
242             async_release_job(ctx->currjob);
243             ctx->currjob = NULL;
244             *job = NULL;
245             return ASYNC_ERR;
246         }
247
248         /* Start a new job */
249         if ((ctx->currjob = async_get_pool_job()) == NULL)
250             return ASYNC_NO_JOBS;
251
252         if (args != NULL) {
253             ctx->currjob->funcargs = OPENSSL_malloc(size);
254             if (ctx->currjob->funcargs == NULL) {
255                 async_release_job(ctx->currjob);
256                 ctx->currjob = NULL;
257                 return ASYNC_ERR;
258             }
259             memcpy(ctx->currjob->funcargs, args, size);
260         } else {
261             ctx->currjob->funcargs = NULL;
262         }
263
264         ctx->currjob->func = func;
265         ctx->currjob->waitctx = wctx;
266         libctx = ossl_lib_ctx_get_concrete(NULL);
267         if (!async_fibre_swapcontext(&ctx->dispatcher,
268                 &ctx->currjob->fibrectx, 1)) {
269             ERR_raise(ERR_LIB_ASYNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
270             goto err;
271         }
272         /*
273          * In case the fibre changed the default libctx we set it back again
274          * to what it was, and remember what it had been changed to.
275          */
276         ctx->currjob->libctx = OSSL_LIB_CTX_set0_default(libctx);
277     }
278
279 err:
280     async_release_job(ctx->currjob);
281     ctx->currjob = NULL;
282     *job = NULL;
283     return ASYNC_ERR;
284 }
285
286 int ASYNC_pause_job(void)
287 {
288     ASYNC_JOB *job;
289     async_ctx *ctx = async_get_ctx();
290
291     if (ctx == NULL
292             || ctx->currjob == NULL
293             || ctx->blocked) {
294         /*
295          * Could be we've deliberately not been started within a job so this is
296          * counted as success.
297          */
298         return 1;
299     }
300
301     job = ctx->currjob;
302     job->status = ASYNC_JOB_PAUSING;
303
304     if (!async_fibre_swapcontext(&job->fibrectx,
305                                  &ctx->dispatcher, 1)) {
306         ERR_raise(ERR_LIB_ASYNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
307         return 0;
308     }
309     /* Reset counts of added and deleted fds */
310     async_wait_ctx_reset_counts(job->waitctx);
311
312     return 1;
313 }
314
315 static void async_empty_pool(async_pool *pool)
316 {
317     ASYNC_JOB *job;
318
319     if (pool == NULL || pool->jobs == NULL)
320         return;
321
322     do {
323         job = sk_ASYNC_JOB_pop(pool->jobs);
324         async_job_free(job);
325     } while (job);
326 }
327
328 int async_init(void)
329 {
330     if (!CRYPTO_THREAD_init_local(&ctxkey, NULL))
331         return 0;
332
333     if (!CRYPTO_THREAD_init_local(&poolkey, NULL)) {
334         CRYPTO_THREAD_cleanup_local(&ctxkey);
335         return 0;
336     }
337
338     return async_local_init();
339 }
340
341 void async_deinit(void)
342 {
343     CRYPTO_THREAD_cleanup_local(&ctxkey);
344     CRYPTO_THREAD_cleanup_local(&poolkey);
345     async_local_deinit();
346 }
347
348 int ASYNC_init_thread(size_t max_size, size_t init_size)
349 {
350     async_pool *pool;
351     size_t curr_size = 0;
352
353     if (init_size > max_size) {
354         ERR_raise(ERR_LIB_ASYNC, ASYNC_R_INVALID_POOL_SIZE);
355         return 0;
356     }
357
358     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
359         return 0;
360
361     if (!ossl_init_thread_start(NULL, NULL, async_delete_thread_state))
362         return 0;
363
364     pool = OPENSSL_zalloc(sizeof(*pool));
365     if (pool == NULL)
366         return 0;
367
368     pool->jobs = sk_ASYNC_JOB_new_reserve(NULL, init_size);
369     if (pool->jobs == NULL) {
370         ERR_raise(ERR_LIB_ASYNC, ERR_R_CRYPTO_LIB);
371         OPENSSL_free(pool);
372         return 0;
373     }
374
375     pool->max_size = max_size;
376
377     /* Pre-create jobs as required */
378     while (init_size--) {
379         ASYNC_JOB *job;
380         job = async_job_new();
381         if (job == NULL || !async_fibre_makecontext(&job->fibrectx)) {
382             /*
383              * Not actually fatal because we already created the pool, just
384              * skip creation of any more jobs
385              */
386             async_job_free(job);
387             break;
388         }
389         job->funcargs = NULL;
390         sk_ASYNC_JOB_push(pool->jobs, job); /* Cannot fail due to reserve */
391         curr_size++;
392     }
393     pool->curr_size = curr_size;
394     if (!CRYPTO_THREAD_set_local(&poolkey, pool)) {
395         ERR_raise(ERR_LIB_ASYNC, ASYNC_R_FAILED_TO_SET_POOL);
396         goto err;
397     }
398
399     return 1;
400 err:
401     async_empty_pool(pool);
402     sk_ASYNC_JOB_free(pool->jobs);
403     OPENSSL_free(pool);
404     return 0;
405 }
406
407 static void async_delete_thread_state(void *arg)
408 {
409     async_pool *pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
410
411     if (pool != NULL) {
412         async_empty_pool(pool);
413         sk_ASYNC_JOB_free(pool->jobs);
414         OPENSSL_free(pool);
415         CRYPTO_THREAD_set_local(&poolkey, NULL);
416     }
417     async_local_cleanup();
418     async_ctx_free();
419 }
420
421 void ASYNC_cleanup_thread(void)
422 {
423     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
424         return;
425
426     async_delete_thread_state(NULL);
427 }
428
429 ASYNC_JOB *ASYNC_get_current_job(void)
430 {
431     async_ctx *ctx;
432
433     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
434         return NULL;
435
436     ctx = async_get_ctx();
437     if (ctx == NULL)
438         return NULL;
439
440     return ctx->currjob;
441 }
442
443 ASYNC_WAIT_CTX *ASYNC_get_wait_ctx(ASYNC_JOB *job)
444 {
445     return job->waitctx;
446 }
447
448 void ASYNC_block_pause(void)
449 {
450     async_ctx *ctx;
451
452     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
453         return;
454
455     ctx = async_get_ctx();
456     if (ctx == NULL || ctx->currjob == NULL) {
457         /*
458          * We're not in a job anyway so ignore this
459          */
460         return;
461     }
462     ctx->blocked++;
463 }
464
465 void ASYNC_unblock_pause(void)
466 {
467     async_ctx *ctx;
468
469     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
470         return;
471
472     ctx = async_get_ctx();
473     if (ctx == NULL || ctx->currjob == NULL) {
474         /*
475          * We're not in a job anyway so ignore this
476          */
477         return;
478     }
479     if (ctx->blocked > 0)
480         ctx->blocked--;
481 }