/*
Copyright (C) 2019-Present SKALE Labs
This file is part of sgxwallet.
sgxwallet is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
sgxwallet is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with sgxwallet. If not, see .
@file ServerWorker.cpp
@author Stan Kladko
@date 2021
*/
#include "common.h"
#include "sgxwallet_common.h"
#include
#include
#include "zhelpers.hpp"
#include "Log.h"
#include "ZMQMessage.h"
#include "ServerWorker.h"
std::atomic ServerWorker::workerCount(1);
ServerWorker::ServerWorker(zmq::context_t &_ctx, int sock_type, bool _checkSignature,
const string& _caCert ) : checkSignature(_checkSignature),
caCert(_caCert),
isExitRequested(false) {
worker = make_shared(_ctx, sock_type);
if (checkSignature) {
CHECK_STATE(!caCert.empty())
}
index = workerCount.fetch_add(1);
int linger = 0;
zmq_setsockopt(*worker, ZMQ_LINGER, &linger, sizeof(linger));
};
void ServerWorker::doOneServerLoop() noexcept {
string replyStr;
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
result["errorMessage"] = "";
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
try {
zmq_pollitem_t items[1];
items[0].socket = *worker;
items[0].events = ZMQ_POLLIN;
int pollResult = 0;
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
return;
}
} while (pollResult == 0);
zmq::message_t msg;
zmq::message_t copied_msg;
worker->recv(&identity);
copied_id.copy(&identity);
worker->recv(&msg);
int64_t more;
size_t more_size = sizeof(more);
auto rc = zmq_getsockopt(*worker, ZMQ_RCVMORE, &more, &more_size);
CHECK_STATE2(rc == 0, ZMQ_COULD_NOT_GET_SOCKOPT);
vector msgData(msg.size() + 1, 0);
memcpy(msgData.data(), msg.data(), msg.size());
CHECK_STATE2(msg.size() > 5 || msgData.at(0) == '{' || msgData[msg.size()] == '}',
ZMQ_INVALID_MESSAGE);
memcpy(msgData.data(), msg.data(), msg.size());
auto parsedMsg = ZMQMessage::parse(
(const char *) msgData.data(), msg.size(), true, checkSignature);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process();
} catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.getMessage();
spdlog::error("Exception in zmq server worker:{}", e.what());
}
catch (std::exception &e) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server worker");
result["errorMessage"] = "Error in zmq server worker";
}
try {
Json::FastWriter fastWriter;
replyStr = fastWriter.write(result);
replyStr = replyStr.substr(0, replyStr.size() - 1);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
worker->send(copied_id, ZMQ_SNDMORE);
worker->send(replyMsg);
} catch (std::exception &e) {
if (isExitRequested) {
return;
}
spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Unklnown exception in zmq server worker send");
}
}
void ServerWorker::work() {
worker->connect("inproc://backend");
while (!isExitRequested) {
try {
doOneServerLoop();
} catch (...) {
spdlog::error("doOneServerLoop threw exception. This should never happen!");
}
}
spdlog::info("Exited worker thread {}", index);
}
void ServerWorker::requestExit() {
isExitRequested.exchange(true);
spdlog::info("Closed worker socket {}", index);
}