Compare commits
7 Commits
concurrent
...
main
Author | SHA1 | Date | |
---|---|---|---|
671e46ce66 | |||
140902185f | |||
e2b847665f | |||
d898a60cc1 | |||
c991d4b3ef | |||
af819dc758 | |||
b4b476bd7f |
@@ -9,7 +9,15 @@ file(GLOB_RECURSE HEADERS "include/*.h" "include/*.hpp")
|
||||
file(GLOB_RECURSE SOURCES "src/*.c" "src/*.cpp")
|
||||
set(CMAKE_CXX_STANDARD 20)
|
||||
|
||||
add_library(MultiThreading SHARED ${SOURCES})
|
||||
if (UNIX AND NOT APPLE)
|
||||
add_library(MultiThreading SHARED ${SOURCES} "include/MultiThreading/CollisionGuard.h")
|
||||
endif()
|
||||
|
||||
if(WIN32)
|
||||
add_library(MultiThreading STATIC ${SOURCES} "include/MultiThreading/CollisionGuard.h")
|
||||
endif()
|
||||
|
||||
target_include_directories(MultiThreading PUBLIC ${PROJECT_SOURCE_DIR}/include)
|
||||
|
||||
add_executable(MultiThreadingDemo main.cpp)
|
||||
target_link_libraries(MultiThreadingDemo PUBLIC MultiThreading)
|
||||
|
46
README.md
Normal file
46
README.md
Normal file
@@ -0,0 +1,46 @@
|
||||
# Redacted Software MultiThreading
|
||||
|
||||
Yet Another C++ Threading Library
|
||||
|
||||
[](http://unlicense.org/) 
|
||||
|
||||
## Features
|
||||
|
||||
* Easily utilize maximum parallelization of the hardware.
|
||||
* Overhead < 1% compared to running functions directly.
|
||||
* Thread-safe wrappers for common STL data structures.
|
||||
* Public Domain Source Code.
|
||||
|
||||
## API Overview
|
||||
|
||||
## Types
|
||||
* CollisionGuard
|
||||
* ConcurrentQueue
|
||||
* Task
|
||||
* Thread
|
||||
* ThreadPool
|
||||
|
||||
## Usage
|
||||
|
||||
Using a thread-pool to execute a large set of tasks, which will utilize as many hardware threads as possible.
|
||||
|
||||
## Requirements
|
||||
|
||||
* C++ 20
|
||||
|
||||
## Compatibility
|
||||
|
||||
* Linux (gcc 13.x or newer)
|
||||
* Windows 10 (1909 or newer)
|
||||
|
||||
## Documentation
|
||||
|
||||
Documentation is currently limited to source-code annotations, and the demo program.
|
||||
|
||||
## Contributing
|
||||
|
||||
Contributions to MultiThreading are welcome! If you find a bug, have a feature request, or would like to contribute code, please submit an issue or pull request.
|
||||
|
||||
## Acknowledgements
|
||||
|
||||
Developed & Maintained by William @ Redacted Software and contributors.
|
113
include/MultiThreading/CollisionGuard.h
Normal file
113
include/MultiThreading/CollisionGuard.h
Normal file
@@ -0,0 +1,113 @@
|
||||
#pragma once
|
||||
|
||||
#include <concepts>
|
||||
#include <shared_mutex>
|
||||
|
||||
namespace MultiThreading {
|
||||
template <typename T>
|
||||
class CollisionGuard;
|
||||
}
|
||||
template <typename T>
|
||||
class MultiThreading::CollisionGuard {
|
||||
private:
|
||||
mutable std::shared_mutex mutex;
|
||||
T value;
|
||||
public:
|
||||
|
||||
class Read {
|
||||
private:
|
||||
std::shared_lock<std::shared_mutex> lock;
|
||||
const T* t_ptr;
|
||||
public:
|
||||
Read(const std::shared_mutex& mtx, const T& val) : t_ptr(&val), lock(mtx) {}
|
||||
const T* operator->() const { return t_ptr; }
|
||||
};
|
||||
|
||||
class Write {
|
||||
private:
|
||||
std::unique_lock<std::shared_mutex> lock;
|
||||
T* t_ptr;
|
||||
public:
|
||||
Write(std::shared_mutex& mtx, T& val) : t_ptr(&val), lock(mtx) {}
|
||||
T* operator->() { return t_ptr; }
|
||||
};
|
||||
public:
|
||||
Read operator->() const { return Read(mutex, value); }
|
||||
Write operator->() { return Write(mutex, value); }
|
||||
public:
|
||||
template <typename U = T>
|
||||
requires requires(const U& a, const U& b) { a + b; }
|
||||
T operator +(const T& rhs) const {
|
||||
std::shared_lock lock(mutex);
|
||||
return value + rhs;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(U& a, const U& b) { a = b; }
|
||||
CollisionGuard& operator =(const T& rhs) {
|
||||
std::unique_lock lock(mutex);
|
||||
value = rhs;
|
||||
return *this;
|
||||
}
|
||||
|
||||
explicit operator T() const requires std::copy_constructible<T> {
|
||||
std::shared_lock lock(mutex);
|
||||
return value;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(U& a, const U& b) { a += b; }
|
||||
CollisionGuard& operator +=(const T& rhs) {
|
||||
std::unique_lock lock(mutex);
|
||||
value += rhs;
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(U& a, const U& b) { a -= b; }
|
||||
CollisionGuard& operator -=(const T& rhs) {
|
||||
std::unique_lock lock(mutex);
|
||||
value -= rhs;
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(U& a, const U& b) { a *= b; }
|
||||
CollisionGuard& operator *=(const T& rhs) {
|
||||
std::unique_lock lock(mutex);
|
||||
value *= rhs;
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(U& a, const U& b) { a /= b; }
|
||||
CollisionGuard& operator /=(const T& rhs) {
|
||||
std::unique_lock lock(mutex);
|
||||
value /= rhs;
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(const U& a, const U& b) { a - b; }
|
||||
T operator -(const T& rhs) const {
|
||||
std::shared_lock lock(mutex);
|
||||
return value - rhs;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(const U& a, const U& b) { a * b; }
|
||||
T operator *(const T& rhs) const {
|
||||
std::shared_lock lock(mutex);
|
||||
return value * rhs;
|
||||
}
|
||||
|
||||
template <typename U = T>
|
||||
requires requires(const U& a, const U& b) { a / b; }
|
||||
T operator /(const T& rhs) const {
|
||||
std::shared_lock lock(mutex);
|
||||
return value / rhs;
|
||||
}
|
||||
public:
|
||||
template <typename... Args>
|
||||
explicit CollisionGuard(Args&&... args) : value(std::forward<Args>(args)...) {}
|
||||
};
|
@@ -1,16 +1,3 @@
|
||||
/// CaveGame - A procedural 2D platformer sandbox.
|
||||
/// Created by Josh O'Leary @ Redacted Software, 2020-2024
|
||||
/// Contact: josh@redacted.cc
|
||||
/// Contributors: william@redacted.cc maxi@redacted.cc
|
||||
/// This work is dedicated to the public domain.
|
||||
|
||||
/// @file ConcurrentQueue.hpp
|
||||
/// @desc A simple C++11 Concurrent Queue based on std::queue.
|
||||
/// @ref https://github.com/jlim262/concurrent_queue
|
||||
/// @edit 11/11/2024
|
||||
/// @auth JongYoon Lim
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <queue>
|
||||
@@ -22,116 +9,115 @@
|
||||
/// A simple C++11 Concurrent Queue based on std::queue.
|
||||
/// Supports waiting operations for retrieving an element when it's empty.
|
||||
/// It's interrupted by calling interrupt.
|
||||
template <typename T, typename Container = std::deque<T> >
|
||||
class ConcurrentQueue
|
||||
{
|
||||
public:
|
||||
typedef typename std::queue<T>::size_type size_type;
|
||||
typedef typename std::queue<T>::reference reference;
|
||||
typedef typename std::queue<T>::const_reference const_reference;
|
||||
|
||||
~ConcurrentQueue() {
|
||||
interrupt();
|
||||
}
|
||||
namespace MultiThreading {
|
||||
|
||||
void interrupt()
|
||||
{
|
||||
interrupted = true;
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
template<typename T, typename Container = std::deque<T> >
|
||||
class ConcurrentQueue {
|
||||
public:
|
||||
typedef typename std::queue<T>::size_type size_type;
|
||||
typedef typename std::queue<T>::reference reference;
|
||||
typedef typename std::queue<T>::const_reference const_reference;
|
||||
|
||||
bool contains(const T& value)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
std::queue<T, Container> copy = queue;
|
||||
while(!copy.empty())
|
||||
if (copy.front() == value)
|
||||
return true;
|
||||
else
|
||||
copy.pop();
|
||||
return false;
|
||||
}
|
||||
|
||||
void push(const T& e)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
queue.push(e);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
void emplace(Args&&... args)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
queue.emplace(std::forward<Args>(args)...);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool empty() {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.empty();
|
||||
}
|
||||
|
||||
void pop() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
if (!queue.empty())
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
void front_pop(T& ret) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
ret = queue.front();
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
size_type size() const {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
reference front() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}
|
||||
|
||||
/*const_reference front() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}*/
|
||||
|
||||
reference back() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}
|
||||
|
||||
/*const_reference back() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}*/
|
||||
|
||||
void swap(ConcurrentQueue& q) {
|
||||
throw std::runtime_error("Not supported");
|
||||
}
|
||||
|
||||
protected:
|
||||
std::queue<T, Container> queue;
|
||||
std::mutex mutex;
|
||||
std::condition_variable condition_variable;
|
||||
std::atomic_bool interrupted;
|
||||
|
||||
|
||||
private:
|
||||
void wait(std::unique_lock<std::mutex>& lock) {
|
||||
interrupted = false;
|
||||
while (queue.empty()) {
|
||||
condition_variable.wait(lock);
|
||||
if (interrupted)
|
||||
throw std::runtime_error("Interrupted");
|
||||
~ConcurrentQueue() {
|
||||
interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void interrupt() {
|
||||
interrupted = true;
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool contains(const T &value) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
std::queue<T, Container> copy = queue;
|
||||
while (!copy.empty())
|
||||
if (copy.front() == value)
|
||||
return true;
|
||||
else
|
||||
copy.pop();
|
||||
return false;
|
||||
}
|
||||
|
||||
void push(const T &e) {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
queue.push(e);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
void emplace(Args &&... args) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
queue.emplace(std::forward<Args>(args)...);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool empty() {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.empty();
|
||||
}
|
||||
|
||||
void pop() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
if (!queue.empty())
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
void front_pop(T &ret) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
ret = queue.front();
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
size_type size() const {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
reference front() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}
|
||||
|
||||
/*const_reference front() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}*/
|
||||
|
||||
reference back() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}
|
||||
|
||||
/*const_reference back() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}*/
|
||||
|
||||
void swap(ConcurrentQueue &q) {
|
||||
throw std::runtime_error("Not supported");
|
||||
}
|
||||
|
||||
protected:
|
||||
std::queue<T, Container> queue;
|
||||
std::mutex mutex;
|
||||
std::condition_variable condition_variable;
|
||||
std::atomic_bool interrupted;
|
||||
|
||||
|
||||
private:
|
||||
void wait(std::unique_lock<std::mutex> &lock) {
|
||||
interrupted = false;
|
||||
while (queue.empty()) {
|
||||
condition_variable.wait(lock);
|
||||
if (interrupted)
|
||||
throw std::runtime_error("Interrupted");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
@@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <atomic>
|
||||
@@ -14,13 +15,22 @@ namespace MultiThreading {
|
||||
|
||||
class MultiThreading::TaskBase {
|
||||
protected:
|
||||
std::function<void()> complete_callback = nullptr;
|
||||
std::condition_variable completed_condition;
|
||||
std::atomic<bool> complete = false;
|
||||
std::mutex mtx;
|
||||
void MarkTaskComplete() { std::lock_guard<std::mutex> lock(mtx); complete = true; completed_condition.notify_all(); }
|
||||
protected:
|
||||
explicit TaskBase(std::function<void()> task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {};
|
||||
public:
|
||||
/// @returns whether the task is finished.
|
||||
[[nodiscard]] bool Complete() const { return complete; }
|
||||
|
||||
/// Condition variable waits for the task to be complete
|
||||
/// @note There is no way to know if the task has been assigned, If you do this and that never happens the calling thread will wait forever.
|
||||
void WaitComplete() { std::unique_lock<std::mutex> lock(mtx); completed_condition.wait(lock, [this]() { return complete.load(); }); }
|
||||
|
||||
virtual void Run() = 0;
|
||||
public:
|
||||
TaskBase() = default;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
@@ -29,15 +39,17 @@ private:
|
||||
T* result = nullptr;
|
||||
std::function<T()> callable = nullptr;
|
||||
private:
|
||||
explicit Task(std::function<T()> callable, T* result = nullptr) : TaskBase(), result(result), callable(callable) {}
|
||||
explicit Task(std::function<T()> callable, std::function<void()> complete_callback = nullptr, T* result = nullptr) : TaskBase(complete_callback), result(result), callable(std::move(callable)) {}
|
||||
public:
|
||||
void Run() final { result ? *result = callable() : callable(); complete = true; }
|
||||
void Run() final { result ? *result = callable() : callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); }
|
||||
public:
|
||||
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
|
||||
/// @param callable The function to run, *usually a lambda or std::bind*
|
||||
/// @param complete_callback A void callable to be run after the task.
|
||||
/// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it.
|
||||
/// @note this is shared_ptr so you don't have to delete it.
|
||||
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, result)); };
|
||||
/// @note If result is stack allocated your program will probably explode.
|
||||
/// @note this is shared_ptr so that you don't have to delete it.
|
||||
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, const std::function<void()>& complete_callback = nullptr, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, complete_callback, result)); };
|
||||
~Task() = default;
|
||||
};
|
||||
|
||||
@@ -46,13 +58,14 @@ template <>
|
||||
class MultiThreading::Task<void> : public TaskBase {
|
||||
private:
|
||||
std::function<void()> callable = nullptr;
|
||||
private:
|
||||
explicit Task(std::function<void()> callable) : TaskBase(), callable(std::move(callable)) {}
|
||||
explicit Task(std::function<void()> callable, std::function<void()> complete_callback = nullptr) : TaskBase(std::move(complete_callback)), callable(std::move(callable)) {}
|
||||
public:
|
||||
void Run() final { callable(); complete = true; }
|
||||
void Run() final { callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); }
|
||||
|
||||
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
|
||||
/// @param callable The function to run, *usually a lambda or std::bind*
|
||||
/// @note this is shared_ptr so you don't have to delete it.
|
||||
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable) { return std::shared_ptr<Task<void>>(new Task<void>(callable)); }
|
||||
/// @param complete_callback A void callable to be run after the task.
|
||||
/// @note this is shared_ptr so that you don't have to delete it.
|
||||
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable, const std::function<void()>& complete_callback = nullptr) { return std::shared_ptr<Task<void>>(new Task<void>(callable, complete_callback)); }
|
||||
~Task() = default;
|
||||
};
|
@@ -1,11 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <thread>
|
||||
#include <functional>
|
||||
#include <stdexcept>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
#include <MultiThreading/Task.h>
|
||||
|
||||
namespace MultiThreading {
|
||||
@@ -18,18 +13,35 @@ private:
|
||||
std::atomic<bool> stop = false;
|
||||
std::condition_variable cv;
|
||||
std::thread worker;
|
||||
std::mutex mtx;
|
||||
bool busy = false;
|
||||
std::mutex mtx;
|
||||
private:
|
||||
void Runner();
|
||||
public:
|
||||
[[nodiscard]] bool SetTask(const std::function<void()>& task);
|
||||
/// @returns false if the thread is busy.
|
||||
/// @param task The task to run.
|
||||
/// @note Task is passed by value on purpose so that the original shared_ptr to the task is still good.
|
||||
[[nodiscard]] bool SetTask(std::shared_ptr<TaskBase> task);
|
||||
|
||||
/// @returns false if the thread is busy.
|
||||
/// @param task The task to run.
|
||||
/// @note Task is passed by reference because the frame after next is going to copy it anyway.
|
||||
[[nodiscard]] bool SetTask(const std::function<void()>& task);
|
||||
|
||||
/// @returns true if the thread is currently doing work.
|
||||
/// @note SetTask will return false if the thread is busy.
|
||||
[[nodiscard]] bool Busy() const { return busy; }
|
||||
void Join() { if (worker.joinable()) worker.join(); };
|
||||
|
||||
/// Blocks the thread join is called from until execution of this thread exits.
|
||||
/// @note Joining this thread from this thread will raise an exception.
|
||||
void Join();
|
||||
public:
|
||||
/// Create a thread which will initialize and then wait for a task.
|
||||
Thread() { worker = std::thread([this] { this->Runner(); }); }
|
||||
/// Crete a thread which will immediately run a task and then wait for another.
|
||||
explicit Thread(std::shared_ptr<TaskBase> task);
|
||||
/// Crete a thread which will immediately run a task and then wait for another.
|
||||
explicit Thread(const std::function<void()>& task);
|
||||
/// Waits for the current task to finish (if there is one) and destroys the thread.
|
||||
~Thread();
|
||||
};
|
@@ -3,28 +3,51 @@
|
||||
#include <MultiThreading/Thread.h>
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
|
||||
namespace MultiThreading {
|
||||
class ThreadPool;
|
||||
}
|
||||
|
||||
/// A group of threads to run tasks on.
|
||||
class MultiThreading::ThreadPool {
|
||||
private:
|
||||
std::vector<MultiThreading::Thread*> threads;
|
||||
std::queue<std::shared_ptr<TaskBase>> queue;
|
||||
std::condition_variable queue_condition;
|
||||
std::mutex queue_mutex;
|
||||
bool destructing = false;
|
||||
private:
|
||||
/// @returns nullptr if the queue is empty.
|
||||
std::shared_ptr<TaskBase> Dequeue();
|
||||
void Runner(const std::shared_ptr<TaskBase>& task);
|
||||
public:
|
||||
void Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task);
|
||||
void Enqueue(const std::function<void()>& task);
|
||||
/// Set a task to be run on the thread-pool.
|
||||
/// @param task The task to run.
|
||||
bool Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task);
|
||||
|
||||
/// Set a task to be run on the thread-pool.
|
||||
/// @param task The task to run.
|
||||
bool Enqueue(const std::function<void()>& task);
|
||||
public:
|
||||
/// @returns The number of threads in the thread pool.
|
||||
[[nodiscard]] unsigned int ThreadCount() const { return threads.size(); }
|
||||
|
||||
/// @returns the number of tasks in the queue
|
||||
/// @note this excludes the tasks currently being run.
|
||||
[[nodiscard]] unsigned int PendingTasks();
|
||||
/// @returns Whether a task you enqueue would have to wait.
|
||||
|
||||
/// @returns Whether a task you enqueue would have to wait for something else to finish before running.
|
||||
[[nodiscard]] bool Busy();
|
||||
|
||||
/// Uses a condition variable to wait the calling thread until the queue is empty.
|
||||
void WaitQueueEmpty();
|
||||
public:
|
||||
/// Constructs a thread-pool with a given number of threads (or hardware_concurrency as the default)
|
||||
/// @param thread_count The number of threads.
|
||||
/// @note If you do a *lot* of work on the main thread and hardware_concurrency >= 2, You should use hardware_concurrency -1.
|
||||
explicit ThreadPool(unsigned int thread_count = std::thread::hardware_concurrency());
|
||||
|
||||
/// Waits for the threads to empty the queue and destroys the thread pool.
|
||||
/// @note This should be one of the very last things your program does before exiting.
|
||||
~ThreadPool();
|
||||
};
|
23
main.cpp
23
main.cpp
@@ -1,19 +1,15 @@
|
||||
#include <MultiThreading/Task.h>
|
||||
#include <MultiThreading/Thread.h>
|
||||
#include <MultiThreading/ThreadPool.h>
|
||||
#include <MultiThreading/CollisionGuard.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
using namespace MultiThreading;
|
||||
int32_t some_test_func(int32_t hello) {
|
||||
for (unsigned int i = 0; i < 1000000000; i++) {}
|
||||
std::cout << "task " << hello << " finishes." << std::endl;
|
||||
return rand();
|
||||
}
|
||||
|
||||
void cb(Thread* thread, std::shared_ptr<TaskBase> task) {
|
||||
std::cout << thread->Busy() << std::endl;
|
||||
std::cout << task->Complete() << std::endl;
|
||||
//void cb() { std::cout << "hi" << std::endl; }
|
||||
void some_test_func() {
|
||||
for (unsigned int i = 0; i < 1000000000; i++) {}
|
||||
std::cout << "task finishes." << std::endl;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -45,19 +41,16 @@ int main() {
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
|
||||
int main() {
|
||||
srand(time(nullptr));
|
||||
|
||||
int32_t task_completion_count = 0;
|
||||
CollisionGuard<int> task_completion_count(0);
|
||||
auto* thread_pool = new ThreadPool();
|
||||
|
||||
for (unsigned int i = 0; i < 128; i++) {
|
||||
auto some_task = Task<int32_t>::Create(([i] { return some_test_func(i + 1); }), &task_completion_count);
|
||||
auto some_task = Task<void>::Create([&task_completion_count]() { task_completion_count += 1; }, [&task_completion_count]() { task_completion_count += 1; });
|
||||
thread_pool->Enqueue(some_task);
|
||||
}
|
||||
|
||||
delete thread_pool;
|
||||
std::cout << "The returned random value was: " << task_completion_count << std::endl;
|
||||
std::cout << "The number of tasks run was " << (int) task_completion_count << std::endl;
|
||||
}
|
@@ -1,16 +1,23 @@
|
||||
#include <MultiThreading/Thread.h>
|
||||
#include <utility>
|
||||
#include <MultiThreading/Thread.h>
|
||||
|
||||
|
||||
using namespace MultiThreading;
|
||||
|
||||
Thread::~Thread() {
|
||||
if (current_task)
|
||||
while (!current_task->Complete()) {}
|
||||
stop = true;
|
||||
std::shared_ptr<TaskBase> task;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mtx);
|
||||
task = current_task;
|
||||
}
|
||||
|
||||
if (task)
|
||||
task->WaitComplete();
|
||||
|
||||
stop = true;
|
||||
cv.notify_one();
|
||||
Join();
|
||||
// Thread exits gracefully.
|
||||
}
|
||||
|
||||
bool Thread::SetTask(std::shared_ptr<TaskBase> task) {
|
||||
@@ -42,9 +49,10 @@ void Thread::Runner() {
|
||||
if (stop)
|
||||
break;
|
||||
|
||||
auto task = current_task;
|
||||
lock.unlock();
|
||||
|
||||
current_task->Run();
|
||||
task->Run();
|
||||
|
||||
lock.lock();
|
||||
|
||||
@@ -66,3 +74,8 @@ Thread::Thread(const std::function<void()>& task) {
|
||||
if (!SetTask(task))
|
||||
throw std::runtime_error("Thread constructor failure.");
|
||||
}
|
||||
|
||||
void Thread::Join() {
|
||||
if (worker.joinable())
|
||||
worker.join();
|
||||
}
|
||||
|
@@ -3,11 +3,15 @@
|
||||
using namespace MultiThreading;
|
||||
|
||||
ThreadPool::ThreadPool(unsigned int thread_count) {
|
||||
|
||||
for (unsigned int i = 0; i < thread_count; i++)
|
||||
threads.push_back(new Thread());
|
||||
}
|
||||
|
||||
void ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task) {
|
||||
bool ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task) {
|
||||
if (destructing)
|
||||
return false;
|
||||
|
||||
std::lock_guard<std::mutex> lock(queue_mutex);
|
||||
|
||||
// Assign it immediately if there's no wait and a thread open.
|
||||
@@ -18,12 +22,13 @@ void ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task)
|
||||
|
||||
if (!t->SetTask( [this, task] (){ Runner(task); } ))
|
||||
throw std::runtime_error("There was a collision while putting the task to the thread.");
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Alternatively it goes in-to the queue.
|
||||
queue.push(task);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<TaskBase> ThreadPool::Dequeue() {
|
||||
@@ -33,11 +38,15 @@ std::shared_ptr<TaskBase> ThreadPool::Dequeue() {
|
||||
|
||||
auto task = queue.front();
|
||||
queue.pop();
|
||||
|
||||
if (queue.empty())
|
||||
queue_condition.notify_all();
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
void ThreadPool::Enqueue(const std::function<void()>& task) {
|
||||
Enqueue(Task<void>::Create(task));
|
||||
bool ThreadPool::Enqueue(const std::function<void()>& task) {
|
||||
return Enqueue(Task<void>::Create(task));
|
||||
}
|
||||
|
||||
void ThreadPool::Runner(const std::shared_ptr<TaskBase>& task) {
|
||||
@@ -57,9 +66,9 @@ unsigned int ThreadPool::PendingTasks() {
|
||||
}
|
||||
|
||||
ThreadPool::~ThreadPool() {
|
||||
destructing = true;
|
||||
// Wait for all queued tasks to be running.
|
||||
// TODO avoid spin-loop here.
|
||||
while (PendingTasks() != 0) {}
|
||||
WaitQueueEmpty();
|
||||
|
||||
// delete t waits for the thread to exit gracefully.
|
||||
for (auto* t: threads)
|
||||
@@ -76,3 +85,8 @@ bool ThreadPool::Busy() {
|
||||
|
||||
return all_busy;
|
||||
}
|
||||
|
||||
void ThreadPool::WaitQueueEmpty() {
|
||||
std::unique_lock<std::mutex> lock(queue_mutex);
|
||||
queue_condition.wait(lock, [this]{ return queue.empty(); });
|
||||
}
|
||||
|
Reference in New Issue
Block a user