Unverified Commit d898fb07 authored by kladko's avatar kladko

SKALE-4284

parent f08c08d5
......@@ -70,7 +70,7 @@ bin_PROGRAMS = sgxwallet testw sgx_util
## have to be explicitly listed
COMMON_SRC = SGXException.cpp ExitHandler.cpp ZMQClient.cpp BLSSignRspMessage.cpp ECDSASignRspMessage.cpp ECDSASignReqMessage.cpp BLSSignReqMessage.cpp ZMQMessage.cpp ZMQServer.cpp ServerWorker.cpp InvalidStateException.cpp Exception.cpp InvalidArgumentException.cpp Log.cpp \
COMMON_SRC = SGXException.cpp ExitHandler.cpp ZMQClient.cpp BLSSignRspMessage.cpp ECDSASignRspMessage.cpp ECDSASignReqMessage.cpp BLSSignReqMessage.cpp ZMQMessage.cpp ZMQServer.cpp InvalidStateException.cpp Exception.cpp InvalidArgumentException.cpp Log.cpp \
SGXWalletServer.cpp SGXRegistrationServer.cpp CSRManagerServer.cpp BLSCrypto.cpp \
DKGCrypto.cpp ServerInit.cpp BLSPrivateKeyShareSGX.cpp LevelDB.cpp ServerDataChecker.cpp SEKManager.cpp \
third_party/intel/sgx_stub.c third_party/intel/sgx_detect_linux.c third_party/intel/create_enclave.c third_party/intel/oc_alloc.c \
......
......@@ -30,6 +30,7 @@
#include "common.h"
#include "SGXException.h"
#include "ZMQMessage.h"
#include "ZMQServer.h"
#include "sgxwallet_common.h"
......@@ -42,14 +43,7 @@ ZMQServer::ZMQServer(bool _checkSignature, const string &_caCertFile)
caCertFile(_caCertFile), ctx_(make_shared<zmq::context_t>(1)) {
frontend = make_shared<zmq::socket_t>(*ctx_, ZMQ_ROUTER);
backend = make_shared<zmq::socket_t>(*ctx_, ZMQ_DEALER);
//workerThreads = 2 * thread::hardware_concurrency();
workerThreads = 1; // do one thread for now
socket = make_shared<zmq::socket_t>(*ctx_, ZMQ_ROUTER);
if (_checkSignature) {
CHECK_STATE(!_caCertFile.empty());
......@@ -61,9 +55,7 @@ ZMQServer::ZMQServer(bool _checkSignature, const string &_caCertFile)
int linger = 0;
zmq_setsockopt(*frontend, ZMQ_LINGER, &linger, sizeof(linger));
zmq_setsockopt(*backend, ZMQ_LINGER, &linger, sizeof(linger));
zmq_setsockopt(*socket, ZMQ_LINGER, &linger, sizeof(linger));
}
......@@ -75,8 +67,8 @@ void ZMQServer::run() {
spdlog::info("Starting zmq server on port {} ...", port);
try {
CHECK_STATE(frontend);
frontend->bind("tcp://*:" + to_string(port));
CHECK_STATE(socket);
socket->bind("tcp://*:" + to_string(port));
} catch (...) {
spdlog::error("Server task could not bind to port:{}", port);
throw SGXException(ZMQ_COULD_NOT_BIND_FRONT_END, "Server task could not bind.");
......@@ -84,68 +76,10 @@ void ZMQServer::run() {
spdlog::info("Bound port ...");
try {
CHECK_STATE(backend);
backend->bind("inproc://backend");
} catch (exception &e) {
spdlog::error("Could not bind to zmq backend: {}", e.what());
throw SGXException(ZMQ_COULD_NOT_BIND_BACK_END, "Could not bind to zmq backend.");
}
spdlog::info("Creating {} zmq server workers ...", workerThreads);
try {
for (int i = 0; i < workerThreads; ++i) {
workers.push_back(make_shared<ServerWorker>(*ctx_, ZMQ_DEALER,
this->checkSignature, this->caCert));
auto th = make_shared<std::thread>(std::bind(&ServerWorker::work, workers[i]));
worker_threads.push_back(th);
}
} catch (std::exception &e) {
spdlog::error("Could not create zmq server workers:{} ", e.what());
throw SGXException(ZMQ_COULD_NOT_CREATE_WORKERS, "Could not create zmq server workers.");
};
spdlog::info("Created {} zmq server workers ...", workerThreads);
spdlog::info("Creating zmq proxy.");
try {
zmq::proxy(static_cast<void *>(*frontend), static_cast<void *>(*backend), nullptr);
spdlog::info("Exited zmq proxy");
} catch (exception &_e) {
if (isExitRequested) {
spdlog::info("Exited ZMQServer main thread");
return;
}
spdlog::info("Error, exiting zmq server ... {}", _e.what());
return;
} catch (...) {
if (isExitRequested) {
spdlog::info("Exited ZMQServer main thread");
return;
}
spdlog::info("Error, exiting zmq server ...");
throw SGXException(ZMQ_COULD_NOT_CREATE_PROXY, "Error, exiting zmq server.");
}
}
void ZMQServer::exitAll() {
spdlog::info("Exiting zmq server workers ...");
for (auto &&worker : workers) {
worker->requestExit();
}
for (auto &&workerThread : worker_threads) {
workerThread->join();
}
spdlog::info("Exited zmq server workers ...");
}
std::atomic<bool> ZMQServer::isExitRequested(false);
......@@ -154,6 +88,7 @@ void ZMQServer::exitZMQServer() {
auto doExit = !isExitRequested.exchange(true);
if (doExit) {
zmqServer->exitAll();
spdlog::info("deleting zmq server");
......@@ -179,35 +114,148 @@ void ZMQServer::initZMQServer(bool _checkSignature) {
};
zmqServer = make_shared<ZMQServer>(_checkSignature, rootCAPath);
serverThread = make_shared<thread>(std::bind(&ZMQServer::run, ZMQServer::zmqServer));
serverThread->detach();
spdlog::info("Inited zmq server ...");
CHECK_STATE(zmqServer);
while (!isExitRequested) {
try {
zmqServer->doOneServerLoop();
} catch (...) {
spdlog::error("doOneServerLoop threw exception. This should never happen!");
}
}
}
shared_ptr <std::thread> ZMQServer::serverThread = nullptr;
ZMQServer::~ZMQServer() {
spdlog::info("Deleting worker threads");
worker_threads.clear();
spdlog::info("Deleted worker threads");
spdlog::info("Deleting workers ...");
workers.clear();
spdlog::info("Deleted workers ...");
spdlog::info("Deleting front end");
socket = nullptr;
spdlog::info("Deleted front end");
spdlog::info("Deleting front end and back end");
frontend = nullptr;
backend = nullptr;
spdlog::info("Deleted front end and back end");
spdlog::info("Deleting server thread");
ZMQServer::serverThread = nullptr;
spdlog::info("Deleted server thread");
spdlog::info("Deleting ZMQ context");
ctx_ = nullptr;
spdlog::info("Deleted ZMQ context");
}
void ZMQServer::doOneServerLoop() {
string replyStr;
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
result["errorMessage"] = "";
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
try {
zmq_pollitem_t items[1];
items[0].socket = *socket;
items[0].events = ZMQ_POLLIN;
int pollResult = 0;
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
return;
}
} while (pollResult == 0);
if (!socket->recv(&identity)) {
// something terrible happened
spdlog::error("Fatal error: socket->recv(&identity) returned false" );
exit(-11);
}
if (!identity.more()) {
// something terrible happened
spdlog::error("Fatal error: zmq_msg_more(identity) returned false" );
exit(-12);
}
copied_id.copy(&identity);
zmq::message_t reqMsg;
if (!socket->recv(&reqMsg, 0)) {
// something terrible happened
spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false" );
exit(-13);
}
string stringToParse((char*) reqMsg.data(), reqMsg.size());
CHECK_STATE(stringToParse.front() = '{')
CHECK_STATE(stringToParse.back() = '}')
auto parsedMsg = ZMQMessage::parse(
stringToParse.c_str(), stringToParse.size(), true, checkSignature);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process();
} catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.what();
spdlog::error("Exception in zmq server {}", e.what());
}
catch (
std::exception &e
) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server ";
}
try {
Json::FastWriter fastWriter;
fastWriter.omitEndingLineFeed();
replyStr = fastWriter.write(result);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
socket->send(copied_id, ZMQ_SNDMORE);
s_send(*socket, replyStr);
} catch (
std::exception &e
) {
if (isExitRequested) {
return;
}
spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Unklnown exception in zmq server worker send");
}
}
......@@ -35,9 +35,6 @@
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "ServerWorker.h"
using namespace std;
......@@ -70,14 +67,12 @@ public:
private:
shared_ptr<zmq::context_t> ctx_;
shared_ptr<zmq::socket_t> frontend;
shared_ptr<zmq::socket_t> backend;
std::vector<shared_ptr<ServerWorker> > workers;
std::vector<shared_ptr<std::thread>> worker_threads;
shared_ptr<zmq::socket_t> socket;
static std::atomic<bool> isExitRequested;
void doOneServerLoop();
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment