10 Commits
dawsh ... main

Author SHA1 Message Date
671e46ce66 Add operator support to collision guard 2025-07-13 16:46:34 -04:00
140902185f Update Thread.cpp
Fixed a bug that caused a race between ~Thread and Thread::Runner receiving a new job from the threadpool
2025-07-13 13:35:46 -04:00
e2b847665f Update CMakeLists.txt 2025-07-13 12:30:23 -04:00
d898a60cc1 Cleanup & Collision Guard 2025-07-13 00:48:05 -04:00
c991d4b3ef task complete callback 2025-04-12 19:08:17 -04:00
af819dc758 Task::WaitComplete to avoid spinning
Mutex waiting for task complete. Waiting for a task to be finished, The destructor for the thread and thread pool now take less than 1% usage. 😎
2025-04-10 21:28:06 -04:00
b4b476bd7f Document & wait queue condition in threadpool destructor. 2025-04-10 12:29:10 -04:00
d20bda0d4e Small fixes. Make the demo nicer. 2025-04-09 21:10:28 -04:00
3b9ca2e144 Fixed a timing bug
That would cause the thread to not be reported as busy while putting the task on-to the thread.
2025-04-09 16:37:55 -04:00
a03eb39347 Implement ConcurrentQueue.h from CaveGame 2025-04-09 02:38:23 -05:00
10 changed files with 464 additions and 86 deletions

View File

@@ -9,7 +9,15 @@ file(GLOB_RECURSE HEADERS "include/*.h" "include/*.hpp")
file(GLOB_RECURSE SOURCES "src/*.c" "src/*.cpp")
set(CMAKE_CXX_STANDARD 20)
add_library(MultiThreading SHARED ${SOURCES})
if (UNIX AND NOT APPLE)
add_library(MultiThreading SHARED ${SOURCES} "include/MultiThreading/CollisionGuard.h")
endif()
if(WIN32)
add_library(MultiThreading STATIC ${SOURCES} "include/MultiThreading/CollisionGuard.h")
endif()
target_include_directories(MultiThreading PUBLIC ${PROJECT_SOURCE_DIR}/include)
add_executable(MultiThreadingDemo main.cpp)
target_link_libraries(MultiThreadingDemo PUBLIC MultiThreading)

46
README.md Normal file
View File

@@ -0,0 +1,46 @@
# Redacted Software MultiThreading
Yet Another C++ Threading Library
[![License: Unlicense](https://img.shields.io/badge/license-Unlicense-blue.svg)](http://unlicense.org/) ![Static Badge](https://img.shields.io/badge/Lit-Based-%20)
## Features
* Easily utilize maximum parallelization of the hardware.
* Overhead < 1% compared to running functions directly.
* Thread-safe wrappers for common STL data structures.
* Public Domain Source Code.
## API Overview
## Types
* CollisionGuard
* ConcurrentQueue
* Task
* Thread
* ThreadPool
## Usage
Using a thread-pool to execute a large set of tasks, which will utilize as many hardware threads as possible.
## Requirements
* C++ 20
## Compatibility
* Linux (gcc 13.x or newer)
* Windows 10 (1909 or newer)
## Documentation
Documentation is currently limited to source-code annotations, and the demo program.
## Contributing
Contributions to MultiThreading are welcome! If you find a bug, have a feature request, or would like to contribute code, please submit an issue or pull request.
## Acknowledgements
Developed & Maintained by William @ Redacted Software and contributors.

View File

@@ -0,0 +1,113 @@
#pragma once
#include <concepts>
#include <shared_mutex>
namespace MultiThreading {
template <typename T>
class CollisionGuard;
}
template <typename T>
class MultiThreading::CollisionGuard {
private:
mutable std::shared_mutex mutex;
T value;
public:
class Read {
private:
std::shared_lock<std::shared_mutex> lock;
const T* t_ptr;
public:
Read(const std::shared_mutex& mtx, const T& val) : t_ptr(&val), lock(mtx) {}
const T* operator->() const { return t_ptr; }
};
class Write {
private:
std::unique_lock<std::shared_mutex> lock;
T* t_ptr;
public:
Write(std::shared_mutex& mtx, T& val) : t_ptr(&val), lock(mtx) {}
T* operator->() { return t_ptr; }
};
public:
Read operator->() const { return Read(mutex, value); }
Write operator->() { return Write(mutex, value); }
public:
template <typename U = T>
requires requires(const U& a, const U& b) { a + b; }
T operator +(const T& rhs) const {
std::shared_lock lock(mutex);
return value + rhs;
}
template <typename U = T>
requires requires(U& a, const U& b) { a = b; }
CollisionGuard& operator =(const T& rhs) {
std::unique_lock lock(mutex);
value = rhs;
return *this;
}
explicit operator T() const requires std::copy_constructible<T> {
std::shared_lock lock(mutex);
return value;
}
template <typename U = T>
requires requires(U& a, const U& b) { a += b; }
CollisionGuard& operator +=(const T& rhs) {
std::unique_lock lock(mutex);
value += rhs;
return *this;
}
template <typename U = T>
requires requires(U& a, const U& b) { a -= b; }
CollisionGuard& operator -=(const T& rhs) {
std::unique_lock lock(mutex);
value -= rhs;
return *this;
}
template <typename U = T>
requires requires(U& a, const U& b) { a *= b; }
CollisionGuard& operator *=(const T& rhs) {
std::unique_lock lock(mutex);
value *= rhs;
return *this;
}
template <typename U = T>
requires requires(U& a, const U& b) { a /= b; }
CollisionGuard& operator /=(const T& rhs) {
std::unique_lock lock(mutex);
value /= rhs;
return *this;
}
template <typename U = T>
requires requires(const U& a, const U& b) { a - b; }
T operator -(const T& rhs) const {
std::shared_lock lock(mutex);
return value - rhs;
}
template <typename U = T>
requires requires(const U& a, const U& b) { a * b; }
T operator *(const T& rhs) const {
std::shared_lock lock(mutex);
return value * rhs;
}
template <typename U = T>
requires requires(const U& a, const U& b) { a / b; }
T operator /(const T& rhs) const {
std::shared_lock lock(mutex);
return value / rhs;
}
public:
template <typename... Args>
explicit CollisionGuard(Args&&... args) : value(std::forward<Args>(args)...) {}
};

View File

@@ -0,0 +1,123 @@
#pragma once
#include <queue>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <type_traits>
/// A simple C++11 Concurrent Queue based on std::queue.
/// Supports waiting operations for retrieving an element when it's empty.
/// It's interrupted by calling interrupt.
namespace MultiThreading {
template<typename T, typename Container = std::deque<T> >
class ConcurrentQueue {
public:
typedef typename std::queue<T>::size_type size_type;
typedef typename std::queue<T>::reference reference;
typedef typename std::queue<T>::const_reference const_reference;
~ConcurrentQueue() {
interrupt();
}
void interrupt() {
interrupted = true;
condition_variable.notify_one();
}
bool contains(const T &value) {
std::unique_lock<std::mutex> lock(this->mutex);
std::queue<T, Container> copy = queue;
while (!copy.empty())
if (copy.front() == value)
return true;
else
copy.pop();
return false;
}
void push(const T &e) {
std::unique_lock<std::mutex> lock(mutex);
queue.push(e);
condition_variable.notify_one();
}
template<typename... Args>
void emplace(Args &&... args) {
std::unique_lock<std::mutex> lock(this->mutex);
queue.emplace(std::forward<Args>(args)...);
condition_variable.notify_one();
}
bool empty() {
//std::unique_lock<std::mutex> lock(this->mutex);
return queue.empty();
}
void pop() {
std::unique_lock<std::mutex> lock(this->mutex);
if (!queue.empty())
queue.pop();
}
void front_pop(T &ret) {
std::unique_lock<std::mutex> lock(this->mutex);
wait(lock);
ret = queue.front();
queue.pop();
}
size_type size() const {
//std::unique_lock<std::mutex> lock(this->mutex);
return queue.size();
}
reference front() {
std::unique_lock<std::mutex> lock(this->mutex);
wait(lock);
return queue.front();
}
/*const_reference front() const {
std::unique_lock<std::mutex> lock(this->mutex);
wait(lock);
return queue.front();
}*/
reference back() {
std::unique_lock<std::mutex> lock(this->mutex);
wait(lock);
return queue.back();
}
/*const_reference back() const {
std::unique_lock<std::mutex> lock(this->mutex);
wait(lock);
return queue.back();
}*/
void swap(ConcurrentQueue &q) {
throw std::runtime_error("Not supported");
}
protected:
std::queue<T, Container> queue;
std::mutex mutex;
std::condition_variable condition_variable;
std::atomic_bool interrupted;
private:
void wait(std::unique_lock<std::mutex> &lock) {
interrupted = false;
while (queue.empty()) {
condition_variable.wait(lock);
if (interrupted)
throw std::runtime_error("Interrupted");
}
}
};
}

View File

@@ -1,5 +1,6 @@
#pragma once
#include <condition_variable>
#include <functional>
#include <memory>
#include <atomic>
@@ -14,13 +15,22 @@ namespace MultiThreading {
class MultiThreading::TaskBase {
protected:
std::function<void()> complete_callback = nullptr;
std::condition_variable completed_condition;
std::atomic<bool> complete = false;
std::mutex mtx;
void MarkTaskComplete() { std::lock_guard<std::mutex> lock(mtx); complete = true; completed_condition.notify_all(); }
protected:
explicit TaskBase(std::function<void()> task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {};
public:
/// @returns whether the task is finished.
[[nodiscard]] bool Complete() const { return complete; }
/// Condition variable waits for the task to be complete
/// @note There is no way to know if the task has been assigned, If you do this and that never happens the calling thread will wait forever.
void WaitComplete() { std::unique_lock<std::mutex> lock(mtx); completed_condition.wait(lock, [this]() { return complete.load(); }); }
virtual void Run() = 0;
public:
TaskBase() = default;
};
template <typename T>
@@ -29,15 +39,17 @@ private:
T* result = nullptr;
std::function<T()> callable = nullptr;
private:
explicit Task(std::function<T()> callable, T* result = nullptr) : TaskBase(), result(result), callable(callable) {}
explicit Task(std::function<T()> callable, std::function<void()> complete_callback = nullptr, T* result = nullptr) : TaskBase(complete_callback), result(result), callable(std::move(callable)) {}
public:
void Run() final { result ? *result = callable() : callable(); complete = true; }
void Run() final { result ? *result = callable() : callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); }
public:
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
/// @param callable The function to run, *usually a lambda or std::bind*
/// @param complete_callback A void callable to be run after the task.
/// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it.
/// @note this is shared_ptr so you don't have to delete it.
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, result)); };
/// @note If result is stack allocated your program will probably explode.
/// @note this is shared_ptr so that you don't have to delete it.
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, const std::function<void()>& complete_callback = nullptr, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, complete_callback, result)); };
~Task() = default;
};
@@ -46,13 +58,14 @@ template <>
class MultiThreading::Task<void> : public TaskBase {
private:
std::function<void()> callable = nullptr;
private:
explicit Task(std::function<void()> callable) : TaskBase(), callable(std::move(callable)) {}
explicit Task(std::function<void()> callable, std::function<void()> complete_callback = nullptr) : TaskBase(std::move(complete_callback)), callable(std::move(callable)) {}
public:
void Run() final { callable(); complete = true; }
void Run() final { callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); }
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
/// @param callable The function to run, *usually a lambda or std::bind*
/// @note this is shared_ptr so you don't have to delete it.
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable) { return std::shared_ptr<Task<void>>(new Task<void>(callable)); }
/// @param complete_callback A void callable to be run after the task.
/// @note this is shared_ptr so that you don't have to delete it.
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable, const std::function<void()>& complete_callback = nullptr) { return std::shared_ptr<Task<void>>(new Task<void>(callable, complete_callback)); }
~Task() = default;
};

View File

@@ -1,10 +1,6 @@
#pragma once
#include <thread>
#include <functional>
#include <stdexcept>
#include <condition_variable>
#include <mutex>
#include <MultiThreading/Task.h>
namespace MultiThreading {
@@ -13,22 +9,39 @@ namespace MultiThreading {
class MultiThreading::Thread {
private:
std::thread worker;
std::shared_ptr<TaskBase> current_task = nullptr;
std::atomic<bool> busy = false;
std::mutex mtx;
std::condition_variable cv;
std::atomic<bool> stop = false;
std::condition_variable cv;
std::thread worker;
bool busy = false;
std::mutex mtx;
private:
void Runner();
public:
[[nodiscard]] bool SetTask(const std::function<void()>& task);
/// @returns false if the thread is busy.
/// @param task The task to run.
/// @note Task is passed by value on purpose so that the original shared_ptr to the task is still good.
[[nodiscard]] bool SetTask(std::shared_ptr<TaskBase> task);
/// @returns false if the thread is busy.
/// @param task The task to run.
/// @note Task is passed by reference because the frame after next is going to copy it anyway.
[[nodiscard]] bool SetTask(const std::function<void()>& task);
/// @returns true if the thread is currently doing work.
/// @note SetTask will return false if the thread is busy.
[[nodiscard]] bool Busy() const { return busy; }
void Join() { if (worker.joinable()) worker.join(); };
/// Blocks the thread join is called from until execution of this thread exits.
/// @note Joining this thread from this thread will raise an exception.
void Join();
public:
/// Create a thread which will initialize and then wait for a task.
Thread() { worker = std::thread([this] { this->Runner(); }); }
/// Crete a thread which will immediately run a task and then wait for another.
explicit Thread(std::shared_ptr<TaskBase> task);
/// Crete a thread which will immediately run a task and then wait for another.
explicit Thread(const std::function<void()>& task);
/// Waits for the current task to finish (if there is one) and destroys the thread.
~Thread();
};

View File

@@ -3,27 +3,51 @@
#include <MultiThreading/Thread.h>
#include <vector>
#include <queue>
namespace MultiThreading {
class ThreadPool;
}
/// A group of threads to run tasks on.
class MultiThreading::ThreadPool {
private:
std::vector<MultiThreading::Thread*> threads;
std::queue<std::shared_ptr<TaskBase>> queue;
std::condition_variable queue_condition;
std::mutex queue_mutex;
bool destructing = false;
private:
/// @returns nullptr if the queue is empty.
std::shared_ptr<TaskBase> Dequeue();
void Runner(const std::shared_ptr<TaskBase>& task);
public:
void Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task);
void Enqueue(const std::function<void()>& task);
/// Set a task to be run on the thread-pool.
/// @param task The task to run.
bool Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task);
/// Set a task to be run on the thread-pool.
/// @param task The task to run.
bool Enqueue(const std::function<void()>& task);
public:
/// @returns The number of threads in the thread pool.
[[nodiscard]] unsigned int ThreadCount() const { return threads.size(); }
[[nodiscard]] unsigned int QueueSize();
/// @returns the number of tasks in the queue
/// @note this excludes the tasks currently being run.
[[nodiscard]] unsigned int PendingTasks();
/// @returns Whether a task you enqueue would have to wait for something else to finish before running.
[[nodiscard]] bool Busy();
/// Uses a condition variable to wait the calling thread until the queue is empty.
void WaitQueueEmpty();
public:
ThreadPool();
explicit ThreadPool(unsigned int thread_count);
/// Constructs a thread-pool with a given number of threads (or hardware_concurrency as the default)
/// @param thread_count The number of threads.
/// @note If you do a *lot* of work on the main thread and hardware_concurrency >= 2, You should use hardware_concurrency -1.
explicit ThreadPool(unsigned int thread_count = std::thread::hardware_concurrency());
/// Waits for the threads to empty the queue and destroys the thread pool.
/// @note This should be one of the very last things your program does before exiting.
~ThreadPool();
};

View File

@@ -1,17 +1,18 @@
#include <MultiThreading/Task.h>
#include <MultiThreading/Thread.h>
#include <MultiThreading/ThreadPool.h>
#include <MultiThreading/CollisionGuard.h>
#include <iostream>
using namespace MultiThreading;
int32_t some_test_func(int32_t hello) {
for (unsigned int i = 0; i < 100; i++)
std::cout << i << std::endl;
return hello;
//void cb() { std::cout << "hi" << std::endl; }
void some_test_func() {
for (unsigned int i = 0; i < 1000000000; i++) {}
std::cout << "task finishes." << std::endl;
}
/*
int main() {
// Each task you create can be run by a thread only once. It's marked as complete after.
// If you're running a lambda or std::function directly on the thread, It can be used multiple times.
@@ -21,8 +22,14 @@ int main() {
// You can start threads in place like this, but you have to wait for the amount of time
// it takes for the thread to start up which in many cases is longer than the job. Use thread pool.
Thread some_thread;
//some_thread.SetTaskCompletionCallback(cb);
some_thread.SetTask(some_task);
//while (!some_task->Complete()) {}
//std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::cout << some_thread.Busy() << std::endl;
// Some work running concurrently with the worker thread.
//for (unsigned int i = 0; i < 10; i++) {}
@@ -32,26 +39,18 @@ int main() {
//std::cout << a << std::endl;
}
*/
/*
int main() {
ThreadPool thread_pool(1);
srand(time(nullptr));
CollisionGuard<int> task_completion_count(0);
auto* thread_pool = new ThreadPool();
auto some_task_1 = Task<void>::Create(([] { return some_test_func(1); }));
auto some_task_2 = Task<void>::Create(([] { return some_test_func(1); }));
auto some_task_3 = Task<void>::Create(([] { return some_test_func(1); }));
auto some_task_4 = Task<void>::Create(([] { return some_test_func(1); }));
for (unsigned int i = 0; i < 128; i++) {
auto some_task = Task<void>::Create([&task_completion_count]() { task_completion_count += 1; }, [&task_completion_count]() { task_completion_count += 1; });
thread_pool->Enqueue(some_task);
}
thread_pool.Enqueue(some_task_1);
thread_pool.Enqueue(some_task_2);
thread_pool.Enqueue(some_task_3);
thread_pool.Enqueue(some_task_4);
while (!some_task_4->Complete()) {}
std::cout << thread_pool.ThreadCount() << std::endl;
//delete thread_pool;
delete thread_pool;
std::cout << "The number of tasks run was " << (int) task_completion_count << std::endl;
}
*/

View File

@@ -1,25 +1,37 @@
#include <MultiThreading/Thread.h>
#include <sstream>
#include <iostream>
#include <utility>
#include <MultiThreading/Thread.h>
using namespace MultiThreading;
Thread::~Thread() {
if (current_task)
while (!current_task->Complete()) {}
std::shared_ptr<TaskBase> task;
{
std::lock_guard<std::mutex> lock(mtx);
task = current_task;
}
if (task)
task->WaitComplete();
stop = true;
cv.notify_one();
Join();
// Thread exits gracefully.
}
bool Thread::SetTask(std::shared_ptr<TaskBase> task) {
if (busy)
std::lock_guard<std::mutex> lock(mtx);
if (Busy())
return false;
{
std::lock_guard<std::mutex> lock(mtx);
current_task = std::move(task);
}
if (current_task)
return false;
busy = true;
current_task = std::move(task);
cv.notify_all();
return true;
}
@@ -30,17 +42,17 @@ bool Thread::SetTask(const std::function<void()>& task) {
void Thread::Runner() {
std::unique_lock<std::mutex> lock(mtx);
while (!stop) {
cv.wait(lock, [this] { return stop || current_task != nullptr; });
if (stop)
break;
busy = true;
auto task = current_task;
lock.unlock();
if (current_task)
current_task->Run();
task->Run();
lock.lock();
@@ -62,3 +74,8 @@ Thread::Thread(const std::function<void()>& task) {
if (!SetTask(task))
throw std::runtime_error("Thread constructor failure.");
}
void Thread::Join() {
if (worker.joinable())
worker.join();
}

View File

@@ -3,32 +3,32 @@
using namespace MultiThreading;
ThreadPool::ThreadPool(unsigned int thread_count) {
for (unsigned int i = 0; i < thread_count; i++)
threads.push_back(new Thread());
}
ThreadPool::ThreadPool() {
for (unsigned int i = 0; i < std::thread::hardware_concurrency(); i++)
threads.push_back(new Thread());
}
bool ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task) {
if (destructing)
return false;
void ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task) {
std::lock_guard<std::mutex> lock(queue_mutex);
// Assign it immediately if there's no wait and a thread open.
if (queue.empty()) {
for (auto *t: threads) {
for (auto* t: threads) {
if (t->Busy())
continue;
if (!t->SetTask( [this, task] (){ Runner(task); } ))
throw std::runtime_error("There was an error while setting up the task to run on the thread.");
return;
throw std::runtime_error("There was a collision while putting the task to the thread.");
return true;
}
}
// Alternatively it goes in-to the queue.
queue.push(task);
return true;
}
std::shared_ptr<TaskBase> ThreadPool::Dequeue() {
@@ -38,33 +38,55 @@ std::shared_ptr<TaskBase> ThreadPool::Dequeue() {
auto task = queue.front();
queue.pop();
if (queue.empty())
queue_condition.notify_all();
return task;
}
void ThreadPool::Enqueue(const std::function<void()>& task) {
Enqueue(Task<void>::Create(task));
bool ThreadPool::Enqueue(const std::function<void()>& task) {
return Enqueue(Task<void>::Create(task));
}
void ThreadPool::Runner(const std::shared_ptr<TaskBase>& task) {
if (!task->Complete())
task->Run();
auto running_task = task;
auto next_task = Dequeue();
if (!next_task)
return;
Runner(next_task);
while (running_task) {
if (!running_task->Complete())
running_task->Run();
running_task = Dequeue();
}
}
unsigned int ThreadPool::QueueSize() {
unsigned int ThreadPool::PendingTasks() {
std::lock_guard<std::mutex> lock(queue_mutex);
return queue.size();
}
ThreadPool::~ThreadPool() {
// Wait for all tasks to be running.
while (QueueSize() != 0) {}
destructing = true;
// Wait for all queued tasks to be running.
WaitQueueEmpty();
// delete t waits for the thread to exit gracefully.
for (auto* t: threads)
delete t;
}
bool ThreadPool::Busy() {
if (PendingTasks() != 0)
return true;
bool all_busy = true;
for (auto* t : threads)
if (!t->Busy()) { all_busy = false; break; }
return all_busy;
}
void ThreadPool::WaitQueueEmpty() {
std::unique_lock<std::mutex> lock(queue_mutex);
queue_condition.wait(lock, [this]{ return queue.empty(); });
}