Unverified Commit 94fd8174 authored by kladko's avatar kladko

bug/SKALE-3662 Adding libzmq

parent 475e9a3c
...@@ -31,9 +31,10 @@ ...@@ -31,9 +31,10 @@
using namespace std; using namespace std;
ZMQServer::ZMQServer() ZMQServer::ZMQServer()
: isExitRequested(false), ctx_(1), : isExitRequested(false), ctx_(make_shared<zmq::context_t>(1)),
frontend_(ctx_, ZMQ_ROUTER), frontend_(*ctx_, ZMQ_ROUTER),
backend_(ctx_, ZMQ_DEALER) {} backend_(*ctx_, ZMQ_DEALER) {
}
void ZMQServer::run() { void ZMQServer::run() {
...@@ -63,7 +64,7 @@ void ZMQServer::run() { ...@@ -63,7 +64,7 @@ void ZMQServer::run() {
try { try {
for (int i = 0; i < kMaxThread; ++i) { for (int i = 0; i < kMaxThread; ++i) {
workers.push_back(make_shared<ServerWorker>(ctx_, ZMQ_DEALER)); workers.push_back(make_shared<ServerWorker>(*ctx_, ZMQ_DEALER));
worker_threads.push_back(make_shared<std::thread>(std::bind(&ServerWorker::work, workers[i]))); worker_threads.push_back(make_shared<std::thread>(std::bind(&ServerWorker::work, workers[i])));
} }
} catch (std::exception &e) { } catch (std::exception &e) {
...@@ -75,9 +76,17 @@ void ZMQServer::run() { ...@@ -75,9 +76,17 @@ void ZMQServer::run() {
try { try {
zmq::proxy(static_cast<void *>(frontend_), static_cast<void *>(backend_), nullptr); zmq::proxy(static_cast<void *>(frontend_), static_cast<void *>(backend_), nullptr);
} catch (exception &_e) { } catch (exception &_e) {
if (isExitRequested) {
spdlog::info("Exited ZMQServer main thread");
return;
}
spdlog::info("Error, exiting zmq server ... {}", _e.what()); spdlog::info("Error, exiting zmq server ... {}", _e.what());
return; return;
} catch (...) { } catch (...) {
if (isExitRequested) {
spdlog::info("Exited ZMQServer main thread");
return;
}
spdlog::info("Error, exiting zmq server ..."); spdlog::info("Error, exiting zmq server ...");
return; return;
} }
...@@ -85,15 +94,23 @@ void ZMQServer::run() { ...@@ -85,15 +94,23 @@ void ZMQServer::run() {
void ZMQServer::exitWorkers() { void ZMQServer::exitWorkers() {
auto doExit = !exiting.exchange(true); auto doExit = !isExitRequested.exchange(true);
if (doExit) { if (doExit) {
spdlog::info("Telling workers to exit"); spdlog::info("Tell workers to exit");
for (auto &&worker : workers) { for (auto &&worker : workers) {
worker->requestExit(); worker->requestExit();
} }
spdlog::info("Terminating context ...");
// terminate context (destructor will be called)
ctx_ = nullptr;
spdlog::info("Terminated context ...");
spdlog::info("joining threads ..."); spdlog::info("joining threads ...");
for (auto &&thread : worker_threads) for (auto &&thread : worker_threads)
thread->join(); thread->join();
......
...@@ -44,12 +44,8 @@ using namespace std; ...@@ -44,12 +44,8 @@ using namespace std;
class ZMQServer { class ZMQServer {
public: public:
atomic<bool> exiting;
ZMQServer(); ZMQServer();
enum { enum {
kMaxThread = 1 kMaxThread = 1
}; };
...@@ -59,7 +55,7 @@ public: ...@@ -59,7 +55,7 @@ public:
void exitWorkers(); void exitWorkers();
private: private:
zmq::context_t ctx_; shared_ptr<zmq::context_t> ctx_;
zmq::socket_t frontend_; zmq::socket_t frontend_;
zmq::socket_t backend_; zmq::socket_t backend_;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment