commit 09a8bd086822927e1fd2cd43ab257fdf52933dfe Author: Redacted Date: Tue Apr 8 16:52:41 2025 -0400 initial commit diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..72941f8 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 3.18..3.28) +project(MultiThreading VERSION 1.0 LANGUAGES CXX) + +if (PROJECT_SOURCE_DIR STREQUAL PROJECT_BINARY_DIR) + message(FATAL_ERROR "In-source builds are not allowed") +endif() + +file(GLOB_RECURSE HEADERS "include/*.h" "include/*.hpp") +file(GLOB_RECURSE SOURCES "src/*.c" "src/*.cpp") +set(CMAKE_CXX_STANDARD 20) + +add_library(MultiThreading SHARED ${SOURCES}) +target_include_directories(MultiThreading PUBLIC ${PROJECT_SOURCE_DIR}/include) +add_executable(MultiThreadingDemo main.cpp) +target_link_libraries(MultiThreadingDemo PUBLIC MultiThreading) diff --git a/include/MultiThreading/Task.h b/include/MultiThreading/Task.h new file mode 100644 index 0000000..ebd7e90 --- /dev/null +++ b/include/MultiThreading/Task.h @@ -0,0 +1,58 @@ +#pragma once + +#include +#include +#include +#include + +namespace MultiThreading { + class TaskBase; + + template + class Task; +} + +class MultiThreading::TaskBase { +protected: + std::atomic complete = false; +public: + /// @returns whether the task is finished. + [[nodiscard]] bool Complete() const { return complete; } + virtual void Run() = 0; +public: + TaskBase() = default; +}; + +template +class MultiThreading::Task : public TaskBase { +private: + T* result = nullptr; + std::function callable = nullptr; +private: + explicit Task(std::function callable, T* result = nullptr) : TaskBase(), result(result), callable(callable) {} +public: + void Run() final { result ? *result = callable() : callable(); complete = true; } +public: + /// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs. + /// @param callable The function to run, *usually a lambda or std::bind* + /// @param result If your function returns a value, This is where you want it to go. nullptr for no return value or you don't want it. + /// @note this is shared_ptr so you don't have to delete it. + static std::shared_ptr> Create(std::function callable, T* result = nullptr) { return std::shared_ptr>(new Task(callable, result)); }; + ~Task() = default; +}; + +// Special case for if the task is void return type because templates are weird. +template <> +class MultiThreading::Task : public TaskBase { +private: + std::function callable = nullptr; +private: + explicit Task(std::function callable) : TaskBase(), callable(std::move(callable)) {} +public: + void Run() final { callable(); complete = true; } + + /// Create a Task. This is non-negotiable, This ensures that there's no issues related to the stack frame the task is on being popped before or while the job runs. + /// @param callable The function to run, *usually a lambda or std::bind* + /// @note this is shared_ptr so you don't have to delete it. + static std::shared_ptr> Create(const std::function& callable) { return std::shared_ptr>(new Task(callable)); } +}; \ No newline at end of file diff --git a/include/MultiThreading/Thread.h b/include/MultiThreading/Thread.h new file mode 100644 index 0000000..5b6e1c9 --- /dev/null +++ b/include/MultiThreading/Thread.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace MultiThreading { + class Thread; +} + +class MultiThreading::Thread { +private: + std::thread worker; + std::function current_task; + std::atomic busy{ false }; + std::mutex mtx; + std::condition_variable cv; + bool stop = false; +private: + void Runner(); +public: + [[nodiscard]] bool SetTask(std::function task); + [[nodiscard]] bool SetTask(std::shared_ptr task); + [[nodiscard]] bool Busy() const { return busy; } + void Join() { if (worker.joinable()) worker.join(); }; +public: + Thread() { worker = std::thread([this] { this->Runner(); }); } + explicit Thread(std::shared_ptr task); + explicit Thread(std::function task); + ~Thread(); +}; \ No newline at end of file diff --git a/include/MultiThreading/ThreadPool.h b/include/MultiThreading/ThreadPool.h new file mode 100644 index 0000000..bc92129 --- /dev/null +++ b/include/MultiThreading/ThreadPool.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include +namespace MultiThreading { + class ThreadPool; +} + +class MultiThreading::ThreadPool { +private: + std::vector threads; + std::queue> queue; + std::mutex queue_mutex; +private: + /// @returns nullptr if the queue is empty. + std::shared_ptr Dequeue(); + void Runner(const std::shared_ptr& task); +public: + void Enqueue(const std::shared_ptr& task); + void Enqueue(const std::function& task); +public: + [[nodiscard]] unsigned int ThreadCount() const { return threads.size(); } + [[nodiscard]] unsigned int QueueSize(); +public: + ThreadPool(); + explicit ThreadPool(unsigned int thread_count); + ~ThreadPool(); +}; \ No newline at end of file diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..455f687 --- /dev/null +++ b/main.cpp @@ -0,0 +1,52 @@ +#include +#include +#include + +#include + +using namespace MultiThreading; +void some_test_func(int32_t hello) { + for (unsigned int i = 0; i < 4000000000; i++) + std::cout << "test" << std::endl; +} + +/* +int main() { + // Each task you create can be run by a thread only once. It's marked as complete after. + // If you're running a lambda or std::function directly on the thread, It can be used multiple times. + int32_t a = 0; + auto some_task = Task::Create([] { return some_test_func(4); }); + + // You can start threads in place like this, but you have to wait for the amount of time + // it takes for the thread to start up which in many cases is longer than the job. Use thread pool. + Thread some_thread(some_task); + + // Some work running concurrently with the worker thread. + for (unsigned int i = 0; i < 1000000000; i++) {} + + // When we get to the point in our code where we need the result from the task wait until it's finished. + while (!some_task->Complete()) {} + + // At this point, the thread is sleeping and a new task can be pushed on. + auto some_other_task = Task::Create(std::bind(some_test_func, 5)); + + if (!some_thread.SetTask(some_other_task)) + throw std::runtime_error("Error while pushing the task on-to the thread"); + std::cout << some_thread.Busy() << std::endl; + + // Because the thread exists already and is waiting for jobs, We don't get the penalty of the thread start-up time. + while (!some_other_task->Complete()) { std::cout << some_thread.Busy() << std::endl; } + + std::cout << a << std::endl; +} +*/ + +int main() { + ThreadPool thread_pool(1); + + auto some_task_1 = Task::Create(([] { return some_test_func(1); })); + thread_pool.Enqueue(some_task_1); + std::cout << thread_pool.ThreadCount() << std::endl; + + //delete thread_pool; +} \ No newline at end of file diff --git a/src/Task.cpp b/src/Task.cpp new file mode 100644 index 0000000..d687331 --- /dev/null +++ b/src/Task.cpp @@ -0,0 +1 @@ +#include \ No newline at end of file diff --git a/src/Thread.cpp b/src/Thread.cpp new file mode 100644 index 0000000..a9afdca --- /dev/null +++ b/src/Thread.cpp @@ -0,0 +1,73 @@ +#include +#include +#include +#include + +using namespace MultiThreading; + +Thread::~Thread() { + { + std::lock_guard lock(mtx); + stop = true; + current_task = nullptr; + } + + if (Busy()) + Join(); + + cv.notify_all(); + // Thread exits gracefully. If it does not, your program will likely freeze forever. +} + +bool Thread::SetTask(std::shared_ptr task) { + return Thread::SetTask([task]{ task->Run(); }); +} + +bool Thread::SetTask(std::function task) { + if (busy) + return false; + + { + std::lock_guard lock(mtx); + current_task = std::move(task); + } + cv.notify_all(); + + return true; +} + +void Thread::Runner() { + std::unique_lock lock(mtx); + while (!stop) { + cv.wait(lock, [this] { return stop || current_task != nullptr; }); + + if (stop) + break; + + auto this_task = std::move(current_task); + current_task = nullptr; + + busy = true; + + lock.unlock(); + if (this_task) + this_task(); + lock.lock(); + + busy = false; + } +} + +Thread::Thread(std::shared_ptr task) { + worker = std::thread([this] { this->Runner(); }); + + if (!SetTask(task)) + throw std::runtime_error("Thread constructor failure."); +} + +Thread::Thread(std::function task) { + worker = std::thread([this] { this->Runner(); }); + + if (!SetTask(std::move(task))) + throw std::runtime_error("Thread constructor failure."); +} diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp new file mode 100644 index 0000000..cfad817 --- /dev/null +++ b/src/ThreadPool.cpp @@ -0,0 +1,70 @@ +#include + +using namespace MultiThreading; + +ThreadPool::ThreadPool(unsigned int thread_count) { + for (unsigned int i = 0; i < thread_count; i++) + threads.push_back(new Thread()); +} + +ThreadPool::ThreadPool() { + for (unsigned int i = 0; i < std::thread::hardware_concurrency(); i++) + threads.push_back(new Thread()); +} + +void ThreadPool::Enqueue(const std::shared_ptr& task) { + std::lock_guard lock(queue_mutex); + + // Assign it immediately if there's no wait and a thread open. + if (queue.empty()) { + for (auto *t: threads) { + if (t->Busy()) + continue; + + if (!t->SetTask( [this, task] (){ Runner(task); } )) + throw std::runtime_error("There was an error while setting up the task to run on the thread."); + return; + } + } + + // Alternatively it goes in-to the queue. + queue.push(task); +} + +std::shared_ptr ThreadPool::Dequeue() { + std::lock_guard lock(queue_mutex); + if (queue.empty()) + return nullptr; + + auto task = queue.front(); + queue.pop(); + return task; +} + +void ThreadPool::Enqueue(const std::function& task) { + Enqueue(Task::Create(task)); +} + +void ThreadPool::Runner(const std::shared_ptr& task) { + if (!task->Complete()) + task->Run(); + + auto next_task = Dequeue(); + if (!next_task) + return; + Runner(next_task); +} + +unsigned int ThreadPool::QueueSize() { + std::lock_guard lock(queue_mutex); + return queue.size(); +} + +ThreadPool::~ThreadPool() { + // Wait for all tasks to be running. + while (QueueSize() != 0) {} + + // delete t waits for the thread to exit gracefully. + for (auto* t: threads) + delete t; +}