diff --git a/include/MultiThreading/ConcurrentQueue.h b/include/MultiThreading/ConcurrentQueue.h index 06bcaca..a85b8cb 100644 --- a/include/MultiThreading/ConcurrentQueue.h +++ b/include/MultiThreading/ConcurrentQueue.h @@ -1,16 +1,3 @@ -/// CaveGame - A procedural 2D platformer sandbox. -/// Created by Josh O'Leary @ Redacted Software, 2020-2024 -/// Contact: josh@redacted.cc -/// Contributors: william@redacted.cc maxi@redacted.cc -/// This work is dedicated to the public domain. - -/// @file ConcurrentQueue.hpp -/// @desc A simple C++11 Concurrent Queue based on std::queue. -/// @ref https://github.com/jlim262/concurrent_queue -/// @edit 11/11/2024 -/// @auth JongYoon Lim - - #pragma once #include @@ -22,116 +9,115 @@ /// A simple C++11 Concurrent Queue based on std::queue. /// Supports waiting operations for retrieving an element when it's empty. /// It's interrupted by calling interrupt. -template > -class ConcurrentQueue -{ -public: - typedef typename std::queue::size_type size_type; - typedef typename std::queue::reference reference; - typedef typename std::queue::const_reference const_reference; - ~ConcurrentQueue() { - interrupt(); - } +namespace MultiThreading { - void interrupt() - { - interrupted = true; - condition_variable.notify_one(); - } + template > + class ConcurrentQueue { + public: + typedef typename std::queue::size_type size_type; + typedef typename std::queue::reference reference; + typedef typename std::queue::const_reference const_reference; - bool contains(const T& value) - { - std::unique_lock lock(this->mutex); - std::queue copy = queue; - while(!copy.empty()) - if (copy.front() == value) - return true; - else - copy.pop(); - return false; - } - - void push(const T& e) - { - std::unique_lock lock(mutex); - queue.push(e); - condition_variable.notify_one(); - } - - template - void emplace(Args&&... args) - { - std::unique_lock lock(this->mutex); - queue.emplace(std::forward(args)...); - condition_variable.notify_one(); - } - - bool empty() { - //std::unique_lock lock(this->mutex); - return queue.empty(); - } - - void pop() { - std::unique_lock lock(this->mutex); - if (!queue.empty()) - queue.pop(); - } - - void front_pop(T& ret) { - std::unique_lock lock(this->mutex); - wait(lock); - ret = queue.front(); - queue.pop(); - } - - size_type size() const { - //std::unique_lock lock(this->mutex); - return queue.size(); - } - - reference front() { - std::unique_lock lock(this->mutex); - wait(lock); - return queue.front(); - } - - /*const_reference front() const { - std::unique_lock lock(this->mutex); - wait(lock); - return queue.front(); - }*/ - - reference back() { - std::unique_lock lock(this->mutex); - wait(lock); - return queue.back(); - } - - /*const_reference back() const { - std::unique_lock lock(this->mutex); - wait(lock); - return queue.back(); - }*/ - - void swap(ConcurrentQueue& q) { - throw std::runtime_error("Not supported"); - } - -protected: - std::queue queue; - std::mutex mutex; - std::condition_variable condition_variable; - std::atomic_bool interrupted; - - -private: - void wait(std::unique_lock& lock) { - interrupted = false; - while (queue.empty()) { - condition_variable.wait(lock); - if (interrupted) - throw std::runtime_error("Interrupted"); + ~ConcurrentQueue() { + interrupt(); } - } -}; \ No newline at end of file + + void interrupt() { + interrupted = true; + condition_variable.notify_one(); + } + + bool contains(const T &value) { + std::unique_lock lock(this->mutex); + std::queue copy = queue; + while (!copy.empty()) + if (copy.front() == value) + return true; + else + copy.pop(); + return false; + } + + void push(const T &e) { + std::unique_lock lock(mutex); + queue.push(e); + condition_variable.notify_one(); + } + + template + void emplace(Args &&... args) { + std::unique_lock lock(this->mutex); + queue.emplace(std::forward(args)...); + condition_variable.notify_one(); + } + + bool empty() { + //std::unique_lock lock(this->mutex); + return queue.empty(); + } + + void pop() { + std::unique_lock lock(this->mutex); + if (!queue.empty()) + queue.pop(); + } + + void front_pop(T &ret) { + std::unique_lock lock(this->mutex); + wait(lock); + ret = queue.front(); + queue.pop(); + } + + size_type size() const { + //std::unique_lock lock(this->mutex); + return queue.size(); + } + + reference front() { + std::unique_lock lock(this->mutex); + wait(lock); + return queue.front(); + } + + /*const_reference front() const { + std::unique_lock lock(this->mutex); + wait(lock); + return queue.front(); + }*/ + + reference back() { + std::unique_lock lock(this->mutex); + wait(lock); + return queue.back(); + } + + /*const_reference back() const { + std::unique_lock lock(this->mutex); + wait(lock); + return queue.back(); + }*/ + + void swap(ConcurrentQueue &q) { + throw std::runtime_error("Not supported"); + } + + protected: + std::queue queue; + std::mutex mutex; + std::condition_variable condition_variable; + std::atomic_bool interrupted; + + + private: + void wait(std::unique_lock &lock) { + interrupted = false; + while (queue.empty()) { + condition_variable.wait(lock); + if (interrupted) + throw std::runtime_error("Interrupted"); + } + } + }; +} \ No newline at end of file diff --git a/include/MultiThreading/Task.h b/include/MultiThreading/Task.h index 5f9de74..93d733e 100644 --- a/include/MultiThreading/Task.h +++ b/include/MultiThreading/Task.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace MultiThreading { class TaskBase; @@ -14,6 +15,7 @@ namespace MultiThreading { class MultiThreading::TaskBase { protected: + std::function complete_callback = nullptr; std::condition_variable completed_condition; std::atomic complete = false; std::mutex mtx; @@ -28,7 +30,7 @@ public: virtual void Run() = 0; public: - TaskBase() = default; + explicit TaskBase(std::function task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {}; }; template @@ -37,15 +39,15 @@ private: T* result = nullptr; std::function callable = nullptr; private: - explicit Task(std::function callable, T* result = nullptr) : TaskBase(), result(result), callable(std::move(callable)) {} + explicit Task(std::function callable, std::function complete_callback = nullptr, T* result = nullptr) : TaskBase(complete_callback), result(result), callable(std::move(callable)) {} public: - void Run() final { result ? *result = callable() : callable(); MarkTaskComplete(); } + void Run() final { result ? *result = callable() : callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); } public: /// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs. /// @param callable The function to run, *usually a lambda or std::bind* /// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it. /// @note this is shared_ptr so you don't have to delete it. - static std::shared_ptr> Create(std::function callable, T* result = nullptr) { return std::shared_ptr>(new Task(callable, result)); }; + static std::shared_ptr> Create(std::function callable, const std::function& complete_callback = nullptr, T* result = nullptr) { return std::shared_ptr>(new Task(callable, complete_callback, result)); }; ~Task() = default; }; @@ -54,13 +56,13 @@ template <> class MultiThreading::Task : public TaskBase { private: std::function callable = nullptr; - explicit Task(std::function callable) : TaskBase(), callable(std::move(callable)) {} + explicit Task(std::function callable, std::function complete_callback = nullptr) : TaskBase(std::move(complete_callback)), callable(std::move(callable)) {} public: - void Run() final { callable(); MarkTaskComplete(); } + void Run() final { callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); } /// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs. /// @param callable The function to run, *usually a lambda or std::bind* /// @note this is shared_ptr so you don't have to delete it. - static std::shared_ptr> Create(const std::function& callable) { return std::shared_ptr>(new Task(callable)); } + static std::shared_ptr> Create(const std::function& callable, const std::function& complete_callback = nullptr) { return std::shared_ptr>(new Task(callable, complete_callback)); } ~Task() = default; }; \ No newline at end of file diff --git a/main.cpp b/main.cpp index b8d4584..f782ec0 100644 --- a/main.cpp +++ b/main.cpp @@ -6,6 +6,7 @@ using namespace MultiThreading; +void cb() { std::cout << "hi" << std::endl; } int32_t some_test_func(int32_t hello) { for (unsigned int i = 0; i < 1000000000; i++) {} std::cout << "task " << hello << " finishes." << std::endl; @@ -47,13 +48,15 @@ int main() { srand(time(nullptr)); int32_t task_completion_count = 0; - auto* thread_pool = new ThreadPool(4); + auto* thread_pool = new ThreadPool(16); + for (unsigned int i = 0; i < 128; i++) { - auto some_task = Task::Create(([i] { return some_test_func(i + 1); }), &task_completion_count); + auto some_task = Task::Create(([i] { return some_test_func(i); }), cb, &task_completion_count); thread_pool->Enqueue(some_task); } + /// do stuff after the job. delete thread_pool; std::cout << "The returned random value was: " << task_completion_count << std::endl; } \ No newline at end of file