Unverified Commit cb7c1a63 authored by kladko's avatar kladko

bug/SKALE-3662 Adding libzmq

parent 94fd8174
...@@ -105,6 +105,7 @@ void initUserSpace() { ...@@ -105,6 +105,7 @@ void initUserSpace() {
zmqServer = new ZMQServer(); zmqServer = new ZMQServer();
static std::thread serverThread(std::bind(&ZMQServer::run, zmqServer)); static std::thread serverThread(std::bind(&ZMQServer::run, zmqServer));
serverThread.detach();
} }
void exitZMQServer() { void exitZMQServer() {
......
...@@ -13,10 +13,15 @@ ...@@ -13,10 +13,15 @@
#include "ServerWorker.h" #include "ServerWorker.h"
std::atomic<uint64_t> ServerWorker::workerCount(1);
ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx), ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx),
worker_(ctx_, sock_type), worker_(ctx_, sock_type),
isExitRequested(false) {}; isExitRequested(false) {
index = workerCount.fetch_add(1);
int linger = 0;
zmq_setsockopt (worker_, ZMQ_LINGER, &linger, sizeof (linger));
};
void ServerWorker::work() { void ServerWorker::work() {
worker_.connect("inproc://backend"); worker_.connect("inproc://backend");
...@@ -84,7 +89,7 @@ void ServerWorker::work() { ...@@ -84,7 +89,7 @@ void ServerWorker::work() {
spdlog::error("Exception in zmq server worker:{}", e.what()); spdlog::error("Exception in zmq server worker:{}", e.what());
} catch (...) { } catch (...) {
if (isExitRequested) { if (isExitRequested) {
return; goto clean;
} }
spdlog::error("Error in zmq server worker"); spdlog::error("Error in zmq server worker");
result["errorMessage"] = "Error in zmq server worker"; result["errorMessage"] = "Error in zmq server worker";
...@@ -107,21 +112,25 @@ void ServerWorker::work() { ...@@ -107,21 +112,25 @@ void ServerWorker::work() {
} catch (std::exception &e) { } catch (std::exception &e) {
if (isExitRequested) { if (isExitRequested) {
return; goto clean;
} }
spdlog::error("Exception in zmq server worker send :{}", e.what()); spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) { } catch (...) {
if (isExitRequested) { if (isExitRequested) {
return; goto clean;
} }
spdlog::error("Unklnown exception in zmq server worker send"); spdlog::error("Unklnown exception in zmq server worker send");
} }
} }
clean:
spdlog::info("Exited worker thread {}", index);
} }
void ServerWorker::requestExit() { void ServerWorker::requestExit() {
isExitRequested.exchange(true); isExitRequested.exchange(true);
zmq_close(worker_);
spdlog::info("Closed worker socket {}", index);
} }
...@@ -32,6 +32,9 @@ private: ...@@ -32,6 +32,9 @@ private:
zmq::socket_t worker_; zmq::socket_t worker_;
std::atomic<bool> isExitRequested; std::atomic<bool> isExitRequested;
static std::atomic<uint64_t> workerCount;
uint64_t index;
}; };
......
...@@ -146,6 +146,5 @@ string ZMQClient::ecdsaSignMessageHash(int base, const std::string &keyName, con ...@@ -146,6 +146,5 @@ string ZMQClient::ecdsaSignMessageHash(int base, const std::string &keyName, con
auto result = dynamic_pointer_cast<ECDSASignRspMessage>(doRequestReply(p)); auto result = dynamic_pointer_cast<ECDSASignRspMessage>(doRequestReply(p));
CHECK_STATE(result); CHECK_STATE(result);
CHECK_STATE(result->getStatus() == 0); CHECK_STATE(result->getStatus() == 0);
sleep(5);
return result->getSignature(); return result->getSignature();
} }
\ No newline at end of file
...@@ -57,7 +57,7 @@ public: ...@@ -57,7 +57,7 @@ public:
uint64_t getUint64Rapid(const char *_name); uint64_t getUint64Rapid(const char *_name);
uint64_t getStatus() { uint64_t getStatus() {
getUint64Rapid("status"); return getUint64Rapid("status");
} }
static shared_ptr<ZMQMessage> parse(vector<uint8_t> &_msg, bool _isRequest); static shared_ptr<ZMQMessage> parse(vector<uint8_t> &_msg, bool _isRequest);
......
...@@ -34,6 +34,12 @@ ZMQServer::ZMQServer() ...@@ -34,6 +34,12 @@ ZMQServer::ZMQServer()
: isExitRequested(false), ctx_(make_shared<zmq::context_t>(1)), : isExitRequested(false), ctx_(make_shared<zmq::context_t>(1)),
frontend_(*ctx_, ZMQ_ROUTER), frontend_(*ctx_, ZMQ_ROUTER),
backend_(*ctx_, ZMQ_DEALER) { backend_(*ctx_, ZMQ_DEALER) {
int linger = 0;
zmq_setsockopt (frontend_, ZMQ_LINGER, &linger, sizeof (linger));
zmq_setsockopt (backend_, ZMQ_LINGER, &linger, sizeof (linger));
} }
...@@ -65,7 +71,9 @@ void ZMQServer::run() { ...@@ -65,7 +71,9 @@ void ZMQServer::run() {
try { try {
for (int i = 0; i < kMaxThread; ++i) { for (int i = 0; i < kMaxThread; ++i) {
workers.push_back(make_shared<ServerWorker>(*ctx_, ZMQ_DEALER)); workers.push_back(make_shared<ServerWorker>(*ctx_, ZMQ_DEALER));
worker_threads.push_back(make_shared<std::thread>(std::bind(&ServerWorker::work, workers[i]))); auto th = make_shared<std::thread>(std::bind(&ServerWorker::work, workers[i]));
th->detach();
worker_threads.push_back(th);
} }
} catch (std::exception &e) { } catch (std::exception &e) {
spdlog::error("Could not create zmq server workers:{} ", e.what()); spdlog::error("Could not create zmq server workers:{} ", e.what());
...@@ -97,28 +105,36 @@ void ZMQServer::exitWorkers() { ...@@ -97,28 +105,36 @@ void ZMQServer::exitWorkers() {
auto doExit = !isExitRequested.exchange(true); auto doExit = !isExitRequested.exchange(true);
if (doExit) { if (doExit) {
spdlog::info("Tell workers to exit"); spdlog::info("Tell workers to exit");
for (auto &&worker : workers) { for (auto &&worker : workers) {
worker->requestExit(); worker->requestExit();
} }
// close server sockets
spdlog::info("Closing server sockets ...");
zmq_close(frontend_);
zmq_close(backend_);
spdlog::info("Closed server sockets");
spdlog::info("Terminating context ..."); spdlog::info("Terminating context ...");
// terminate context (destructor will be called) // terminate context (destructor will be called)
ctx_ = nullptr; ctx_ = nullptr;
spdlog::info("Terminated context ..."); spdlog::info("Terminated context ...");
spdlog::info("joining threads ...");
for (auto &&thread : worker_threads)
thread->join();
} }
spdlog::info("Deleting threads ..."); spdlog::info("Deleting threads ...");
worker_threads.empty(); worker_threads.empty();
spdlog::info("Deleting workers ..."); spdlog::info("Deleting workers ...");
workers.empty();
spdlog::info("Deleted workers ..."); spdlog::info("Deleted workers ...");
} }
......
...@@ -1136,30 +1136,16 @@ TEST_CASE_METHOD(TestFixtureNoReset, "ZMQ-ecdsa", "[zmq-ecdsa-run]") { ...@@ -1136,30 +1136,16 @@ TEST_CASE_METHOD(TestFixtureNoReset, "ZMQ-ecdsa", "[zmq-ecdsa-run]") {
string keyName = ""; string keyName = "";
try { PRINT_SRC_LINE
PRINT_SRC_LINE keyName = genECDSAKeyAPI(c);
keyName = genECDSAKeyAPI(c); PRINT_SRC_LINE
PRINT_SRC_LINE for (int i = 1; i < 1000; i++) {
} catch (Exception & e)
{
cerr << e.what() << endl;
throw;
}
try {
PRINT_SRC_LINE
auto sig = client->ecdsaSignMessageHash(16, keyName, SAMPLE_HASH); auto sig = client->ecdsaSignMessageHash(16, keyName, SAMPLE_HASH);
REQUIRE(sig.size() > 10); REQUIRE(sig.size() > 10);
}
cerr << sig << endl; PRINT_SRC_LINE
} catch (...) {
client = nullptr;
sleep(10000);
}
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment