Implement ConcurrentQueue.h from CaveGame
This commit is contained in:
137
include/MultiThreading/ConcurrentQueue.h
Normal file
137
include/MultiThreading/ConcurrentQueue.h
Normal file
@@ -0,0 +1,137 @@
|
||||
/// CaveGame - A procedural 2D platformer sandbox.
|
||||
/// Created by Josh O'Leary @ Redacted Software, 2020-2024
|
||||
/// Contact: josh@redacted.cc
|
||||
/// Contributors: william@redacted.cc maxi@redacted.cc
|
||||
/// This work is dedicated to the public domain.
|
||||
|
||||
/// @file ConcurrentQueue.hpp
|
||||
/// @desc A simple C++11 Concurrent Queue based on std::queue.
|
||||
/// @ref https://github.com/jlim262/concurrent_queue
|
||||
/// @edit 11/11/2024
|
||||
/// @auth JongYoon Lim
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <queue>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <type_traits>
|
||||
|
||||
/// A simple C++11 Concurrent Queue based on std::queue.
|
||||
/// Supports waiting operations for retrieving an element when it's empty.
|
||||
/// It's interrupted by calling interrupt.
|
||||
template <typename T, typename Container = std::deque<T> >
|
||||
class ConcurrentQueue
|
||||
{
|
||||
public:
|
||||
typedef typename std::queue<T>::size_type size_type;
|
||||
typedef typename std::queue<T>::reference reference;
|
||||
typedef typename std::queue<T>::const_reference const_reference;
|
||||
|
||||
~ConcurrentQueue() {
|
||||
interrupt();
|
||||
}
|
||||
|
||||
void interrupt()
|
||||
{
|
||||
interrupted = true;
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool contains(const T& value)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
std::queue<T, Container> copy = queue;
|
||||
while(!copy.empty())
|
||||
if (copy.front() == value)
|
||||
return true;
|
||||
else
|
||||
copy.pop();
|
||||
return false;
|
||||
}
|
||||
|
||||
void push(const T& e)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
queue.push(e);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
void emplace(Args&&... args)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
queue.emplace(std::forward<Args>(args)...);
|
||||
condition_variable.notify_one();
|
||||
}
|
||||
|
||||
bool empty() {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.empty();
|
||||
}
|
||||
|
||||
void pop() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
if (!queue.empty())
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
void front_pop(T& ret) {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
ret = queue.front();
|
||||
queue.pop();
|
||||
}
|
||||
|
||||
size_type size() const {
|
||||
//std::unique_lock<std::mutex> lock(this->mutex);
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
reference front() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}
|
||||
|
||||
/*const_reference front() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.front();
|
||||
}*/
|
||||
|
||||
reference back() {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}
|
||||
|
||||
/*const_reference back() const {
|
||||
std::unique_lock<std::mutex> lock(this->mutex);
|
||||
wait(lock);
|
||||
return queue.back();
|
||||
}*/
|
||||
|
||||
void swap(ConcurrentQueue& q) {
|
||||
throw std::runtime_error("Not supported");
|
||||
}
|
||||
|
||||
protected:
|
||||
std::queue<T, Container> queue;
|
||||
std::mutex mutex;
|
||||
std::condition_variable condition_variable;
|
||||
std::atomic_bool interrupted;
|
||||
|
||||
|
||||
private:
|
||||
void wait(std::unique_lock<std::mutex>& lock) {
|
||||
interrupted = false;
|
||||
while (queue.empty()) {
|
||||
condition_variable.wait(lock);
|
||||
if (interrupted)
|
||||
throw std::runtime_error("Interrupted");
|
||||
}
|
||||
}
|
||||
};
|
Reference in New Issue
Block a user