Unverified Commit 73e2a031 authored by kladko's avatar kladko

SKALE-4586 Added Thread Pool

parent f457b201
This diff is collapsed.
...@@ -81,7 +81,7 @@ void WorkerThreadPool::createThread(uint64_t _threadNumber) { ...@@ -81,7 +81,7 @@ void WorkerThreadPool::createThread(uint64_t _threadNumber) {
spdlog::info("Starting ZMQ worker thread " + to_string(_threadNumber) ); spdlog::info("Starting ZMQ worker thread " + to_string(_threadNumber) );
this->threadpool.push_back( this->threadpool.push_back(
make_shared< thread >( ZMQServer::workerThreadMessageProcessLoop, agent ) ); make_shared< thread >( ZMQServer::workerThreadMessageProcessLoop, agent, _threadNumber ) );
spdlog::info("Started ZMQ worker thread " + to_string(_threadNumber) ); spdlog::info("Started ZMQ worker thread " + to_string(_threadNumber) );
} }
...@@ -61,7 +61,7 @@ ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string ...@@ -61,7 +61,7 @@ ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string
zmq_setsockopt(*socket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_setsockopt(*socket, ZMQ_LINGER, &linger, sizeof(linger));
threadPool = make_shared<WorkerThreadPool>(1, this); threadPool = make_shared<WorkerThreadPool>(NUM_ZMQ_WORKER_THREADS, this);
} }
...@@ -93,7 +93,7 @@ void ZMQServer::run() { ...@@ -93,7 +93,7 @@ void ZMQServer::run() {
while (!isExitRequested) { while (!isExitRequested) {
try { try {
zmqServer->doOneServerLoop(); zmqServer->doOneServerLoop();
} catch (ExitRequestedException& e) { } catch (ExitRequestedException &e) {
spdlog::info("Exit requested. Exiting server loop"); spdlog::info("Exit requested. Exiting server loop");
break; break;
} }
...@@ -178,7 +178,6 @@ void ZMQServer::checkForExit() { ...@@ -178,7 +178,6 @@ void ZMQServer::checkForExit() {
} }
PollResult ZMQServer::poll() { PollResult ZMQServer::poll() {
zmq_pollitem_t items[1]; zmq_pollitem_t items[1];
items[0].socket = *socket; items[0].socket = *socket;
...@@ -189,12 +188,19 @@ PollResult ZMQServer::poll() { ...@@ -189,12 +188,19 @@ PollResult ZMQServer::poll() {
do { do {
checkForExit(); checkForExit();
pollResult = zmq_poll(items, 1, 1); pollResult = zmq_poll(items, 1, 1);
pair <Json::Value, shared_ptr<zmq::message_t>> element;
// send all items in outgoing queue
while (outgoingQueue.try_dequeue(element)) {
sendToClient(element.first, element.second);
}
} while (pollResult == 0); } while (pollResult == 0);
return GOT_INCOMING_MSG; return GOT_INCOMING_MSG;
} }
pair<string, shared_ptr<zmq::message_t>> ZMQServer::receiveMessage() { pair <string, shared_ptr<zmq::message_t>> ZMQServer::receiveMessage() {
auto identity = make_shared<zmq::message_t>(); auto identity = make_shared<zmq::message_t>();
...@@ -228,7 +234,7 @@ pair<string, shared_ptr<zmq::message_t>> ZMQServer::receiveMessage() { ...@@ -228,7 +234,7 @@ pair<string, shared_ptr<zmq::message_t>> ZMQServer::receiveMessage() {
return {result, identity}; return {result, identity};
} }
void ZMQServer::sendToClient(Json::Value& _result, shared_ptr<zmq::message_t>& _identity ) { void ZMQServer::sendToClient(Json::Value &_result, shared_ptr <zmq::message_t> &_identity) {
string replyStr; string replyStr;
try { try {
Json::FastWriter fastWriter; Json::FastWriter fastWriter;
...@@ -265,7 +271,7 @@ void ZMQServer::doOneServerLoop() { ...@@ -265,7 +271,7 @@ void ZMQServer::doOneServerLoop() {
Json::Value result; Json::Value result;
result["status"] = ZMQ_SERVER_ERROR; result["status"] = ZMQ_SERVER_ERROR;
shared_ptr<zmq::message_t> identity = nullptr; shared_ptr <zmq::message_t> identity = nullptr;
string msgStr; string msgStr;
try { try {
...@@ -282,18 +288,18 @@ void ZMQServer::doOneServerLoop() { ...@@ -282,18 +288,18 @@ void ZMQServer::doOneServerLoop() {
uint64_t index = 0; uint64_t index = 0;
if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)!= nullptr) || if ((dynamic_pointer_cast<BLSSignReqMessage>(msg) != nullptr) ||
dynamic_pointer_cast<ECDSASignReqMessage>(msg)) { dynamic_pointer_cast<ECDSASignReqMessage>(msg)) {
index = NUM_ZMQ_WORKER_THREADS - 1; index = NUM_ZMQ_WORKER_THREADS - 1;
} else { } else {
index = 0; index = 0;
} }
auto element = pair<shared_ptr<ZMQMessage>, shared_ptr<zmq::message_t>>(msg, identity); auto element = pair < shared_ptr < ZMQMessage >, shared_ptr<zmq::message_t>>
(msg, identity);
incomingQueue.at(index).enqueue(element); incomingQueue.at(index).enqueue(element);
result = msg->process();
} catch (ExitRequestedException) { } catch (ExitRequestedException) {
throw; throw;
} catch (exception &e) { } catch (exception &e) {
...@@ -302,30 +308,69 @@ void ZMQServer::doOneServerLoop() { ...@@ -302,30 +308,69 @@ void ZMQServer::doOneServerLoop() {
spdlog::error("Exception in zmq server :{}", e.what()); spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("ID:" + string((char *) identity->data(), identity->size())); spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("Client request :" + msgStr); spdlog::error("Client request :" + msgStr);
sendToClient(result, identity);
} catch (...) { } catch (...) {
checkForExit(); checkForExit();
spdlog::error("Error in zmq server "); spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server "; result["errorMessage"] = "Error in zmq server ";
spdlog::error("ID:" + string((char *) identity->data(), identity->size())); spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("Client request :" + msgStr); spdlog::error("Client request :" + msgStr);
sendToClient(result, identity);
} }
sendToClient(result, identity);
} }
void ZMQServer::workerThreadProcessNextMessage() { void ZMQServer::workerThreadProcessNextMessage(uint64_t _threadNumber) {
usleep(1000000);
cerr << "WORKER LOOP" << endl;
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
shared_ptr <zmq::message_t> identity = nullptr;
string msgStr;
pair <shared_ptr<ZMQMessage>, shared_ptr<zmq::message_t>> element;
try {
while (!incomingQueue.at(_threadNumber)
.wait_dequeue_timed(element, std::chrono::milliseconds(100))) {
}
result = element.first->process();
} catch (ExitRequestedException) {
throw;
} catch (exception &e) {
checkForExit();
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("Client request :" + msgStr);
} catch (...) {
checkForExit();
spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server ";
spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("Client request :" + msgStr);
}
pair <Json::Value, shared_ptr<zmq::message_t>> fullResult(result, element.second);
outgoingQueue.enqueue(fullResult);
} }
void ZMQServer::workerThreadMessageProcessLoop(ZMQServer *_agent) { void ZMQServer::workerThreadMessageProcessLoop(ZMQServer *_agent, uint64_t _threadNumber) {
CHECK_STATE(_agent); CHECK_STATE(_agent);
_agent->waitOnGlobalStartBarrier(); _agent->waitOnGlobalStartBarrier();
// do work forever until told to exit // do work forever until told to exit
while (!isExitRequested) { while (!isExitRequested) {
try { try {
_agent->workerThreadProcessNextMessage(); _agent->workerThreadProcessNextMessage(_threadNumber);
} catch (ExitRequestedException &e) { } catch (ExitRequestedException &e) {
break; break;
} catch (Exception &e) { } catch (Exception &e) {
......
...@@ -50,9 +50,9 @@ class ZMQServer : public Agent{ ...@@ -50,9 +50,9 @@ class ZMQServer : public Agent{
string caCertFile; string caCertFile;
string caCert; string caCert;
ReaderWriterQueue<pair<string, shared_ptr<zmq::message_t>>> outgoingQueue; BlockingReaderWriterQueue<pair<Json::Value, shared_ptr<zmq::message_t>>> outgoingQueue;
vector<ReaderWriterQueue<pair<shared_ptr<ZMQMessage>, shared_ptr<zmq::message_t>>>> incomingQueue; vector<BlockingReaderWriterQueue<pair<shared_ptr<ZMQMessage>, shared_ptr<zmq::message_t>>>> incomingQueue;
bool checkKeyOwnership = true; bool checkKeyOwnership = true;
...@@ -84,9 +84,9 @@ public: ...@@ -84,9 +84,9 @@ public:
static void initZMQServer(bool _checkSignature, bool _checkKeyOwnership); static void initZMQServer(bool _checkSignature, bool _checkKeyOwnership);
static void exitZMQServer(); static void exitZMQServer();
static void workerThreadMessageProcessLoop(ZMQServer* agent ); static void workerThreadMessageProcessLoop(ZMQServer* agent, uint64_t _threadNumber );
void workerThreadProcessNextMessage(); void workerThreadProcessNextMessage(uint64_t _threadNumber);
void checkForExit(); void checkForExit();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment