Unverified Commit 9e9f36bc authored by kladko's avatar kladko

SKALE-4586 Added Thread Pool

parent a36f87e3
...@@ -21,12 +21,9 @@ ...@@ -21,12 +21,9 @@
@date 2018 @date 2018
*/ */
#include "common.h"
#include "Log.h"
#include "ExitRequestedException.h" #include "ExitRequestedException.h"
ExitRequestedException::ExitRequestedException(const std::string &_message, const string& _className) : ExitRequestedException::ExitRequestedException() {
Exception(_message, _className) {
fatal = false;
} }
...@@ -23,10 +23,9 @@ ...@@ -23,10 +23,9 @@
#pragma once #pragma once
#include "Exception.h" #include <exception>
class ExitRequestedException : public std::exception {
class ExitRequestedException : public Exception {
public: public:
ExitRequestedException( const std::string& _message, const std::string& _className ); ExitRequestedException();
}; };
...@@ -61,7 +61,7 @@ ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string ...@@ -61,7 +61,7 @@ ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string
} }
void ZMQServer::run() { void ZMQServer::initListenSocket() {
auto port = BASE_PORT + 5; auto port = BASE_PORT + 5;
...@@ -71,20 +71,29 @@ void ZMQServer::run() { ...@@ -71,20 +71,29 @@ void ZMQServer::run() {
CHECK_STATE(socket); CHECK_STATE(socket);
socket->bind("tcp://*:" + to_string(port)); socket->bind("tcp://*:" + to_string(port));
} catch (...) { } catch (...) {
spdlog::error("Server task could not bind to port:{}", port); spdlog::error("Zmq server task could not bind to port:{}", port);
throw SGXException(ZMQ_COULD_NOT_BIND_FRONT_END, "Server task could not bind."); throw SGXException(ZMQ_COULD_NOT_BIND_FRONT_END, "Server task could not bind.");
} }
spdlog::info("Bound port ..."); spdlog::info("ZMQ server socket created and bound.");
}
void ZMQServer::run() {
spdlog::info("Started zmq read loop ..."); zmqServer->initListenSocket();
spdlog::info("Started zmq read loop.");
while (!isExitRequested) { while (!isExitRequested) {
try { try {
zmqServer->doOneServerLoop(); zmqServer->doOneServerLoop();
} catch (...) { } catch (ExitRequestedException& e) {
spdlog::info("Exit requested. Exiting server loop");
break;
}
catch (...) {
spdlog::error("doOneServerLoop threw exception. This should never happen!"); spdlog::error("doOneServerLoop threw exception. This should never happen!");
} }
} }
...@@ -158,6 +167,12 @@ ZMQServer::~ZMQServer() { ...@@ -158,6 +167,12 @@ ZMQServer::~ZMQServer() {
exitZMQServer(); exitZMQServer();
} }
void ZMQServer::checkForExit() {
if (isExitRequested) {
throw ExitRequestedException();
}
}
void ZMQServer::doOneServerLoop() { void ZMQServer::doOneServerLoop() {
string replyStr; string replyStr;
...@@ -172,7 +187,6 @@ void ZMQServer::doOneServerLoop() { ...@@ -172,7 +187,6 @@ void ZMQServer::doOneServerLoop() {
try { try {
zmq_pollitem_t items[1]; zmq_pollitem_t items[1];
items[0].socket = *socket; items[0].socket = *socket;
items[0].events = ZMQ_POLLIN; items[0].events = ZMQ_POLLIN;
...@@ -180,21 +194,19 @@ void ZMQServer::doOneServerLoop() { ...@@ -180,21 +194,19 @@ void ZMQServer::doOneServerLoop() {
int pollResult = 0; int pollResult = 0;
do { do {
checkForExit();
pollResult = zmq_poll(items, 1, 1000); pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
return;
}
} while (pollResult == 0); } while (pollResult == 0);
if (!socket->recv(&identity)) { if (!socket->recv(&identity)) {
// something terrible happened // something terrible happened
spdlog::error("Fatal error: socket->recv(&identity) returned false"); spdlog::error("Fatal error: socket->recv(&identity) returned false. Exiting.");
exit(-11); exit(-11);
} }
if (!identity.more()) { if (!identity.more()) {
// something terrible happened // something terrible happened
spdlog::error("Fatal error: zmq_msg_more(identity) returned false"); spdlog::error("Fatal error: zmq_msg_more(identity) returned false. Existing.");
exit(-12); exit(-12);
} }
...@@ -202,7 +214,7 @@ void ZMQServer::doOneServerLoop() { ...@@ -202,7 +214,7 @@ void ZMQServer::doOneServerLoop() {
if (!socket->recv(&reqMsg, 0)) { if (!socket->recv(&reqMsg, 0)) {
// something terrible happened // something terrible happened
spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false"); spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false. Exiting");
exit(-13); exit(-13);
} }
...@@ -216,23 +228,18 @@ void ZMQServer::doOneServerLoop() { ...@@ -216,23 +228,18 @@ void ZMQServer::doOneServerLoop() {
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE); CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process(); result = parsedMsg->process();
} catch (std::exception &e) { } catch (std::exception &e) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what()); result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what()); spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("ID:" + string((char*) identity.data(), identity.size())); spdlog::error("ID:" + string((char *) identity.data(), identity.size()));
spdlog::error("Client request :" + stringToParse); spdlog::error("Client request :" + stringToParse);
} catch (...) { } catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server "); spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server "; result["errorMessage"] = "Error in zmq server ";
spdlog::error("ID:" + string((char*) identity.data(), identity.size())); spdlog::error("ID:" + string((char *) identity.data(), identity.size()));
spdlog::error("Client request :" + stringToParse); spdlog::error("Client request :" + stringToParse);
} }
...@@ -260,7 +267,7 @@ void ZMQServer::doOneServerLoop() { ...@@ -260,7 +267,7 @@ void ZMQServer::doOneServerLoop() {
exit(-16); exit(-16);
} }
} catch ( std::exception &e ) { } catch (std::exception &e) {
if (isExitRequested) { if (isExitRequested) {
return; return;
} }
...@@ -280,16 +287,16 @@ void ZMQServer::workerThreadProcessNextMessage() { ...@@ -280,16 +287,16 @@ void ZMQServer::workerThreadProcessNextMessage() {
cerr << "WORKER LOOP" << endl; cerr << "WORKER LOOP" << endl;
} }
void ZMQServer::workerThreadMessageProcessLoop(ZMQServer* _agent ) { void ZMQServer::workerThreadMessageProcessLoop(ZMQServer *_agent) {
CHECK_STATE(_agent); CHECK_STATE(_agent);
_agent->waitOnGlobalStartBarrier(); _agent->waitOnGlobalStartBarrier();
// do work forever until told to exit // do work forever until told to exit
while (!isExitRequested) { while (!isExitRequested) {
try { try {
_agent->workerThreadProcessNextMessage(); _agent->workerThreadProcessNextMessage();
} catch (ExitRequestedException& e) { } catch (ExitRequestedException &e) {
break; break;
} catch (Exception& e) { } catch (Exception &e) {
spdlog::error(string("Caught exception in worker thread loop:") + e.what()); spdlog::error(string("Caught exception in worker thread loop:") + e.what());
} }
} }
......
...@@ -74,6 +74,8 @@ public: ...@@ -74,6 +74,8 @@ public:
void run(); void run();
void initListenSocket();
static void initZMQServer(bool _checkSignature, bool _checkKeyOwnership); static void initZMQServer(bool _checkSignature, bool _checkKeyOwnership);
static void exitZMQServer(); static void exitZMQServer();
...@@ -81,6 +83,8 @@ public: ...@@ -81,6 +83,8 @@ public:
void workerThreadProcessNextMessage(); void workerThreadProcessNextMessage();
void checkForExit();
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment