Unverified Commit ecde0240 authored by kladko's avatar kladko

SKALE-4586 Added Queue

parent 832ff75a
This diff is collapsed.
This diff is collapsed.
......@@ -31,7 +31,6 @@
#include <openssl/rand.h>
#include "third_party/spdlog/spdlog.h"
#include <zmq.hpp>
#include "zhelpers.hpp"
#include <jsonrpccpp/client.h>
......
......@@ -42,7 +42,7 @@ using namespace std;
shared_ptr <ZMQServer> ZMQServer::zmqServer = nullptr;
ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string &_caCertFile)
: checkSignature(_checkSignature), checkKeyOwnership(_checkKeyOwnership),
: outgoingQueue(NUM_ZMQ_WORKER_THREADS), checkSignature(_checkSignature), checkKeyOwnership(_checkKeyOwnership),
caCertFile(_caCertFile), ctx(make_shared<zmq::context_t>(1)) {
socket = make_shared<zmq::socket_t>(*ctx, ZMQ_ROUTER);
......@@ -103,7 +103,7 @@ void ZMQServer::run() {
spdlog::info("Exited zmq server loop");
}
std::atomic<bool> ZMQServer::isExitRequested(false);
atomic<bool> ZMQServer::isExitRequested(false);
void ZMQServer::exitZMQServer() {
// if already exited do not exit
......@@ -151,7 +151,7 @@ void ZMQServer::initZMQServer(bool _checkSignature, bool _checkKeyOwnership) {
zmqServer = make_shared<ZMQServer>(_checkSignature, _checkKeyOwnership, rootCAPath);
CHECK_STATE(zmqServer)
serverThread = make_shared<thread>(std::bind(&ZMQServer::run, ZMQServer::zmqServer));
serverThread = make_shared<thread>(bind(&ZMQServer::run, ZMQServer::zmqServer));
serverThread->detach();
spdlog::info("Releasing SGX worker threads ...");
......@@ -163,7 +163,7 @@ void ZMQServer::initZMQServer(bool _checkSignature, bool _checkKeyOwnership) {
spdlog::info("Inited zmq server.");
}
shared_ptr <std::thread> ZMQServer::serverThread = nullptr;
shared_ptr <thread> ZMQServer::serverThread = nullptr;
ZMQServer::~ZMQServer() {
exitZMQServer();
......@@ -192,38 +192,41 @@ PollResult ZMQServer::poll() {
return GOT_INCOMING_MSG;
}
string ZMQServer::receiveMessage(zmq::message_t& _identity) {
if (!socket->recv(&_identity)) {
pair<string, shared_ptr<zmq::message_t>> ZMQServer::receiveMessage() {
auto identity = make_shared<zmq::message_t>();
if (!socket->recv(identity.get())) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: socket->recv(&identity) returned false. Exiting.");
exit(-11);
}
if (!_identity.more()) {
if (!identity->more()) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: zmq_msg_more(identity) returned false. Existing.");
exit(-12);
}
zmq::message_t reqMsg;
auto reqMsg = make_shared<zmq::message_t>();
if (!socket->recv(&reqMsg, 0)) {
if (!socket->recv(reqMsg.get(), 0)) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false. Exiting");
exit(-13);
}
auto result = string((char *) reqMsg.data(), reqMsg.size());
auto result = string((char *) reqMsg->data(), reqMsg->size());
CHECK_STATE(result.front() == '{')
CHECK_STATE(result.back() == '}')
return result;
return {result, identity};
}
void ZMQServer::sendToClient(Json::Value& _result, zmq::message_t& _identity ) {
void ZMQServer::sendToClient(Json::Value& _result, shared_ptr<zmq::message_t>& _identity ) {
string replyStr;
try {
Json::FastWriter fastWriter;
......@@ -235,7 +238,7 @@ void ZMQServer::sendToClient(Json::Value& _result, zmq::message_t& _identity )
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
if (!socket->send(_identity, ZMQ_SNDMORE)) {
if (!socket->send(*_identity, ZMQ_SNDMORE)) {
exit(-15);
}
if (!s_send(*socket, replyStr)) {
......@@ -243,7 +246,7 @@ void ZMQServer::sendToClient(Json::Value& _result, zmq::message_t& _identity )
}
} catch (ExitRequestedException) {
throw;
} catch (std::exception &e) {
} catch (exception &e) {
checkForExit();
spdlog::error("Exception in zmq server worker send :{}", e.what());
exit(-17);
......@@ -260,14 +263,14 @@ void ZMQServer::doOneServerLoop() {
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
zmq::message_t identity;
shared_ptr<zmq::message_t> identity = nullptr;
string msgStr;
try {
poll();
msgStr = receiveMessage(identity);
tie(msgStr, identity) = receiveMessage();
auto msg = ZMQMessage::parse(
msgStr.c_str(), msgStr.size(), true, checkSignature, checkKeyOwnership);
......@@ -276,25 +279,24 @@ void ZMQServer::doOneServerLoop() {
if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)!= nullptr) ||
dynamic_pointer_cast<ECDSASignReqMessage>(msg)) {
spdlog::info("FUFUFUFUF");
} else {
spdlog::info("HAHAHA");
}
result = msg->process();
} catch (ExitRequestedException) {
throw;
} catch (std::exception &e) {
} catch (exception &e) {
checkForExit();
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("ID:" + string((char *) identity.data(), identity.size()));
spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("Client request :" + msgStr);
} catch (...) {
checkForExit();
spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server ";
spdlog::error("ID:" + string((char *) identity.data(), identity.size()));
spdlog::error("ID:" + string((char *) identity->data(), identity->size()));
spdlog::error("Client request :" + msgStr);
}
......
......@@ -25,14 +25,22 @@
#ifndef SGXWALLET_ZMQServer_H
#define SGXWALLET_ZMQServer_H
#include "third_party/readerwriterqueue.h"
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "Agent.h"
#include "WorkerThreadPool.h"
using namespace moodycamel;
typedef enum {GOT_INCOMING_MSG = 0, GOT_OUTFOING_MSG = 1} PollResult;
static const uint64_t NUM_ZMQ_WORKER_THREADS = 2;
class ZMQServer : public Agent{
uint64_t workerThreads;
......@@ -40,12 +48,16 @@ class ZMQServer : public Agent{
string caCertFile;
string caCert;
ReaderWriterQueue<pair<string, shared_ptr<zmq_msg_t>>> outgoingQueue;
vector<ReaderWriterQueue<pair<string, shared_ptr<zmq_msg_t>>>> incomingQueue;
bool checkKeyOwnership = true;
shared_ptr<zmq::context_t> ctx;
shared_ptr<zmq::socket_t> socket;
static std::atomic<bool> isExitRequested;
static atomic<bool> isExitRequested;
void doOneServerLoop();
......@@ -57,7 +69,6 @@ public:
shared_ptr<WorkerThreadPool> threadPool = nullptr;
static shared_ptr<std::thread> serverThread;
ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string& _caCertFile);
......@@ -79,9 +90,9 @@ public:
PollResult poll();
string receiveMessage(zmq::message_t& _identity);
pair<string, shared_ptr<zmq::message_t>> receiveMessage();
void sendToClient(Json::Value& _result, zmq::message_t& _identity);
void sendToClient(Json::Value& _result, shared_ptr<zmq::message_t>& _identity);
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment