Unverified Commit fbdd91ac authored by kladko's avatar kladko

SKALE-4586 Added Thread Pool

parent f575911f
......@@ -31,16 +31,20 @@ using namespace std;
#include <iostream>
#include <map>
#include <memory>
#include <sys/types.h>
#include <sys/sysinfo.h>
#include <string.h>
#include <vector>
#include <json/value.h>
#include <boost/throw_exception.hpp>
#include <gmp.h>
#include <thread>
#include <functional>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include "secure_enclave/Verify.h"
#include "InvalidStateException.h"
#include "SGXException.h"
......
......@@ -21,10 +21,9 @@
@date 2021
*/
#include "common.h"
#include "Agent.h"
Agent::Agent() : startedWorkers(false) {};
void Agent::waitOnGlobalStartBarrier() {
......
......@@ -23,12 +23,6 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <atomic>
using namespace std;
class Agent {
protected:
......
......@@ -21,7 +21,14 @@
@date 2021
*/
#include "document.h"
#include "stringbuffer.h"
#include "writer.h"
#include "common.h"
#include "sgxwallet_common.h"
#include "third_party/spdlog/spdlog.h"
#include "ZMQServer.h"
#include "WorkerThreadPool.h"
......
......@@ -23,11 +23,6 @@
#pragma once
#include <vector>
#include <atomic>
#include <thread>
class Agent;
class ZMQServer;
......
......@@ -24,9 +24,6 @@
#pragma once
#include <memory>
#include <vector>
#include <openssl/pem.h>
#include <openssl/evp.h>
#include <openssl/err.h>
......
......@@ -28,13 +28,14 @@
#include "third_party/spdlog/spdlog.h"
#include "common.h"
#include "sgxwallet_common.h"
#include "SGXException.h"
#include "ExitRequestedException.h"
#include "ReqMessage.h"
#include "ZMQMessage.h"
#include "ZMQServer.h"
#include "sgxwallet_common.h"
using namespace std;
......@@ -174,75 +175,109 @@ void ZMQServer::checkForExit() {
}
}
void ZMQServer::doOneServerLoop() {
void ZMQServer::poll() {
zmq_pollitem_t items[1];
items[0].socket = *socket;
items[0].events = ZMQ_POLLIN;
string replyStr;
int pollResult = 0;
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
result["errorMessage"] = "";
do {
checkForExit();
pollResult = zmq_poll(items, 1, 1000);
} while (pollResult == 0);
}
zmq::message_t identity;
string ZMQServer::receiveMessage(zmq::message_t& _identity) {
if (!socket->recv(&_identity)) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: socket->recv(&identity) returned false. Exiting.");
exit(-11);
}
string stringToParse = "";
if (!_identity.more()) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: zmq_msg_more(identity) returned false. Existing.");
exit(-12);
}
try {
zmq::message_t reqMsg;
zmq_pollitem_t items[1];
items[0].socket = *socket;
items[0].events = ZMQ_POLLIN;
if (!socket->recv(&reqMsg, 0)) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false. Exiting");
exit(-13);
}
int pollResult = 0;
auto result = string((char *) reqMsg.data(), reqMsg.size());
do {
checkForExit();
pollResult = zmq_poll(items, 1, 1000);
} while (pollResult == 0);
CHECK_STATE(result.front() == '{')
CHECK_STATE(result.back() == '}')
return result;
}
if (!socket->recv(&identity)) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: socket->recv(&identity) returned false. Exiting.");
exit(-11);
}
void ZMQServer::sendToClient(Json::Value& _result, zmq::message_t& _identity ) {
string replyStr;
try {
Json::FastWriter fastWriter;
fastWriter.omitEndingLineFeed();
if (!identity.more()) {
// something terrible happened
spdlog::error("Fatal error: zmq_msg_more(identity) returned false. Existing.");
exit(-12);
}
replyStr = fastWriter.write(_result);
zmq::message_t reqMsg;
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
if (!socket->recv(&reqMsg, 0)) {
checkForExit();
// something terrible happened
spdlog::error("Fatal error: socket.recv(&reqMsg, 0) returned false. Exiting");
exit(-13);
if (!socket->send(_identity, ZMQ_SNDMORE)) {
exit(-15);
}
if (!s_send(*socket, replyStr)) {
exit(-16);
}
} catch (ExitRequestedException) {
throw;
} catch (std::exception &e) {
checkForExit();
spdlog::error("Exception in zmq server worker send :{}", e.what());
exit(-17);
} catch (...) {
checkForExit();
spdlog::error("Unklnown exception in zmq server worker send");
exit(-18);
}
stringToParse = string((char *) reqMsg.data(), reqMsg.size());
}
CHECK_STATE(stringToParse.front() == '{')
CHECK_STATE(stringToParse.back() == '}')
void ZMQServer::doOneServerLoop() {
auto parsedMsg = ZMQMessage::parse(
stringToParse.c_str(), stringToParse.size(), true, checkSignature, checkKeyOwnership);
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
zmq::message_t identity;
string msgStr;
if ((dynamic_pointer_cast<BLSSignReqMessage>(parsedMsg)!= nullptr) ||
dynamic_pointer_cast<ECDSASignReqMessage>(parsedMsg)) {
spdlog::info("FUFUFUFUF");
} else {
spdlog::info("HAHAHA");
}
try {
poll();
msgStr = receiveMessage(identity);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
auto msg = ZMQMessage::parse(
msgStr.c_str(), msgStr.size(), true, checkSignature, checkKeyOwnership);
CHECK_STATE2(msg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process();
if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)!= nullptr) ||
dynamic_pointer_cast<ECDSASignReqMessage>(msg)) {
spdlog::info("FUFUFUFUF");
} else {
spdlog::info("HAHAHA");
}
result = msg->process();
} catch (ExitRequestedException) {
throw;
} catch (std::exception &e) {
......@@ -250,43 +285,17 @@ void ZMQServer::doOneServerLoop() {
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server :{}", e.what());
spdlog::error("ID:" + string((char *) identity.data(), identity.size()));
spdlog::error("Client request :" + stringToParse);
spdlog::error("Client request :" + msgStr);
} catch (...) {
checkForExit();
spdlog::error("Error in zmq server ");
result["errorMessage"] = "Error in zmq server ";
spdlog::error("ID:" + string((char *) identity.data(), identity.size()));
spdlog::error("Client request :" + stringToParse);
spdlog::error("Client request :" + msgStr);
}
try {
Json::FastWriter fastWriter;
fastWriter.omitEndingLineFeed();
sendToClient(result, identity);
replyStr = fastWriter.write(result);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
if (!socket->send(identity, ZMQ_SNDMORE)) {
exit(-15);
}
if (!s_send(*socket, replyStr)) {
exit(-16);
}
} catch (ExitRequestedException) {
throw;
} catch (std::exception &e) {
checkForExit();
spdlog::error("Exception in zmq server worker send :{}", e.what());
exit(-17);
} catch (...) {
checkForExit();
spdlog::error("Unklnown exception in zmq server worker send");
exit(-18);
}
}
void ZMQServer::workerThreadProcessNextMessage() {
......
......@@ -25,22 +25,12 @@
#ifndef SGXWALLET_ZMQServer_H
#define SGXWALLET_ZMQServer_H
#include <vector>
#include <thread>
#include <memory>
#include <functional>
#include <atomic>
#include <zmq.hpp>
#include "zhelpers.hpp"
#include "Agent.h"
#include "WorkerThreadPool.h"
using namespace std;
class ZMQServer : public Agent{
uint64_t workerThreads;
......@@ -85,6 +75,12 @@ public:
void checkForExit();
void poll();
string receiveMessage(zmq::message_t& _identity);
void sendToClient(Json::Value& _result, zmq::message_t& _identity);
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment