diff --git a/README.md b/README.md new file mode 100644 index 0000000..c7b9ba8 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# Redacted Software MultiThreading + +Yet Another C++ Threading Library + +[![License: Unlicense](https://img.shields.io/badge/license-Unlicense-blue.svg)](http://unlicense.org/) ![Static Badge](https://img.shields.io/badge/Lit-Based-%20) + +## Features + +* Easily utilize maximum parallelization of the hardware. +* Overhead < 1% compared to running functions directly. +* Thread-safe wrappers for common STL data structures. +* Public Domain Source Code. + +## API Overview + +## Types +* CollisionGuard +* ConcurrentQueue +* Task +* Thread +* ThreadPool + +## Usage + +Using a thread-pool to execute a large set of tasks, which will utilize as many hardware threads as possible. + +## Requirements + +* C++ 20 + +## Compatibility + +* Linux (gcc 13.x or newer) +* Windows 10 (1909 or newer) + +## Documentation + +Documentation is currently limited to source-code annotations, and the demo program. + +## Contributing + +Contributions to MultiThreading are welcome! If you find a bug, have a feature request, or would like to contribute code, please submit an issue or pull request. + +## Acknowledgements + +Developed & Maintained by William @ Redacted Software and contributors. \ No newline at end of file diff --git a/include/MultiThreading/CollisionGuard.h b/include/MultiThreading/CollisionGuard.h new file mode 100644 index 0000000..f627cd9 --- /dev/null +++ b/include/MultiThreading/CollisionGuard.h @@ -0,0 +1,39 @@ +#pragma once + +#include + +namespace MultiThreading { + template + class CollisionGuard; +} +template +class MultiThreading::CollisionGuard { +private: + mutable std::shared_mutex mutex; + T value; +public: + + class Read { + private: + std::shared_lock lock; + const T* t_ptr; + public: + Read(const std::shared_mutex& mtx, const T& val) : t_ptr(&val), lock(mtx) {} + const T* operator->() const { return t_ptr; } + }; + + class Write { + private: + std::unique_lock lock; + T* t_ptr; + public: + Write(std::shared_mutex& mtx, T& val) : t_ptr(&val), lock(mtx) {} + T* operator->() { return t_ptr; } + }; +public: + Read operator->() const { return Read(mutex, value); } + Write operator->() { return Write(mutex, value); } + + template + explicit CollisionGuard(Args&&... args) : value(std::forward(args)...) {} +}; \ No newline at end of file diff --git a/include/MultiThreading/Task.h b/include/MultiThreading/Task.h index 93d733e..b74f1d6 100644 --- a/include/MultiThreading/Task.h +++ b/include/MultiThreading/Task.h @@ -20,6 +20,8 @@ protected: std::atomic complete = false; std::mutex mtx; void MarkTaskComplete() { std::lock_guard lock(mtx); complete = true; completed_condition.notify_all(); } +protected: + explicit TaskBase(std::function task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {}; public: /// @returns whether the task is finished. [[nodiscard]] bool Complete() const { return complete; } @@ -29,8 +31,6 @@ public: void WaitComplete() { std::unique_lock lock(mtx); completed_condition.wait(lock, [this]() { return complete.load(); }); } virtual void Run() = 0; -public: - explicit TaskBase(std::function task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {}; }; template @@ -45,8 +45,10 @@ public: public: /// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs. /// @param callable The function to run, *usually a lambda or std::bind* + /// @param complete_callback A void callable to be run after the task. /// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it. - /// @note this is shared_ptr so you don't have to delete it. + /// @note If result is stack allocated your program will probably explode. + /// @note this is shared_ptr so that you don't have to delete it. static std::shared_ptr> Create(std::function callable, const std::function& complete_callback = nullptr, T* result = nullptr) { return std::shared_ptr>(new Task(callable, complete_callback, result)); }; ~Task() = default; }; @@ -62,7 +64,8 @@ public: /// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs. /// @param callable The function to run, *usually a lambda or std::bind* - /// @note this is shared_ptr so you don't have to delete it. + /// @param complete_callback A void callable to be run after the task. + /// @note this is shared_ptr so that you don't have to delete it. static std::shared_ptr> Create(const std::function& callable, const std::function& complete_callback = nullptr) { return std::shared_ptr>(new Task(callable, complete_callback)); } ~Task() = default; }; \ No newline at end of file diff --git a/include/MultiThreading/ThreadPool.h b/include/MultiThreading/ThreadPool.h index 64c739b..b11dd51 100644 --- a/include/MultiThreading/ThreadPool.h +++ b/include/MultiThreading/ThreadPool.h @@ -15,6 +15,7 @@ private: std::queue> queue; std::condition_variable queue_condition; std::mutex queue_mutex; + bool destructing = false; private: /// @returns nullptr if the queue is empty. std::shared_ptr Dequeue(); @@ -22,11 +23,11 @@ private: public: /// Set a task to be run on the thread-pool. /// @param task The task to run. - void Enqueue(const std::shared_ptr& task); + bool Enqueue(const std::shared_ptr& task); /// Set a task to be run on the thread-pool. /// @param task The task to run. - void Enqueue(const std::function& task); + bool Enqueue(const std::function& task); public: /// @returns The number of threads in the thread pool. [[nodiscard]] unsigned int ThreadCount() const { return threads.size(); } @@ -35,7 +36,7 @@ public: /// @note this excludes the tasks currently being run. [[nodiscard]] unsigned int PendingTasks(); - /// @returns Whether a task you enqueue would have to wait for something else to finish. + /// @returns Whether a task you enqueue would have to wait for something else to finish before running. [[nodiscard]] bool Busy(); /// Uses a condition variable to wait the calling thread until the queue is empty. diff --git a/main.cpp b/main.cpp index f782ec0..10b441c 100644 --- a/main.cpp +++ b/main.cpp @@ -1,12 +1,12 @@ #include -#include #include +#include #include using namespace MultiThreading; -void cb() { std::cout << "hi" << std::endl; } +//void cb() { std::cout << "hi" << std::endl; } int32_t some_test_func(int32_t hello) { for (unsigned int i = 0; i < 1000000000; i++) {} std::cout << "task " << hello << " finishes." << std::endl; @@ -42,17 +42,14 @@ int main() { } */ - - int main() { srand(time(nullptr)); int32_t task_completion_count = 0; - auto* thread_pool = new ThreadPool(16); - + auto* thread_pool = new ThreadPool(); for (unsigned int i = 0; i < 128; i++) { - auto some_task = Task::Create(([i] { return some_test_func(i); }), cb, &task_completion_count); + auto some_task = Task::Create(([i] { return some_test_func(i); }), nullptr, &task_completion_count); thread_pool->Enqueue(some_task); } diff --git a/src/Thread.cpp b/src/Thread.cpp index ab84fd6..d30ff90 100644 --- a/src/Thread.cpp +++ b/src/Thread.cpp @@ -1,5 +1,6 @@ -#include #include +#include + using namespace MultiThreading; diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index ba56fe4..299a181 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -8,7 +8,10 @@ ThreadPool::ThreadPool(unsigned int thread_count) { threads.push_back(new Thread()); } -void ThreadPool::Enqueue(const std::shared_ptr& task) { +bool ThreadPool::Enqueue(const std::shared_ptr& task) { + if (destructing) + return false; + std::lock_guard lock(queue_mutex); // Assign it immediately if there's no wait and a thread open. @@ -19,12 +22,13 @@ void ThreadPool::Enqueue(const std::shared_ptr& task) if (!t->SetTask( [this, task] (){ Runner(task); } )) throw std::runtime_error("There was a collision while putting the task to the thread."); - return; + return true; } } // Alternatively it goes in-to the queue. queue.push(task); + return true; } std::shared_ptr ThreadPool::Dequeue() { @@ -41,8 +45,8 @@ std::shared_ptr ThreadPool::Dequeue() { return task; } -void ThreadPool::Enqueue(const std::function& task) { - Enqueue(Task::Create(task)); +bool ThreadPool::Enqueue(const std::function& task) { + return Enqueue(Task::Create(task)); } void ThreadPool::Runner(const std::shared_ptr& task) { @@ -62,6 +66,7 @@ unsigned int ThreadPool::PendingTasks() { } ThreadPool::~ThreadPool() { + destructing = true; // Wait for all queued tasks to be running. WaitQueueEmpty();