Unverified Commit 9b1b6981 authored by kladko's avatar kladko

SKALE-4586 Added Thread Pool

parent d4e26311
......@@ -27,41 +27,39 @@
#include "WorkerThreadPool.h"
void WorkerThreadPool::startService() {
WorkerThreadPool::WorkerThreadPool(uint64_t _numThreads, ZMQServer *_agent) : joined(false) {
CHECK_STATE(_numThreads > 0);
CHECK_STATE(_agent);
CHECK_STATE(!started.exchange(true))
spdlog::info("Creating thread pool. Threads count:" + to_string(_numThreads));
this->agent = _agent;
this->numThreads = _numThreads;;
LOCK(m)
for (uint64_t i = 0; i < (uint64_t) numThreads; i++) {
createThread(i);
}
}
spdlog::info("Created thread pool");
WorkerThreadPool::WorkerThreadPool(uint64_t _numThreads, ZMQServer *_agent) : started(false), joined(false) {
CHECK_STATE(_numThreads > 0);
CHECK_STATE(_agent);
spdlog::info("Started thread pool. Threads count:" + to_string(_numThreads));
this->agent = _agent;
this->numThreads = _numThreads;;
}
void WorkerThreadPool::joinAll() {
if (joined)
return;
LOCK(m);
spdlog::info("Joining worker threads ...");
joined = true;
if (joined.exchange(true))
return;
for (auto &&thread : threadpool) {
if (thread->joinable())
thread->join();
CHECK_STATE(!thread->joinable());
}
spdlog::info("Joined worker threads.");
}
bool WorkerThreadPool::isJoined() const {
......
......@@ -33,8 +33,6 @@ class ZMQServer;
class WorkerThreadPool {
atomic_bool started;
void createThread( uint64_t threadNumber );
recursive_mutex m;
......@@ -53,8 +51,6 @@ public:
virtual ~WorkerThreadPool();
virtual void startService();
void joinAll();
bool isJoined() const;
......
......@@ -94,10 +94,26 @@ void ZMQServer::run() {
std::atomic<bool> ZMQServer::isExitRequested(false);
void ZMQServer::exitZMQServer() {
isExitRequested.exchange(true);
// if already exited do not exit
spdlog::info("exitZMQServer called");
if (isExitRequested.exchange(true)) {
spdlog::info("Exit is already under way");
return;
}
spdlog::info("Exiting ZMQServer");
spdlog::info("Joining worker thread pool threads ...");
zmqServer->threadPool->joinAll();
spdlog::info("Joined worker thread pool threads");
spdlog::info("Shutting down ZMQ contect");
zmqServer->ctx->shutdown();
spdlog::info("Shut down ZMQ contect");
spdlog::info("Closing ZMQ server socket ...");
zmqServer->socket->close();
spdlog::info("Closed ZMQ server socket");
spdlog::info("Closing ZMQ context ...");
zmqServer->ctx->close();
spdlog::info("Closed ZMQ context.");
spdlog::info("Exited zmq server.");
}
......@@ -118,7 +134,7 @@ void ZMQServer::initZMQServer(bool _checkSignature, bool _checkKeyOwnership) {
spdlog::info("Read CA.", rootCAPath);
};
spdlog::info("Initing zmq server.");
spdlog::info("Initing zmq server ...");
zmqServer = make_shared<ZMQServer>(_checkSignature, _checkKeyOwnership, rootCAPath);
......@@ -126,22 +142,20 @@ void ZMQServer::initZMQServer(bool _checkSignature, bool _checkKeyOwnership) {
serverThread = make_shared<thread>(std::bind(&ZMQServer::run, ZMQServer::zmqServer));
serverThread->detach();
zmqServer->releaseWorkers();
spdlog::info("Inited zmq server.");
spdlog::info("Starting zmq server ...");
spdlog::info("Releasing SGX worker threads ...");
zmqServer->releaseWorkers();
spdlog::info("Started zmq server.");
spdlog::info("Released SGX worker threads.");
spdlog::info("Inited zmq server.");
}
shared_ptr <std::thread> ZMQServer::serverThread = nullptr;
ZMQServer::~ZMQServer() {}
ZMQServer::~ZMQServer() {
exitZMQServer();
}
void ZMQServer::doOneServerLoop() {
......@@ -265,7 +279,7 @@ void ZMQServer::workerThreadMessageProcessLoop(ZMQServer* _agent ) {
_agent->waitOnGlobalStartBarrier();
// do work forever until told to exit
while (!isExitRequested) {
sleep(1000);
usleep(1000000);
cerr << "WORKER LOOP" << endl;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment