Unverified Commit 4066708d authored by kladko's avatar kladko

SKALE-4586 Added Thread Pool

parent ca858196
...@@ -29,13 +29,11 @@ void Agent::notifyAllConditionVariables() { ...@@ -29,13 +29,11 @@ void Agent::notifyAllConditionVariables() {
queueCond.notify_all(); queueCond.notify_all();
} }
Agent::Agent() : startedWorkers(false) {};
Agent::Agent() : startedRun(false) {};
void Agent::waitOnGlobalStartBarrier() { void Agent::waitOnGlobalStartBarrier() {
unique_lock<mutex> mlock(queueMutex); unique_lock<mutex> mlock(queueMutex);
while (!startedRun) { while (!startedWorkers) {
queueCond.wait(mlock); queueCond.wait(mlock);
} }
} }
...@@ -44,9 +42,10 @@ Agent::~Agent() { ...@@ -44,9 +42,10 @@ Agent::~Agent() {
} }
void Agent::releaseGlobalStartBarrier() { void Agent::releaseWorkers() {
if (startedRun.exchange(true)) { if (startedWorkers.exchange(true)) {
// already started
return; return;
} }
......
...@@ -33,7 +33,7 @@ class Agent { ...@@ -33,7 +33,7 @@ class Agent {
protected: protected:
atomic_bool startedRun; atomic_bool startedWorkers;
mutex messageMutex; mutex messageMutex;
condition_variable messageCond; condition_variable messageCond;
...@@ -52,9 +52,9 @@ public: ...@@ -52,9 +52,9 @@ public:
virtual ~Agent(); virtual ~Agent();
void releaseGlobalStartBarrier(); void releaseWorkers();
void waitOnGlobalStartBarrier(); void waitOnGlobalStartBarrier();
recursive_mutex& getMainMutex() { return m; } recursive_mutex& getMainMutex() { return m; }
}; };
...@@ -23,15 +23,16 @@ ...@@ -23,15 +23,16 @@
#include "common.h" #include "common.h"
#include "third_party/spdlog/spdlog.h" #include "third_party/spdlog/spdlog.h"
#include "ZMQServer.h"
#include "WorkerThreadPool.h" #include "WorkerThreadPool.h"
void WorkerThreadPool::startService() { void WorkerThreadPool::startService() {
lock_guard<recursive_mutex> lock(threadPoolMutex);
CHECK_STATE(!started.exchange(true)) CHECK_STATE(!started.exchange(true))
LOCK(m)
for (uint64_t i = 0; i < (uint64_t) numThreads; i++) { for (uint64_t i = 0; i < (uint64_t) numThreads; i++) {
createThread(i); createThread(i);
} }
...@@ -39,7 +40,7 @@ void WorkerThreadPool::startService() { ...@@ -39,7 +40,7 @@ void WorkerThreadPool::startService() {
} }
WorkerThreadPool::WorkerThreadPool(uint64_t _numThreads, Agent *_agent) : started(false), joined(false) { WorkerThreadPool::WorkerThreadPool(uint64_t _numThreads, ZMQServer *_agent) : started(false), joined(false) {
CHECK_STATE(_numThreads > 0); CHECK_STATE(_numThreads > 0);
CHECK_STATE(_agent); CHECK_STATE(_agent);
spdlog::info("Started thread pool. Threads count:" + to_string(_numThreads)); spdlog::info("Started thread pool. Threads count:" + to_string(_numThreads));
...@@ -52,7 +53,7 @@ void WorkerThreadPool::joinAll() { ...@@ -52,7 +53,7 @@ void WorkerThreadPool::joinAll() {
if (joined) if (joined)
return; return;
lock_guard<recursive_mutex> lock(threadPoolMutex); LOCK(m);
joined = true; joined = true;
...@@ -69,3 +70,13 @@ bool WorkerThreadPool::isJoined() const { ...@@ -69,3 +70,13 @@ bool WorkerThreadPool::isJoined() const {
WorkerThreadPool::~WorkerThreadPool(){ WorkerThreadPool::~WorkerThreadPool(){
} }
void WorkerThreadPool::createThread(uint64_t _threadNumber) {
spdlog::info("Starting ZMQ worker thread " + to_string(_threadNumber) );
this->threadpool.push_back(
make_shared< thread >( ZMQServer::workerThreadMessageProcessLoop, agent ) );
spdlog::info("Started ZMQ worker thread " + to_string(_threadNumber) );
}
...@@ -27,30 +27,30 @@ ...@@ -27,30 +27,30 @@
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include "Agent.h"
class Agent;
class ZMQServer;
class WorkerThreadPool { class WorkerThreadPool {
atomic_bool started; atomic_bool started;
virtual void createThread( uint64_t threadNumber ) = 0; void createThread( uint64_t threadNumber );
recursive_mutex m;
protected: protected:
atomic_bool joined; atomic_bool joined;
vector<shared_ptr<thread>> threadpool; vector<shared_ptr<thread>> threadpool;
recursive_mutex threadPoolMutex;
uint64_t numThreads = 0; uint64_t numThreads = 0;
Agent* agent = nullptr; ZMQServer* agent = nullptr;
protected:
WorkerThreadPool(uint64_t _numThreads, Agent *_agent);
public: public:
WorkerThreadPool(uint64_t _numThreads, ZMQServer *_agent);
virtual ~WorkerThreadPool(); virtual ~WorkerThreadPool();
virtual void startService(); virtual void startService();
...@@ -59,5 +59,4 @@ public: ...@@ -59,5 +59,4 @@ public:
bool isJoined() const; bool isJoined() const;
}; };
...@@ -55,6 +55,9 @@ ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string ...@@ -55,6 +55,9 @@ ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string
int linger = 0; int linger = 0;
zmq_setsockopt(*socket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_setsockopt(*socket, ZMQ_LINGER, &linger, sizeof(linger));
threadPool = make_shared<WorkerThreadPool>(1, this);
} }
void ZMQServer::run() { void ZMQServer::run() {
...@@ -73,7 +76,7 @@ void ZMQServer::run() { ...@@ -73,7 +76,7 @@ void ZMQServer::run() {
spdlog::info("Bound port ..."); spdlog::info("Bound port ...");
waitOnGlobalStartBarrier();
spdlog::info("Started zmq read loop ..."); spdlog::info("Started zmq read loop ...");
...@@ -123,11 +126,13 @@ void ZMQServer::initZMQServer(bool _checkSignature, bool _checkKeyOwnership) { ...@@ -123,11 +126,13 @@ void ZMQServer::initZMQServer(bool _checkSignature, bool _checkKeyOwnership) {
serverThread = make_shared<thread>(std::bind(&ZMQServer::run, ZMQServer::zmqServer)); serverThread = make_shared<thread>(std::bind(&ZMQServer::run, ZMQServer::zmqServer));
serverThread->detach(); serverThread->detach();
zmqServer->releaseWorkers();
spdlog::info("Inited zmq server."); spdlog::info("Inited zmq server.");
spdlog::info("Starting zmq server ..."); spdlog::info("Starting zmq server ...");
zmqServer->releaseGlobalStartBarrier(); zmqServer->releaseWorkers();
spdlog::info("Started zmq server."); spdlog::info("Started zmq server.");
...@@ -254,3 +259,12 @@ void ZMQServer::doOneServerLoop() { ...@@ -254,3 +259,12 @@ void ZMQServer::doOneServerLoop() {
exit(-18); exit(-18);
} }
} }
void ZMQServer::workerThreadMessageProcessLoop(ZMQServer* _agent ) {
CHECK_STATE(_agent);
_agent->waitOnGlobalStartBarrier();
while (!isExitRequested) {
sleep(100);
}
}
\ No newline at end of file
...@@ -36,22 +36,36 @@ ...@@ -36,22 +36,36 @@
#include "zhelpers.hpp" #include "zhelpers.hpp"
#include "Agent.h" #include "Agent.h"
#include "WorkerThreadPool.h"
using namespace std; using namespace std;
class ZMQServer : Agent{ class ZMQServer : public Agent{
uint64_t workerThreads; uint64_t workerThreads;
string caCertFile;
string caCert;
bool checkKeyOwnership = true;
shared_ptr<zmq::context_t> ctx;
shared_ptr<zmq::socket_t> socket;
static std::atomic<bool> isExitRequested;
void doOneServerLoop();
public: public:
bool checkSignature = false; bool checkSignature = false;
string caCertFile = "";
string caCert = "";
static shared_ptr<ZMQServer> zmqServer; static shared_ptr<ZMQServer> zmqServer;
shared_ptr<WorkerThreadPool> threadPool = nullptr;
static shared_ptr<std::thread> serverThread; static shared_ptr<std::thread> serverThread;
ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string& _caCertFile); ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string& _caCertFile);
...@@ -63,15 +77,7 @@ public: ...@@ -63,15 +77,7 @@ public:
static void initZMQServer(bool _checkSignature, bool _checkKeyOwnership); static void initZMQServer(bool _checkSignature, bool _checkKeyOwnership);
static void exitZMQServer(); static void exitZMQServer();
private: static void workerThreadMessageProcessLoop(ZMQServer* agent );
bool checkKeyOwnership = true;
shared_ptr<zmq::context_t> ctx;
shared_ptr<zmq::socket_t> socket;
static std::atomic<bool> isExitRequested;
void doOneServerLoop();
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment