diff --git a/include/keyple/core/service/cpp/ExecutorService.hpp b/include/keyple/core/service/cpp/ExecutorService.hpp index eae19b1..6a4c068 100644 --- a/include/keyple/core/service/cpp/ExecutorService.hpp +++ b/include/keyple/core/service/cpp/ExecutorService.hpp @@ -13,7 +13,9 @@ #pragma once +#include #include +#include #include #include #include @@ -74,25 +76,35 @@ class KEYPLESERVICE_API ExecutorService final { /** * */ - void run(); + std::mutex mMutex; + + /** + * + */ + std::condition_variable mCondition; /** * */ - std::atomic mRunning; + bool mRunning; /** * */ - std::atomic mTerminated; + bool mTerminated; /** * */ - std::thread* mThread; + std::unique_ptr mThread; + + /** + * + */ + void run(); }; } /* namespace cpp */ } /* namespace service */ } /* namespace core */ -} /* namespace keyple */ +} /* namespace keyple */ \ No newline at end of file diff --git a/src/main/cpp/ExecutorService.cpp b/src/main/cpp/ExecutorService.cpp index d611fc7..9ffa028 100644 --- a/src/main/cpp/ExecutorService.cpp +++ b/src/main/cpp/ExecutorService.cpp @@ -14,6 +14,9 @@ #include "keyple/core/service/cpp/ExecutorService.hpp" #include +#include +#include +#include #include "keyple/core/service/AbstractObservableStateAdapter.hpp" #include "keyple/core/util/cpp/Thread.hpp" @@ -27,40 +30,43 @@ using keyple::core::service::AbstractObservableStateAdapter; using keyple::core::util::cpp::Thread; ExecutorService::ExecutorService() -: mRunning(true) +: mRunning(false) , mTerminated(false) { - mThread = new std::thread(&ExecutorService::run, this); } ExecutorService::~ExecutorService() { - mRunning = false; - - while (!mTerminated) { - Thread::sleep(10); - } + shutdown(); } void ExecutorService::run() { - /* Emulates a SingleThreadExecutor (e.g. only one thread at a time) */ + while (true) { + std::unique_lock lock(mMutex); - while (mRunning) { - if (mPool.size()) { - /* Start first service and wait until completion */ - std::shared_ptr job = mPool[0]; + // Wait until there's a job or the service is shutting down + mCondition.wait(lock, [this]{ + return !mPool.empty() || !mRunning; + }); - if(!job->isCancelled()) { - job->run(); - } - - /* Remove from vector */ - mPool.erase(mPool.begin()); + // Check if we should terminate + if (!mRunning && mPool.empty()) { + break; } - Thread::sleep(100); + // Get the job and remove it from the pool + std::shared_ptr job = mPool.front(); + mPool.erase(mPool.begin()); + + // Unlock the mutex before running the job + // This allows other threads to submit new jobs while one is being processed + lock.unlock(); + + if (!job->isCancelled()) { + job->run(); + } } mTerminated = true; @@ -69,28 +75,47 @@ ExecutorService::run() void ExecutorService::execute(std::shared_ptr job) { - mPool.push_back(job); + { + std::lock_guard lock(mMutex); + if (!mThread) { + mRunning = true; + mThread = std::unique_ptr(new std::thread(&ExecutorService::run, this)); + } + mPool.push_back(job); + } + mCondition.notify_one(); } std::shared_ptr ExecutorService::submit(std::shared_ptr job) { - mPool.push_back(job); - + execute(job); + std::lock_guard lock(mMutex); return mPool.back(); } void ExecutorService::shutdown() { - mRunning = false; + { + std::lock_guard lock(mMutex); + if (!mThread) { + return; + } + mRunning = false; + } - while (!mTerminated) { - Thread::sleep(10); + mCondition.notify_one(); + + if (mThread->joinable()) { + mThread->join(); } + + mThread.reset(); + mTerminated = true; } } /* namespace cpp */ } /* namespace service */ } /* namespace core */ -} /* namespace keyple */ +} /* namespace keyple */ \ No newline at end of file