Unverified Commit 5319afa7 authored by kladko's avatar kladko

SKALE-4586 Added concurrent queue

parent 597ecc4d
......@@ -179,7 +179,7 @@ void ZMQServer::checkForExit() {
}
PollResult ZMQServer::pollIncomingAndSendOutgoing() {
void ZMQServer::waitForIncomingAndProcessOutgoingMessages() {
zmq_pollitem_t items[1];
items[0].socket = *socket;
items[0].events = ZMQ_POLLIN;
......@@ -198,7 +198,6 @@ PollResult ZMQServer::pollIncomingAndSendOutgoing() {
}
} while (pollResult == 0);
return GOT_INCOMING_MSG;
}
pair <string, shared_ptr<zmq::message_t>> ZMQServer::receiveMessage() {
......@@ -277,7 +276,7 @@ void ZMQServer::doOneServerLoop() {
try {
poll();
waitForIncomingAndProcessOutgoingMessages();
tie(msgStr, identity) = receiveMessage();
......@@ -292,9 +291,9 @@ void ZMQServer::doOneServerLoop() {
if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)) ||
dynamic_pointer_cast<ECDSASignReqMessage>(msg)) {
boost::hash<std::string> string_hash;
boost::hash <std::string> string_hash;
auto hash = string_hash(string((const char*) identity->data()));
auto hash = string_hash(string((const char *) identity->data()));
index = hash % (NUM_ZMQ_WORKER_THREADS - 1);
} else {
......@@ -305,7 +304,6 @@ void ZMQServer::doOneServerLoop() {
(msg, identity);
incomingQueue.at(index).enqueue(element);
} catch (ExitRequestedException) {
......@@ -327,8 +325,6 @@ void ZMQServer::doOneServerLoop() {
}
}
void ZMQServer::workerThreadProcessNextMessage(uint64_t _threadNumber) {
......@@ -336,34 +332,44 @@ void ZMQServer::workerThreadProcessNextMessage(uint64_t _threadNumber) {
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
shared_ptr <zmq::message_t> identity = nullptr;
string msgStr;
pair <shared_ptr<ZMQMessage>, shared_ptr<zmq::message_t>> element;
try {
while (!incomingQueue.at(_threadNumber)
.wait_dequeue_timed(element, std::chrono::milliseconds(1000))) {
checkForExit();
}
} catch (ExitRequestedException) {
throw;
} catch (exception &e) {
checkForExit();
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("Client request :" + msgStr);
} catch (...) {
checkForExit();
spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server ";
spdlog::error("Client request :" + msgStr);
}
try {
result = element.first->process();
} catch (ExitRequestedException) {
throw;
} catch (exception &e) {
checkForExit();
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("ID:" + string((char *) element.second->data(), element.second->size()));
spdlog::error("Client request :" + msgStr);
} catch (...) {
checkForExit();
spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server ";
spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("ID:" + string((char *) element.second->data(), element.second->size()));
spdlog::error("Client request :" + msgStr);
}
......
......@@ -39,8 +39,6 @@
using namespace moodycamel;
typedef enum {GOT_INCOMING_MSG = 0, GOT_OUTFOING_MSG = 1} PollResult;
static const uint64_t NUM_ZMQ_WORKER_THREADS = 16;
......@@ -91,7 +89,7 @@ public:
void checkForExit();
PollResult pollIncomingAndSendOutGoing();
void waitForIncomingAndProcessOutgoingMessages();
pair<string, shared_ptr<zmq::message_t>> receiveMessage();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment