task complete callback
This commit is contained in:
@@ -1,16 +1,3 @@
|
||||
/// CaveGame - A procedural 2D platformer sandbox.
|
||||
/// Created by Josh O'Leary @ Redacted Software, 2020-2024
|
||||
/// Contact: josh@redacted.cc
|
||||
/// Contributors: william@redacted.cc maxi@redacted.cc
|
||||
/// This work is dedicated to the public domain.
|
||||
|
||||
/// @file ConcurrentQueue.hpp
|
||||
/// @desc A simple C++11 Concurrent Queue based on std::queue.
|
||||
/// @ref https://github.com/jlim262/concurrent_queue
|
||||
/// @edit 11/11/2024
|
||||
/// @auth JongYoon Lim
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <queue>
|
||||
@@ -22,116 +9,115 @@
|
||||
/// A simple C++11 Concurrent Queue based on std::queue.
|
||||
/// Supports waiting operations for retrieving an element when it's empty.
|
||||
/// It's interrupted by calling interrupt.
|
||||
template <typename T, typename Container = std::deque<T> >
|
||||
class ConcurrentQueue
|
||||
{
|
||||
public:
|
||||
typedef typename std::queue<T>::size_type size_type;
|
||||
typedef typename std::queue<T>::reference reference;
|
||||
typedef typename std::queue<T>::const_reference const_reference;
|
||||
|
||||
~ConcurrentQueue() {
|
||||
interrupt();
|
||||
}
|
||||
namespace MultiThreading {
|
||||
|
||||
void interrupt()
|
||||
{
|
||||
interrupted = true;
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
template<typename T, typename Container = std::deque<T> >
|
||||
class ConcurrentQueue {
|
||||
public:
|
||||
typedef typename std::queue<T>::size_type size_type;
|
||||
typedef typename std::queue<T>::reference reference;
|
||||
typedef typename std::queue<T>::const_reference const_reference;
|
||||
|
||||
bool contains(const T& value)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
std::queue<T, Container> copy = queue;
|
||||
while(!copy.empty())
|
||||
if (copy.front() == value)
|
||||
return true;
|
||||
else
|
||||
copy.pop();
|
||||
return false;
|
||||
}
|
||||
|
||||
void push(const T& e)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
queue.push(e);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
void emplace(Args&&... args)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
queue.emplace(std::forward<Args>(args)...);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool empty() {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.empty();
|
||||
}
|
||||
|
||||
void pop() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
if (!queue.empty())
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
void front_pop(T& ret) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
ret = queue.front();
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
size_type size() const {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
reference front() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}
|
||||
|
||||
/*const_reference front() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}*/
|
||||
|
||||
reference back() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}
|
||||
|
||||
/*const_reference back() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}*/
|
||||
|
||||
void swap(ConcurrentQueue& q) {
|
||||
throw std::runtime_error("Not supported");
|
||||
}
|
||||
|
||||
protected:
|
||||
std::queue<T, Container> queue;
|
||||
std::mutex mutex;
|
||||
std::condition_variable condition_variable;
|
||||
std::atomic_bool interrupted;
|
||||
|
||||
|
||||
private:
|
||||
void wait(std::unique_lock<std::mutex>& lock) {
|
||||
interrupted = false;
|
||||
while (queue.empty()) {
|
||||
condition_variable.wait(lock);
|
||||
if (interrupted)
|
||||
throw std::runtime_error("Interrupted");
|
||||
~ConcurrentQueue() {
|
||||
interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void interrupt() {
|
||||
interrupted = true;
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool contains(const T &value) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
std::queue<T, Container> copy = queue;
|
||||
while (!copy.empty())
|
||||
if (copy.front() == value)
|
||||
return true;
|
||||
else
|
||||
copy.pop();
|
||||
return false;
|
||||
}
|
||||
|
||||
void push(const T &e) {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
queue.push(e);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
void emplace(Args &&... args) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
queue.emplace(std::forward<Args>(args)...);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool empty() {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.empty();
|
||||
}
|
||||
|
||||
void pop() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
if (!queue.empty())
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
void front_pop(T &ret) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
ret = queue.front();
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
size_type size() const {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
reference front() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}
|
||||
|
||||
/*const_reference front() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}*/
|
||||
|
||||
reference back() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}
|
||||
|
||||
/*const_reference back() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}*/
|
||||
|
||||
void swap(ConcurrentQueue &q) {
|
||||
throw std::runtime_error("Not supported");
|
||||
}
|
||||
|
||||
protected:
|
||||
std::queue<T, Container> queue;
|
||||
std::mutex mutex;
|
||||
std::condition_variable condition_variable;
|
||||
std::atomic_bool interrupted;
|
||||
|
||||
|
||||
private:
|
||||
void wait(std::unique_lock<std::mutex> &lock) {
|
||||
interrupted = false;
|
||||
while (queue.empty()) {
|
||||
condition_variable.wait(lock);
|
||||
if (interrupted)
|
||||
throw std::runtime_error("Interrupted");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
@@ -4,6 +4,7 @@
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <atomic>
|
||||
#include <utility>
|
||||
|
||||
namespace MultiThreading {
|
||||
class TaskBase;
|
||||
@@ -14,6 +15,7 @@ namespace MultiThreading {
|
||||
|
||||
class MultiThreading::TaskBase {
|
||||
protected:
|
||||
std::function<void()> complete_callback = nullptr;
|
||||
std::condition_variable completed_condition;
|
||||
std::atomic<bool> complete = false;
|
||||
std::mutex mtx;
|
||||
@@ -28,7 +30,7 @@ public:
|
||||
|
||||
virtual void Run() = 0;
|
||||
public:
|
||||
TaskBase() = default;
|
||||
explicit TaskBase(std::function<void()> task_complete_callback = nullptr) : complete_callback(std::move(task_complete_callback)) {};
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
@@ -37,15 +39,15 @@ private:
|
||||
T* result = nullptr;
|
||||
std::function<T()> callable = nullptr;
|
||||
private:
|
||||
explicit Task(std::function<T()> callable, T* result = nullptr) : TaskBase(), result(result), callable(std::move(callable)) {}
|
||||
explicit Task(std::function<T()> callable, std::function<void()> complete_callback = nullptr, T* result = nullptr) : TaskBase(complete_callback), result(result), callable(std::move(callable)) {}
|
||||
public:
|
||||
void Run() final { result ? *result = callable() : callable(); MarkTaskComplete(); }
|
||||
void Run() final { result ? *result = callable() : callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); }
|
||||
public:
|
||||
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
|
||||
/// @param callable The function to run, *usually a lambda or std::bind*
|
||||
/// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it.
|
||||
/// @note this is shared_ptr so you don't have to delete it.
|
||||
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, result)); };
|
||||
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, const std::function<void()>& complete_callback = nullptr, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, complete_callback, result)); };
|
||||
~Task() = default;
|
||||
};
|
||||
|
||||
@@ -54,13 +56,13 @@ template <>
|
||||
class MultiThreading::Task<void> : public TaskBase {
|
||||
private:
|
||||
std::function<void()> callable = nullptr;
|
||||
explicit Task(std::function<void()> callable) : TaskBase(), callable(std::move(callable)) {}
|
||||
explicit Task(std::function<void()> callable, std::function<void()> complete_callback = nullptr) : TaskBase(std::move(complete_callback)), callable(std::move(callable)) {}
|
||||
public:
|
||||
void Run() final { callable(); MarkTaskComplete(); }
|
||||
void Run() final { callable(); MarkTaskComplete(); if (complete_callback) complete_callback(); }
|
||||
|
||||
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
|
||||
/// @param callable The function to run, *usually a lambda or std::bind*
|
||||
/// @note this is shared_ptr so you don't have to delete it.
|
||||
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable) { return std::shared_ptr<Task<void>>(new Task<void>(callable)); }
|
||||
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable, const std::function<void()>& complete_callback = nullptr) { return std::shared_ptr<Task<void>>(new Task<void>(callable, complete_callback)); }
|
||||
~Task() = default;
|
||||
};
|
7
main.cpp
7
main.cpp
@@ -6,6 +6,7 @@
|
||||
|
||||
using namespace MultiThreading;
|
||||
|
||||
void cb() { std::cout << "hi" << std::endl; }
|
||||
int32_t some_test_func(int32_t hello) {
|
||||
for (unsigned int i = 0; i < 1000000000; i++) {}
|
||||
std::cout << "task " << hello << " finishes." << std::endl;
|
||||
@@ -47,13 +48,15 @@ int main() {
|
||||
srand(time(nullptr));
|
||||
|
||||
int32_t task_completion_count = 0;
|
||||
auto* thread_pool = new ThreadPool(4);
|
||||
auto* thread_pool = new ThreadPool(16);
|
||||
|
||||
|
||||
for (unsigned int i = 0; i < 128; i++) {
|
||||
auto some_task = Task<int32_t>::Create(([i] { return some_test_func(i + 1); }), &task_completion_count);
|
||||
auto some_task = Task<int32_t>::Create(([i] { return some_test_func(i); }), cb, &task_completion_count);
|
||||
thread_pool->Enqueue(some_task);
|
||||
}
|
||||
|
||||
/// do stuff after the job.
|
||||
delete thread_pool;
|
||||
std::cout << "The returned random value was: " << task_completion_count << std::endl;
|
||||
}
|
Reference in New Issue
Block a user