initial commit

This commit is contained in:
2025-04-08 16:52:41 -04:00
commit 09a8bd0868
8 changed files with 332 additions and 0 deletions

15
CMakeLists.txt Normal file
View File

@@ -0,0 +1,15 @@
cmake_minimum_required(VERSION 3.18..3.28)
project(MultiThreading VERSION 1.0 LANGUAGES CXX)
if (PROJECT_SOURCE_DIR STREQUAL PROJECT_BINARY_DIR)
message(FATAL_ERROR "In-source builds are not allowed")
endif()
file(GLOB_RECURSE HEADERS "include/*.h" "include/*.hpp")
file(GLOB_RECURSE SOURCES "src/*.c" "src/*.cpp")
set(CMAKE_CXX_STANDARD 20)
add_library(MultiThreading SHARED ${SOURCES})
target_include_directories(MultiThreading PUBLIC ${PROJECT_SOURCE_DIR}/include)
add_executable(MultiThreadingDemo main.cpp)
target_link_libraries(MultiThreadingDemo PUBLIC MultiThreading)

View File

@@ -0,0 +1,58 @@
#pragma once
#include <functional>
#include <memory>
#include <atomic>
#include <utility>
namespace MultiThreading {
class TaskBase;
template <typename T>
class Task;
}
class MultiThreading::TaskBase {
protected:
std::atomic<bool> complete = false;
public:
/// @returns whether the task is finished.
[[nodiscard]] bool Complete() const { return complete; }
virtual void Run() = 0;
public:
TaskBase() = default;
};
template <typename T>
class MultiThreading::Task : public TaskBase {
private:
T* result = nullptr;
std::function<T()> callable = nullptr;
private:
explicit Task(std::function<T()> callable, T* result = nullptr) : TaskBase(), result(result), callable(callable) {}
public:
void Run() final { result ? *result = callable() : callable(); complete = true; }
public:
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
/// @param callable The function to run, *usually a lambda or std::bind*
/// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it.
/// @note this is shared_ptr so you don't have to delete it.
static std::shared_ptr<Task<T>> Create(std::function<T()> callable, T* result = nullptr) { return std::shared_ptr<Task<T>>(new Task<T>(callable, result)); };
~Task() = default;
};
// Special case for if the task is void return type because templates are weird.
template <>
class MultiThreading::Task<void> : public TaskBase {
private:
std::function<void()> callable = nullptr;
private:
explicit Task(std::function<void()> callable) : TaskBase(), callable(std::move(callable)) {}
public:
void Run() final { callable(); complete = true; }
/// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs.
/// @param callable The function to run, *usually a lambda or std::bind*
/// @note this is shared_ptr so you don't have to delete it.
static std::shared_ptr<Task<void>> Create(const std::function<void()>& callable) { return std::shared_ptr<Task<void>>(new Task<void>(callable)); }
};

View File

@@ -0,0 +1,34 @@
#pragma once
#include <thread>
#include <functional>
#include <stdexcept>
#include <condition_variable>
#include <mutex>
#include <MultiThreading/Task.h>
namespace MultiThreading {
class Thread;
}
class MultiThreading::Thread {
private:
std::thread worker;
std::function<void()> current_task;
std::atomic<bool> busy{ false };
std::mutex mtx;
std::condition_variable cv;
bool stop = false;
private:
void Runner();
public:
[[nodiscard]] bool SetTask(std::function<void()> task);
[[nodiscard]] bool SetTask(std::shared_ptr<TaskBase> task);
[[nodiscard]] bool Busy() const { return busy; }
void Join() { if (worker.joinable()) worker.join(); };
public:
Thread() { worker = std::thread([this] { this->Runner(); }); }
explicit Thread(std::shared_ptr<TaskBase> task);
explicit Thread(std::function<void()> task);
~Thread();
};

View File

@@ -0,0 +1,29 @@
#pragma once
#include <MultiThreading/Thread.h>
#include <vector>
#include <queue>
namespace MultiThreading {
class ThreadPool;
}
class MultiThreading::ThreadPool {
private:
std::vector<MultiThreading::Thread*> threads;
std::queue<std::shared_ptr<TaskBase>> queue;
std::mutex queue_mutex;
private:
/// @returns nullptr if the queue is empty.
std::shared_ptr<TaskBase> Dequeue();
void Runner(const std::shared_ptr<TaskBase>& task);
public:
void Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task);
void Enqueue(const std::function<void()>& task);
public:
[[nodiscard]] unsigned int ThreadCount() const { return threads.size(); }
[[nodiscard]] unsigned int QueueSize();
public:
ThreadPool();
explicit ThreadPool(unsigned int thread_count);
~ThreadPool();
};

52
main.cpp Normal file
View File

@@ -0,0 +1,52 @@
#include <MultiThreading/Task.h>
#include <MultiThreading/Thread.h>
#include <MultiThreading/ThreadPool.h>
#include <iostream>
using namespace MultiThreading;
void some_test_func(int32_t hello) {
for (unsigned int i = 0; i < 4000000000; i++)
std::cout << "test" << std::endl;
}
/*
int main() {
// Each task you create can be run by a thread only once. It's marked as complete after.
// If you're running a lambda or std::function directly on the thread, It can be used multiple times.
int32_t a = 0;
auto some_task = Task<void>::Create([] { return some_test_func(4); });
// You can start threads in place like this, but you have to wait for the amount of time
// it takes for the thread to start up which in many cases is longer than the job. Use thread pool.
Thread some_thread(some_task);
// Some work running concurrently with the worker thread.
for (unsigned int i = 0; i < 1000000000; i++) {}
// When we get to the point in our code where we need the result from the task wait until it's finished.
while (!some_task->Complete()) {}
// At this point, the thread is sleeping and a new task can be pushed on.
auto some_other_task = Task<void>::Create(std::bind(some_test_func, 5));
if (!some_thread.SetTask(some_other_task))
throw std::runtime_error("Error while pushing the task on-to the thread");
std::cout << some_thread.Busy() << std::endl;
// Because the thread exists already and is waiting for jobs, We don't get the penalty of the thread start-up time.
while (!some_other_task->Complete()) { std::cout << some_thread.Busy() << std::endl; }
std::cout << a << std::endl;
}
*/
int main() {
ThreadPool thread_pool(1);
auto some_task_1 = Task<void>::Create(([] { return some_test_func(1); }));
thread_pool.Enqueue(some_task_1);
std::cout << thread_pool.ThreadCount() << std::endl;
//delete thread_pool;
}

1
src/Task.cpp Normal file
View File

@@ -0,0 +1 @@
#include <MultiThreading/Task.h>

73
src/Thread.cpp Normal file
View File

@@ -0,0 +1,73 @@
#include <MultiThreading/Thread.h>
#include <sstream>
#include <iostream>
#include <utility>
using namespace MultiThreading;
Thread::~Thread() {
{
std::lock_guard<std::mutex> lock(mtx);
stop = true;
current_task = nullptr;
}
if (Busy())
Join();
cv.notify_all();
// Thread exits gracefully. If it does not, your program will likely freeze forever.
}
bool Thread::SetTask(std::shared_ptr<TaskBase> task) {
return Thread::SetTask([task]{ task->Run(); });
}
bool Thread::SetTask(std::function<void()> task) {
if (busy)
return false;
{
std::lock_guard<std::mutex> lock(mtx);
current_task = std::move(task);
}
cv.notify_all();
return true;
}
void Thread::Runner() {
std::unique_lock<std::mutex> lock(mtx);
while (!stop) {
cv.wait(lock, [this] { return stop || current_task != nullptr; });
if (stop)
break;
auto this_task = std::move(current_task);
current_task = nullptr;
busy = true;
lock.unlock();
if (this_task)
this_task();
lock.lock();
busy = false;
}
}
Thread::Thread(std::shared_ptr<TaskBase> task) {
worker = std::thread([this] { this->Runner(); });
if (!SetTask(task))
throw std::runtime_error("Thread constructor failure.");
}
Thread::Thread(std::function<void()> task) {
worker = std::thread([this] { this->Runner(); });
if (!SetTask(std::move(task)))
throw std::runtime_error("Thread constructor failure.");
}

70
src/ThreadPool.cpp Normal file
View File

@@ -0,0 +1,70 @@
#include <MultiThreading/ThreadPool.h>
using namespace MultiThreading;
ThreadPool::ThreadPool(unsigned int thread_count) {
for (unsigned int i = 0; i < thread_count; i++)
threads.push_back(new Thread());
}
ThreadPool::ThreadPool() {
for (unsigned int i = 0; i < std::thread::hardware_concurrency(); i++)
threads.push_back(new Thread());
}
void ThreadPool::Enqueue(const std::shared_ptr<MultiThreading::TaskBase>& task) {
std::lock_guard<std::mutex> lock(queue_mutex);
// Assign it immediately if there's no wait and a thread open.
if (queue.empty()) {
for (auto *t: threads) {
if (t->Busy())
continue;
if (!t->SetTask( [this, task] (){ Runner(task); } ))
throw std::runtime_error("There was an error while setting up the task to run on the thread.");
return;
}
}
// Alternatively it goes in-to the queue.
queue.push(task);
}
std::shared_ptr<TaskBase> ThreadPool::Dequeue() {
std::lock_guard<std::mutex> lock(queue_mutex);
if (queue.empty())
return nullptr;
auto task = queue.front();
queue.pop();
return task;
}
void ThreadPool::Enqueue(const std::function<void()>& task) {
Enqueue(Task<void>::Create(task));
}
void ThreadPool::Runner(const std::shared_ptr<TaskBase>& task) {
if (!task->Complete())
task->Run();
auto next_task = Dequeue();
if (!next_task)
return;
Runner(next_task);
}
unsigned int ThreadPool::QueueSize() {
std::lock_guard<std::mutex> lock(queue_mutex);
return queue.size();
}
ThreadPool::~ThreadPool() {
// Wait for all tasks to be running.
while (QueueSize() != 0) {}
// delete t waits for the thread to exit gracefully.
for (auto* t: threads)
delete t;
}