Unverified Commit 5fd62090 authored by Stan Kladko's avatar Stan Kladko Committed by GitHub

Merge pull request #327 from skalenetwork/bug/SKALE-4284-exception-in-server-worker

Bug/skale 4284 exception in server worker
parents 350c35ce 299f7382
......@@ -70,7 +70,7 @@ bin_PROGRAMS = sgxwallet testw sgx_util
## have to be explicitly listed
COMMON_SRC = SGXException.cpp ExitHandler.cpp ZMQClient.cpp BLSSignRspMessage.cpp ECDSASignRspMessage.cpp ECDSASignReqMessage.cpp BLSSignReqMessage.cpp ZMQMessage.cpp ZMQServer.cpp ServerWorker.cpp InvalidStateException.cpp Exception.cpp InvalidArgumentException.cpp Log.cpp \
COMMON_SRC = SGXException.cpp ExitHandler.cpp ZMQClient.cpp BLSSignRspMessage.cpp ECDSASignRspMessage.cpp ECDSASignReqMessage.cpp BLSSignReqMessage.cpp ZMQMessage.cpp ZMQServer.cpp InvalidStateException.cpp Exception.cpp InvalidArgumentException.cpp Log.cpp \
SGXWalletServer.cpp SGXRegistrationServer.cpp CSRManagerServer.cpp BLSCrypto.cpp \
DKGCrypto.cpp ServerInit.cpp BLSPrivateKeyShareSGX.cpp LevelDB.cpp ServerDataChecker.cpp SEKManager.cpp \
third_party/intel/sgx_stub.c third_party/intel/sgx_detect_linux.c third_party/intel/create_enclave.c third_party/intel/oc_alloc.c \
......
/*
Copyright (C) 2019-Present SKALE Labs
This file is part of sgxwallet.
sgxwallet is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
sgxwallet is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with sgxwallet. If not, see <https://www.gnu.org/licenses/>.
@file ServerWorker.cpp
@author Stan Kladko
@date 2021
*/
#include "common.h"
#include "sgxwallet_common.h"
#include <json/writer.h>
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "Log.h"
#include "ZMQMessage.h"
#include "ServerWorker.h"
std::atomic <uint64_t> ServerWorker::workerCount(1);
ServerWorker::ServerWorker(zmq::context_t &_ctx, int sock_type, bool _checkSignature,
const string& _caCert ) : checkSignature(_checkSignature),
caCert(_caCert),
isExitRequested(false) {
worker = make_shared<zmq::socket_t>(_ctx, sock_type);
if (checkSignature) {
CHECK_STATE(!caCert.empty())
}
index = workerCount.fetch_add(1);
int linger = 0;
zmq_setsockopt(*worker, ZMQ_LINGER, &linger, sizeof(linger));
};
void ServerWorker::doOneServerLoop() noexcept {
string replyStr;
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
result["errorMessage"] = "";
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
try {
zmq_pollitem_t items[1];
items[0].socket = *worker;
items[0].events = ZMQ_POLLIN;
int pollResult = 0;
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
return;
}
} while (pollResult == 0);
worker->recv(&identity);
copied_id.copy(&identity);
string stringToParse = s_recv(*worker);
auto parsedMsg = ZMQMessage::parse(
stringToParse.c_str(), stringToParse.size(), true, checkSignature);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process();
} catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.what();
spdlog::error("Exception in zmq server worker:{}", e.what());
}
catch (std::exception &e) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server worker");
result["errorMessage"] = "Error in zmq server worker";
}
try {
Json::FastWriter fastWriter;
fastWriter.omitEndingLineFeed();
replyStr = fastWriter.write(result);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
worker->send(copied_id, ZMQ_SNDMORE);
s_send(*worker, replyStr);
} catch (std::exception &e) {
if (isExitRequested) {
return;
}
spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Unklnown exception in zmq server worker send");
}
}
void ServerWorker::work() {
worker->connect("inproc://backend");
while (!isExitRequested) {
try {
doOneServerLoop();
} catch (...) {
spdlog::error("doOneServerLoop threw exception. This should never happen!");
}
}
spdlog::info("Exited worker thread {}", index);
}
void ServerWorker::requestExit() {
isExitRequested.exchange(true);
spdlog::info("Closed worker socket {}", index);
}
/*
Copyright (C) 2019-Present SKALE Labs
This file is part of sgxwallet.
sgxwallet is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
sgxwallet is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with sgxwallet. If not, see <https://www.gnu.org/licenses/>.
@file ServerWorker.h
@author Stan Kladko
@date 2021
*/
#ifndef SGXWALLET_SERVERWORKER_H
#define SGXWALLET_SERVERWORKER_H
#include <vector>
#include <thread>
#include <memory>
#include <functional>
#include "abstractstubserver.h"
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "third_party/spdlog/spdlog.h"
#include "document.h"
class ServerWorker {
bool checkSignature = true;
string caCert = "";
public:
ServerWorker(zmq::context_t &ctx, int sock_type, bool _checkSignature, const string& _caCert );
void work();
void requestExit();
private:
shared_ptr<zmq::socket_t> worker;
std::atomic<bool> isExitRequested;
void doOneServerLoop() noexcept;
static std::atomic<uint64_t> workerCount;
uint64_t index;
};
#endif //SGXWALLET_SERVERWORKER_H
......@@ -30,6 +30,7 @@
#include "common.h"
#include "SGXException.h"
#include "ZMQMessage.h"
#include "ZMQServer.h"
#include "sgxwallet_common.h"
......@@ -39,17 +40,10 @@ shared_ptr <ZMQServer> ZMQServer::zmqServer = nullptr;
ZMQServer::ZMQServer(bool _checkSignature, const string &_caCertFile)
: checkSignature(_checkSignature),
caCertFile(_caCertFile), ctx_(make_shared<zmq::context_t>(1)) {
caCertFile(_caCertFile), ctx(make_shared<zmq::context_t>(1)) {
frontend = make_shared<zmq::socket_t>(*ctx_, ZMQ_ROUTER);
backend = make_shared<zmq::socket_t>(*ctx_, ZMQ_DEALER);
//workerThreads = 2 * thread::hardware_concurrency();
workerThreads = 1; // do one thread for now
socket = make_shared<zmq::socket_t>(*ctx, ZMQ_ROUTER);
if (_checkSignature) {
CHECK_STATE(!_caCertFile.empty());
......@@ -61,9 +55,7 @@ ZMQServer::ZMQServer(bool _checkSignature, const string &_caCertFile)
int linger = 0;
zmq_setsockopt(*frontend, ZMQ_LINGER, &linger, sizeof(linger));
zmq_setsockopt(*backend, ZMQ_LINGER, &linger, sizeof(linger));
zmq_setsockopt(*socket, ZMQ_LINGER, &linger, sizeof(linger));
}
......@@ -75,8 +67,8 @@ void ZMQServer::run() {
spdlog::info("Starting zmq server on port {} ...", port);
try {
CHECK_STATE(frontend);
frontend->bind("tcp://*:" + to_string(port));
CHECK_STATE(socket);
socket->bind("tcp://*:" + to_string(port));
} catch (...) {
spdlog::error("Server task could not bind to port:{}", port);
throw SGXException(ZMQ_COULD_NOT_BIND_FRONT_END, "Server task could not bind.");
......@@ -84,84 +76,30 @@ void ZMQServer::run() {
spdlog::info("Bound port ...");
try {
CHECK_STATE(backend);
backend->bind("inproc://backend");
} catch (exception &e) {
spdlog::error("Could not bind to zmq backend: {}", e.what());
throw SGXException(ZMQ_COULD_NOT_BIND_BACK_END, "Could not bind to zmq backend.");
}
spdlog::info("Creating {} zmq server workers ...", workerThreads);
try {
for (int i = 0; i < workerThreads; ++i) {
workers.push_back(make_shared<ServerWorker>(*ctx_, ZMQ_DEALER,
this->checkSignature, this->caCert));
auto th = make_shared<std::thread>(std::bind(&ServerWorker::work, workers[i]));
worker_threads.push_back(th);
}
} catch (std::exception &e) {
spdlog::error("Could not create zmq server workers:{} ", e.what());
throw SGXException(ZMQ_COULD_NOT_CREATE_WORKERS, "Could not create zmq server workers.");
};
spdlog::info("Created {} zmq server workers ...", workerThreads);
spdlog::info("Creating zmq proxy.");
try {
zmq::proxy(static_cast<void *>(*frontend), static_cast<void *>(*backend), nullptr);
spdlog::info("Exited zmq proxy");
} catch (exception &_e) {
if (isExitRequested) {
spdlog::info("Exited ZMQServer main thread");
return;
}
spdlog::info("Error, exiting zmq server ... {}", _e.what());
return;
} catch (...) {
if (isExitRequested) {
spdlog::info("Exited ZMQServer main thread");
return;
while (!isExitRequested) {
try {
zmqServer->doOneServerLoop();
} catch (...) {
spdlog::error("doOneServerLoop threw exception. This should never happen!");
}
spdlog::info("Error, exiting zmq server ...");
throw SGXException(ZMQ_COULD_NOT_CREATE_PROXY, "Error, exiting zmq server.");
}
}
void ZMQServer::exitAll() {
spdlog::info("Exiting zmq server workers ...");
for (auto &&worker : workers) {
worker->requestExit();
}
for (auto &&workerThread : worker_threads) {
workerThread->join();
}
spdlog::info("Exited zmq server workers ...");
spdlog::info("Exited zmq server loop");
}
std::atomic<bool> ZMQServer::isExitRequested(false);
void ZMQServer::exitZMQServer() {
auto doExit = !isExitRequested.exchange(true);
if (doExit) {
zmqServer->exitAll();
spdlog::info("deleting zmq server");
zmqServer = nullptr;
spdlog::info("deleted zmq server ");
}
isExitRequested.exchange(true);
zmqServer->ctx->shutdown();
zmqServer->socket->close();
zmqServer->ctx->close();
spdlog::info("Exited zmq server.");
}
void ZMQServer::initZMQServer(bool _checkSignature) {
static bool initedServer = false;
CHECK_STATE(!initedServer)
......@@ -172,42 +110,152 @@ void ZMQServer::initZMQServer(bool _checkSignature) {
string rootCAPath = "";
if (_checkSignature) {
rootCAPath = string(SGXDATA_FOLDER) + "cert_data/rootCA.pem";
spdlog::info("Reading root CA from {}", rootCAPath);
CHECK_STATE(access(rootCAPath.c_str(), F_OK) == 0);
};
zmqServer = make_shared<ZMQServer>(_checkSignature, rootCAPath);
CHECK_STATE(zmqServer)
serverThread = make_shared<thread>(std::bind(&ZMQServer::run, ZMQServer::zmqServer));
serverThread->detach();
spdlog::info("Inited zmq server ...");
}
shared_ptr <std::thread> ZMQServer::serverThread = nullptr;
ZMQServer::~ZMQServer() {
}
spdlog::info("Deleting worker threads");
worker_threads.clear();
spdlog::info("Deleted worker threads");
spdlog::info("Deleting workers ...");
workers.clear();
spdlog::info("Deleted workers ...");
void ZMQServer::doOneServerLoop() {
spdlog::info("Deleting front end and back end");
frontend = nullptr;
backend = nullptr;
spdlog::info("Deleted front end and back end");
string replyStr;
spdlog::info("Deleting server thread");
ZMQServer::serverThread = nullptr;
spdlog::info("Deleted server thread");
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
result["errorMessage"] = "";
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
string stringToParse = "";
try {
zmq_pollitem_t items[1];
items[0].socket = *socket;
items[0].events = ZMQ_POLLIN;
spdlog::info("Deleting ZMQ context");
ctx_ = nullptr;
spdlog::info("Deleted ZMQ context");
int pollResult = 0;
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
return;
}
} while (pollResult == 0);
if (!socket->recv(&identity)) {
// something terrible happened
spdlog::error("Fatal error: socket->recv(&identity) returned false");
exit(-11);
}
if (!identity.more()) {
// something terrible happened
spdlog::error("Fatal error: zmq_msg_more(identity) returned false");
exit(-12);
}
copied_id.copy(&identity);
zmq::message_t reqMsg;
if (!socket->recv(&reqMsg, 0)) {
// something terrible happened
spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false");
exit(-13);
}
stringToParse = string((char *) reqMsg.data(), reqMsg.size());
CHECK_STATE(stringToParse.front() == '{')
CHECK_STATE(stringToParse.back() == '}')
auto parsedMsg = ZMQMessage::parse(
stringToParse.c_str(), stringToParse.size(), true, checkSignature);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process();
}
catch (std::exception &e) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("Client request :" + stringToParse);
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server ";
spdlog::error("Client request :" + stringToParse);
}
try {
Json::FastWriter fastWriter;
fastWriter.omitEndingLineFeed();
replyStr = fastWriter.write(result);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
if (!socket->send(copied_id, ZMQ_SNDMORE)) {
if (isExitRequested) {
return;
}
exit(-15);
}
if (!s_send(*socket, replyStr)) {
if (isExitRequested) {
return;
}
exit(-16);
}
} catch (
std::exception &e
) {
if (isExitRequested) {
return;
}
spdlog::error("Exception in zmq server worker send :{}", e.what());
exit(-17);
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Unklnown exception in zmq server worker send");
exit(-18);
}
}
......@@ -35,9 +35,6 @@
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "ServerWorker.h"
using namespace std;
......@@ -61,23 +58,19 @@ public:
void run();
void exitAll();
static void initZMQServer(bool _checkSignature);
static void exitZMQServer();
private:
shared_ptr<zmq::context_t> ctx_;
shared_ptr<zmq::socket_t> frontend;
shared_ptr<zmq::socket_t> backend;
std::vector<shared_ptr<ServerWorker> > workers;
std::vector<shared_ptr<std::thread>> worker_threads;
shared_ptr<zmq::context_t> ctx;
shared_ptr<zmq::socket_t> socket;
static std::atomic<bool> isExitRequested;
void doOneServerLoop();
};
......
#!/bin/bash
sudo apt install -y build-essential make gcc g++ yasm python libprotobuf10 flex bison automake
sudo apt install -y ccache cmake ccache autoconf texinfo libgcrypt20-dev libgnutls28-dev libtool pkg-config
sudo apt install -y ocaml ocaml-build
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment