Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
2dc26fc862 |
@@ -26,4 +26,69 @@ public:
|
||||
ThreadPool();
|
||||
explicit ThreadPool(unsigned int thread_count);
|
||||
~ThreadPool();
|
||||
};
|
||||
|
||||
/// Canonical C++ ThreadPool object
|
||||
/// Ref. https://stackoverflow.com/questions/15752659/thread-pooling-in-c11
|
||||
class ThreadPool {
|
||||
public:
|
||||
void Start()
|
||||
{
|
||||
const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports.
|
||||
for (uint32_t ii = 0; ii < num_threads; ++ii) {
|
||||
threads.emplace_back(&ThreadPool::ThreadLoop, this);
|
||||
}
|
||||
}
|
||||
void QueueJob(const std::function<void()>& job)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex);
|
||||
jobs.push(job);
|
||||
}
|
||||
mutex_condition.notify_one();
|
||||
}
|
||||
void Stop() {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex);
|
||||
should_terminate = true;
|
||||
}
|
||||
mutex_condition.notify_all();
|
||||
for (std::thread& active_thread : threads) {
|
||||
active_thread.join();
|
||||
}
|
||||
threads.clear();
|
||||
}
|
||||
bool busy() {
|
||||
bool bussy;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex);
|
||||
bussy = !jobs.empty();
|
||||
}
|
||||
return bussy;
|
||||
}
|
||||
private:
|
||||
void ThreadLoop()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
std::function<void()> job;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex);
|
||||
mutex_condition.wait(lock, [this] {
|
||||
return !jobs.empty() || should_terminate;
|
||||
});
|
||||
|
||||
if (should_terminate) { return; }
|
||||
job = jobs.front();
|
||||
jobs.pop();
|
||||
}
|
||||
job();
|
||||
}
|
||||
}
|
||||
|
||||
bool should_terminate = false;
|
||||
std::mutex queue_mutex;
|
||||
std::condition_variable mutex_condition;
|
||||
std::vector<std::thread> threads;
|
||||
std::queue<std::function<void()>> jobs;
|
||||
};
|
16
main.cpp
16
main.cpp
@@ -5,13 +5,15 @@
|
||||
#include <iostream>
|
||||
|
||||
using namespace MultiThreading;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int32_t some_test_func(int32_t hello) {
|
||||
for (unsigned int i = 0; i < 100; i++)
|
||||
std::cout << i << std::endl;
|
||||
for (unsigned int i = 0; i < 1000; i++)
|
||||
std::this_thread::sleep_for(1ms);
|
||||
return hello;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
int main() {
|
||||
// Each task you create can be run by a thread only once. It's marked as complete after.
|
||||
// If you're running a lambda or std::function directly on the thread, It can be used multiple times.
|
||||
@@ -31,11 +33,14 @@ int main() {
|
||||
|
||||
|
||||
//std::cout << a << std::endl;
|
||||
std::cout << "Primarch Thread Complete" << std::endl;
|
||||
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
|
||||
/*
|
||||
int main() {
|
||||
ThreadPool thread_pool(1);
|
||||
|
||||
@@ -50,8 +55,7 @@ int main() {
|
||||
thread_pool.Enqueue(some_task_4);
|
||||
|
||||
while (!some_task_4->Complete()) {}
|
||||
std::cout << thread_pool.ThreadCount() << std::endl;
|
||||
std::cout << "qty" << thread_pool.ThreadCount() << std::endl;
|
||||
|
||||
//delete thread_pool;
|
||||
}
|
||||
*/
|
@@ -6,9 +6,11 @@
|
||||
using namespace MultiThreading;
|
||||
|
||||
Thread::~Thread() {
|
||||
|
||||
if (current_task)
|
||||
while (!current_task->Complete()) {}
|
||||
stop = true;
|
||||
cv.notify_all();
|
||||
Join();
|
||||
// Thread exits gracefully.
|
||||
}
|
||||
|
@@ -1,4 +1,5 @@
|
||||
#include <MultiThreading/ThreadPool.h>
|
||||
#include <iostream>
|
||||
|
||||
using namespace MultiThreading;
|
||||
|
||||
@@ -16,16 +17,20 @@ void ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task)
|
||||
std::lock_guard<std::mutex> lock(queue_mutex);
|
||||
|
||||
// Assign it immediately if there's no wait and a thread open.
|
||||
std::cout << "qsize (A) " << queue.size() << std::endl;
|
||||
if (queue.empty()) {
|
||||
for (auto *t: threads) {
|
||||
|
||||
if (t->Busy())
|
||||
continue;
|
||||
|
||||
if (!t->SetTask( [this, task] (){ Runner(task); } ))
|
||||
throw std::runtime_error("There was an error while setting up the task to run on the thread.");
|
||||
return;
|
||||
|
||||
}
|
||||
return;
|
||||
}
|
||||
std::cout << "qsize (B) " << queue.size() << std::endl;
|
||||
|
||||
// Alternatively it goes in-to the queue.
|
||||
queue.push(task);
|
||||
@@ -61,10 +66,20 @@ unsigned int ThreadPool::QueueSize() {
|
||||
}
|
||||
|
||||
ThreadPool::~ThreadPool() {
|
||||
std::cout << "ThreadPool Destruction Started" << std::endl;
|
||||
|
||||
|
||||
// Wait for all tasks to be running.
|
||||
while (QueueSize() != 0) {}
|
||||
|
||||
std::cout << "All Tasks Complete" << std::endl;
|
||||
|
||||
// delete t waits for the thread to exit gracefully.
|
||||
for (auto* t: threads)
|
||||
delete t;
|
||||
|
||||
std::cout << "Threads deallocated" << std::endl;
|
||||
|
||||
|
||||
std::cout << "ThreadPool Destruction Complete" << std::endl;
|
||||
}
|
||||
|
Reference in New Issue
Block a user