Cleanup & Collision Guard
This commit is contained in:
46
README.md
Normal file
46
README.md
Normal file
@@ -0,0 +1,46 @@
|
||||
# Redacted Software MultiThreading
|
||||
|
||||
Yet Another C++ Threading Library
|
||||
|
||||
[](http://unlicense.org/) 
|
||||
|
||||
## Features
|
||||
|
||||
* Easily utilize maximum parallelization of the hardware.
|
||||
* Overhead < 1% compared to running functions directly.
|
||||
* Thread-safe wrappers for common STL data structures.
|
||||
* Public Domain Source Code.
|
||||
|
||||
## API Overview
|
||||
|
||||
## Types
|
||||
* CollisionGuard
|
||||
* ConcurrentQueue
|
||||
* Task
|
||||
* Thread
|
||||
* ThreadPool
|
||||
|
||||
## Usage
|
||||
|
||||
Using a thread-pool to execute a large set of tasks, which will utilize as many hardware threads as possible.
|
||||
|
||||
## Requirements
|
||||
|
||||
* C++ 20
|
||||
|
||||
## Compatibility
|
||||
|
||||
* Linux (gcc 13.x or newer)
|
||||
* Windows 10 (1909 or newer)
|
||||
|
||||
## Documentation
|
||||
|
||||
Documentation is currently limited to source-code annotations, and the demo program.
|
||||
|
||||
## Contributing
|
||||
|
||||
Contributions to MultiThreading are welcome! If you find a bug, have a feature request, or would like to contribute code, please submit an issue or pull request.
|
||||
|
||||
## Acknowledgements
|
||||
|
||||
Developed & Maintained by William @ Redacted Software and contributors.
|
39
include/MultiThreading/CollisionGuard.h
Normal file
39
include/MultiThreading/CollisionGuard.h
Normal file
@@ -0,0 +1,39 @@
|
||||
#pragma once
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
namespace MultiThreading {
|
||||
template <typename T>
|
||||
class CollisionGuard;
|
||||
}
|
||||
template <typename T>
|
||||
class MultiThreading::CollisionGuard {
|
||||
private:
|
||||
mutable std::shared_mutex mutex;
|
||||
T value;
|
||||
public:
|
||||
|
||||
class Read {
|
||||
private:
|
||||
std::shared_lock<std::shared_mutex> lock;
|
||||
const T* t_ptr;
|
||||
public:
|
||||
Read(const std::shared_mutex& mtx, const T& val) : t_ptr(&val), lock(mtx) {}
|
||||
const T* operator->() const { return t_ptr; }
|
||||
};
|
||||
|
||||
class Write {
|
||||
private:
|
||||
std::unique_lock<std::shared_mutex> lock;
|
||||
T* t_ptr;
|
||||
public:
|
||||
Write(std::shared_mutex& mtx, T& val) : t_ptr(&val), lock(mtx) {}
|
||||
T* operator->() { return t_ptr; }
|
||||
};
|
||||
public:
|
||||
Read operator->() const { return Read(mutex, value); }
|
||||
Write operator->() { return Write(mutex, value); }
|
||||
|
||||
template <typename... Args>
|
||||
explicit CollisionGuard(Args&&... args) : value(std::forward<Args>(args)...) {}
|
||||
};
|
@@ -20,6 +20,8 @@ protected:
|
||||
std::atomic<bool> complete = false;
|
||||
std::mutex mtx;
|
||||
void MarkTaskComplete() { std::lock_guard<std::mutex> lock(mtx); complete = true; completed_condition.notify_all(); }
|
||||
protected:
|
||||
explicit TaskBase(std::function<void()> task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {};
|
||||
public:
|
||||
/// @returns whether the task is finished.
|
||||
[[nodiscard]] bool Complete() const { return complete; }
|
||||
@@ -29,8 +31,6 @@ public:
|
||||
void WaitComplete() { std::unique_lock<std::mutex> lock(mtx); completed_condition.wait(lock, [this]() { return complete.load(); }); }
|
||||
|
||||
virtual void Run() = 0;
|
||||
public:
|
||||
explicit TaskBase(std::function<void()> task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {};
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
@@ -45,8 +45,10 @@ public:
|
||||
public:
|
||||
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
|
||||
/// @param callable The function to run, *usually a lambda or std::bind*
|
||||
/// @param complete_callback A void callable to be run after the task.
|
||||
/// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it.
|
||||
/// @note this is shared_ptr so you don't have to delete it.
|
||||
/// @note If result is stack allocated your program will probably explode.
|
||||
/// @note this is shared_ptr so that you don't have to delete it.
|
||||
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, const std::function<void()>& complete_callback = nullptr, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, complete_callback, result)); };
|
||||
~Task() = default;
|
||||
};
|
||||
@@ -62,7 +64,8 @@ public:
|
||||
|
||||
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
|
||||
/// @param callable The function to run, *usually a lambda or std::bind*
|
||||
/// @note this is shared_ptr so you don't have to delete it.
|
||||
/// @param complete_callback A void callable to be run after the task.
|
||||
/// @note this is shared_ptr so that you don't have to delete it.
|
||||
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable, const std::function<void()>& complete_callback = nullptr) { return std::shared_ptr<Task<void>>(new Task<void>(callable, complete_callback)); }
|
||||
~Task() = default;
|
||||
};
|
@@ -15,6 +15,7 @@ private:
|
||||
std::queue<std::shared_ptr<TaskBase>> queue;
|
||||
std::condition_variable queue_condition;
|
||||
std::mutex queue_mutex;
|
||||
bool destructing = false;
|
||||
private:
|
||||
/// @returns nullptr if the queue is empty.
|
||||
std::shared_ptr<TaskBase> Dequeue();
|
||||
@@ -22,11 +23,11 @@ private:
|
||||
public:
|
||||
/// Set a task to be run on the thread-pool.
|
||||
/// @param task The task to run.
|
||||
void Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task);
|
||||
bool Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task);
|
||||
|
||||
/// Set a task to be run on the thread-pool.
|
||||
/// @param task The task to run.
|
||||
void Enqueue(const std::function<void()>& task);
|
||||
bool Enqueue(const std::function<void()>& task);
|
||||
public:
|
||||
/// @returns The number of threads in the thread pool.
|
||||
[[nodiscard]] unsigned int ThreadCount() const { return threads.size(); }
|
||||
@@ -35,7 +36,7 @@ public:
|
||||
/// @note this excludes the tasks currently being run.
|
||||
[[nodiscard]] unsigned int PendingTasks();
|
||||
|
||||
/// @returns Whether a task you enqueue would have to wait for something else to finish.
|
||||
/// @returns Whether a task you enqueue would have to wait for something else to finish before running.
|
||||
[[nodiscard]] bool Busy();
|
||||
|
||||
/// Uses a condition variable to wait the calling thread until the queue is empty.
|
||||
|
11
main.cpp
11
main.cpp
@@ -1,12 +1,12 @@
|
||||
#include <MultiThreading/Task.h>
|
||||
#include <MultiThreading/Thread.h>
|
||||
#include <MultiThreading/ThreadPool.h>
|
||||
#include <MultiThreading/CollisionGuard.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
using namespace MultiThreading;
|
||||
|
||||
void cb() { std::cout << "hi" << std::endl; }
|
||||
//void cb() { std::cout << "hi" << std::endl; }
|
||||
int32_t some_test_func(int32_t hello) {
|
||||
for (unsigned int i = 0; i < 1000000000; i++) {}
|
||||
std::cout << "task " << hello << " finishes." << std::endl;
|
||||
@@ -42,17 +42,14 @@ int main() {
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
|
||||
int main() {
|
||||
srand(time(nullptr));
|
||||
|
||||
int32_t task_completion_count = 0;
|
||||
auto* thread_pool = new ThreadPool(16);
|
||||
|
||||
auto* thread_pool = new ThreadPool();
|
||||
|
||||
for (unsigned int i = 0; i < 128; i++) {
|
||||
auto some_task = Task<int32_t>::Create(([i] { return some_test_func(i); }), cb, &task_completion_count);
|
||||
auto some_task = Task<int32_t>::Create(([i] { return some_test_func(i); }), nullptr, &task_completion_count);
|
||||
thread_pool->Enqueue(some_task);
|
||||
}
|
||||
|
||||
|
@@ -1,5 +1,6 @@
|
||||
#include <MultiThreading/Thread.h>
|
||||
#include <utility>
|
||||
#include <MultiThreading/Thread.h>
|
||||
|
||||
|
||||
using namespace MultiThreading;
|
||||
|
||||
|
@@ -8,7 +8,10 @@ ThreadPool::ThreadPool(unsigned int thread_count) {
|
||||
threads.push_back(new Thread());
|
||||
}
|
||||
|
||||
void ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task) {
|
||||
bool ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task) {
|
||||
if (destructing)
|
||||
return false;
|
||||
|
||||
std::lock_guard<std::mutex> lock(queue_mutex);
|
||||
|
||||
// Assign it immediately if there's no wait and a thread open.
|
||||
@@ -19,12 +22,13 @@ void ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task)
|
||||
|
||||
if (!t->SetTask( [this, task] (){ Runner(task); } ))
|
||||
throw std::runtime_error("There was a collision while putting the task to the thread.");
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Alternatively it goes in-to the queue.
|
||||
queue.push(task);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<TaskBase> ThreadPool::Dequeue() {
|
||||
@@ -41,8 +45,8 @@ std::shared_ptr<TaskBase> ThreadPool::Dequeue() {
|
||||
return task;
|
||||
}
|
||||
|
||||
void ThreadPool::Enqueue(const std::function<void()>& task) {
|
||||
Enqueue(Task<void>::Create(task));
|
||||
bool ThreadPool::Enqueue(const std::function<void()>& task) {
|
||||
return Enqueue(Task<void>::Create(task));
|
||||
}
|
||||
|
||||
void ThreadPool::Runner(const std::shared_ptr<TaskBase>& task) {
|
||||
@@ -62,6 +66,7 @@ unsigned int ThreadPool::PendingTasks() {
|
||||
}
|
||||
|
||||
ThreadPool::~ThreadPool() {
|
||||
destructing = true;
|
||||
// Wait for all queued tasks to be running.
|
||||
WaitQueueEmpty();
|
||||
|
||||
|
Reference in New Issue
Block a user