Unverified Commit 0e0dcaad authored by kladko's avatar kladko

SKALE-3751-zeromq

parent 12590c20
...@@ -11,12 +11,28 @@ ...@@ -11,12 +11,28 @@
#include <jsonrpccpp/client.h> #include <jsonrpccpp/client.h>
#include "ZMQMessage.h" #include "ZMQMessage.h"
#define REQUEST_TIMEOUT 2500 // msecs, (> 1000!)
class ZMQClient { class ZMQClient {
ZMQClient(string &ip, uint16_t port) : ctx(1),
clientSocket(ctx_, ZMQ_REQ) {
url = "tcp://" + ip + ":" + to_string(port);
}
Json::Value blsSignMessageHash(const std::string& keyShareName, const std::string& messageHash, int t, int n) void reconnect() {
{ clientSocket = nullptr; // delete previous
clientSocket = make_unique < zmq::socket_t > clientSocket(ctx_, ZMQ_REQ);
clienSocket->connect(url);
// Configure socket to not wait at close time
int linger = 0;
clientSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
}
Json::Value blsSignMessageHash(const std::string &keyShareName, const std::string &messageHash, int t, int n) {
Json::Value p; Json::Value p;
p["type"] = ZMQMessage::BLS_SIGN_REQ; p["type"] = ZMQMessage::BLS_SIGN_REQ;
p["keyShareName"] = keyShareName; p["keyShareName"] = keyShareName;
...@@ -30,22 +46,62 @@ class ZMQClient { ...@@ -30,22 +46,62 @@ class ZMQClient {
throw jsonrpc::JsonRpcException(jsonrpc::Errors::ERROR_CLIENT_INVALID_RESPONSE, result.toStyledString()); throw jsonrpc::JsonRpcException(jsonrpc::Errors::ERROR_CLIENT_INVALID_RESPONSE, result.toStyledString());
} }
Json::Value sendRequest(Json::Value& _req) {}; shared_ptr<ZMQMessage> doRequestReply(Json::Value &_req) {
Json::Value ecdsaSignMessageHash(int base, const std::string& keyName, const std::string& messageHash) Json::FastWriter fastWriter;
{ string reqStr = fastWriter.write(_req);
Json::Value p;
p["type"] = ZMQMessage::ECDSA_SIGN_REQ; auto resultStr = doZmqRequestReply(reqStr);
p["base"] = base;
p["keyName"] = keyName; return ZMQMessage::parse(resultStr);
p["messageHash"] = messageHash;
Json::Value result = sendRequest(p); }
if (result.isObject())
return result; string doZmqRequestReply(string &_req) {
else
throw jsonrpc::JsonRpcException(jsonrpc::Errors::ERROR_CLIENT_INVALID_RESPONSE, result.toStyledString()); stringstream request;
s_send(*client, _req.str());
while (true) {
// Poll socket for a reply, with timeout
zmq::pollitem_t items[] = {
{static_cast<void *>(*client), 0, ZMQ_POLLIN, 0}};
zmq::poll(&items[0], 1, REQUEST_TIMEOUT);
// If we got a reply, process it
if (items[0].revents & ZMQ_POLLIN) {
reply = s_recv(*client);
return reply;
} else {
spdlog::error("W: no response from server, retrying...");
reconnect();
// Send request again, on new socket
s_send(*client, _req.str());
}
}
} }
};
Json::Value ecdsaSignMessageHash(int base, const std::string &keyName, const std::string &messageHash) {
Json::Value p;
p["type"] = ZMQMessage::ECDSA_SIGN_REQ;
p["base"] = base;
p["keyName"] = keyName;
p["messageHash"] = messageHash;
Json::Value result = sendRequest(p);
if (result.isObject())
return result;
else
throw jsonrpc::JsonRpcException(jsonrpc::Errors::ERROR_CLIENT_INVALID_RESPONSE, result.toStyledString());
}
private:
zmq::context_t ctx;
unique_ptr <zmq::socket_t> clientSocket;
string url;
}; };
......
...@@ -28,11 +28,10 @@ ...@@ -28,11 +28,10 @@
#include "ZMQMessage.h" #include "ZMQMessage.h"
uint64_t ZMQMessage::getUint64Rapid(const char *_name) { uint64_t ZMQMessage::getUint64Rapid(const char *_name) {
CHECK_STATE(_name); CHECK_STATE(_name);
CHECK_STATE(d->HasMember(_name)); CHECK_STATE(d->HasMember(_name));
const rapidjson::Value& a = (*d)[_name]; const rapidjson::Value &a = (*d)[_name];
CHECK_STATE(a.IsUint64()); CHECK_STATE(a.IsUint64());
return a.GetUint64(); return a.GetUint64();
}; };
...@@ -44,31 +43,53 @@ string ZMQMessage::getStringRapid(const char *_name) { ...@@ -44,31 +43,53 @@ string ZMQMessage::getStringRapid(const char *_name) {
return (*d)[_name].GetString(); return (*d)[_name].GetString();
}; };
shared_ptr<ZMQMessage> ZMQMessage::parse(vector<uint8_t>& _msg) { shared_ptr <ZMQMessage> ZMQMessage::parse(vector <uint8_t> &_msg, bool _isRequest) {
CHECK_STATE(_msg.at(_msg.size() - 1) == 0); CHECK_STATE(_msg.at(_msg.size() - 1) == 0);
auto d = make_shared<rapidjson::Document>(); auto d = make_shared<rapidjson::Document>();
d->Parse((const char*) _msg.data()); d->Parse((const char *) _msg.data());
CHECK_STATE(!d->HasParseError()); CHECK_STATE(!d->HasParseError());
CHECK_STATE(d->IsObject()) CHECK_STATE(d->IsObject())
CHECK_STATE(d->HasMember("type")); CHECK_STATE(d->HasMember("type"));
CHECK_STATE((*d)["type"].IsString()); CHECK_STATE((*d)["type"].IsString());
auto type = (*d)["type"].GetString(); auto type = (*d)["type"].GetString();
shared_ptr <ZMQMessage> result;
shared_ptr<ZMQMessage> result; if (isRequest) {
return buildRequest(type, d);
} else {
return buildResponse(type, d);
}
}
shared_ptr <ZMQMessage> ZMQMessage::buildRequest(string type, shared_ptr <rapidjson::Document> _d) {
if (type == ZMQMessage::BLS_SIGN_REQ) { if (type == ZMQMessage::BLS_SIGN_REQ) {
result = make_shared<BLSSignReqMessage>(d); return
make_shared<BLSSignReqMessage>(d);
} else if (type == ZMQMessage::ECDSA_SIGN_REQ) { } else if (type == ZMQMessage::ECDSA_SIGN_REQ) {
result = make_shared<ECDSASignReqMessage>(d); return
make_shared<ECDSASignReqMessage>(d);
} else { } else {
throw SGXException(-301, "Incorrect zmq message type: " + string(type)); BOOST_THROW_EXCEPTION((-301, "Incorrect zmq message type: " +
string(type)));
} }
return result;
} }
shared_ptr <ZMQMessage> ZMQMessage::buildRequest(string& type, shared_ptr <rapidjson::Document> _d) {
if (type == ZMQMessage::BLS_SIGN_RSP) {
return
make_shared<BLSSignRspMessage>(d);
} else if (type == ZMQMessage::ECDSA_SIGN_REQ) {
return
make_shared<ECDSASignRspMessage>(d);
} else {
BOOST_THROW_EXCEPTION(InvalidStateException("Incorrect zmq message request type: " + string(type),
__CLASS_NAME__)
);
}
}
\ No newline at end of file
...@@ -57,7 +57,10 @@ public: ...@@ -57,7 +57,10 @@ public:
uint64_t getUint64Rapid(const char *_name); uint64_t getUint64Rapid(const char *_name);
static shared_ptr<ZMQMessage> parse(vector<uint8_t> &_msg); static shared_ptr<ZMQMessage> parse(vector<uint8_t> &_msg, bool _isRequest);
shared_ptr<ZMQMessage> buildRequest(string& type, shared_ptr <rapidjson::Document> _d);
shared_ptr<ZMQMessage> buildResponse(string& type, shared_ptr <rapidjson::Document> _d);
virtual Json::Value process() = 0; virtual Json::Value process() = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment