simplified threapool pthread implementation

This commit is contained in:
Yann Collet
2024-10-07 09:01:48 -07:00
parent c11bf7733b
commit 21b24c6680

View File

@@ -210,221 +210,122 @@ void TPool_jobsCompleted(TPool* pool)
#else
/* pthread availability assumed */
#include <stdlib.h> /* malloc, free */
#include <pthread.h> /* pthread_* */
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
/* A job is just a function with an opaque argument */
typedef struct TPool_job_s {
void (*job_function)(void*);
void (*function)(void*);
void *arg;
} TPool_job;
struct TPool_s {
pthread_t* threads;
size_t threadCapacity;
size_t threadLimit;
/* The queue is a circular buffer */
TPool_job* queue;
size_t queueHead;
size_t queueTail;
size_t queueSize;
/* The number of threads working on jobs */
size_t numThreadsBusy;
/* Indicates if the queue is empty */
int queueEmpty;
/* The mutex protects the queue */
pthread_mutex_t queueMutex;
/* Condition variable for pushers to wait on when the queue is full */
pthread_cond_t queuePushCond;
/* Condition variables for poppers to wait on when the queue is empty */
pthread_cond_t queuePopCond;
/* Indicates if the queue is shutting down */
typedef struct TPool_s {
pthread_t *threads;
TPool_job *queue;
size_t thread_count, thread_limit, active_threads;
size_t queue_size, queue_head, queue_tail, jobs_in_queue;
pthread_mutex_t mutex;
pthread_cond_t cond;
int shutdown;
};
} TPool;
static void TPool_shutdown(TPool* ctx);
void TPool_free(TPool* ctx) {
if (!ctx) { return; }
TPool_shutdown(ctx);
pthread_mutex_destroy(&ctx->queueMutex);
pthread_cond_destroy(&ctx->queuePushCond);
pthread_cond_destroy(&ctx->queuePopCond);
free(ctx->queue);
free(ctx->threads);
free(ctx);
}
static void* TPool_thread(void* opaque);
TPool* TPool_create(int nbThreads, int queueSize)
{
TPool* ctx;
/* Check parameters */
if (nbThreads<1 || queueSize<1) { return NULL; }
/* Allocate the context and zero initialize */
ctx = (TPool*)calloc(1, sizeof(TPool));
if (!ctx) { return NULL; }
/* init pthread variables */
{ int error = 0;
error |= pthread_mutex_init(&ctx->queueMutex, NULL);
error |= pthread_cond_init(&ctx->queuePushCond, NULL);
error |= pthread_cond_init(&ctx->queuePopCond, NULL);
if (error) { TPool_free(ctx); return NULL; }
}
/* Initialize the job queue.
* It needs one extra space since one space is wasted to differentiate
* empty and full queues.
*/
ctx->queueSize = (size_t)queueSize + 1;
ctx->queue = (TPool_job*)calloc(1, ctx->queueSize * sizeof(TPool_job));
if (ctx->queue == NULL) {
TPool_free(ctx);
return NULL;
}
ctx->queueHead = 0;
ctx->queueTail = 0;
ctx->numThreadsBusy = 0;
ctx->queueEmpty = 1;
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (pthread_t*)calloc(1, (size_t)nbThreads * sizeof(pthread_t));
if (ctx->threads == NULL) {
TPool_free(ctx);
return NULL;
}
ctx->threadCapacity = 0;
/* Initialize the threads */
{ int i;
for (i = 0; i < nbThreads; ++i) {
if (pthread_create(&ctx->threads[i], NULL, &TPool_thread, ctx)) {
ctx->threadCapacity = (size_t)i;
TPool_free(ctx);
static void* TPool_thread(void* arg) {
TPool *pool = arg;
while (1) {
pthread_mutex_lock(&pool->mutex);
while (pool->jobs_in_queue == 0 && !pool->shutdown) {
if (pool->active_threads > pool->thread_limit) {
pool->active_threads--;
pthread_mutex_unlock(&pool->mutex);
return NULL;
} }
ctx->threadCapacity = (size_t)nbThreads;
ctx->threadLimit = (size_t)nbThreads;
}
return ctx;
}
/* TPool_thread() :
* Work thread for the thread pool.
* Waits for jobs and executes them.
* @returns : NULL on failure else non-null.
*/
static void* TPool_thread(void* opaque) {
TPool* const ctx = (TPool*)opaque;
if (!ctx) { return NULL; }
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex);
while ( ctx->queueEmpty
|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {
if (ctx->shutdown) {
/* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
* a few threads will be shutdown while !queueEmpty,
* but enough threads will remain active to finish the queue */
pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
pthread_cond_wait(&pool->cond, &pool->mutex);
}
/* Pop a job off the queue */
{ TPool_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
ctx->numThreadsBusy++;
ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
/* Unlock the mutex, signal a pusher, and run the job */
pthread_cond_signal(&ctx->queuePushCond);
pthread_mutex_unlock(&ctx->queueMutex);
if (pool->shutdown && pool->jobs_in_queue == 0) break;
job.job_function(job.arg);
TPool_job job = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_size;
pool->jobs_in_queue--;
pool->active_threads++;
/* If the intended queue size was 0, signal after finishing job */
pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
pthread_cond_signal(&ctx->queuePushCond);
pthread_mutex_unlock(&ctx->queueMutex);
pthread_mutex_unlock(&pool->mutex);
job.function(job.arg);
pthread_mutex_lock(&pool->mutex);
pool->active_threads--;
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
pool->active_threads--;
pthread_mutex_unlock(&pool->mutex);
return NULL;
}
TPool* TPool_create(int thread_count, int queue_size) {
TPool* pool = calloc(1, sizeof(TPool));
if (!pool) return NULL;
pool->thread_count = pool->thread_limit = thread_count;
pool->queue_size = queue_size;
pool->queue = calloc(queue_size, sizeof(TPool_job));
pool->threads = calloc(thread_count, sizeof(pthread_t));
if (!pool->queue || !pool->threads ||
pthread_mutex_init(&pool->mutex, NULL) != 0 ||
pthread_cond_init(&pool->cond, NULL) != 0) {
TPool_free(pool);
return NULL;
}
for (int i = 0; i < thread_count; i++) {
if (pthread_create(&pool->threads[i], NULL, TPool_thread, pool) != 0) {
pool->thread_count = i;
TPool_free(pool);
return NULL;
}
} /* for (;;) */
assert(0); /* Unreachable */
}
/*! TPool_shutdown() :
Shutdown the queue, wake any sleeping threads, and join all of the threads.
*/
static void TPool_shutdown(TPool* ctx) {
/* Shut down the queue */
pthread_mutex_lock(&ctx->queueMutex);
ctx->shutdown = 1;
pthread_mutex_unlock(&ctx->queueMutex);
/* Wake up sleeping threads */
pthread_cond_broadcast(&ctx->queuePushCond);
pthread_cond_broadcast(&ctx->queuePopCond);
/* Join all of the threads */
{ size_t i;
for (i = 0; i < ctx->threadCapacity; ++i) {
pthread_join(ctx->threads[i], NULL); /* note : could fail */
} }
}
/*! TPool_jobsCompleted() :
* Waits for all queued jobs to finish executing.
*/
void TPool_jobsCompleted(TPool* ctx){
pthread_mutex_lock(&ctx->queueMutex);
while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
}
pthread_mutex_unlock(&ctx->queueMutex);
return pool;
}
/**
* Returns 1 if the queue is full and 0 otherwise.
*
* When queueSize is 1 (pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free _and_ no job is waiting.
*/
static int isQueueFull(TPool const* ctx) {
if (ctx->queueSize > 1) {
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
} else {
return (ctx->numThreadsBusy == ctx->threadLimit) ||
!ctx->queueEmpty;
void TPool_free(TPool* pool) {
if (!pool) return;
pthread_mutex_lock(&pool->mutex);
pool->shutdown = 1;
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
for (size_t i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
free(pool->queue);
free(pool->threads);
free(pool);
}
static void
TPool_submitJob_internal(TPool* ctx, void (*job_function)(void*), void *arg)
{
TPool_job job;
job.job_function = job_function;
job.arg = arg;
assert(ctx != NULL);
if (ctx->shutdown) return;
ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job;
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
pthread_cond_signal(&ctx->queuePopCond);
}
void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
{
assert(ctx != NULL);
pthread_mutex_lock(&ctx->queueMutex);
/* Wait until there is space in the queue for the new job */
while (isQueueFull(ctx) && (!ctx->shutdown)) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
void TPool_submitJob(TPool* pool, void (*function)(void*), void* arg) {
if (!pool || !function) return;
pthread_mutex_lock(&pool->mutex);
while (pool->jobs_in_queue == pool->queue_size && !pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->mutex);
}
TPool_submitJob_internal(ctx, job_function, arg);
pthread_mutex_unlock(&ctx->queueMutex);
if (!pool->shutdown) {
pool->queue[pool->queue_tail] = (TPool_job){function, arg};
pool->queue_tail = (pool->queue_tail + 1) % pool->queue_size;
pool->jobs_in_queue++;
pthread_cond_signal(&pool->cond);
}
pthread_mutex_unlock(&pool->mutex);
}
void TPool_jobsCompleted(TPool* pool) {
if (!pool) return;
pthread_mutex_lock(&pool->mutex);
while (pool->jobs_in_queue > 0 || pool->active_threads > 0) {
pthread_cond_wait(&pool->cond, &pool->mutex);
}
pthread_mutex_unlock(&pool->mutex);
}
#endif /* LZ4IO_NO_MT */