Unverified Commit 61cf7127 authored by kladko's avatar kladko

SKALE-4586 Added concurrentqueue

parent 5319afa7
......@@ -179,6 +179,15 @@ void ZMQServer::checkForExit() {
}
void ZMQServer::sendMessagesInOutgoingMessageQueueIfAny() {
pair <Json::Value, shared_ptr<zmq::message_t>> element;
// send all items in outgoing queue
while (outgoingQueue.try_dequeue(element)) {
sendToClient(element.first, element.second);
}
}
void ZMQServer::waitForIncomingAndProcessOutgoingMessages() {
zmq_pollitem_t items[1];
items[0].socket = *socket;
......@@ -190,12 +199,8 @@ void ZMQServer::waitForIncomingAndProcessOutgoingMessages() {
checkForExit();
pollResult = zmq_poll(items, 1, 1);
pair <Json::Value, shared_ptr<zmq::message_t>> element;
sendMessagesInOutgoingMessageQueueIfAny();
// send all items in outgoing queue
while (outgoingQueue.try_dequeue(element)) {
sendToClient(element.first, element.second);
}
} while (pollResult == 0);
}
......@@ -280,31 +285,33 @@ void ZMQServer::doOneServerLoop() {
tie(msgStr, identity) = receiveMessage();
auto msg = ZMQMessage::parse(
msgStr.c_str(), msgStr.size(), true, checkSignature, checkKeyOwnership);
{
CHECK_STATE2(msg, ZMQ_COULD_NOT_PARSE);
auto msg = ZMQMessage::parse(
msgStr.c_str(), msgStr.size(), true, checkSignature, checkKeyOwnership);
CHECK_STATE2(msg, ZMQ_COULD_NOT_PARSE);
uint64_t index = 0;
uint64_t index = 0;
if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)) ||
dynamic_pointer_cast<ECDSASignReqMessage>(msg)) {
if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)) ||
dynamic_pointer_cast<ECDSASignReqMessage>(msg)) {
boost::hash <std::string> string_hash;
boost::hash <std::string> string_hash;
auto hash = string_hash(string((const char *) identity->data()));
auto hash = string_hash(string((const char *) identity->data()));
index = hash % (NUM_ZMQ_WORKER_THREADS - 1);
} else {
index = NUM_ZMQ_WORKER_THREADS - 1;
}
index = hash % (NUM_ZMQ_WORKER_THREADS - 1);
} else {
index = NUM_ZMQ_WORKER_THREADS - 1;
}
auto element = pair < shared_ptr < ZMQMessage >, shared_ptr<zmq::message_t>>
(msg, identity);
auto element = pair < shared_ptr < ZMQMessage >, shared_ptr<zmq::message_t>>
(msg, identity);
incomingQueue.at(index).enqueue(element);
incomingQueue.at(index).enqueue(element);
}
} catch (ExitRequestedException) {
throw;
......
......@@ -95,6 +95,8 @@ public:
void sendToClient(Json::Value& _result, shared_ptr<zmq::message_t>& _identity);
void sendMessagesInOutgoingMessageQueueIfAny();
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment