Unverified Commit eb1b2fa8 authored by kladko's avatar kladko

bug/SKALE-3751-enable-zeromq

parent 24d26a09
...@@ -37,20 +37,20 @@ ...@@ -37,20 +37,20 @@
std::atomic <uint64_t> ServerWorker::workerCount(1); std::atomic <uint64_t> ServerWorker::workerCount(1);
ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type, bool _checkSignature, ServerWorker::ServerWorker(zmq::context_t &_ctx, int sock_type, bool _checkSignature,
const string& _caCert ) : checkSignature(_checkSignature), const string& _caCert ) : checkSignature(_checkSignature),
caCert(_caCert), caCert(_caCert),
ctx_(ctx),
worker_(ctx_, sock_type),
isExitRequested(false) { isExitRequested(false) {
worker = make_shared<zmq::socket_t>(_ctx, sock_type);
if (checkSignature) { if (checkSignature) {
CHECK_STATE(!caCert.empty()) CHECK_STATE(!caCert.empty())
} }
index = workerCount.fetch_add(1); index = workerCount.fetch_add(1);
int linger = 0; int linger = 0;
zmq_setsockopt(worker_, ZMQ_LINGER, &linger, sizeof(linger)); zmq_setsockopt(*worker, ZMQ_LINGER, &linger, sizeof(linger));
}; };
...@@ -70,7 +70,7 @@ void ServerWorker::doOneServerLoop() noexcept { ...@@ -70,7 +70,7 @@ void ServerWorker::doOneServerLoop() noexcept {
zmq_pollitem_t items[1]; zmq_pollitem_t items[1];
items[0].socket = worker_; items[0].socket = *worker;
items[0].events = ZMQ_POLLIN; items[0].events = ZMQ_POLLIN;
int pollResult = 0; int pollResult = 0;
...@@ -85,13 +85,13 @@ void ServerWorker::doOneServerLoop() noexcept { ...@@ -85,13 +85,13 @@ void ServerWorker::doOneServerLoop() noexcept {
zmq::message_t msg; zmq::message_t msg;
zmq::message_t copied_msg; zmq::message_t copied_msg;
worker_.recv(&identity); worker->recv(&identity);
copied_id.copy(&identity); copied_id.copy(&identity);
worker_.recv(&msg); worker->recv(&msg);
int64_t more; int64_t more;
size_t more_size = sizeof(more); size_t more_size = sizeof(more);
auto rc = zmq_getsockopt(worker_, ZMQ_RCVMORE, &more, &more_size); auto rc = zmq_getsockopt(*worker, ZMQ_RCVMORE, &more, &more_size);
CHECK_STATE2(rc == 0, ZMQ_COULD_NOT_GET_SOCKOPT); CHECK_STATE2(rc == 0, ZMQ_COULD_NOT_GET_SOCKOPT);
...@@ -142,8 +142,8 @@ void ServerWorker::doOneServerLoop() noexcept { ...@@ -142,8 +142,8 @@ void ServerWorker::doOneServerLoop() noexcept {
CHECK_STATE(replyStr.back() == '}'); CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1); zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
worker_.send(copied_id, ZMQ_SNDMORE); worker->send(copied_id, ZMQ_SNDMORE);
worker_.send(replyMsg); worker->send(replyMsg);
} catch (std::exception &e) { } catch (std::exception &e) {
if (isExitRequested) { if (isExitRequested) {
...@@ -159,7 +159,7 @@ void ServerWorker::doOneServerLoop() noexcept { ...@@ -159,7 +159,7 @@ void ServerWorker::doOneServerLoop() noexcept {
} }
void ServerWorker::work() { void ServerWorker::work() {
worker_.connect("inproc://backend"); worker->connect("inproc://backend");
while (!isExitRequested) { while (!isExitRequested) {
...@@ -176,7 +176,6 @@ void ServerWorker::work() { ...@@ -176,7 +176,6 @@ void ServerWorker::work() {
void ServerWorker::requestExit() { void ServerWorker::requestExit() {
isExitRequested.exchange(true); isExitRequested.exchange(true);
zmq_close(worker_);
spdlog::info("Closed worker socket {}", index); spdlog::info("Closed worker socket {}", index);
} }
...@@ -49,8 +49,7 @@ public: ...@@ -49,8 +49,7 @@ public:
void requestExit(); void requestExit();
private: private:
zmq::context_t &ctx_; shared_ptr<zmq::socket_t> worker;
zmq::socket_t worker_;
std::atomic<bool> isExitRequested; std::atomic<bool> isExitRequested;
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#include <streambuf> #include <streambuf>
#include "third_party/spdlog/spdlog.h" #include "third_party/spdlog/spdlog.h"
#include "common.h" #include "common.h"
...@@ -35,14 +34,15 @@ ...@@ -35,14 +34,15 @@
using namespace std; using namespace std;
ZMQServer *ZMQServer::zmqServer = nullptr; shared_ptr <ZMQServer> ZMQServer::zmqServer = nullptr;
ZMQServer::ZMQServer(bool _checkSignature, const string& _caCertFile) ZMQServer::ZMQServer(bool _checkSignature, const string &_caCertFile)
: checkSignature(_checkSignature), : checkSignature(_checkSignature),
caCertFile(_caCertFile), isExitRequested(false), ctx_(make_shared<zmq::context_t>(1)), caCertFile(_caCertFile), isExitRequested(false), ctx_(make_shared<zmq::context_t>(1)) {
frontend_(*ctx_, ZMQ_ROUTER),
backend_(*ctx_, ZMQ_DEALER) {
frontend = make_shared<zmq::socket_t>(*ctx_, ZMQ_ROUTER);
backend = make_shared<zmq::socket_t>(*ctx_, ZMQ_DEALER);
workerThreads = 2 * thread::hardware_concurrency(); workerThreads = 2 * thread::hardware_concurrency();
...@@ -59,8 +59,8 @@ ZMQServer::ZMQServer(bool _checkSignature, const string& _caCertFile) ...@@ -59,8 +59,8 @@ ZMQServer::ZMQServer(bool _checkSignature, const string& _caCertFile)
} }
int linger = 0; int linger = 0;
zmq_setsockopt (frontend_, ZMQ_LINGER, &linger, sizeof (linger)); zmq_setsockopt(*frontend, ZMQ_LINGER, &linger, sizeof(linger));
zmq_setsockopt (backend_, ZMQ_LINGER, &linger, sizeof (linger)); zmq_setsockopt(*backend, ZMQ_LINGER, &linger, sizeof(linger));
} }
...@@ -72,7 +72,7 @@ void ZMQServer::run() { ...@@ -72,7 +72,7 @@ void ZMQServer::run() {
spdlog::info("Starting zmq server on port {} ...", port); spdlog::info("Starting zmq server on port {} ...", port);
try { try {
frontend_.bind("tcp://*:" + to_string(port)); frontend->bind("tcp://*:" + to_string(port));
} catch (...) { } catch (...) {
spdlog::error("Server task could not bind to port:{}", port); spdlog::error("Server task could not bind to port:{}", port);
exit(-100); exit(-100);
...@@ -81,7 +81,7 @@ void ZMQServer::run() { ...@@ -81,7 +81,7 @@ void ZMQServer::run() {
spdlog::info("Bound port ..."); spdlog::info("Bound port ...");
try { try {
backend_.bind("inproc://backend"); backend->bind("inproc://backend");
} catch (exception &e) { } catch (exception &e) {
spdlog::error("Could not bind to zmq backend: {}", e.what()); spdlog::error("Could not bind to zmq backend: {}", e.what());
exit(-101); exit(-101);
...@@ -105,7 +105,7 @@ void ZMQServer::run() { ...@@ -105,7 +105,7 @@ void ZMQServer::run() {
try { try {
zmq::proxy(static_cast<void *>(frontend_), static_cast<void *>(backend_), nullptr); zmq::proxy(static_cast<void *>(*frontend), static_cast<void *>(*backend), nullptr);
} catch (exception &_e) { } catch (exception &_e) {
if (isExitRequested) { if (isExitRequested) {
spdlog::info("Exited ZMQServer main thread"); spdlog::info("Exited ZMQServer main thread");
...@@ -129,7 +129,6 @@ void ZMQServer::exitWorkers() { ...@@ -129,7 +129,6 @@ void ZMQServer::exitWorkers() {
if (doExit) { if (doExit) {
spdlog::info("Tell workers to exit"); spdlog::info("Tell workers to exit");
for (auto &&worker : workers) { for (auto &&worker : workers) {
...@@ -138,36 +137,35 @@ void ZMQServer::exitWorkers() { ...@@ -138,36 +137,35 @@ void ZMQServer::exitWorkers() {
// close server sockets // close server sockets
spdlog::info("Closing server sockets ..."); spdlog::info("Deleting threads ...");
worker_threads.empty();
zmq_close(frontend_); spdlog::info("Deleting workers ...");
zmq_close(backend_); workers.clear();
spdlog::info("Deleted workers ...");
spdlog::info("Closed server sockets"); spdlog::info("Deleting front end and back end");
frontend = nullptr;
backend = nullptr;
spdlog::info("Terminating context ..."); spdlog::info("Deleted front end and back end");
spdlog::info("Terminating context ...");
// terminate context (destructor will be called) // terminate context (destructor will be called)
ctx_ = nullptr; ctx_ = nullptr;
spdlog::info("Terminated context ..."); spdlog::info("Terminated context ...");
spdlog::info("Deleting threads ...");
worker_threads.empty();
} }
spdlog::info("Deleting workers ...");
spdlog::info("Deleted workers ...");
} }
void ZMQServer::exitZMQServer() { void ZMQServer::exitZMQServer() {
spdlog::info("Exiting zmq server ..."); spdlog::info("Exiting zmq server workers ...");
zmqServer->exitWorkers(); zmqServer->exitWorkers();
spdlog::info("Exited zmq server ..."); spdlog::info("Exited zmq server ...");
spdlog::info("deleting zmq server");
zmqServer = nullptr; zmqServer = nullptr;
spdlog::info("deleted zmq server ");
} }
void ZMQServer::initZMQServer(bool _checkSignature) { void ZMQServer::initZMQServer(bool _checkSignature) {
...@@ -186,8 +184,8 @@ void ZMQServer::initZMQServer(bool _checkSignature) { ...@@ -186,8 +184,8 @@ void ZMQServer::initZMQServer(bool _checkSignature) {
CHECK_STATE(access(rootCAPath.c_str(), F_OK) == 0); CHECK_STATE(access(rootCAPath.c_str(), F_OK) == 0);
}; };
zmqServer = new ZMQServer(_checkSignature, rootCAPath); zmqServer = make_shared<ZMQServer>(_checkSignature, rootCAPath);
serverThread =make_shared<thread> (std::bind(&ZMQServer::run, ZMQServer::zmqServer)); serverThread = make_shared<thread>(std::bind(&ZMQServer::run, ZMQServer::zmqServer));
serverThread->detach(); serverThread->detach();
...@@ -195,4 +193,22 @@ void ZMQServer::initZMQServer(bool _checkSignature) { ...@@ -195,4 +193,22 @@ void ZMQServer::initZMQServer(bool _checkSignature) {
spdlog::info("Inited zmq server ..."); spdlog::info("Inited zmq server ...");
} }
shared_ptr<std::thread> ZMQServer::serverThread = nullptr; shared_ptr <std::thread> ZMQServer::serverThread = nullptr;
\ No newline at end of file
ZMQServer::~ZMQServer() {
spdlog::info("Deleting server thread");
ZMQServer::serverThread = nullptr;
spdlog::info("Deleted server thread");
spdlog::info("Deleting worker threads");
worker_threads.clear();
spdlog::info("Deleted worker threads");
spdlog::info("Deleting workers");
workers.clear();
spdlog::info("Deleted workers");
spdlog::info("Deleting server context");
ctx_ = nullptr;
spdlog::info("Deleted server context");
}
\ No newline at end of file
...@@ -51,12 +51,13 @@ public: ...@@ -51,12 +51,13 @@ public:
string caCertFile = ""; string caCertFile = "";
string caCert = ""; string caCert = "";
static ZMQServer *zmqServer; static shared_ptr<ZMQServer> zmqServer;
static shared_ptr<std::thread> serverThread; static shared_ptr<std::thread> serverThread;
ZMQServer(bool _checkSignature, const string& _caCertFile); ZMQServer(bool _checkSignature, const string& _caCertFile);
~ZMQServer();
void run(); void run();
...@@ -69,13 +70,12 @@ public: ...@@ -69,13 +70,12 @@ public:
private: private:
shared_ptr<zmq::context_t> ctx_; shared_ptr<zmq::context_t> ctx_;
zmq::socket_t frontend_; shared_ptr<zmq::socket_t> frontend;
zmq::socket_t backend_; shared_ptr<zmq::socket_t> backend;
std::vector<shared_ptr<ServerWorker> > workers; std::vector<shared_ptr<ServerWorker> > workers;
std::vector<shared_ptr<std::thread>> worker_threads; std::vector<shared_ptr<std::thread>> worker_threads;
std::atomic<bool> isExitRequested; std::atomic<bool> isExitRequested;
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment