Unverified Commit f457b201 authored by kladko's avatar kladko

SKALE-4586 Added Thread Pool

parent ecde0240
...@@ -206,24 +206,48 @@ namespace moodycamel { ...@@ -206,24 +206,48 @@ namespace moodycamel {
AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN
{ {
switch (order) { switch (order) {
case memory_order_relaxed: break; case memory_order_relaxed:
case memory_order_acquire: std::atomic_signal_fence(std::memory_order_acquire); break; break;
case memory_order_release: std::atomic_signal_fence(std::memory_order_release); break; case memory_order_acquire:
case memory_order_acq_rel: std::atomic_signal_fence(std::memory_order_acq_rel); break; std::atomic_signal_fence(std::memory_order_acquire);
case memory_order_seq_cst: std::atomic_signal_fence(std::memory_order_seq_cst); break; break;
default: assert(false); case memory_order_release:
std::atomic_signal_fence(std::memory_order_release);
break;
case memory_order_acq_rel:
std::atomic_signal_fence(std::memory_order_acq_rel);
break;
case memory_order_seq_cst:
std::atomic_signal_fence(std::memory_order_seq_cst);
break;
default:
assert(false);
} }
} }
AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN
{ {
switch (order) { switch (order) {
case memory_order_relaxed: break; case memory_order_relaxed:
case memory_order_acquire: AE_TSAN_ANNOTATE_ACQUIRE(); std::atomic_thread_fence(std::memory_order_acquire); break; break;
case memory_order_release: AE_TSAN_ANNOTATE_RELEASE(); std::atomic_thread_fence(std::memory_order_release); break; case memory_order_acquire:
case memory_order_acq_rel: AE_TSAN_ANNOTATE_ACQUIRE(); AE_TSAN_ANNOTATE_RELEASE(); std::atomic_thread_fence(std::memory_order_acq_rel); break; AE_TSAN_ANNOTATE_ACQUIRE();
case memory_order_seq_cst: AE_TSAN_ANNOTATE_ACQUIRE(); AE_TSAN_ANNOTATE_RELEASE(); std::atomic_thread_fence(std::memory_order_seq_cst); break; std::atomic_thread_fence(std::memory_order_acquire);
default: assert(false); break;
case memory_order_release:
AE_TSAN_ANNOTATE_RELEASE();
std::atomic_thread_fence(std::memory_order_release);
break;
case memory_order_acq_rel:
AE_TSAN_ANNOTATE_ACQUIRE(); AE_TSAN_ANNOTATE_RELEASE();
std::atomic_thread_fence(std::memory_order_acq_rel);
break;
case memory_order_seq_cst:
AE_TSAN_ANNOTATE_ACQUIRE(); AE_TSAN_ANNOTATE_RELEASE();
std::atomic_thread_fence(std::memory_order_seq_cst);
break;
default:
assert(false);
} }
} }
...@@ -237,8 +261,11 @@ namespace moodycamel { ...@@ -237,8 +261,11 @@ namespace moodycamel {
#endif #endif
#ifdef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC #ifdef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC
#include <atomic> #include <atomic>
#endif #endif
#include <utility> #include <utility>
// WARNING: *NOT* A REPLACEMENT FOR std::atomic. READ CAREFULLY: // WARNING: *NOT* A REPLACEMENT FOR std::atomic. READ CAREFULLY:
...@@ -247,21 +274,26 @@ namespace moodycamel { ...@@ -247,21 +274,26 @@ namespace moodycamel {
// at the hardware level -- on most platforms this generally means aligned pointers and integers (only). // at the hardware level -- on most platforms this generally means aligned pointers and integers (only).
namespace moodycamel { namespace moodycamel {
template<typename T> template<typename T>
class weak_atomic class weak_atomic {
{
public: public:
AE_NO_TSAN weak_atomic() : value() { } AE_NO_TSAN weak_atomic() : value() {}
#ifdef AE_VCPP #ifdef AE_VCPP
#pragma warning(push) #pragma warning(push)
#pragma warning(disable: 4100) // Get rid of (erroneous) 'unreferenced formal parameter' warning #pragma warning(disable: 4100) // Get rid of (erroneous) 'unreferenced formal parameter' warning
#endif #endif
template<typename U> AE_NO_TSAN weak_atomic(U&& x) : value(std::forward<U>(x)) { }
template<typename U>
AE_NO_TSAN weak_atomic(U &&x) : value(std::forward<U>(x)) {}
#ifdef __cplusplus_cli #ifdef __cplusplus_cli
// Work around bug with universal reference/nullptr combination that only appears when /clr is on // Work around bug with universal reference/nullptr combination that only appears when /clr is on
AE_NO_TSAN weak_atomic(nullptr_t) : value(nullptr) { } AE_NO_TSAN weak_atomic(nullptr_t) : value(nullptr) { }
#endif #endif
AE_NO_TSAN weak_atomic(weak_atomic const& other) : value(other.load()) { } AE_NO_TSAN weak_atomic(weak_atomic const &other) : value(other.load()) {}
AE_NO_TSAN weak_atomic(weak_atomic&& other) : value(std::move(other.load())) { }
AE_NO_TSAN weak_atomic(weak_atomic &&other) : value(std::move(other.load())) {}
#ifdef AE_VCPP #ifdef AE_VCPP
#pragma warning(pop) #pragma warning(pop)
#endif #endif
...@@ -303,14 +335,14 @@ namespace moodycamel { ...@@ -303,14 +335,14 @@ namespace moodycamel {
return value; return value;
} }
#else #else
template<typename U> template<typename U>
AE_FORCEINLINE weak_atomic const& operator=(U&& x) AE_NO_TSAN AE_FORCEINLINE weak_atomic const &operator=(U &&x) AE_NO_TSAN {
{
value.store(std::forward<U>(x), std::memory_order_relaxed); value.store(std::forward<U>(x), std::memory_order_relaxed);
return *this; return *this;
} }
AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other) AE_NO_TSAN AE_FORCEINLINE weak_atomic const &operator=(weak_atomic const &other) AE_NO_TSAN
{ {
value.store(other.value.load(std::memory_order_relaxed), std::memory_order_relaxed); value.store(other.value.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this; return *this;
...@@ -327,6 +359,7 @@ namespace moodycamel { ...@@ -327,6 +359,7 @@ namespace moodycamel {
{ {
return value.fetch_add(increment, std::memory_order_release); return value.fetch_add(increment, std::memory_order_release);
} }
#endif #endif
...@@ -336,7 +369,7 @@ namespace moodycamel { ...@@ -336,7 +369,7 @@ namespace moodycamel {
// `volatile` will make memory access slow, but is guaranteed to be reliable. // `volatile` will make memory access slow, but is guaranteed to be reliable.
volatile T value; volatile T value;
#else #else
std::atomic<T> value; std::atomic <T> value;
#endif #endif
}; };
...@@ -369,8 +402,7 @@ extern "C" { ...@@ -369,8 +402,7 @@ extern "C" {
#include <task.h> #include <task.h>
#endif #endif
namespace moodycamel namespace moodycamel {
{
// Code in the spsc_sema namespace below is an adaptation of Jeff Preshing's // Code in the spsc_sema namespace below is an adaptation of Jeff Preshing's
// portable + lightweight semaphore implementations, originally from // portable + lightweight semaphore implementations, originally from
// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
...@@ -392,8 +424,7 @@ namespace moodycamel ...@@ -392,8 +424,7 @@ namespace moodycamel
// 2. Altered source versions must be plainly marked as such, and must not be // 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software. // misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution. // 3. This notice may not be removed or altered from any source distribution.
namespace spsc_sema namespace spsc_sema {
{
#if defined(_WIN32) #if defined(_WIN32)
class Semaphore class Semaphore
{ {
...@@ -655,8 +686,7 @@ namespace moodycamel ...@@ -655,8 +686,7 @@ namespace moodycamel
//--------------------------------------------------------- //---------------------------------------------------------
// LightweightSemaphore // LightweightSemaphore
//--------------------------------------------------------- //---------------------------------------------------------
class LightweightSemaphore class LightweightSemaphore {
{
public: public:
typedef std::make_signed<std::size_t>::type ssize_t; typedef std::make_signed<std::size_t>::type ssize_t;
...@@ -671,10 +701,8 @@ namespace moodycamel ...@@ -671,10 +701,8 @@ namespace moodycamel
// If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC, // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
// as threads start hitting the kernel semaphore. // as threads start hitting the kernel semaphore.
int spin = 1024; int spin = 1024;
while (--spin >= 0) while (--spin >= 0) {
{ if (m_count.load() > 0) {
if (m_count.load() > 0)
{
m_count.fetch_add_acquire(-1); m_count.fetch_add_acquire(-1);
return true; return true;
} }
...@@ -683,8 +711,7 @@ namespace moodycamel ...@@ -683,8 +711,7 @@ namespace moodycamel
oldCount = m_count.fetch_add_acquire(-1); oldCount = m_count.fetch_add_acquire(-1);
if (oldCount > 0) if (oldCount > 0)
return true; return true;
if (timeout_usecs < 0) if (timeout_usecs < 0) {
{
if (m_sema.wait()) if (m_sema.wait())
return true; return true;
} }
...@@ -695,8 +722,7 @@ namespace moodycamel ...@@ -695,8 +722,7 @@ namespace moodycamel
// it. So we have to re-adjust the count, but only if the semaphore // it. So we have to re-adjust the count, but only if the semaphore
// wasn't signaled enough times for us too since then. If it was, we // wasn't signaled enough times for us too since then. If it was, we
// need to release the semaphore too. // need to release the semaphore too.
while (true) while (true) {
{
oldCount = m_count.fetch_add_release(1); oldCount = m_count.fetch_add_release(1);
if (oldCount < 0) if (oldCount < 0)
return false; // successfully restored things to the way they were return false; // successfully restored things to the way they were
...@@ -708,15 +734,13 @@ namespace moodycamel ...@@ -708,15 +734,13 @@ namespace moodycamel
} }
public: public:
AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount), m_sema() AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount), m_sema() {
{
assert(initialCount >= 0); assert(initialCount >= 0);
} }
bool tryWait() AE_NO_TSAN bool tryWait() AE_NO_TSAN
{ {
if (m_count.load() > 0) if (m_count.load() > 0) {
{
m_count.fetch_add_acquire(-1); m_count.fetch_add_acquire(-1);
return true; return true;
} }
...@@ -738,8 +762,7 @@ namespace moodycamel ...@@ -738,8 +762,7 @@ namespace moodycamel
assert(count >= 0); assert(count >= 0);
ssize_t oldCount = m_count.fetch_add_release(count); ssize_t oldCount = m_count.fetch_add_release(count);
assert(oldCount >= -1); assert(oldCount >= -1);
if (oldCount < 0) if (oldCount < 0) {
{
m_sema.signal(1); m_sema.signal(1);
} }
} }
......
This diff is collapsed.
...@@ -42,9 +42,11 @@ using namespace std; ...@@ -42,9 +42,11 @@ using namespace std;
shared_ptr <ZMQServer> ZMQServer::zmqServer = nullptr; shared_ptr <ZMQServer> ZMQServer::zmqServer = nullptr;
ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string &_caCertFile) ZMQServer::ZMQServer(bool _checkSignature, bool _checkKeyOwnership, const string &_caCertFile)
: outgoingQueue(NUM_ZMQ_WORKER_THREADS), checkSignature(_checkSignature), checkKeyOwnership(_checkKeyOwnership), : incomingQueue(NUM_ZMQ_WORKER_THREADS), checkSignature(_checkSignature), checkKeyOwnership(_checkKeyOwnership),
caCertFile(_caCertFile), ctx(make_shared<zmq::context_t>(1)) { caCertFile(_caCertFile), ctx(make_shared<zmq::context_t>(1)) {
CHECK_STATE(NUM_ZMQ_WORKER_THREADS > 1);
socket = make_shared<zmq::socket_t>(*ctx, ZMQ_ROUTER); socket = make_shared<zmq::socket_t>(*ctx, ZMQ_ROUTER);
if (_checkSignature) { if (_checkSignature) {
...@@ -277,12 +279,20 @@ void ZMQServer::doOneServerLoop() { ...@@ -277,12 +279,20 @@ void ZMQServer::doOneServerLoop() {
CHECK_STATE2(msg, ZMQ_COULD_NOT_PARSE); CHECK_STATE2(msg, ZMQ_COULD_NOT_PARSE);
uint64_t index = 0;
if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)!= nullptr) || if ((dynamic_pointer_cast<BLSSignReqMessage>(msg)!= nullptr) ||
dynamic_pointer_cast<ECDSASignReqMessage>(msg)) { dynamic_pointer_cast<ECDSASignReqMessage>(msg)) {
index = NUM_ZMQ_WORKER_THREADS - 1;
} else { } else {
index = 0;
} }
auto element = pair<shared_ptr<ZMQMessage>, shared_ptr<zmq::message_t>>(msg, identity);
incomingQueue.at(index).enqueue(element);
result = msg->process(); result = msg->process();
} catch (ExitRequestedException) { } catch (ExitRequestedException) {
throw; throw;
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "Agent.h" #include "Agent.h"
#include "WorkerThreadPool.h" #include "WorkerThreadPool.h"
#include "ZMQMessage.h"
using namespace moodycamel; using namespace moodycamel;
...@@ -41,6 +42,7 @@ typedef enum {GOT_INCOMING_MSG = 0, GOT_OUTFOING_MSG = 1} PollResult; ...@@ -41,6 +42,7 @@ typedef enum {GOT_INCOMING_MSG = 0, GOT_OUTFOING_MSG = 1} PollResult;
static const uint64_t NUM_ZMQ_WORKER_THREADS = 2; static const uint64_t NUM_ZMQ_WORKER_THREADS = 2;
class ZMQServer : public Agent{ class ZMQServer : public Agent{
uint64_t workerThreads; uint64_t workerThreads;
...@@ -48,9 +50,9 @@ class ZMQServer : public Agent{ ...@@ -48,9 +50,9 @@ class ZMQServer : public Agent{
string caCertFile; string caCertFile;
string caCert; string caCert;
ReaderWriterQueue<pair<string, shared_ptr<zmq_msg_t>>> outgoingQueue; ReaderWriterQueue<pair<string, shared_ptr<zmq::message_t>>> outgoingQueue;
vector<ReaderWriterQueue<pair<string, shared_ptr<zmq_msg_t>>>> incomingQueue; vector<ReaderWriterQueue<pair<shared_ptr<ZMQMessage>, shared_ptr<zmq::message_t>>>> incomingQueue;
bool checkKeyOwnership = true; bool checkKeyOwnership = true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment