Unverified Commit 475e9a3c authored by kladko's avatar kladko

bug/SKALE-3662 Adding libzmq

parent d1162820
...@@ -42,7 +42,6 @@ ...@@ -42,7 +42,6 @@
#include <unistd.h> #include <unistd.h>
#include "BLSPrivateKeyShareSGX.h" #include "BLSPrivateKeyShareSGX.h"
#include "sgxwallet_common.h" #include "sgxwallet_common.h"
#include "third_party/intel/create_enclave.h" #include "third_party/intel/create_enclave.h"
...@@ -68,16 +67,16 @@ using namespace std; ...@@ -68,16 +67,16 @@ using namespace std;
void systemHealthCheck() { void systemHealthCheck() {
string ulimit; string ulimit;
try { try {
ulimit = exec( "/bin/bash -c \"ulimit -n\"" ); ulimit = exec("/bin/bash -c \"ulimit -n\"");
} catch ( ... ) { } catch (...) {
spdlog::error("Execution of '/bin/bash -c ulimit -n' failed"); spdlog::error("Execution of '/bin/bash -c ulimit -n' failed");
exit(-15); exit(-15);
} }
int noFiles = strtol( ulimit.c_str(), NULL, 10 ); int noFiles = strtol(ulimit.c_str(), NULL, 10);
auto noUlimitCheck = getenv( "NO_ULIMIT_CHECK" ) != nullptr; auto noUlimitCheck = getenv("NO_ULIMIT_CHECK") != nullptr;
if ( noFiles < 65535 && !noUlimitCheck) { if (noFiles < 65535 && !noUlimitCheck) {
string errStr = string errStr =
"sgxwallet requires setting Linux file descriptor limit to at least 65535 " "sgxwallet requires setting Linux file descriptor limit to at least 65535 "
"You current limit (ulimit -n) is less than 65535. \n Please set it to 65535:" "You current limit (ulimit -n) is less than 65535. \n Please set it to 65535:"
...@@ -89,8 +88,8 @@ void systemHealthCheck() { ...@@ -89,8 +88,8 @@ void systemHealthCheck() {
} }
} }
static ZMQServer* zmqServer = nullptr; static ZMQServer *zmqServer = nullptr;
atomic<bool> exiting(false);
void initUserSpace() { void initUserSpace() {
...@@ -109,15 +108,10 @@ void initUserSpace() { ...@@ -109,15 +108,10 @@ void initUserSpace() {
} }
void exitZMQServer() { void exitZMQServer() {
auto doExit = !exiting.exchange(true); spdlog::info("Exiting zmq server ...");
zmqServer->exitWorkers();
if (doExit) { spdlog::info("Exited zmq server ...");
spdlog::info("Exiting zmq server ..."); zmqServer = nullptr;
zmqServer->exitWorkers();
spdlog::info("Exited zmq server ...");
zmqServer = nullptr;
}
} }
uint64_t initEnclave() { uint64_t initEnclave() {
...@@ -163,7 +157,7 @@ uint64_t initEnclave() { ...@@ -163,7 +157,7 @@ uint64_t initEnclave() {
} }
spdlog::info("Enclave created and started successfully"); spdlog::info("Enclave created and started successfully");
status = trustedEnclaveInit(eid, enclaveLogLevel); status = trustedEnclaveInit(eid, enclaveLogLevel);
} }
...@@ -178,7 +172,6 @@ uint64_t initEnclave() { ...@@ -178,7 +172,6 @@ uint64_t initEnclave() {
} }
void initAll(uint32_t _logLevel, bool _checkCert, bool _autoSign, bool _generateTestKeys) { void initAll(uint32_t _logLevel, bool _checkCert, bool _autoSign, bool _generateTestKeys) {
...@@ -201,9 +194,9 @@ void initAll(uint32_t _logLevel, bool _checkCert, bool _autoSign, bool _generate ...@@ -201,9 +194,9 @@ void initAll(uint32_t _logLevel, bool _checkCert, bool _autoSign, bool _generate
uint64_t counter = 0; uint64_t counter = 0;
uint64_t initResult = 0; uint64_t initResult = 0;
while ((initResult = initEnclave()) != 0 && counter < 10){ while ((initResult = initEnclave()) != 0 && counter < 10) {
sleep(1); sleep(1);
counter ++; counter++;
} }
if (initResult != 0) { if (initResult != 0) {
......
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx), ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx),
worker_(ctx_, sock_type) {}; worker_(ctx_, sock_type),
isExitRequested(false) {};
void ServerWorker::work() { void ServerWorker::work() {
worker_.connect("inproc://backend"); worker_.connect("inproc://backend");
...@@ -23,7 +24,7 @@ void ServerWorker::work() { ...@@ -23,7 +24,7 @@ void ServerWorker::work() {
std::string replyStr; std::string replyStr;
while (true) { while (!isExitRequested) {
Json::Value result; Json::Value result;
int errStatus = -1 * (10000 + __LINE__); int errStatus = -1 * (10000 + __LINE__);
...@@ -76,9 +77,15 @@ void ServerWorker::work() { ...@@ -76,9 +77,15 @@ void ServerWorker::work() {
spdlog::error("Exception in zmq server worker:{}", e.what()); spdlog::error("Exception in zmq server worker:{}", e.what());
} }
catch (std::exception &e) { catch (std::exception &e) {
if (isExitRequested) {
return;
}
result["errorMessage"] = string(e.what()); result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server worker:{}", e.what()); spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (...) { } catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Error in zmq server worker"); spdlog::error("Error in zmq server worker");
result["errorMessage"] = "Error in zmq server worker"; result["errorMessage"] = "Error in zmq server worker";
} }
...@@ -99,14 +106,22 @@ void ServerWorker::work() { ...@@ -99,14 +106,22 @@ void ServerWorker::work() {
worker_.send(replyMsg); worker_.send(replyMsg);
} catch (std::exception &e) { } catch (std::exception &e) {
if (isExitRequested) {
return;
}
spdlog::error("Exception in zmq server worker send :{}", e.what()); spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) { } catch (...) {
if (isExitRequested) {
return;
}
spdlog::error("Unklnown exception in zmq server worker send"); spdlog::error("Unklnown exception in zmq server worker send");
} }
}
}
}
void ServerWorker::requestExit() {
isExitRequested.exchange(true);
} }
...@@ -25,10 +25,13 @@ public: ...@@ -25,10 +25,13 @@ public:
void work(); void work();
void requestExit();
private: private:
zmq::context_t &ctx_; zmq::context_t &ctx_;
zmq::socket_t worker_; zmq::socket_t worker_;
std::atomic<bool> isExitRequested;
}; };
......
...@@ -63,18 +63,18 @@ void ZMQServer::run() { ...@@ -63,18 +63,18 @@ void ZMQServer::run() {
try { try {
for (int i = 0; i < kMaxThread; ++i) { for (int i = 0; i < kMaxThread; ++i) {
worker.push_back(make_shared<ServerWorker>(ctx_, ZMQ_DEALER)); workers.push_back(make_shared<ServerWorker>(ctx_, ZMQ_DEALER));
worker_thread.push_back(make_shared<std::thread>(std::bind(&ServerWorker::work, worker[i]))); worker_threads.push_back(make_shared<std::thread>(std::bind(&ServerWorker::work, workers[i])));
} }
} catch (std::exception &e) { } catch (std::exception &e) {
spdlog::error("Could not create zmq server workers:{} ", e.what()); spdlog::error("Could not create zmq server workers:{} ", e.what());
exit(-102); exit(-102);
} };
try { try {
zmq::proxy(static_cast<void *>(frontend_), static_cast<void *>(backend_), nullptr); zmq::proxy(static_cast<void *>(frontend_), static_cast<void *>(backend_), nullptr);
} catch (exception& _e) { } catch (exception &_e) {
spdlog::info("Error, exiting zmq server ... {}", _e.what()); spdlog::info("Error, exiting zmq server ... {}", _e.what());
return; return;
} catch (...) { } catch (...) {
...@@ -84,12 +84,25 @@ void ZMQServer::run() { ...@@ -84,12 +84,25 @@ void ZMQServer::run() {
} }
void ZMQServer::exitWorkers() { void ZMQServer::exitWorkers() {
spdlog::info("Emptying threads ..."); auto doExit = !exiting.exchange(true);
worker_thread.empty(); if (doExit) {
spdlog::info("Emptying workers ...");
worker.empty(); spdlog::info("Telling workers to exit");
spdlog::info("Emptied workers ...");
for (auto &&worker : workers) {
worker->requestExit();
}
spdlog::info("joining threads ...");
for (auto &&thread : worker_threads)
thread->join();
}
spdlog::info("Deleting threads ...");
worker_threads.empty();
spdlog::info("Deleting workers ...");
workers.empty();
spdlog::info("Deleted workers ...");
} }
...@@ -43,9 +43,12 @@ using namespace std; ...@@ -43,9 +43,12 @@ using namespace std;
class ZMQServer { class ZMQServer {
public: public:
atomic<bool> exiting;
ZMQServer(); ZMQServer();
atomic<bool> isExitRequested;
enum { enum {
kMaxThread = 1 kMaxThread = 1
...@@ -60,8 +63,11 @@ private: ...@@ -60,8 +63,11 @@ private:
zmq::socket_t frontend_; zmq::socket_t frontend_;
zmq::socket_t backend_; zmq::socket_t backend_;
std::vector<shared_ptr<ServerWorker> > worker; std::vector<shared_ptr<ServerWorker> > workers;
std::vector<shared_ptr<std::thread>> worker_thread; std::vector<shared_ptr<std::thread>> worker_threads;
std::atomic<bool> isExitRequested;
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment