Unverified Commit 43a7d5b1 authored by kladko's avatar kladko

bug/SKALE-3662 Adding libzmq

parent 83888e7d
...@@ -29,9 +29,9 @@ ...@@ -29,9 +29,9 @@
Json::Value ECDSASignReqMessage::process() { Json::Value ECDSASignReqMessage::process() {
auto base = getUint64Rapid("bs"); auto base = getUint64Rapid("base");
auto keyName = getStringRapid("kn"); auto keyName = getStringRapid("keyName");
auto hash = getStringRapid("mh"); auto hash = getStringRapid("messageHash");
auto result = SGXWalletServer::ecdsaSignMessageHashImpl(base, keyName, hash); auto result = SGXWalletServer::ecdsaSignMessageHashImpl(base, keyName, hash);
result["type"] = ZMQMessage::ECDSA_SIGN_RSP; result["type"] = ZMQMessage::ECDSA_SIGN_RSP;
return result; return result;
......
...@@ -30,36 +30,47 @@ void ServerWorker::work() { ...@@ -30,36 +30,47 @@ void ServerWorker::work() {
result["status"] = errStatus; result["status"] = errStatus;
result["errorMessage"] = "Server error"; result["errorMessage"] = "Server error";
zmq::message_t identity;
zmq::message_t identit2;
zmq::message_t copied_id;
try { try {
zmq::message_t msg; zmq::message_t msg;
zmq::message_t copied_msg;
worker_.recv(&identity);
cerr << identity.size();
copied_id.copy(&identity);
worker_.recv(&msg); worker_.recv(&msg);
int64_t more;
size_t more_size = sizeof(more);
auto rc = zmq_getsockopt (worker_, ZMQ_RCVMORE, &more, &more_size);
CHECK_STATE(rc == 0);
vector <uint8_t> msgData(msg.size() + 1, 0); vector <uint8_t> msgData(msg.size() + 1, 0);
memcpy(msgData.data(), msg.data(), msg.size()); memcpy(msgData.data(), msg.data(), msg.size());
if (msg.size() < 5 || msgData.at(0) != '{' || msgData[msg.size()] != '}') {
cerr << "haha";
continue;
}
cerr << "Received:" << msgData.data(); cerr << "Received:" << msgData.data();
auto parsedMsg = ZMQMessage::parse(
(const char*) msgData.data(), msg.size(), true);
CHECK_STATE(parsedMsg); CHECK_STATE(msg.size() > 5 || msgData.at(0) == '{' || msgData[msg.size()] == '}');
result = parsedMsg->process();
memcpy(msgData.data(), msg.data(), msg.size());
auto parsedMsg = ZMQMessage::parse(
(const char*) msgData.data(), msg.size(), true);
} CHECK_STATE(parsedMsg);
result = parsedMsg->process();
catch (SGXException &e) { } catch (SGXException &e) {
result["status"] = e.getStatus(); result["status"] = e.getStatus();
result["errorMessage"] = e.getMessage(); result["errorMessage"] = e.getMessage();
spdlog::error("Exception in zmq server worker:{}", e.what()); spdlog::error("Exception in zmq server worker:{}", e.what());
...@@ -83,7 +94,13 @@ void ServerWorker::work() { ...@@ -83,7 +94,13 @@ void ServerWorker::work() {
CHECK_STATE(replyStr.front() == '{'); CHECK_STATE(replyStr.front() == '{');
CHECK_STATE(replyStr.back() == '}'); CHECK_STATE(replyStr.back() == '}');
zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1); zmq::message_t replyMsg(replyStr.c_str(), replyStr.size() + 1);
cerr << "sending!!!";
worker_.send(copied_id, ZMQ_SNDMORE);
worker_.send(replyMsg); worker_.send(replyMsg);
cerr << "sent!!!";
} catch (std::exception &e) { } catch (std::exception &e) {
spdlog::error("Exception in zmq server worker send :{}", e.what()); spdlog::error("Exception in zmq server worker send :{}", e.what());
} catch (...) { } catch (...) {
......
...@@ -20,6 +20,9 @@ ...@@ -20,6 +20,9 @@
@author Stan Kladko @author Stan Kladko
@date 2020 @date 2020
*/ */
#include "sys/random.h"
#include "common.h" #include "common.h"
#include "BLSSignReqMessage.h" #include "BLSSignReqMessage.h"
#include "BLSSignRspMessage.h" #include "BLSSignRspMessage.h"
...@@ -51,7 +54,6 @@ string ZMQClient::doZmqRequestReply(string &_req) { ...@@ -51,7 +54,6 @@ string ZMQClient::doZmqRequestReply(string &_req) {
CHECK_STATE(clientSocket); CHECK_STATE(clientSocket);
cerr << "Sending:" << _req; cerr << "Sending:" << _req;
sleep(1);
s_send(*clientSocket, _req); s_send(*clientSocket, _req);
...@@ -63,6 +65,11 @@ string ZMQClient::doZmqRequestReply(string &_req) { ...@@ -63,6 +65,11 @@ string ZMQClient::doZmqRequestReply(string &_req) {
// If we got a reply, process it // If we got a reply, process it
if (items[0].revents & ZMQ_POLLIN) { if (items[0].revents & ZMQ_POLLIN) {
string reply = s_recv(*clientSocket); string reply = s_recv(*clientSocket);
cerr << "Received!" + reply;
sleep(1000);
return reply; return reply;
} else { } else {
spdlog::error("W: no response from server, retrying..."); spdlog::error("W: no response from server, retrying...");
...@@ -79,12 +86,19 @@ ZMQClient::ZMQClient(string &ip, uint16_t port) : ctx(1) { ...@@ -79,12 +86,19 @@ ZMQClient::ZMQClient(string &ip, uint16_t port) : ctx(1) {
} }
void ZMQClient::reconnect() { void ZMQClient::reconnect() {
getrandom(identity, 10, 0);
clientSocket = nullptr; // delete previous clientSocket = nullptr; // delete previous
clientSocket = make_unique<zmq::socket_t>(ctx, ZMQ_DEALER); clientSocket = make_unique<zmq::socket_t>(ctx, ZMQ_DEALER);
clientSocket->connect(url); clientSocket->setsockopt(ZMQ_IDENTITY, identity, 10);
// Configure socket to not wait at close time // Configure socket to not wait at close time
int linger = 0; int linger = 0;
clientSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); clientSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
clientSocket->connect(url);
} }
......
...@@ -35,6 +35,8 @@ ...@@ -35,6 +35,8 @@
#include "ZMQMessage.h" #include "ZMQMessage.h"
#define REQUEST_TIMEOUT 10000 // msecs, (> 1000!) #define REQUEST_TIMEOUT 10000 // msecs, (> 1000!)
class ZMQClient { class ZMQClient {
...@@ -46,6 +48,9 @@ private: ...@@ -46,6 +48,9 @@ private:
std::unique_ptr <zmq::socket_t> clientSocket; std::unique_ptr <zmq::socket_t> clientSocket;
string url; string url;
// generate random identity
char identity[10] = {};
shared_ptr <ZMQMessage> doRequestReply(Json::Value &_req); shared_ptr <ZMQMessage> doRequestReply(Json::Value &_req);
string doZmqRequestReply(string &_req); string doZmqRequestReply(string &_req);
......
...@@ -49,7 +49,7 @@ public: ...@@ -49,7 +49,7 @@ public:
atomic<bool> isExitRequested; atomic<bool> isExitRequested;
enum { enum {
kMaxThread = 5 kMaxThread = 1
}; };
void run(); void run();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment