Unverified Commit 6545d811 authored by kladko's avatar kladko

bug/SKALE-3751-enable-zeromq

parent cb7c1a63
...@@ -13,14 +13,14 @@ ...@@ -13,14 +13,14 @@
#include "ServerWorker.h" #include "ServerWorker.h"
std::atomic<uint64_t> ServerWorker::workerCount(1); std::atomic <uint64_t> ServerWorker::workerCount(1);
ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx), ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx),
worker_(ctx_, sock_type), worker_(ctx_, sock_type),
isExitRequested(false) { isExitRequested(false) {
index = workerCount.fetch_add(1); index = workerCount.fetch_add(1);
int linger = 0; int linger = 0;
zmq_setsockopt (worker_, ZMQ_LINGER, &linger, sizeof (linger)); zmq_setsockopt(worker_, ZMQ_LINGER, &linger, sizeof(linger));
}; };
void ServerWorker::work() { void ServerWorker::work() {
...@@ -33,7 +33,7 @@ void ServerWorker::work() { ...@@ -33,7 +33,7 @@ void ServerWorker::work() {
Json::Value result; Json::Value result;
int errStatus = -1 * (10000 + __LINE__); int errStatus = -1 * (10000 + __LINE__);
result["status"] = errStatus; result["status"] = errStatus;
result["errorMessage"] = "Server error"; result["errorMessage"] = "Server error";
...@@ -42,17 +42,30 @@ void ServerWorker::work() { ...@@ -42,17 +42,30 @@ void ServerWorker::work() {
zmq::message_t copied_id; zmq::message_t copied_id;
try { try {
zmq_pollitem_t items[1];
items[0].socket = worker_;
items[0].events = ZMQ_POLLIN;
int pollResult = 0;
do {
pollResult = zmq_poll(items, 1, 1000);
if (isExitRequested) {
goto clean;
}
} while (pollResult == 0);
zmq::message_t msg; zmq::message_t msg;
zmq::message_t copied_msg; zmq::message_t copied_msg;
worker_.recv(&identity); worker_.recv(&identity);
cerr << identity.size();
copied_id.copy(&identity); copied_id.copy(&identity);
worker_.recv(&msg); worker_.recv(&msg);
int64_t more; int64_t more;
size_t more_size = sizeof(more); size_t more_size = sizeof(more);
auto rc = zmq_getsockopt (worker_, ZMQ_RCVMORE, &more, &more_size); auto rc = zmq_getsockopt(worker_, ZMQ_RCVMORE, &more, &more_size);
CHECK_STATE(rc == 0); CHECK_STATE(rc == 0);
...@@ -60,21 +73,17 @@ void ServerWorker::work() { ...@@ -60,21 +73,17 @@ void ServerWorker::work() {
memcpy(msgData.data(), msg.data(), msg.size()); memcpy(msgData.data(), msg.data(), msg.size());
cerr << "Received:" << msgData.data();
CHECK_STATE(msg.size() > 5 || msgData.at(0) == '{' || msgData[msg.size()] == '}'); CHECK_STATE(msg.size() > 5 || msgData.at(0) == '{' || msgData[msg.size()] == '}');
memcpy(msgData.data(), msg.data(), msg.size()); memcpy(msgData.data(), msg.data(), msg.size());
auto parsedMsg = ZMQMessage::parse( auto parsedMsg = ZMQMessage::parse(
(const char*) msgData.data(), msg.size(), true); (const char *) msgData.data(), msg.size(), true);
CHECK_STATE(parsedMsg); CHECK_STATE(parsedMsg);
result = parsedMsg->process(); result = parsedMsg->process();
} catch (SGXException &e) { } catch (SGXException &e) {
result["status"] = e.getStatus(); result["status"] = e.getStatus();
...@@ -100,9 +109,9 @@ void ServerWorker::work() { ...@@ -100,9 +109,9 @@ void ServerWorker::work() {
Json::FastWriter fastWriter; Json::FastWriter fastWriter;
replyStr = fastWriter.write(result); replyStr = fastWriter.write(result);
replyStr = replyStr.substr(0, replyStr.size() - 1 ); replyStr = replyStr.substr(0, replyStr.size() - 1);
CHECK_STATE(replyStr.size() > 2); CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{'); CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}'); CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1); zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
...@@ -123,7 +132,7 @@ void ServerWorker::work() { ...@@ -123,7 +132,7 @@ void ServerWorker::work() {
} }
} }
clean: clean:
spdlog::info("Exited worker thread {}", index); spdlog::info("Exited worker thread {}", index);
} }
......
...@@ -72,7 +72,7 @@ string ZMQClient::doZmqRequestReply(string &_req) { ...@@ -72,7 +72,7 @@ string ZMQClient::doZmqRequestReply(string &_req) {
reconnect(); reconnect();
CHECK_STATE(clientSocket); CHECK_STATE(clientSocket);
spdlog::info("ZMQ client sending: \n {}" , _req); spdlog::debug("ZMQ client sending: \n {}" , _req);
s_send(*clientSocket, _req); s_send(*clientSocket, _req);
...@@ -87,7 +87,7 @@ string ZMQClient::doZmqRequestReply(string &_req) { ...@@ -87,7 +87,7 @@ string ZMQClient::doZmqRequestReply(string &_req) {
CHECK_STATE(reply.size() > 5); CHECK_STATE(reply.size() > 5);
reply = reply.substr(0, reply.size() - 1); reply = reply.substr(0, reply.size() - 1);
spdlog::info("ZMQ client received reply:{}", reply); spdlog::debug("ZMQ client received reply:{}", reply);
CHECK_STATE(reply.front() == '{'); CHECK_STATE(reply.front() == '{');
CHECK_STATE(reply.back() == '}'); CHECK_STATE(reply.back() == '}');
......
...@@ -45,12 +45,12 @@ ZMQServer::ZMQServer() ...@@ -45,12 +45,12 @@ ZMQServer::ZMQServer()
void ZMQServer::run() { void ZMQServer::run() {
auto port = BASE_PORT + 4; auto port = BASE_PORT + 5;
spdlog::info("Starting zmq server ..."); spdlog::info("Starting zmq server ...");
try { try {
frontend_.bind("tcp://*:" + to_string(BASE_PORT + 5)); frontend_.bind("tcp://*:" + to_string(port));
} catch (...) { } catch (...) {
spdlog::error("Server task could not bind to port:{}", port); spdlog::error("Server task could not bind to port:{}", port);
exit(-100); exit(-100);
...@@ -107,10 +107,6 @@ void ZMQServer::exitWorkers() { ...@@ -107,10 +107,6 @@ void ZMQServer::exitWorkers() {
spdlog::info("Tell workers to exit"); spdlog::info("Tell workers to exit");
for (auto &&worker : workers) { for (auto &&worker : workers) {
...@@ -126,14 +122,20 @@ void ZMQServer::exitWorkers() { ...@@ -126,14 +122,20 @@ void ZMQServer::exitWorkers() {
spdlog::info("Closed server sockets"); spdlog::info("Closed server sockets");
spdlog::info("Terminating context ..."); spdlog::info("Terminating context ...");
// terminate context (destructor will be called) // terminate context (destructor will be called)
ctx_ = nullptr; ctx_ = nullptr;
spdlog::info("Terminated context ..."); spdlog::info("Terminated context ...");
spdlog::info("Deleting threads ...");
worker_threads.empty();
} }
spdlog::info("Deleting threads ...");
worker_threads.empty();
spdlog::info("Deleting workers ..."); spdlog::info("Deleting workers ...");
spdlog::info("Deleted workers ..."); spdlog::info("Deleted workers ...");
} }
......
...@@ -47,7 +47,7 @@ public: ...@@ -47,7 +47,7 @@ public:
ZMQServer(); ZMQServer();
enum { enum {
kMaxThread = 1 kMaxThread = 16
}; };
void run(); void run();
......
...@@ -77,6 +77,7 @@ public: ...@@ -77,6 +77,7 @@ public:
} }
~TestFixture() { ~TestFixture() {
exitZMQServer();
TestUtils::destroyEnclave(); TestUtils::destroyEnclave();
} }
}; };
...@@ -1125,7 +1126,7 @@ TEST_CASE_METHOD(TestFixtureNoReset, "Second run", "[second-run]") { ...@@ -1125,7 +1126,7 @@ TEST_CASE_METHOD(TestFixtureNoReset, "Second run", "[second-run]") {
TEST_CASE_METHOD(TestFixtureNoReset, "ZMQ-ecdsa", "[zmq-ecdsa-run]") { TEST_CASE_METHOD(TestFixture, "ZMQ-ecdsa", "[zmq-ecdsa]") {
HttpClient htp(RPC_ENDPOINT); HttpClient htp(RPC_ENDPOINT);
StubClient c(htp, JSONRPC_CLIENT_V2); StubClient c(htp, JSONRPC_CLIENT_V2);
...@@ -1138,9 +1139,22 @@ TEST_CASE_METHOD(TestFixtureNoReset, "ZMQ-ecdsa", "[zmq-ecdsa-run]") { ...@@ -1138,9 +1139,22 @@ TEST_CASE_METHOD(TestFixtureNoReset, "ZMQ-ecdsa", "[zmq-ecdsa-run]") {
PRINT_SRC_LINE PRINT_SRC_LINE
keyName = genECDSAKeyAPI(c); keyName = genECDSAKeyAPI(c);
int end = 100000;
string sh = string(SAMPLE_HASH);
PRINT_SRC_LINE PRINT_SRC_LINE
for (int i = 1; i < 1000; i++) { for (int i = 1; i < 10000; i++) {
auto sig = client->ecdsaSignMessageHash(16, keyName, SAMPLE_HASH);
auto hash = sh.substr(0, sh.size() - 6) + to_string(end + i);
auto sig = client->ecdsaSignMessageHash(16, keyName, hash);
REQUIRE(sig.size() > 10); REQUIRE(sig.size() > 10);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment