Unverified Commit 299f7382 authored by kladko's avatar kladko

SKALE-4284

parent 200ade32
/*
Copyright (C) 2019-Present SKALE Labs
This file is part of sgxwallet.
sgxwallet is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
sgxwallet is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with sgxwallet. If not, see <https://www.gnu.org/licenses/>.
@file ServerWorker.cpp
@author Stan Kladko
@date 2021
*/
#include "common.h"
#include "sgxwallet_common.h"
#include <json/writer.h>
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "Log.h"
#include "ZMQMessage.h"
#include "ServerWorker.h"
std::atomic <uint64_t> ServerWorker::workerCount(1);
ServerWorker::ServerWorker(zmq::context_t &_ctx, int sock_type, bool _checkSignature,
const string& _caCert ) : checkSignature(_checkSignature),
caCert(_caCert),
isExitRequested(false) {
worker = make_shared<zmq::socket_t>(_ctx, sock_type);
if (checkSignature) {
CHECK_STATE(!caCert.empty())
}
index = workerCount.fetch_add(1);
int linger = 0;
zmq_setsockopt(*worker, ZMQ_LINGER, &linger, sizeof(linger));
};
void ServerWorker::doOneServerLoop() noexcept {
string replyStr;
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
result["errorMessage"] = "";
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
try {
zmq_pollitem_t items[1];
items[0].socket = *worker;
items[0].events = ZMQ_POLLIN;
int pollResult = 0;
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
return;
}
} while (pollResult == 0);
worker->recv(&identity);
copied_id.copy(&identity);
string stringToParse = s_recv(*worker);
auto parsedMsg = ZMQMessage::parse(
stringToParse.c_str(), stringToParse.size(), true, checkSignature);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process();
} catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.what();
spdlog::error("Exception in zmq server worker:{}", e.what());
}
catch (std::exception &e) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server worker");
result["errorMessage"] = "Error in zmq server worker";
}
try {
Json::FastWriter fastWriter;
fastWriter.omitEndingLineFeed();
replyStr = fastWriter.write(result);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
worker->send(copied_id, ZMQ_SNDMORE);
s_send(*worker, replyStr);
} catch (std::exception &e) {
if (isExitRequested) {
return;
}
spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Unklnown exception in zmq server worker send");
}
}
void ServerWorker::work() {
worker->connect("inproc://backend");
while (!isExitRequested) {
try {
doOneServerLoop();
} catch (...) {
spdlog::error("doOneServerLoop threw exception. This should never happen!");
}
}
spdlog::info("Exited worker thread {}", index);
}
void ServerWorker::requestExit() {
isExitRequested.exchange(true);
spdlog::info("Closed worker socket {}", index);
}
/*
Copyright (C) 2019-Present SKALE Labs
This file is part of sgxwallet.
sgxwallet is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
sgxwallet is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with sgxwallet. If not, see <https://www.gnu.org/licenses/>.
@file ServerWorker.h
@author Stan Kladko
@date 2021
*/
#ifndef SGXWALLET_SERVERWORKER_H
#define SGXWALLET_SERVERWORKER_H
#include <vector>
#include <thread>
#include <memory>
#include <functional>
#include "abstractstubserver.h"
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "third_party/spdlog/spdlog.h"
#include "document.h"
class ServerWorker {
bool checkSignature = true;
string caCert = "";
public:
ServerWorker(zmq::context_t &ctx, int sock_type, bool _checkSignature, const string& _caCert );
void work();
void requestExit();
private:
shared_ptr<zmq::socket_t> worker;
std::atomic<bool> isExitRequested;
void doOneServerLoop() noexcept;
static std::atomic<uint64_t> workerCount;
uint64_t index;
};
#endif //SGXWALLET_SERVERWORKER_H
...@@ -145,6 +145,9 @@ void ZMQServer::doOneServerLoop() { ...@@ -145,6 +145,9 @@ void ZMQServer::doOneServerLoop() {
zmq::message_t identit2; zmq::message_t identit2;
zmq::message_t copied_id; zmq::message_t copied_id;
string stringToParse = "";
try { try {
zmq_pollitem_t items[1]; zmq_pollitem_t items[1];
...@@ -153,6 +156,7 @@ void ZMQServer::doOneServerLoop() { ...@@ -153,6 +156,7 @@ void ZMQServer::doOneServerLoop() {
int pollResult = 0; int pollResult = 0;
do { do {
pollResult = zmq_poll(items, 1, 1000); pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) { if (isExitRequested) {
...@@ -163,14 +167,14 @@ void ZMQServer::doOneServerLoop() { ...@@ -163,14 +167,14 @@ void ZMQServer::doOneServerLoop() {
if (!socket->recv(&identity)) { if (!socket->recv(&identity)) {
// something terrible happened // something terrible happened
spdlog::error("Fatal error: socket->recv(&identity) returned false" ); spdlog::error("Fatal error: socket->recv(&identity) returned false");
exit(-11); exit(-11);
} }
if (!identity.more()) { if (!identity.more()) {
// something terrible happened // something terrible happened
spdlog::error("Fatal error: zmq_msg_more(identity) returned false" ); spdlog::error("Fatal error: zmq_msg_more(identity) returned false");
exit(-12); exit(-12);
} }
...@@ -181,15 +185,15 @@ void ZMQServer::doOneServerLoop() { ...@@ -181,15 +185,15 @@ void ZMQServer::doOneServerLoop() {
if (!socket->recv(&reqMsg, 0)) { if (!socket->recv(&reqMsg, 0)) {
// something terrible happened // something terrible happened
spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false" ); spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false");
exit(-13); exit(-13);
} }
string stringToParse((char*) reqMsg.data(), reqMsg.size()); stringToParse = string((char *) reqMsg.data(), reqMsg.size());
CHECK_STATE(stringToParse.front() = '{') CHECK_STATE(stringToParse.front() == '{')
CHECK_STATE(stringToParse.back() = '}') CHECK_STATE(stringToParse.back() == '}')
auto parsedMsg = ZMQMessage::parse( auto parsedMsg = ZMQMessage::parse(
stringToParse.c_str(), stringToParse.size(), true, checkSignature); stringToParse.c_str(), stringToParse.size(), true, checkSignature);
...@@ -197,11 +201,6 @@ void ZMQServer::doOneServerLoop() { ...@@ -197,11 +201,6 @@ void ZMQServer::doOneServerLoop() {
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE); CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process(); result = parsedMsg->process();
} catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.what();
spdlog::error("Exception in zmq server {}", e.what());
} }
catch (std::exception &e) { catch (std::exception &e) {
if (isExitRequested) { if (isExitRequested) {
...@@ -209,12 +208,15 @@ void ZMQServer::doOneServerLoop() { ...@@ -209,12 +208,15 @@ void ZMQServer::doOneServerLoop() {
} }
result["errorMessage"] = string(e.what()); result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what()); spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("Client request :" + stringToParse);
} catch (...) { } catch (...) {
if (isExitRequested) { if (isExitRequested) {
return; return;
} }
spdlog::error("Error in zmq server "); spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server "; result["errorMessage"] = "Error in zmq server ";
spdlog::error("Client request :" + stringToParse);
} }
try { try {
...@@ -228,8 +230,18 @@ void ZMQServer::doOneServerLoop() { ...@@ -228,8 +230,18 @@ void ZMQServer::doOneServerLoop() {
CHECK_STATE(replyStr.front() == '{'); CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}'); CHECK_STATE(replyStr.back() == '}');
socket->send(copied_id, ZMQ_SNDMORE); if (!socket->send(copied_id, ZMQ_SNDMORE)) {
s_send(*socket, replyStr); if (isExitRequested) {
return;
}
exit(-15);
}
if (!s_send(*socket, replyStr)) {
if (isExitRequested) {
return;
}
exit(-16);
}
} catch ( } catch (
std::exception &e std::exception &e
...@@ -238,10 +250,12 @@ void ZMQServer::doOneServerLoop() { ...@@ -238,10 +250,12 @@ void ZMQServer::doOneServerLoop() {
return; return;
} }
spdlog::error("Exception in zmq server worker send :{}", e.what()); spdlog::error("Exception in zmq server worker send :{}", e.what());
exit(-17);
} catch (...) { } catch (...) {
if (isExitRequested) { if (isExitRequested) {
return; return;
} }
spdlog::error("Unklnown exception in zmq server worker send"); spdlog::error("Unklnown exception in zmq server worker send");
exit(-18);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment