updated threadpool API

This commit is contained in:
Yann Collet
2024-07-07 13:22:46 -07:00
parent 50d541f9e3
commit c688b4bd58
3 changed files with 113 additions and 112 deletions

View File

@@ -633,7 +633,7 @@ typedef size_t (*compress_f)(
size_t prefixSize);
typedef struct {
TPOOL_ctx* wpool;
TPool* wpool;
void* buffer;
size_t prefixSize;
size_t inSize;
@@ -666,7 +666,7 @@ static void LZ4IO_compressChunk(void* arg)
wjd->blockNb = cjd->blockNb;
wjd->out = cjd->fout;
wjd->wr = cjd->wr;
TPOOL_submitJob(cjd->wpool, LZ4IO_checkWriteOrder, wjd);
TPool_submitJob(cjd->wpool, LZ4IO_checkWriteOrder, wjd);
} }
}
@@ -681,8 +681,8 @@ static void LZ4IO_compressAndFreeChunk(void* arg)
/* one ReadTracker per file to compress */
typedef struct {
TPOOL_ctx* tpool;
TPOOL_ctx* wpool;
TPool* tPool;
TPool* wpool;
FILE* fin;
size_t chunkSize;
unsigned long long totalReadSize;
@@ -742,11 +742,11 @@ static void LZ4IO_readAndProcess(void* arg)
cjd->wr = rjd->wr;
cjd->maxCBlockSize = rjd->maxCBlockSize;
cjd->lastBlock = inSize < chunkSize;
TPOOL_submitJob(rjd->tpool, LZ4IO_compressAndFreeChunk, cjd);
TPool_submitJob(rjd->tPool, LZ4IO_compressAndFreeChunk, cjd);
if (inSize == chunkSize) {
/* probably more ? read another chunk */
rjd->blockNb++;
TPOOL_submitJob(rjd->tpool, LZ4IO_readAndProcess, rjd);
TPool_submitJob(rjd->tPool, LZ4IO_readAndProcess, rjd);
} } }
}
@@ -826,8 +826,8 @@ static int LZ4IO_compressLegacy_internal(unsigned long long* readSize,
compress_f const compressionFunction = (compressionlevel < 3) ? LZ4IO_compressBlockLegacy_fast : LZ4IO_compressBlockLegacy_HC;
FILE* const finput = LZ4IO_openSrcFile(input_filename);
FILE* foutput = NULL;
TPOOL_ctx* const tPool = TPOOL_create(prefs->nbWorkers, 4);
TPOOL_ctx* const wPool = TPOOL_create(1, 4);
TPool* const tPool = TPool_create(prefs->nbWorkers, 4);
TPool* const wPool = TPool_create(1, 4);
WriteRegister wr = WR_init(LEGACY_BLOCKSIZE);
/* Init & checks */
@@ -860,7 +860,7 @@ static int LZ4IO_compressLegacy_internal(unsigned long long* readSize,
{ CompressLegacyState cls;
ReadTracker rjd;
cls.cLevel = compressionlevel;
rjd.tpool = tPool;
rjd.tPool = tPool;
rjd.wpool = wPool;
rjd.fin = finput;
rjd.chunkSize = LEGACY_BLOCKSIZE;
@@ -874,10 +874,10 @@ static int LZ4IO_compressLegacy_internal(unsigned long long* readSize,
rjd.wr = &wr;
rjd.maxCBlockSize = (size_t)LZ4_compressBound(LEGACY_BLOCKSIZE) + LZ4IO_LEGACY_BLOCK_HEADER_SIZE;
/* Ignite the job chain */
TPOOL_submitJob(tPool, LZ4IO_readAndProcess, &rjd);
TPool_submitJob(tPool, LZ4IO_readAndProcess, &rjd);
/* Wait for all completion */
TPOOL_completeJobs(tPool);
TPOOL_completeJobs(wPool);
TPool_jobsCompleted(tPool);
TPool_jobsCompleted(wPool);
/* Status */
DISPLAYLEVEL(2, "\r%79s\r", ""); /* blank line */
@@ -889,8 +889,8 @@ static int LZ4IO_compressLegacy_internal(unsigned long long* readSize,
/* Close & Free */
_cfl_clean:
WR_destroy(&wr);
TPOOL_free(wPool);
TPOOL_free(tPool);
TPool_free(wPool);
TPool_free(tPool);
if (finput) fclose(finput);
if (foutput && !LZ4IO_isStdout(output_filename)) fclose(foutput); /* do not close stdout */
@@ -981,14 +981,14 @@ typedef struct {
LZ4F_compressionContext_t ctx;
LZ4F_preferences_t preparedPrefs;
LZ4F_CDict* cdict;
TPOOL_ctx* tpool;
TPOOL_ctx* wpool; /* writer thread */
TPool* tPool;
TPool* wPool; /* writer thread */
} cRess_t;
static void LZ4IO_freeCResources(cRess_t ress)
{
TPOOL_free(ress.tpool);
TPOOL_free(ress.wpool);
TPool_free(ress.tPool);
TPool_free(ress.wPool);
free(ress.srcBuffer);
free(ress.dstBuffer);
@@ -1104,8 +1104,8 @@ static cRess_t LZ4IO_createCResources(const LZ4IO_prefs_t* io_prefs)
ress.cdict = LZ4IO_createCDict(io_prefs);
/* will be created it needed */
ress.tpool = NULL;
ress.wpool = NULL;
ress.tPool = NULL;
ress.wPool = NULL;
return ress;
}
@@ -1215,17 +1215,17 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
LZ4IO_CfcParameters cfcp;
ReadTracker rjd;
if (ress.tpool == NULL) {
ress.tpool = TPOOL_create(io_prefs->nbWorkers, 4);
assert(ress.wpool == NULL);
ress.wpool = TPOOL_create(1, 4);
if (ress.tpool == NULL || ress.wpool == NULL)
if (ress.tPool == NULL) {
ress.tPool = TPool_create(io_prefs->nbWorkers, 4);
assert(ress.wPool == NULL);
ress.wPool = TPool_create(1, 4);
if (ress.tPool == NULL || ress.wPool == NULL)
END_PROCESS(43, "can't create threadpools");
}
cfcp.prefs = &prefs;
cfcp.cdict = ress.cdict;
rjd.tpool = ress.tpool;
rjd.wpool = ress.wpool;
rjd.tPool = ress.tPool;
rjd.wpool = ress.wPool;
rjd.fin = srcFile;
rjd.chunkSize = chunkSize;
rjd.totalReadSize = 0;
@@ -1271,7 +1271,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
/* process first block */
{ CompressJobDesc cjd;
cjd.wpool = ress.wpool;
cjd.wpool = ress.wPool;
cjd.buffer = srcBuffer;
cjd.prefixSize = 0;
cjd.inSize = readSize;
@@ -1282,7 +1282,7 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
cjd.wr = &wr;
cjd.maxCBlockSize = rjd.maxCBlockSize;
cjd.lastBlock = 0;
TPOOL_submitJob(ress.tpool, LZ4IO_compressChunk, &cjd);
TPool_submitJob(ress.tPool, LZ4IO_compressChunk, &cjd);
rjd.totalReadSize = readSize;
rjd.blockNb = 1;
if (prefixBuffer) {
@@ -1291,11 +1291,11 @@ LZ4IO_compressFilename_extRess_MT(unsigned long long* inStreamSize,
}
/* Start the job chain */
TPOOL_submitJob(ress.tpool, LZ4IO_readAndProcess, &rjd);
TPool_submitJob(ress.tPool, LZ4IO_readAndProcess, &rjd);
/* Wait for all completion */
TPOOL_completeJobs(ress.tpool);
TPOOL_completeJobs(ress.wpool);
TPool_jobsCompleted(ress.tPool);
TPool_jobsCompleted(ress.wPool);
compressedfilesize += wr.totalCSize;
}
@@ -1704,7 +1704,7 @@ typedef struct {
size_t inSize;
void* outBuffer;
unsigned long long* totalSize;
TPOOL_ctx* wPool;
TPool* wPool;
FILE* foutput;
int sparseEnable;
unsigned* storedSkips;
@@ -1730,7 +1730,7 @@ static void LZ4IO_decompressBlockLegacy(void* arg)
ctw->sparseEnable = lbi->sparseEnable;
ctw->storedSkips = lbi->storedSkips;
ctw->totalSize = lbi->totalSize;
TPOOL_submitJob(lbi->wPool, LZ4IO_writeDecodedChunk, ctw);
TPool_submitJob(lbi->wPool, LZ4IO_writeDecodedChunk, ctw);
}
/* clean up */
@@ -1743,8 +1743,8 @@ LZ4IO_decodeLegacyStream(FILE* finput, FILE* foutput, const LZ4IO_prefs_t* prefs
unsigned long long streamSize = 0;
unsigned storedSkips = 0;
TPOOL_ctx* const tPool = TPOOL_create(1, 1);
TPOOL_ctx* const wPool = TPOOL_create(1, 1);
TPool* const tPool = TPool_create(1, 1);
TPool* const wPool = TPool_create(1, 1);
#define NB_BUFFSETS 4 /* 1 being read, 1 being processed, 1 being written, 1 being queued */
void* inBuffs[NB_BUFFSETS];
void* outBuffs[NB_BUFFSETS];
@@ -1795,22 +1795,22 @@ LZ4IO_decodeLegacyStream(FILE* finput, FILE* foutput, const LZ4IO_prefs_t* prefs
lbi->foutput = foutput;
lbi->sparseEnable = prefs->sparseFileSupport;
lbi->storedSkips = &storedSkips;
TPOOL_submitJob(tPool, LZ4IO_decompressBlockLegacy, lbi);
TPool_submitJob(tPool, LZ4IO_decompressBlockLegacy, lbi);
}
}
}
if (ferror(finput)) END_PROCESS(65, "Read error : ferror");
/* Wait for all completion */
TPOOL_completeJobs(tPool);
TPOOL_completeJobs(wPool);
TPool_jobsCompleted(tPool);
TPool_jobsCompleted(wPool);
/* flush last zeroes */
LZ4IO_fwriteSparseEnd(foutput, storedSkips);
/* Free */
TPOOL_free(wPool);
TPOOL_free(tPool);
TPool_free(wPool);
TPool_free(tPool);
for (bSetNb=0; bSetNb<NB_BUFFSETS; bSetNb++) {
free(inBuffs[bSetNb]);
free(outBuffs[bSetNb]);
@@ -2044,7 +2044,7 @@ typedef struct {
BufferPool* bp;
unsigned long long* totalSize;
LZ4F_errorCode_t* lastStatus;
TPOOL_ctx* wPool;
TPool* wPool;
FILE* foutput;
int sparseEnable;
unsigned* storedSkips;
@@ -2087,7 +2087,7 @@ static void LZ4IO_decompressLZ4FChunk(void* arg)
ctw->sparseEnable = lz4fc->sparseEnable;
ctw->storedSkips = lz4fc->storedSkips;
ctw->totalSize = lz4fc->totalSize;
TPOOL_submitJob(lz4fc->wPool, LZ4IO_writeDecodedLZ4FChunk, ctw);
TPool_submitJob(lz4fc->wPool, LZ4IO_writeDecodedLZ4FChunk, ctw);
}
}
@@ -2108,8 +2108,8 @@ LZ4IO_decompressLZ4F(dRess_t ress,
const LZ4F_decompressOptions_t* const dOptPtr =
((prefs->blockChecksum==0) && (prefs->streamChecksum==0)) ?
&dOpt_skipCrc : NULL;
TPOOL_ctx* const tPool = TPOOL_create(1, 1);
TPOOL_ctx* const wPool = TPOOL_create(1, 1);
TPool* const tPool = TPool_create(1, 1);
TPool* const wPool = TPool_create(1, 1);
BufferPool* const bp = LZ4IO_createBufferPool(OUTBUFF_SIZE);
#define NB_BUFFSETS 4 /* 1 being read, 1 being processed, 1 being written, 1 being queued */
void* inBuffs[NB_BUFFSETS];
@@ -2164,18 +2164,18 @@ LZ4IO_decompressLZ4F(dRess_t ress,
lbi->foutput = dstFile;
lbi->sparseEnable = prefs->sparseFileSupport;
lbi->storedSkips = &storedSkips;
TPOOL_submitJob(tPool, LZ4IO_decompressLZ4FChunk, lbi);
TPool_submitJob(tPool, LZ4IO_decompressLZ4FChunk, lbi);
}
if (readSize < INBUFF_SIZE) break; /* likely reached end of stream */
}
assert(feof(srcFile));
/* Wait for all decompression completion */
TPOOL_completeJobs(tPool);
TPool_jobsCompleted(tPool);
/* flush */
assert(lastStatus == 0);
TPOOL_completeJobs(wPool);
TPool_jobsCompleted(wPool);
if (!prefs->testMode) LZ4IO_fwriteSparseEnd(dstFile, storedSkips);
/* Clean */
@@ -2183,8 +2183,8 @@ LZ4IO_decompressLZ4F(dRess_t ress,
free(inBuffs[bSetNb]);
}
LZ4IO_freeBufferPool(bp);
TPOOL_free(wPool);
TPOOL_free(tPool);
TPool_free(wPool);
TPool_free(tPool);
return filesize;
}

View File

@@ -41,28 +41,28 @@
/* ===================================================== */
/* Non-zero size, to ensure g_poolCtx != NULL */
struct TPOOL_ctx_s {
struct TPool_s {
int dummy;
};
static TPOOL_ctx g_poolCtx;
static TPool g_poolCtx;
TPOOL_ctx* TPOOL_create(int numThreads, int queueSize) {
TPool* TPool_create(int numThreads, int queueSize) {
(void)numThreads;
(void)queueSize;
return &g_poolCtx;
}
void TPOOL_free(TPOOL_ctx* ctx) {
void TPool_free(TPool* ctx) {
assert(!ctx || ctx == &g_poolCtx);
(void)ctx;
}
void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg) {
void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg) {
(void)ctx;
job_function(arg);
}
void TPOOL_completeJobs(TPOOL_ctx* ctx) {
void TPool_jobsCompleted(TPool* ctx) {
assert(!ctx || ctx == &g_poolCtx);
(void)ctx;
}
@@ -73,7 +73,7 @@ void TPOOL_completeJobs(TPOOL_ctx* ctx) {
/* Window TPool implementation using Completion Ports */
#include <windows.h>
typedef struct TPOOL_ctx_s {
typedef struct TPool_s {
HANDLE completionPort;
HANDLE* workerThreads;
int nbWorkers;
@@ -81,9 +81,9 @@ typedef struct TPOOL_ctx_s {
LONG nbPendingJobs;
HANDLE jobSlotAvail; /* For queue size control */
HANDLE allJobsCompleted; /* Event */
} TPOOL_ctx;
} TPool;
void TPOOL_free(TPOOL_ctx* ctx)
void TPool_free(TPool* ctx)
{
if (!ctx) return;
@@ -111,7 +111,7 @@ void TPOOL_free(TPOOL_ctx* ctx)
static DWORD WINAPI WorkerThread(LPVOID lpParameter)
{
TPOOL_ctx* ctx = (TPOOL_ctx*)lpParameter;
TPool* ctx = (TPool*)lpParameter;
DWORD bytesTransferred;
ULONG_PTR completionKey;
LPOVERLAPPED overlapped;
@@ -136,9 +136,9 @@ static DWORD WINAPI WorkerThread(LPVOID lpParameter)
return 0;
}
TPOOL_ctx* TPOOL_create(int nbWorkers, int queueSize)
TPool* TPool_create(int nbWorkers, int queueSize)
{
TPOOL_ctx* const ctx = calloc(1, sizeof(TPOOL_ctx));
TPool* const ctx = calloc(1, sizeof(TPool));
if (!ctx) return NULL;
/* parameters sanitization */
@@ -159,13 +159,13 @@ TPOOL_ctx* TPOOL_create(int nbWorkers, int queueSize)
ctx->nbWorkers = nbWorkers;
ctx->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers);
if (ctx->workerThreads == NULL) {
TPOOL_free(ctx);
TPool_free(ctx);
return NULL;
}
for (int i = 0; i < nbWorkers; i++) {
ctx->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, ctx, 0, NULL);
if (!ctx->workerThreads[i]) {
TPOOL_free(ctx);
TPool_free(ctx);
return NULL;
}
}
@@ -175,19 +175,19 @@ TPOOL_ctx* TPOOL_create(int nbWorkers, int queueSize)
ctx->nbPendingJobs = 0;
ctx->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL);
if (!ctx->jobSlotAvail) {
TPOOL_free(ctx);
TPool_free(ctx);
return NULL;
}
ctx->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!ctx->allJobsCompleted) {
TPOOL_free(ctx);
TPool_free(ctx);
return NULL;
}
return ctx;
}
void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg)
void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
{
if (!ctx || !job_function) return;
@@ -203,7 +203,7 @@ void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg)
(LPOVERLAPPED)arg); /* Store argument in overlapped */
}
void TPOOL_completeJobs(TPOOL_ctx* ctx)
void TPool_jobsCompleted(TPool* ctx)
{
if (!ctx) return;
WaitForSingleObject(ctx->allJobsCompleted, INFINITE);
@@ -216,18 +216,18 @@ void TPOOL_completeJobs(TPOOL_ctx* ctx)
#include <pthread.h> /* pthread_* */
/* A job is just a function with an opaque argument */
typedef struct TPOOL_job_s {
typedef struct TPool_job_s {
void (*job_function)(void*);
void *arg;
} TPOOL_job;
} TPool_job;
struct TPOOL_ctx_s {
struct TPool_s {
pthread_t* threads;
size_t threadCapacity;
size_t threadLimit;
/* The queue is a circular buffer */
TPOOL_job* queue;
TPool_job* queue;
size_t queueHead;
size_t queueTail;
size_t queueSize;
@@ -247,11 +247,11 @@ struct TPOOL_ctx_s {
int shutdown;
};
static void TPOOL_shutdown(TPOOL_ctx* ctx);
static void TPool_shutdown(TPool* ctx);
void TPOOL_free(TPOOL_ctx* ctx) {
void TPool_free(TPool* ctx) {
if (!ctx) { return; }
TPOOL_shutdown(ctx);
TPool_shutdown(ctx);
pthread_mutex_destroy(&ctx->queueMutex);
pthread_cond_destroy(&ctx->queuePushCond);
pthread_cond_destroy(&ctx->queuePopCond);
@@ -260,31 +260,31 @@ void TPOOL_free(TPOOL_ctx* ctx) {
free(ctx);
}
static void* TPOOL_thread(void* opaque);
static void* TPool_thread(void* opaque);
TPOOL_ctx* TPOOL_create(int nbThreads, int queueSize)
TPool* TPool_create(int nbThreads, int queueSize)
{
TPOOL_ctx* ctx;
TPool* ctx;
/* Check parameters */
if (nbThreads<1 || queueSize<1) { return NULL; }
/* Allocate the context and zero initialize */
ctx = (TPOOL_ctx*)calloc(1, sizeof(TPOOL_ctx));
ctx = (TPool*)calloc(1, sizeof(TPool));
if (!ctx) { return NULL; }
/* init pthread variables */
{ int error = 0;
error |= pthread_mutex_init(&ctx->queueMutex, NULL);
error |= pthread_cond_init(&ctx->queuePushCond, NULL);
error |= pthread_cond_init(&ctx->queuePopCond, NULL);
if (error) { TPOOL_free(ctx); return NULL; }
if (error) { TPool_free(ctx); return NULL; }
}
/* Initialize the job queue.
* It needs one extra space since one space is wasted to differentiate
* empty and full queues.
*/
ctx->queueSize = (size_t)queueSize + 1;
ctx->queue = (TPOOL_job*)calloc(1, ctx->queueSize * sizeof(TPOOL_job));
ctx->queue = (TPool_job*)calloc(1, ctx->queueSize * sizeof(TPool_job));
if (ctx->queue == NULL) {
TPOOL_free(ctx);
TPool_free(ctx);
return NULL;
}
ctx->queueHead = 0;
@@ -295,16 +295,16 @@ TPOOL_ctx* TPOOL_create(int nbThreads, int queueSize)
/* Allocate space for the thread handles */
ctx->threads = (pthread_t*)calloc(1, (size_t)nbThreads * sizeof(pthread_t));
if (ctx->threads == NULL) {
TPOOL_free(ctx);
TPool_free(ctx);
return NULL;
}
ctx->threadCapacity = 0;
/* Initialize the threads */
{ int i;
for (i = 0; i < nbThreads; ++i) {
if (pthread_create(&ctx->threads[i], NULL, &TPOOL_thread, ctx)) {
if (pthread_create(&ctx->threads[i], NULL, &TPool_thread, ctx)) {
ctx->threadCapacity = (size_t)i;
TPOOL_free(ctx);
TPool_free(ctx);
return NULL;
} }
ctx->threadCapacity = (size_t)nbThreads;
@@ -313,13 +313,13 @@ TPOOL_ctx* TPOOL_create(int nbThreads, int queueSize)
return ctx;
}
/* TPOOL_thread() :
/* TPool_thread() :
* Work thread for the thread pool.
* Waits for jobs and executes them.
* @returns : NULL on failure else non-null.
*/
static void* TPOOL_thread(void* opaque) {
TPOOL_ctx* const ctx = (TPOOL_ctx*)opaque;
static void* TPool_thread(void* opaque) {
TPool* const ctx = (TPool*)opaque;
if (!ctx) { return NULL; }
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
@@ -337,7 +337,7 @@ static void* TPOOL_thread(void* opaque) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* Pop a job off the queue */
{ TPOOL_job const job = ctx->queue[ctx->queueHead];
{ TPool_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
ctx->numThreadsBusy++;
ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
@@ -357,10 +357,10 @@ static void* TPOOL_thread(void* opaque) {
assert(0); /* Unreachable */
}
/*! TPOOL_shutdown() :
/*! TPool_shutdown() :
Shutdown the queue, wake any sleeping threads, and join all of the threads.
*/
static void TPOOL_shutdown(TPOOL_ctx* ctx) {
static void TPool_shutdown(TPool* ctx) {
/* Shut down the queue */
pthread_mutex_lock(&ctx->queueMutex);
ctx->shutdown = 1;
@@ -376,10 +376,10 @@ static void TPOOL_shutdown(TPOOL_ctx* ctx) {
}
/*! TPOOL_completeJobs() :
/*! TPool_jobsCompleted() :
* Waits for all queued jobs to finish executing.
*/
void TPOOL_completeJobs(TPOOL_ctx* ctx){
void TPool_jobsCompleted(TPool* ctx){
pthread_mutex_lock(&ctx->queueMutex);
while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
@@ -393,7 +393,7 @@ void TPOOL_completeJobs(TPOOL_ctx* ctx){
* When queueSize is 1 (pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free _and_ no job is waiting.
*/
static int isQueueFull(TPOOL_ctx const* ctx) {
static int isQueueFull(TPool const* ctx) {
if (ctx->queueSize > 1) {
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
} else {
@@ -403,9 +403,9 @@ static int isQueueFull(TPOOL_ctx const* ctx) {
}
static void
TPOOL_submitJob_internal(TPOOL_ctx* ctx, void (*job_function)(void*), void *arg)
TPool_submitJob_internal(TPool* ctx, void (*job_function)(void*), void *arg)
{
TPOOL_job job;
TPool_job job;
job.job_function = job_function;
job.arg = arg;
assert(ctx != NULL);
@@ -417,7 +417,7 @@ TPOOL_submitJob_internal(TPOOL_ctx* ctx, void (*job_function)(void*), void *arg)
pthread_cond_signal(&ctx->queuePopCond);
}
void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg)
void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
{
assert(ctx != NULL);
pthread_mutex_lock(&ctx->queueMutex);
@@ -425,7 +425,7 @@ void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg)
while (isQueueFull(ctx) && (!ctx->shutdown)) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
}
TPOOL_submitJob_internal(ctx, job_function, arg);
TPool_submitJob_internal(ctx, job_function, arg);
pthread_mutex_unlock(&ctx->queueMutex);
}

View File

@@ -29,34 +29,35 @@
extern "C" {
#endif
typedef struct TPOOL_ctx_s TPOOL_ctx;
typedef struct TPool_s TPool;
/*! TPOOL_create() :
/*! TPool_create() :
* Create a thread pool with at most @nbThreads.
* @nbThreads must be at least 1.
* @queueSize is the maximum number of pending jobs before blocking.
* @return : POOL_ctx pointer on success, else NULL.
* @return : TPool* pointer on success, else NULL.
*/
TPOOL_ctx* TPOOL_create(int nbThreads, int queueSize);
TPool* TPool_create(int nbThreads, int queueSize);
/*! TPOOL_free() :
* Free a thread pool returned by TPOOL_create().
* Note: if jobs are already running, @free first waits for their completion
/*! TPool_free() :
* Free a thread pool returned by TPool_create().
* Waits for the completion of running jobs before freeing resources.
*/
void TPOOL_free(TPOOL_ctx* ctx);
void TPool_free(TPool* ctx);
/*! TPOOL_submitJob() :
/*! TPool_submitJob() :
* Add @job_function(arg) to the thread pool.
* @ctx must be valid.
* Invocation can block if queue is full.
* Note : pay attention to @arg lifetime, which is now owned by @job_function
* Note: Ensure @arg's lifetime extends until @job_function completes.
* Alternatively, @arg's lifetime must be managed by @job_function.
*/
void TPOOL_submitJob(TPOOL_ctx* ctx, void (*job_function)(void*), void* arg);
void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg);
/*! TPOOL_completeJobs() :
* Blocks, waiting for all queued jobs to be completed
/*! TPool_jobsCompleted() :
* Blocks until all queued jobs are completed.
*/
void TPOOL_completeJobs(TPOOL_ctx* ctx);
void TPool_jobsCompleted(TPool* ctx);