Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions include/keyple/core/service/cpp/ExecutorService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <typeinfo>
#include <vector>
Expand Down Expand Up @@ -74,25 +76,35 @@ class KEYPLESERVICE_API ExecutorService final {
/**
*
*/
void run();
std::mutex mMutex;

/**
*
*/
std::condition_variable mCondition;

/**
*
*/
std::atomic<bool> mRunning;
bool mRunning;

/**
*
*/
std::atomic<bool> mTerminated;
bool mTerminated;

/**
*
*/
std::thread* mThread;
std::unique_ptr<std::thread> mThread;

/**
*
*/
void run();
};

} /* namespace cpp */
} /* namespace service */
} /* namespace core */
} /* namespace keyple */
} /* namespace keyple */
77 changes: 51 additions & 26 deletions src/main/cpp/ExecutorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include "keyple/core/service/cpp/ExecutorService.hpp"

#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>

#include "keyple/core/service/AbstractObservableStateAdapter.hpp"
#include "keyple/core/util/cpp/Thread.hpp"
Expand All @@ -27,40 +30,43 @@ using keyple::core::service::AbstractObservableStateAdapter;
using keyple::core::util::cpp::Thread;

ExecutorService::ExecutorService()
: mRunning(true)
: mRunning(false)
, mTerminated(false)
{
mThread = new std::thread(&ExecutorService::run, this);
}

ExecutorService::~ExecutorService()
{
mRunning = false;

while (!mTerminated) {
Thread::sleep(10);
}
shutdown();
}

void
ExecutorService::run()
{
/* Emulates a SingleThreadExecutor (e.g. only one thread at a time) */
while (true) {
std::unique_lock<std::mutex> lock(mMutex);

while (mRunning) {
if (mPool.size()) {
/* Start first service and wait until completion */
std::shared_ptr<Job> job = mPool[0];
// Wait until there's a job or the service is shutting down
mCondition.wait(lock, [this]{
return !mPool.empty() || !mRunning;
});

if(!job->isCancelled()) {
job->run();
}

/* Remove from vector */
mPool.erase(mPool.begin());
// Check if we should terminate
if (!mRunning && mPool.empty()) {
break;
}

Thread::sleep(100);
// Get the job and remove it from the pool
std::shared_ptr<Job> job = mPool.front();
mPool.erase(mPool.begin());

// Unlock the mutex before running the job
// This allows other threads to submit new jobs while one is being processed
lock.unlock();

if (!job->isCancelled()) {
job->run();
}
}

mTerminated = true;
Expand All @@ -69,28 +75,47 @@ ExecutorService::run()
void
ExecutorService::execute(std::shared_ptr<Job> job)
{
mPool.push_back(job);
{
std::lock_guard<std::mutex> lock(mMutex);
if (!mThread) {
mRunning = true;
mThread = std::unique_ptr<std::thread>(new std::thread(&ExecutorService::run, this));
}
mPool.push_back(job);
}
mCondition.notify_one();
}

std::shared_ptr<Job>
ExecutorService::submit(std::shared_ptr<Job> job)
{
mPool.push_back(job);

execute(job);
std::lock_guard<std::mutex> lock(mMutex);
return mPool.back();
}

void
ExecutorService::shutdown()
{
mRunning = false;
{
std::lock_guard<std::mutex> lock(mMutex);
if (!mThread) {
return;
}
mRunning = false;
}

while (!mTerminated) {
Thread::sleep(10);
mCondition.notify_one();

if (mThread->joinable()) {
mThread->join();
}

mThread.reset();
mTerminated = true;
}

} /* namespace cpp */
} /* namespace service */
} /* namespace core */
} /* namespace keyple */
} /* namespace keyple */
Loading