Convert thread stop handling into a publish/subscribe model
[openssl.git] / crypto / async / async.c
1 /*
2  * Copyright 2015-2018 The OpenSSL Project Authors. All Rights Reserved.
3  *
4  * Licensed under the Apache License 2.0 (the "License").  You may not use
5  * this file except in compliance with the License.  You can obtain a copy
6  * in the file LICENSE in the source distribution or at
7  * https://www.openssl.org/source/license.html
8  */
9
10 /*
11  * Without this we start getting longjmp crashes because it thinks we're jumping
12  * up the stack when in fact we are jumping to an entirely different stack. The
13  * cost of this is not having certain buffer overrun/underrun checks etc for
14  * this source file :-(
15  */
16 #undef _FORTIFY_SOURCE
17
18 /* This must be the first #include file */
19 #include "async_locl.h"
20
21 #include <openssl/err.h>
22 #include "internal/cryptlib_int.h"
23 #include <string.h>
24
25 #define ASYNC_JOB_RUNNING   0
26 #define ASYNC_JOB_PAUSING   1
27 #define ASYNC_JOB_PAUSED    2
28 #define ASYNC_JOB_STOPPING  3
29
30 static CRYPTO_THREAD_LOCAL ctxkey;
31 static CRYPTO_THREAD_LOCAL poolkey;
32
33 static void async_delete_thread_state(OPENSSL_CTX *ctx);
34
35 static async_ctx *async_ctx_new(void)
36 {
37     async_ctx *nctx;
38
39     if (!ossl_init_thread_start(NULL, async_delete_thread_state))
40         return NULL;
41
42     nctx = OPENSSL_malloc(sizeof(*nctx));
43     if (nctx == NULL) {
44         ASYNCerr(ASYNC_F_ASYNC_CTX_NEW, ERR_R_MALLOC_FAILURE);
45         goto err;
46     }
47
48     async_fibre_init_dispatcher(&nctx->dispatcher);
49     nctx->currjob = NULL;
50     nctx->blocked = 0;
51     if (!CRYPTO_THREAD_set_local(&ctxkey, nctx))
52         goto err;
53
54     return nctx;
55 err:
56     OPENSSL_free(nctx);
57
58     return NULL;
59 }
60
61 async_ctx *async_get_ctx(void)
62 {
63     return (async_ctx *)CRYPTO_THREAD_get_local(&ctxkey);
64 }
65
66 static int async_ctx_free(void)
67 {
68     async_ctx *ctx;
69
70     ctx = async_get_ctx();
71
72     if (!CRYPTO_THREAD_set_local(&ctxkey, NULL))
73         return 0;
74
75     OPENSSL_free(ctx);
76
77     return 1;
78 }
79
80 static ASYNC_JOB *async_job_new(void)
81 {
82     ASYNC_JOB *job = NULL;
83
84     job = OPENSSL_zalloc(sizeof(*job));
85     if (job == NULL) {
86         ASYNCerr(ASYNC_F_ASYNC_JOB_NEW, ERR_R_MALLOC_FAILURE);
87         return NULL;
88     }
89
90     job->status = ASYNC_JOB_RUNNING;
91
92     return job;
93 }
94
95 static void async_job_free(ASYNC_JOB *job)
96 {
97     if (job != NULL) {
98         OPENSSL_free(job->funcargs);
99         async_fibre_free(&job->fibrectx);
100         OPENSSL_free(job);
101     }
102 }
103
104 static ASYNC_JOB *async_get_pool_job(void) {
105     ASYNC_JOB *job;
106     async_pool *pool;
107
108     pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
109     if (pool == NULL) {
110         /*
111          * Pool has not been initialised, so init with the defaults, i.e.
112          * no max size and no pre-created jobs
113          */
114         if (ASYNC_init_thread(0, 0) == 0)
115             return NULL;
116         pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
117     }
118
119     job = sk_ASYNC_JOB_pop(pool->jobs);
120     if (job == NULL) {
121         /* Pool is empty */
122         if ((pool->max_size != 0) && (pool->curr_size >= pool->max_size))
123             return NULL;
124
125         job = async_job_new();
126         if (job != NULL) {
127             if (! async_fibre_makecontext(&job->fibrectx)) {
128                 async_job_free(job);
129                 return NULL;
130             }
131             pool->curr_size++;
132         }
133     }
134     return job;
135 }
136
137 static void async_release_job(ASYNC_JOB *job) {
138     async_pool *pool;
139
140     pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
141     OPENSSL_free(job->funcargs);
142     job->funcargs = NULL;
143     sk_ASYNC_JOB_push(pool->jobs, job);
144 }
145
146 void async_start_func(void)
147 {
148     ASYNC_JOB *job;
149     async_ctx *ctx = async_get_ctx();
150
151     while (1) {
152         /* Run the job */
153         job = ctx->currjob;
154         job->ret = job->func(job->funcargs);
155
156         /* Stop the job */
157         job->status = ASYNC_JOB_STOPPING;
158         if (!async_fibre_swapcontext(&job->fibrectx,
159                                      &ctx->dispatcher, 1)) {
160             /*
161              * Should not happen. Getting here will close the thread...can't do
162              * much about it
163              */
164             ASYNCerr(ASYNC_F_ASYNC_START_FUNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
165         }
166     }
167 }
168
169 int ASYNC_start_job(ASYNC_JOB **job, ASYNC_WAIT_CTX *wctx, int *ret,
170                     int (*func)(void *), void *args, size_t size)
171 {
172     async_ctx *ctx;
173
174     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
175         return ASYNC_ERR;
176
177     ctx = async_get_ctx();
178     if (ctx == NULL)
179         ctx = async_ctx_new();
180     if (ctx == NULL)
181         return ASYNC_ERR;
182
183     if (*job)
184         ctx->currjob = *job;
185
186     for (;;) {
187         if (ctx->currjob != NULL) {
188             if (ctx->currjob->status == ASYNC_JOB_STOPPING) {
189                 *ret = ctx->currjob->ret;
190                 ctx->currjob->waitctx = NULL;
191                 async_release_job(ctx->currjob);
192                 ctx->currjob = NULL;
193                 *job = NULL;
194                 return ASYNC_FINISH;
195             }
196
197             if (ctx->currjob->status == ASYNC_JOB_PAUSING) {
198                 *job = ctx->currjob;
199                 ctx->currjob->status = ASYNC_JOB_PAUSED;
200                 ctx->currjob = NULL;
201                 return ASYNC_PAUSE;
202             }
203
204             if (ctx->currjob->status == ASYNC_JOB_PAUSED) {
205                 ctx->currjob = *job;
206                 /* Resume previous job */
207                 if (!async_fibre_swapcontext(&ctx->dispatcher,
208                         &ctx->currjob->fibrectx, 1)) {
209                     ASYNCerr(ASYNC_F_ASYNC_START_JOB,
210                              ASYNC_R_FAILED_TO_SWAP_CONTEXT);
211                     goto err;
212                 }
213                 continue;
214             }
215
216             /* Should not happen */
217             ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_INTERNAL_ERROR);
218             async_release_job(ctx->currjob);
219             ctx->currjob = NULL;
220             *job = NULL;
221             return ASYNC_ERR;
222         }
223
224         /* Start a new job */
225         if ((ctx->currjob = async_get_pool_job()) == NULL)
226             return ASYNC_NO_JOBS;
227
228         if (args != NULL) {
229             ctx->currjob->funcargs = OPENSSL_malloc(size);
230             if (ctx->currjob->funcargs == NULL) {
231                 ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_MALLOC_FAILURE);
232                 async_release_job(ctx->currjob);
233                 ctx->currjob = NULL;
234                 return ASYNC_ERR;
235             }
236             memcpy(ctx->currjob->funcargs, args, size);
237         } else {
238             ctx->currjob->funcargs = NULL;
239         }
240
241         ctx->currjob->func = func;
242         ctx->currjob->waitctx = wctx;
243         if (!async_fibre_swapcontext(&ctx->dispatcher,
244                 &ctx->currjob->fibrectx, 1)) {
245             ASYNCerr(ASYNC_F_ASYNC_START_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
246             goto err;
247         }
248     }
249
250 err:
251     async_release_job(ctx->currjob);
252     ctx->currjob = NULL;
253     *job = NULL;
254     return ASYNC_ERR;
255 }
256
257 int ASYNC_pause_job(void)
258 {
259     ASYNC_JOB *job;
260     async_ctx *ctx = async_get_ctx();
261
262     if (ctx == NULL
263             || ctx->currjob == NULL
264             || ctx->blocked) {
265         /*
266          * Could be we've deliberately not been started within a job so this is
267          * counted as success.
268          */
269         return 1;
270     }
271
272     job = ctx->currjob;
273     job->status = ASYNC_JOB_PAUSING;
274
275     if (!async_fibre_swapcontext(&job->fibrectx,
276                                  &ctx->dispatcher, 1)) {
277         ASYNCerr(ASYNC_F_ASYNC_PAUSE_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
278         return 0;
279     }
280     /* Reset counts of added and deleted fds */
281     async_wait_ctx_reset_counts(job->waitctx);
282
283     return 1;
284 }
285
286 static void async_empty_pool(async_pool *pool)
287 {
288     ASYNC_JOB *job;
289
290     if (!pool || !pool->jobs)
291         return;
292
293     do {
294         job = sk_ASYNC_JOB_pop(pool->jobs);
295         async_job_free(job);
296     } while (job);
297 }
298
299 int async_init(void)
300 {
301     if (!CRYPTO_THREAD_init_local(&ctxkey, NULL))
302         return 0;
303
304     if (!CRYPTO_THREAD_init_local(&poolkey, NULL)) {
305         CRYPTO_THREAD_cleanup_local(&ctxkey);
306         return 0;
307     }
308
309     return 1;
310 }
311
312 void async_deinit(void)
313 {
314     CRYPTO_THREAD_cleanup_local(&ctxkey);
315     CRYPTO_THREAD_cleanup_local(&poolkey);
316 }
317
318 int ASYNC_init_thread(size_t max_size, size_t init_size)
319 {
320     async_pool *pool;
321     size_t curr_size = 0;
322
323     if (init_size > max_size) {
324         ASYNCerr(ASYNC_F_ASYNC_INIT_THREAD, ASYNC_R_INVALID_POOL_SIZE);
325         return 0;
326     }
327
328     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
329         return 0;
330
331     if (!ossl_init_thread_start(NULL, async_delete_thread_state))
332         return 0;
333
334     pool = OPENSSL_zalloc(sizeof(*pool));
335     if (pool == NULL) {
336         ASYNCerr(ASYNC_F_ASYNC_INIT_THREAD, ERR_R_MALLOC_FAILURE);
337         return 0;
338     }
339
340     pool->jobs = sk_ASYNC_JOB_new_reserve(NULL, init_size);
341     if (pool->jobs == NULL) {
342         ASYNCerr(ASYNC_F_ASYNC_INIT_THREAD, ERR_R_MALLOC_FAILURE);
343         OPENSSL_free(pool);
344         return 0;
345     }
346
347     pool->max_size = max_size;
348
349     /* Pre-create jobs as required */
350     while (init_size--) {
351         ASYNC_JOB *job;
352         job = async_job_new();
353         if (job == NULL || !async_fibre_makecontext(&job->fibrectx)) {
354             /*
355              * Not actually fatal because we already created the pool, just
356              * skip creation of any more jobs
357              */
358             async_job_free(job);
359             break;
360         }
361         job->funcargs = NULL;
362         sk_ASYNC_JOB_push(pool->jobs, job); /* Cannot fail due to reserve */
363         curr_size++;
364     }
365     pool->curr_size = curr_size;
366     if (!CRYPTO_THREAD_set_local(&poolkey, pool)) {
367         ASYNCerr(ASYNC_F_ASYNC_INIT_THREAD, ASYNC_R_FAILED_TO_SET_POOL);
368         goto err;
369     }
370
371     return 1;
372 err:
373     async_empty_pool(pool);
374     sk_ASYNC_JOB_free(pool->jobs);
375     OPENSSL_free(pool);
376     return 0;
377 }
378
379 /* OPENSSL_CTX ignored for now */
380 static void async_delete_thread_state(OPENSSL_CTX *ctx)
381 {
382     async_pool *pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
383
384     if (pool != NULL) {
385         async_empty_pool(pool);
386         sk_ASYNC_JOB_free(pool->jobs);
387         OPENSSL_free(pool);
388         CRYPTO_THREAD_set_local(&poolkey, NULL);
389     }
390     async_local_cleanup();
391     async_ctx_free();
392 }
393
394 void ASYNC_cleanup_thread(void)
395 {
396     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
397         return;
398
399     async_delete_thread_state(NULL);
400 }
401
402 ASYNC_JOB *ASYNC_get_current_job(void)
403 {
404     async_ctx *ctx;
405
406     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
407         return NULL;
408
409     ctx = async_get_ctx();
410     if (ctx == NULL)
411         return NULL;
412
413     return ctx->currjob;
414 }
415
416 ASYNC_WAIT_CTX *ASYNC_get_wait_ctx(ASYNC_JOB *job)
417 {
418     return job->waitctx;
419 }
420
421 void ASYNC_block_pause(void)
422 {
423     async_ctx *ctx;
424
425     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
426         return;
427
428     ctx = async_get_ctx();
429     if (ctx == NULL || ctx->currjob == NULL) {
430         /*
431          * We're not in a job anyway so ignore this
432          */
433         return;
434     }
435     ctx->blocked++;
436 }
437
438 void ASYNC_unblock_pause(void)
439 {
440     async_ctx *ctx;
441
442     if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
443         return;
444
445     ctx = async_get_ctx();
446     if (ctx == NULL || ctx->currjob == NULL) {
447         /*
448          * We're not in a job anyway so ignore this
449          */
450         return;
451     }
452     if (ctx->blocked > 0)
453         ctx->blocked--;
454 }