Unverified Commit 4bcbcf5f authored by Stan Kladko's avatar Stan Kladko Committed by GitHub

Merge pull request #312 from skalenetwork/bug/SKALE-4131-ZMQ-hangs

SKALE-4131
parents 80a373cd 3b28d440
...@@ -23,10 +23,6 @@ jobs: ...@@ -23,10 +23,6 @@ jobs:
- name: test - name: test
run: python3 scripts/docker_test.py DockerfileSimulation sgxwallet_sim run: python3 scripts/docker_test.py DockerfileSimulation sgxwallet_sim
- name: build and deploy docker image - name: build and deploy docker image
if: |
contains(github.ref, 'develop') || contains(github.ref, 'beta') ||
contains(github.ref, 'master') || contains(github.ref, 'stable') ||
contains(github.ref, 'SECURE_ENCLAVE_CHANGES')
run : | run : |
sudo rm -rf /home/runner/work/sgxwallet/sgxwallet/sgx_data sudo rm -rf /home/runner/work/sgxwallet/sgxwallet/sgx_data
export BRANCH=${GITHUB_REF##*/} export BRANCH=${GITHUB_REF##*/}
......
...@@ -82,30 +82,12 @@ void ServerWorker::doOneServerLoop() noexcept { ...@@ -82,30 +82,12 @@ void ServerWorker::doOneServerLoop() noexcept {
} }
} while (pollResult == 0); } while (pollResult == 0);
zmq::message_t msg;
zmq::message_t copied_msg;
worker->recv(&identity); worker->recv(&identity);
copied_id.copy(&identity); copied_id.copy(&identity);
worker->recv(&msg); string stringToParse = s_recv(*worker);
int64_t more;
size_t more_size = sizeof(more);
auto rc = zmq_getsockopt(*worker, ZMQ_RCVMORE, &more, &more_size);
CHECK_STATE2(rc == 0, ZMQ_COULD_NOT_GET_SOCKOPT);
vector <uint8_t> msgData(msg.size() + 1, 0);
memcpy(msgData.data(), msg.data(), msg.size());
CHECK_STATE2(msg.size() > 5 || msgData.at(0) == '{' || msgData[msg.size()] == '}',
ZMQ_INVALID_MESSAGE);
memcpy(msgData.data(), msg.data(), msg.size());
auto parsedMsg = ZMQMessage::parse( auto parsedMsg = ZMQMessage::parse(
(const char *) msgData.data(), msg.size(), true, checkSignature); stringToParse.c_str(), stringToParse.size(), true, checkSignature);
CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE); CHECK_STATE2(parsedMsg, ZMQ_COULD_NOT_PARSE);
...@@ -133,17 +115,16 @@ void ServerWorker::doOneServerLoop() noexcept { ...@@ -133,17 +115,16 @@ void ServerWorker::doOneServerLoop() noexcept {
try { try {
Json::FastWriter fastWriter; Json::FastWriter fastWriter;
fastWriter.omitEndingLineFeed();
replyStr = fastWriter.write(result); replyStr = fastWriter.write(result);
replyStr = replyStr.substr(0, replyStr.size() - 1);
CHECK_STATE(replyStr.size() > 2); CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{'); CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}'); CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
worker->send(copied_id, ZMQ_SNDMORE); worker->send(copied_id, ZMQ_SNDMORE);
worker->send(replyMsg); s_send(*worker, replyStr);
} catch (std::exception &e) { } catch (std::exception &e) {
if (isExitRequested) { if (isExitRequested) {
......
...@@ -112,9 +112,7 @@ string ZMQClient::doZmqRequestReply(string &_req) { ...@@ -112,9 +112,7 @@ string ZMQClient::doZmqRequestReply(string &_req) {
// If we got a reply, process it // If we got a reply, process it
if (items[0].revents & ZMQ_POLLIN) { if (items[0].revents & ZMQ_POLLIN) {
string reply = s_recv(*clientSocket); string reply = s_recv(*clientSocket);
CHECK_STATE(reply.size() > 5); CHECK_STATE(reply.size() > 5);
reply = reply.substr(0, reply.size() - 1);
spdlog::debug("ZMQ client received reply:{}", reply); spdlog::debug("ZMQ client received reply:{}", reply);
CHECK_STATE(reply.front() == '{'); CHECK_STATE(reply.front() == '{');
CHECK_STATE(reply.back() == '}'); CHECK_STATE(reply.back() == '}');
...@@ -285,28 +283,28 @@ ZMQClient::ZMQClient(const string &ip, uint16_t port, bool _sign, const string & ...@@ -285,28 +283,28 @@ ZMQClient::ZMQClient(const string &ip, uint16_t port, bool _sign, const string &
} }
void ZMQClient::reconnect() { void ZMQClient::reconnect() {
lock_guard< recursive_mutex > lock( mutex );
lock_guard <recursive_mutex> lock(mutex);
auto pid = getProcessID(); auto pid = getProcessID();
if (clientSockets.count(pid) > 0) { if ( clientSockets.count( pid ) > 0 ) {
clientSockets.erase(pid); clientSockets.erase( pid );
} }
uint64_t randNumber;
CHECK_STATE(getrandom( &randNumber, sizeof(uint64_t), 0 ) == sizeof(uint64_t));
char identity[10]; string identity = to_string(135) + ":" + to_string(randNumber);
getrandom(identity, 10, 0);
auto clientSocket = make_shared<zmq::socket_t>(ctx, ZMQ_DEALER); auto clientSocket = make_shared< zmq::socket_t >( ctx, ZMQ_DEALER );
clientSocket->setsockopt(ZMQ_IDENTITY, identity, 10); clientSocket->setsockopt( ZMQ_IDENTITY, identity.c_str(), identity.size() + 1);
// Configure socket to not wait at close time // Configure socket to not wait at close time
int linger = 0; int linger = 0;
clientSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); clientSocket->setsockopt( ZMQ_LINGER, &linger, sizeof( linger ) );
clientSocket->connect(url); clientSocket->connect( url );
clientSockets.insert({pid, clientSocket}); clientSockets.insert( { pid, clientSocket } );
} }
string ZMQClient::blsSignMessageHash(const std::string &keyShareName, const std::string &messageHash, int t, int n) { string ZMQClient::blsSignMessageHash(const std::string &keyShareName, const std::string &messageHash, int t, int n) {
Json::Value p; Json::Value p;
p["type"] = ZMQMessage::BLS_SIGN_REQ; p["type"] = ZMQMessage::BLS_SIGN_REQ;
......
...@@ -49,7 +49,7 @@ ZMQServer::ZMQServer(bool _checkSignature, const string &_caCertFile) ...@@ -49,7 +49,7 @@ ZMQServer::ZMQServer(bool _checkSignature, const string &_caCertFile)
workerThreads = 4; // do four threads for now workerThreads = 1; // do one thread for now
if (_checkSignature) { if (_checkSignature) {
CHECK_STATE(!_caCertFile.empty()); CHECK_STATE(!_caCertFile.empty());
...@@ -107,6 +107,9 @@ void ZMQServer::run() { ...@@ -107,6 +107,9 @@ void ZMQServer::run() {
throw SGXException(ZMQ_COULD_NOT_CREATE_WORKERS, "Could not create zmq server workers."); throw SGXException(ZMQ_COULD_NOT_CREATE_WORKERS, "Could not create zmq server workers.");
}; };
spdlog::info("Created {} zmq server workers ...", workerThreads);
spdlog::info("Creating zmq proxy.");
try { try {
zmq::proxy(static_cast<void *>(*frontend), static_cast<void *>(*backend), nullptr); zmq::proxy(static_cast<void *>(*frontend), static_cast<void *>(*backend), nullptr);
......
...@@ -103,14 +103,12 @@ inline int getValue() { //Note: this value is in KB! ...@@ -103,14 +103,12 @@ inline int getValue() { //Note: this value is in KB!
#define CHECK_STATE(_EXPRESSION_) \ #define CHECK_STATE(_EXPRESSION_) \
if (!(_EXPRESSION_)) { \ if (!(_EXPRESSION_)) { \
auto __msg__ = std::string("State check failed::") + #_EXPRESSION_ + " " + std::string(__FILE__) + ":" + std::to_string(__LINE__); \ auto __msg__ = std::string("State check failed::") + #_EXPRESSION_ + " " + std::string(__FILE__) + ":" + std::to_string(__LINE__); \
print_stack(__LINE__); \
\ \
BOOST_THROW_EXCEPTION(SGXException(-100, string(__CLASS_NAME__) + ":" + __msg__));} BOOST_THROW_EXCEPTION(SGXException(-100, string(__CLASS_NAME__) + ":" + __msg__));}
#define CHECK_STATE2(_EXPRESSION_, __STATUS__) \ #define CHECK_STATE2(_EXPRESSION_, __STATUS__) \
if (!(_EXPRESSION_)) { \ if (!(_EXPRESSION_)) { \
auto __msg__ = std::string("State check failed::") + #_EXPRESSION_ + " " + std::string(__FILE__) + ":" + std::to_string(__LINE__); \ auto __msg__ = std::string("State check failed::") + #_EXPRESSION_ + " " + std::string(__FILE__) + ":" + std::to_string(__LINE__); \
print_stack(__LINE__); \
\ \
BOOST_THROW_EXCEPTION(SGXException(__STATUS__, string(__CLASS_NAME__) + ":" + __msg__));} BOOST_THROW_EXCEPTION(SGXException(__STATUS__, string(__CLASS_NAME__) + ":" + __msg__));}
......
...@@ -9,6 +9,7 @@ services: ...@@ -9,6 +9,7 @@ services:
- "1028:1028" - "1028:1028"
- "1029:1029" - "1029:1029"
- "1030:1030" - "1030:1030"
- "1031:1031"
devices: devices:
- "/dev/isgx" - "/dev/isgx"
- "/dev/mei0" - "/dev/mei0"
......
...@@ -9,6 +9,7 @@ services: ...@@ -9,6 +9,7 @@ services:
- "1028:1028" - "1028:1028"
- "1029:1029" - "1029:1029"
- "1030:1030" - "1030:1030"
- "1031:1031"
volumes: volumes:
- ./sgx_data:/usr/src/sdk/sgx_data - ./sgx_data:/usr/src/sdk/sgx_data
- /dev/urandom:/dev/random - /dev/urandom:/dev/random
......
...@@ -178,11 +178,24 @@ int main(int argc, char *argv[]) { ...@@ -178,11 +178,24 @@ int main(int argc, char *argv[]) {
enclaveLogLevel = L_TRACE; enclaveLogLevel = L_TRACE;
} }
cerr << "Calling initAll ..." << endl;
initAll(enclaveLogLevel, checkClientCertOption, checkClientCertOption, autoSignClientCertOption, generateTestKeys); initAll(enclaveLogLevel, checkClientCertOption, checkClientCertOption, autoSignClientCertOption, generateTestKeys);
cerr << "Completed initAll." << endl;
ifstream is("sgx_data/4node.json");
if (generateTestKeys && !is.good() && !!ExitHandler::shouldExit()) { //check if test keys already exist
string TEST_KEYS_4_NODE = "sgx_data/4node.json";
ifstream is(TEST_KEYS_4_NODE);
auto keysExist = is.good();
if (keysExist) {
cerr << "Found test keys." << endl;
}
if (generateTestKeys && !keysExist && !ExitHandler::shouldExit()) {
cerr << "Generating test keys ..." << endl; cerr << "Generating test keys ..." << endl;
HttpClient client(RPC_ENDPOINT); HttpClient client(RPC_ENDPOINT);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment