Unverified Commit 10d80482 authored by kladko's avatar kladko

bug/SKALE-3662 Adding libzmq

parent c064a774
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <zmq.hpp> #include <zmq.hpp>
#include "zhelpers.hpp" #include "zhelpers.hpp"
#include "Log.h"
#include "ZMQMessage.h" #include "ZMQMessage.h"
#include "ServerWorker.h" #include "ServerWorker.h"
...@@ -18,50 +19,67 @@ ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx), ...@@ -18,50 +19,67 @@ ServerWorker::ServerWorker(zmq::context_t &ctx, int sock_type) : ctx_(ctx),
void ServerWorker::work() { void ServerWorker::work() {
worker_.connect("inproc://backend"); worker_.connect("inproc://backend");
std::string replyStr; std::string replyStr;
while (true) { while (true) {
Json::Value result;
int errStatus = -1 * (10000 + __LINE__);
result["status"] = errStatus;
result["errorMessage"] = "Server error";
try { try {
zmq::message_t msg; zmq::message_t msg;
worker_.recv(&msg); worker_.recv(&msg);
vector <uint8_t> msgData(msg.size() + 1, 0); vector <uint8_t> msgData(msg.size() + 1, 0);
memcpy(msgData.data(), msg.data(), msg.size()); cerr << "Received:" << msgData.data();
sleep(1);
auto parsedMsg = ZMQMessage::parse(
auto parsedMsg = ZMQMessage::parse(msgData, true); (const char*) msgData.data(), msg.size(), true);
CHECK_STATE(parsedMsg); CHECK_STATE(parsedMsg);
auto reply = parsedMsg->process(); result = parsedMsg->process();
Json::FastWriter fastWriter;
replyStr = fastWriter.write(reply);
} }
catch (SGXException &e) {
result["status"] = e.getStatus();
result["errorMessage"] = e.getMessage();
spdlog::error("Exception in zmq server worker:{}", e.what());
}
catch (std::exception &e) { catch (std::exception &e) {
result["errorMessage"] = string(e.what());
spdlog::error("Exception in zmq server worker:{}", e.what()); spdlog::error("Exception in zmq server worker:{}", e.what());
replyStr = "";
} catch (...) { } catch (...) {
spdlog::error("Error in zmq server worker"); spdlog::error("Error in zmq server worker");
replyStr = ""; result["errorMessage"] = "Error in zmq server worker";
} }
try { try {
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1); Json::FastWriter fastWriter;
replyStr = fastWriter.write(result);
replyStr = replyStr.substr(0, replyStr.size() - 1 );
CHECK_STATE(replyStr.size() > 2);
CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
worker_.send(replyMsg); worker_.send(replyMsg);
} catch (std::exception &e) { } catch (std::exception &e) {
spdlog::error("Exception in zmq server send :{}", e.what()); spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) { } catch (...) {
spdlog::error("Unklnown exception in zmq server send"); spdlog::error("Unklnown exception in zmq server worker send");
} }
} }
......
...@@ -32,6 +32,9 @@ shared_ptr <ZMQMessage> ZMQClient::doRequestReply(Json::Value &_req) { ...@@ -32,6 +32,9 @@ shared_ptr <ZMQMessage> ZMQClient::doRequestReply(Json::Value &_req) {
Json::FastWriter fastWriter; Json::FastWriter fastWriter;
string reqStr = fastWriter.write(_req); string reqStr = fastWriter.write(_req);
reqStr = reqStr.substr(0, reqStr.size() - 1);
CHECK_STATE(reqStr.front() == '{');
CHECK_STATE(reqStr.at(reqStr.size() - 1) == '}');
auto resultStr = doZmqRequestReply(reqStr); auto resultStr = doZmqRequestReply(reqStr);
...@@ -47,6 +50,9 @@ string ZMQClient::doZmqRequestReply(string &_req) { ...@@ -47,6 +50,9 @@ string ZMQClient::doZmqRequestReply(string &_req) {
reconnect(); reconnect();
CHECK_STATE(clientSocket); CHECK_STATE(clientSocket);
cerr << "Sending:" << _req;
sleep(1);
s_send(*clientSocket, _req); s_send(*clientSocket, _req);
while (true) { while (true) {
...@@ -70,7 +76,6 @@ string ZMQClient::doZmqRequestReply(string &_req) { ...@@ -70,7 +76,6 @@ string ZMQClient::doZmqRequestReply(string &_req) {
ZMQClient::ZMQClient(string &ip, uint16_t port) : ctx(1) { ZMQClient::ZMQClient(string &ip, uint16_t port) : ctx(1) {
url = "tcp://" + ip + ":" + to_string(port); url = "tcp://" + ip + ":" + to_string(port);
} }
void ZMQClient::reconnect() { void ZMQClient::reconnect() {
......
...@@ -46,19 +46,16 @@ string ZMQMessage::getStringRapid(const char *_name) { ...@@ -46,19 +46,16 @@ string ZMQMessage::getStringRapid(const char *_name) {
}; };
shared_ptr <ZMQMessage> ZMQMessage::parse(vector <uint8_t> &_msg, bool _isRequest) {
CHECK_STATE(_msg.size() > 2);
CHECK_STATE(_msg.back() == 0);
return parse((const char *) _msg.data(), _msg.size() - 1, _isRequest);
}
shared_ptr <ZMQMessage> ZMQMessage::parse(const char* _msg, shared_ptr <ZMQMessage> ZMQMessage::parse(const char* _msg,
size_t _size, bool _isRequest) { size_t _size, bool _isRequest) {
CHECK_STATE(_size > 2); cerr << "Server got:" << _msg << endl;
CHECK_STATE(_msg); CHECK_STATE(_msg);
CHECK_STATE(_size > 5);
// CHECK NULL TERMINATED // CHECK NULL TERMINATED
CHECK_STATE(_msg[_size] == 0); CHECK_STATE(_msg[_size] == 0);
CHECK_STATE(_msg[_size - 1] == '}'); CHECK_STATE(_msg[_size - 1] == '}');
......
...@@ -1150,7 +1150,7 @@ TEST_CASE_METHOD(TestFixtureNoReset, "ZMQ-ecdsa", "[zmq-ecdsa-run]") { ...@@ -1150,7 +1150,7 @@ TEST_CASE_METHOD(TestFixtureNoReset, "ZMQ-ecdsa", "[zmq-ecdsa-run]") {
PRINT_SRC_LINE PRINT_SRC_LINE
auto sig = client.ecdsaSignMessageHash(16, keyName, SAMPLE_HASH); auto sig = client.ecdsaSignMessageHash(16, keyName, SAMPLE_HASH);
REQUIRE(sig.size() > 10); REQUIRE(sig.size() > 10);
} catch (JsonRpcException &e) { } catch (exception &e) {
cerr << e.what() << endl; cerr << e.what() << endl;
throw; throw;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment