Unverified Commit fc264d08 authored by kladko's avatar kladko

bug/SKALE-3751-enable-zeromq

parent 6f537ff7
......@@ -23,6 +23,7 @@
#include "common.h"
#include "sgxwallet_common.h"
#include <json/writer.h>
......@@ -52,114 +53,123 @@ ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type, bool _checkSignat
zmq_setsockopt(worker_, ZMQ_LINGER, &linger, sizeof(linger));
};
void ServerWorker::work() {
worker_.connect("inproc://backend");
std::string replyStr;
void ServerWorker::doOneServerLoop() noexcept {
string replyStr;
while (!isExitRequested) {
Json::Value result;
result["status"] = ZMQ_SERVER_ERROR;
result["errorMessage"] = "";
Json::Value result;
int errStatus = -1 * (10000 + __LINE__);
result["status"] = errStatus;
result["errorMessage"] = "Server error";
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
try {
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
try {
zmq_pollitem_t items[1];
items[0].socket = worker_;
items[0].events = ZMQ_POLLIN;
zmq_pollitem_t items[1];
items[0].socket = worker_;
items[0].events = ZMQ_POLLIN;
int pollResult = 0;
int pollResult = 0;
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
return;
}
} while (pollResult == 0);
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
goto clean;
}
} while (pollResult == 0);
zmq::message_t msg;
zmq::message_t copied_msg;
worker_.recv(&identity);
copied_id.copy(&identity);
worker_.recv(&msg);
zmq::message_t msg;
zmq::message_t copied_msg;
worker_.recv(&identity);
copied_id.copy(&identity);
worker_.recv(&msg);
int64_t more;
size_t more_size = sizeof(more);
auto rc = zmq_getsockopt(worker_, ZMQ_RCVMORE, &more, &more_size);
int64_t more;
size_t more_size = sizeof(more);
auto rc = zmq_getsockopt(worker_, ZMQ_RCVMORE, &more, &more_size);
CHECK_STATE(rc == 0);
CHECK_STATE2(rc == 0, ZMQ_COULD_NOT_GET_SOCKOPT);
vector <uint8_t> msgData(msg.size() + 1, 0);
vector <uint8_t> msgData(msg.size() + 1, 0);
memcpy(msgData.data(), msg.data(), msg.size());
memcpy(msgData.data(), msg.data(), msg.size());
CHECK_STATE2(msg.size() > 5 || msgData.at(0) == '{' || msgData[msg.size()] == '}',
ZMQ_INVALID_MESSAGE);
CHECK_STATE(msg.size() > 5 || msgData.at(0) == '{' || msgData[msg.size()] == '}');
memcpy(msgData.data(), msg.data(), msg.size());
memcpy(msgData.data(), msg.data(), msg.size());
auto parsedMsg = ZMQMessage::parse(
(const char *) msgData.data(), msg.size(), true, checkSignature);
auto parsedMsg = ZMQMessage::parse(
(const char *) msgData.data(), msg.size(), true, checkSignature);
CHECK_STATE(parsedMsg);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
result = parsedMsg->process();
result = parsedMsg->process();
} catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.getMessage();
spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.getMessage();
spdlog::error("Exception in zmq server worker:{}", e.what());
}
catch (std::exception &e) {
if (isExitRequested) {
return;
}
catch (std::exception &e) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (...) {
if (isExitRequested) {
goto clean;
}
spdlog::error("Error in zmq server worker");
result["errorMessage"] = "Error in zmq server worker";
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server worker");
result["errorMessage"] = "Error in zmq server worker";
}
try {
try {
Json::FastWriter fastWriter;
Json::FastWriter fastWriter;
replyStr = fastWriter.write(result);
replyStr = replyStr.substr(0, replyStr.size() - 1);
replyStr = fastWriter.write(result);
replyStr = replyStr.substr(0, replyStr.size() - 1);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
worker_.send(copied_id, ZMQ_SNDMORE);
worker_.send(replyMsg);
worker_.send(copied_id, ZMQ_SNDMORE);
worker_.send(replyMsg);
} catch (std::exception &e) {
if (isExitRequested) {
goto clean;
}
spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (std::exception &e) {
if (isExitRequested) {
return;
}
spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Unklnown exception in zmq server worker send");
}
}
void ServerWorker::work() {
worker_.connect("inproc://backend");
while (!isExitRequested) {
try {
doOneServerLoop();
} catch (...) {
if (isExitRequested) {
goto clean;
}
spdlog::error("Unklnown exception in zmq server worker send");
spdlog::error("doOneServerLoop threw exception. This should never happen!");
}
}
clean:
spdlog::info("Exited worker thread {}", index);
}
......
......@@ -54,6 +54,8 @@ private:
std::atomic<bool> isExitRequested;
void doOneServerLoop() noexcept;
static std::atomic<uint64_t> workerCount;
uint64_t index;
};
......
......@@ -30,7 +30,7 @@
#include <streambuf>
#include <regex>
#include "sgxwallet_common.h"
#include "common.h"
#include "BLSCrypto.h"
#include "BLSSignReqMessage.h"
......@@ -137,6 +137,7 @@ string ZMQClient::readFileIntoString(const string &_fileName) {
}
void ZMQClient::verifySig(EVP_PKEY* _pubkey, const string& _str, const string& _sig) {
CHECK_STATE(_pubkey);
......@@ -145,12 +146,12 @@ void ZMQClient::verifySig(EVP_PKEY* _pubkey, const string& _str, const string& _
static std::regex r("\\s+");
auto msgToSign = std::regex_replace(_str, r, "");
vector<uint8_t> binSig(256,0);
uint64_t binLen = 0;
CHECK_STATE(hex2carray(_sig.c_str(), &binLen, binSig.data(), binSig.size()));
CHECK_STATE2(hex2carray(_sig.c_str(), &binLen, binSig.data(), binSig.size()),
ZMQ_COULD_NOT_PARSE);
CHECK_STATE(binLen > 0);
......@@ -170,7 +171,8 @@ void ZMQClient::verifySig(EVP_PKEY* _pubkey, const string& _str, const string& _
CHECK_STATE(EVP_DigestVerifyFinal(mdctx, binSig.data(), binLen) == 1);
CHECK_STATE2(EVP_DigestVerifyFinal(mdctx, binSig.data(), binLen) == 1,
ZMQ_COULD_NOT_VERIFY_SIG);
if (mdctx) EVP_MD_CTX_destroy(mdctx);
......
......@@ -22,6 +22,7 @@
*/
#include "common.h"
#include "sgxwallet_common.h"
#include <third_party/cryptlite/sha256.h>
#include <iostream>
#include <fstream>
......@@ -51,36 +52,37 @@ string ZMQMessage::getStringRapid(const char *_name) {
};
shared_ptr <ZMQMessage> ZMQMessage::parse(const char *_msg,
size_t _size, bool _isRequest,
bool _verifySig) {
CHECK_STATE(_msg);
CHECK_STATE(_size > 5);
CHECK_STATE2(_size > 5, ZMQ_INVALID_MESSAGE_SIZE);
// CHECK NULL TERMINATED
CHECK_STATE(_msg[_size] == 0);
CHECK_STATE(_msg[_size - 1] == '}');
CHECK_STATE(_msg[0] == '{');
CHECK_STATE2(_msg[_size - 1] == '}', ZMQ_INVALID_MESSAGE);
CHECK_STATE2(_msg[0] == '{', ZMQ_INVALID_MESSAGE);
auto d = make_shared<rapidjson::Document>();
cerr << _msg << endl;
d->Parse(_msg);
CHECK_STATE(!d->HasParseError());
CHECK_STATE(d->IsObject())
CHECK_STATE2(!d->HasParseError(), ZMQ_COULD_NOT_PARSE);
CHECK_STATE2(d->IsObject(), ZMQ_COULD_NOT_PARSE);
CHECK_STATE(d->HasMember("type"));
CHECK_STATE((*d)["type"].IsString());
CHECK_STATE2(d->HasMember("type"), ZMQ_NO_TYPE_IN_MESSAGE);
CHECK_STATE2((*d)["type"].IsString(), ZMQ_NO_TYPE_IN_MESSAGE);
string type = (*d)["type"].GetString();
if (_verifySig) {
CHECK_STATE(d->HasMember("cert"));
CHECK_STATE(d->HasMember("msgSig"));
CHECK_STATE((*d)["cert"].IsString());
auto cert = make_shared<string>((*d)["cert"].GetString());
CHECK_STATE2(d->HasMember("cert"),ZMQ_NO_CERT_IN_MESSAGE);
CHECK_STATE2(d->HasMember("msgSig"), ZMQ_NO_SIG_IN_MESSAGE);
CHECK_STATE2((*d)["cert"].IsString(), ZMQ_NO_CERT_IN_MESSAGE);
CHECK_STATE2((*d)["msgSig"].IsString(), ZMQ_NO_SIG_IN_MESSAGE);
auto cert = make_shared<string>((*d)["cert"].GetString());
string hash = cryptlite::sha256::hash_hex(*cert);
auto filepath = "/tmp/sgx_wallet_cert_hash_" + hash;
......@@ -103,7 +105,6 @@ shared_ptr <ZMQMessage> ZMQMessage::parse(const char *_msg,
auto handles = ZMQClient::readPublicKeyFromCertStr(*cert);
CHECK_STATE(handles.first);
CHECK_STATE(handles.second);
verifiedCerts.put(*cert, handles);
remove(cert->c_str());
}
......@@ -112,9 +113,7 @@ shared_ptr <ZMQMessage> ZMQMessage::parse(const char *_msg,
CHECK_STATE(publicKey);
CHECK_STATE((*d)["msgSig"].IsString());
auto msgSig = make_shared<string>((*d)["msgSig"].GetString());
cerr << "Got msgSig:" << msgSig << endl;
d->RemoveMember("msgSig");
......
......@@ -159,6 +159,16 @@ extern bool autoconfirm;
#define ECDSA_SIGN_INVALID_KEY_HEX -86
#define SET_SEK_INVALID_SEK_HEX -87
#define TEST_INVALID_HEX -88
#define ZMQ_SERVER_ERROR -89
#define ZMQ_COULD_NOT_PARSE -90
#define ZMQ_INVALID_MESSAGE -91
#define ZMQ_COULD_NOT_GET_SOCKOPT -92
#define ZMQ_INVALID_MESSAGE_SIZE -93
#define ZMQ_NO_TYPE_IN_MESSAGE -94
#define ZMQ_NO_SIG_IN_MESSAGE -95
#define ZMQ_NO_CERT_IN_MESSAGE -96
#define ZMQ_COULD_NOT_VERIFY_SIG -97
#define SGX_ENCLAVE_ERROR -666
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment