* ====================================================================
*/
-#include <openssl/crypto.h>
+/*
+ * Without this we start getting longjmp crashes because it thinks we're jumping
+ * up the stack when in fact we are jumping to an entirely different stack. The
+ * cost of this is not having certain buffer overrun/underrun checks etc for
+ * this source file :-(
+ */
+#undef _FORTIFY_SOURCE
+
+#include <openssl/err.h>
#include <openssl/async.h>
#include <string.h>
#include "async_locl.h"
#define ASYNC_JOB_PAUSED 2
#define ASYNC_JOB_STOPPING 3
-static size_t pool_max_size = 0;
-static size_t curr_size = 0;
-
-DECLARE_STACK_OF(ASYNC_JOB)
-static STACK_OF(ASYNC_JOB) *pool = NULL;
-
-
-static ASYNC_CTX *ASYNC_CTX_new(void)
+static async_ctx *async_ctx_new(void)
{
- ASYNC_CTX *nctx = NULL;
+ async_ctx *nctx = NULL;
- if(!(nctx = OPENSSL_malloc(sizeof (ASYNC_CTX)))) {
- /* Error here */
+ nctx = OPENSSL_malloc(sizeof (async_ctx));
+ if (nctx == NULL) {
+ ASYNCerr(ASYNC_F_ASYNC_CTX_NEW, ERR_R_MALLOC_FAILURE);
goto err;
}
- ASYNC_FIBRE_init_dispatcher(&nctx->dispatcher);
+ async_fibre_init_dispatcher(&nctx->dispatcher);
nctx->currjob = NULL;
- if(!ASYNC_set_ctx(nctx))
+ nctx->blocked = 0;
+ if (!async_set_ctx(nctx))
goto err;
return nctx;
err:
- if(nctx) {
- OPENSSL_free(nctx);
- }
+ OPENSSL_free(nctx);
return NULL;
}
-static int ASYNC_CTX_free(void)
+static int async_ctx_free(void)
{
- if(ASYNC_get_ctx()) {
- OPENSSL_free(ASYNC_get_ctx());
- }
+ async_ctx *ctx;
+
+ ctx = async_get_ctx();
- if(!ASYNC_set_ctx(NULL))
+ if (!async_set_ctx(NULL))
return 0;
+ OPENSSL_free(ctx);
+
return 1;
}
-static ASYNC_JOB *ASYNC_JOB_new(void)
+static ASYNC_JOB *async_job_new(void)
{
ASYNC_JOB *job = NULL;
+ int pipefds[2];
- if(!(job = OPENSSL_malloc(sizeof (ASYNC_JOB)))) {
+ job = OPENSSL_malloc(sizeof (ASYNC_JOB));
+ if (job == NULL) {
+ ASYNCerr(ASYNC_F_ASYNC_JOB_NEW, ERR_R_MALLOC_FAILURE);
return NULL;
}
+ if (!async_pipe(pipefds)) {
+ OPENSSL_free(job);
+ ASYNCerr(ASYNC_F_ASYNC_JOB_NEW, ASYNC_R_CANNOT_CREATE_WAIT_PIPE);
+ return NULL;
+ }
+
+ job->wake_set = 0;
+ job->wait_fd = pipefds[0];
+ job->wake_fd = pipefds[1];
+
job->status = ASYNC_JOB_RUNNING;
job->funcargs = NULL;
return job;
}
-static void ASYNC_JOB_free(ASYNC_JOB *job)
+static void async_job_free(ASYNC_JOB *job)
{
- if(job) {
- if(job->funcargs)
- OPENSSL_free(job->funcargs);
- ASYNC_FIBRE_free(&job->fibrectx);
+ if (job != NULL) {
+ OPENSSL_free(job->funcargs);
+ async_fibre_free(&job->fibrectx);
OPENSSL_free(job);
}
}
static ASYNC_JOB *async_get_pool_job(void) {
ASYNC_JOB *job;
+ STACK_OF(ASYNC_JOB) *pool;
+ pool = async_get_pool();
if (pool == NULL) {
/*
* Pool has not been initialised, so init with the defaults, i.e.
- * global pool, with no max size and no pre-created jobs
+ * no max size and no pre-created jobs
*/
- if (ASYNC_init_pool(0, 0, 0) == 0)
+ if (ASYNC_init_pool(0, 0) == 0)
return NULL;
+ pool = async_get_pool();
}
job = sk_ASYNC_JOB_pop(pool);
if (job == NULL) {
/* Pool is empty */
- if (pool_max_size && curr_size >= pool_max_size) {
- /* Pool is at max size. We cannot continue */
+ if (!async_pool_can_grow())
return NULL;
- }
- job = ASYNC_JOB_new();
+
+ job = async_job_new();
if (job) {
- ASYNC_FIBRE_makecontext(&job->fibrectx);
- curr_size++;
+ async_fibre_makecontext(&job->fibrectx);
+ async_increment_pool_size();
}
}
return job;
}
static void async_release_job(ASYNC_JOB *job) {
- if(job->funcargs)
- OPENSSL_free(job->funcargs);
+ OPENSSL_free(job->funcargs);
job->funcargs = NULL;
/* Ignore error return */
- sk_ASYNC_JOB_push(pool, job);
+ async_release_job_to_pool(job);
}
-void ASYNC_start_func(void)
+void async_start_func(void)
{
ASYNC_JOB *job;
while (1) {
/* Run the job */
- job = ASYNC_get_ctx()->currjob;
+ job = async_get_ctx()->currjob;
job->ret = job->func(job->funcargs);
/* Stop the job */
job->status = ASYNC_JOB_STOPPING;
- if(!ASYNC_FIBRE_swapcontext(&job->fibrectx,
- &ASYNC_get_ctx()->dispatcher, 1)) {
+ if (!async_fibre_swapcontext(&job->fibrectx,
+ &async_get_ctx()->dispatcher, 1)) {
/*
- * Should not happen. Getting here will close the thread...can't do much
- * about it
+ * Should not happen. Getting here will close the thread...can't do
+ * much about it
*/
+ ASYNCerr(ASYNC_F_ASYNC_START_FUNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
}
}
}
int ASYNC_start_job(ASYNC_JOB **job, int *ret, int (*func)(void *),
void *args, size_t size)
{
- if(ASYNC_get_ctx() || !ASYNC_CTX_new()) {
+ if (async_get_ctx() == NULL && async_ctx_new() == NULL) {
return ASYNC_ERR;
}
- if(*job) {
- ASYNC_get_ctx()->currjob = *job;
+ if (*job) {
+ async_get_ctx()->currjob = *job;
}
for (;;) {
- if(ASYNC_get_ctx()->currjob) {
- if(ASYNC_get_ctx()->currjob->status == ASYNC_JOB_STOPPING) {
- *ret = ASYNC_get_ctx()->currjob->ret;
- async_release_job(ASYNC_get_ctx()->currjob);
- ASYNC_get_ctx()->currjob = NULL;
+ if (async_get_ctx()->currjob != NULL) {
+ if (async_get_ctx()->currjob->status == ASYNC_JOB_STOPPING) {
+ *ret = async_get_ctx()->currjob->ret;
+ async_release_job(async_get_ctx()->currjob);
+ async_get_ctx()->currjob = NULL;
*job = NULL;
- ASYNC_CTX_free();
return ASYNC_FINISH;
}
- if(ASYNC_get_ctx()->currjob->status == ASYNC_JOB_PAUSING) {
- *job = ASYNC_get_ctx()->currjob;
- ASYNC_get_ctx()->currjob->status = ASYNC_JOB_PAUSED;
- ASYNC_CTX_free();
+ if (async_get_ctx()->currjob->status == ASYNC_JOB_PAUSING) {
+ *job = async_get_ctx()->currjob;
+ async_get_ctx()->currjob->status = ASYNC_JOB_PAUSED;
+ async_get_ctx()->currjob = NULL;
return ASYNC_PAUSE;
}
- if(ASYNC_get_ctx()->currjob->status == ASYNC_JOB_PAUSED) {
- ASYNC_get_ctx()->currjob = *job;
+ if (async_get_ctx()->currjob->status == ASYNC_JOB_PAUSED) {
+ async_get_ctx()->currjob = *job;
/* Resume previous job */
- if(!ASYNC_FIBRE_swapcontext(&ASYNC_get_ctx()->dispatcher,
- &ASYNC_get_ctx()->currjob->fibrectx, 1))
+ if (!async_fibre_swapcontext(&async_get_ctx()->dispatcher,
+ &async_get_ctx()->currjob->fibrectx, 1)) {
+ ASYNCerr(ASYNC_F_ASYNC_START_JOB,
+ ASYNC_R_FAILED_TO_SWAP_CONTEXT);
goto err;
+ }
continue;
}
/* Should not happen */
- async_release_job(ASYNC_get_ctx()->currjob);
- ASYNC_get_ctx()->currjob = NULL;
+ ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_INTERNAL_ERROR);
+ async_release_job(async_get_ctx()->currjob);
+ async_get_ctx()->currjob = NULL;
*job = NULL;
- ASYNC_CTX_free();
return ASYNC_ERR;
}
/* Start a new job */
- if(!(ASYNC_get_ctx()->currjob = async_get_pool_job())) {
- ASYNC_CTX_free();
+ if ((async_get_ctx()->currjob = async_get_pool_job()) == NULL) {
return ASYNC_NO_JOBS;
}
- if(args != NULL) {
- ASYNC_get_ctx()->currjob->funcargs = OPENSSL_malloc(size);
- if(!ASYNC_get_ctx()->currjob->funcargs) {
- async_release_job(ASYNC_get_ctx()->currjob);
- ASYNC_get_ctx()->currjob = NULL;
- ASYNC_CTX_free();
+ if (args != NULL) {
+ async_get_ctx()->currjob->funcargs = OPENSSL_malloc(size);
+ if (async_get_ctx()->currjob->funcargs == NULL) {
+ ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_MALLOC_FAILURE);
+ async_release_job(async_get_ctx()->currjob);
+ async_get_ctx()->currjob = NULL;
return ASYNC_ERR;
}
- memcpy(ASYNC_get_ctx()->currjob->funcargs, args, size);
+ memcpy(async_get_ctx()->currjob->funcargs, args, size);
} else {
- ASYNC_get_ctx()->currjob->funcargs = NULL;
+ async_get_ctx()->currjob->funcargs = NULL;
}
- ASYNC_get_ctx()->currjob->func = func;
- if(!ASYNC_FIBRE_swapcontext(&ASYNC_get_ctx()->dispatcher,
- &ASYNC_get_ctx()->currjob->fibrectx, 1))
+ async_get_ctx()->currjob->func = func;
+ if (!async_fibre_swapcontext(&async_get_ctx()->dispatcher,
+ &async_get_ctx()->currjob->fibrectx, 1)) {
+ ASYNCerr(ASYNC_F_ASYNC_START_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
goto err;
+ }
}
err:
- async_release_job(ASYNC_get_ctx()->currjob);
- ASYNC_get_ctx()->currjob = NULL;
+ async_release_job(async_get_ctx()->currjob);
+ async_get_ctx()->currjob = NULL;
*job = NULL;
- ASYNC_CTX_free();
return ASYNC_ERR;
}
{
ASYNC_JOB *job;
- if(!ASYNC_get_ctx() || !ASYNC_get_ctx()->currjob)
- return 0;
+ if (async_get_ctx() == NULL
+ || async_get_ctx()->currjob == NULL
+ || async_get_ctx()->blocked) {
+ /*
+ * Could be we've deliberately not been started within a job so this is
+ * counted as success.
+ */
+ return 1;
+ }
- job = ASYNC_get_ctx()->currjob;
+ job = async_get_ctx()->currjob;
job->status = ASYNC_JOB_PAUSING;
- if(!ASYNC_FIBRE_swapcontext(&job->fibrectx,
- &ASYNC_get_ctx()->dispatcher, 1)) {
- /* Error */
+ if (!async_fibre_swapcontext(&job->fibrectx,
+ &async_get_ctx()->dispatcher, 1)) {
+ ASYNCerr(ASYNC_F_ASYNC_PAUSE_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
return 0;
}
return 1;
}
-int ASYNC_in_job(void)
+static void async_empty_pool(STACK_OF(ASYNC_JOB) *pool)
{
- if(ASYNC_get_ctx())
- return 1;
+ ASYNC_JOB *job;
- return 0;
+ do {
+ job = sk_ASYNC_JOB_pop(pool);
+ async_job_free(job);
+ } while (job);
}
-int ASYNC_init_pool(unsigned int local, size_t max_size, size_t init_size)
+int ASYNC_init_pool(size_t max_size, size_t init_size)
{
- if (local != 0) {
- /* We only support a global pool so far */
+ STACK_OF(ASYNC_JOB) *pool;
+ size_t curr_size = 0;
+
+ if (init_size > max_size) {
+ ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ASYNC_R_INVALID_POOL_SIZE);
return 0;
}
- pool_max_size = max_size;
pool = sk_ASYNC_JOB_new_null();
if (pool == NULL) {
+ ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ERR_R_MALLOC_FAILURE);
return 0;
}
/* Pre-create jobs as required */
while (init_size) {
ASYNC_JOB *job;
- job = ASYNC_JOB_new();
+ job = async_job_new();
if (job) {
- ASYNC_FIBRE_makecontext(&job->fibrectx);
+ async_fibre_makecontext(&job->fibrectx);
job->funcargs = NULL;
sk_ASYNC_JOB_push(pool, job);
curr_size++;
}
}
+ if (!async_set_pool(pool, curr_size, max_size)) {
+ ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ASYNC_R_FAILED_TO_SET_POOL);
+ async_empty_pool(pool);
+ sk_ASYNC_JOB_free(pool);
+ return 0;
+ }
+
return 1;
}
void ASYNC_free_pool(void)
{
- ASYNC_JOB *job;
+ STACK_OF(ASYNC_JOB) *pool;
- do {
- job = sk_ASYNC_JOB_pop(pool);
- ASYNC_JOB_free(job);
- } while (job);
- sk_ASYNC_JOB_free(pool);
+ pool = async_get_pool();
+ if (pool == NULL)
+ return;
+
+ async_empty_pool(pool);
+ async_release_pool();
+ async_ctx_free();
+}
+
+ASYNC_JOB *ASYNC_get_current_job(void)
+{
+ async_ctx *ctx;
+
+ ctx = async_get_ctx();
+ if(ctx == NULL)
+ return NULL;
+
+ return ctx->currjob;
+}
+
+int ASYNC_get_wait_fd(ASYNC_JOB *job)
+{
+ return job->wait_fd;
+}
+
+void ASYNC_wake(ASYNC_JOB *job)
+{
+ char dummy = 0;
+
+ if (job->wake_set)
+ return;
+ async_write1(job->wake_fd, &dummy);
+ job->wake_set = 1;
+}
+
+void ASYNC_clear_wake(ASYNC_JOB *job)
+{
+ char dummy = 0;
+ if (!job->wake_set)
+ return;
+ async_read1(job->wait_fd, &dummy);
+ job->wake_set = 0;
+}
+
+void ASYNC_block_pause(void)
+{
+ if (async_get_ctx() == NULL
+ || async_get_ctx()->currjob == NULL) {
+ /*
+ * We're not in a job anyway so ignore this
+ */
+ return;
+ }
+ async_get_ctx()->blocked++;
+}
+
+void ASYNC_unblock_pause(void)
+{
+ if (async_get_ctx() == NULL
+ || async_get_ctx()->currjob == NULL) {
+ /*
+ * We're not in a job anyway so ignore this
+ */
+ return;
+ }
+ if(async_get_ctx()->blocked > 0)
+ async_get_ctx()->blocked--;
}