WorkerThreadPool.cpp 2.22 KB
Newer Older
kladko's avatar
kladko committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
    Copyright (C) 2021 SKALE Labs

    This file is part of skale-consensus.

    skale-consensus is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published
    by the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    skale-consensus is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with skale-consensus.  If not, see <https://www.gnu.org/licenses/>.

    @file WorkerThreadPool.cpp
    @author Stan Kladko
    @date 2021
*/

kladko's avatar
kladko committed
24 25 26 27 28 29
#include "document.h"
#include "stringbuffer.h"
#include "writer.h"



kladko's avatar
kladko committed
30
#include "common.h"
kladko's avatar
kladko committed
31
#include "sgxwallet_common.h"
kladko's avatar
kladko committed
32
#include "third_party/spdlog/spdlog.h"
kladko's avatar
kladko committed
33
#include "ZMQServer.h"
kladko's avatar
kladko committed
34 35 36
#include "WorkerThreadPool.h"


kladko's avatar
kladko committed
37 38 39
WorkerThreadPool::WorkerThreadPool(uint64_t _numThreads, ZMQServer *_agent) : joined(false) {
    CHECK_STATE(_numThreads > 0);
    CHECK_STATE(_agent);
kladko's avatar
kladko committed
40

kladko's avatar
kladko committed
41 42 43 44
    spdlog::info("Creating thread pool. Threads count:" + to_string(_numThreads));

    this->agent = _agent;
    this->numThreads = _numThreads;;
kladko's avatar
kladko committed
45

kladko's avatar
kladko committed
46

kladko's avatar
kladko committed
47 48 49 50
    for (uint64_t i = 0; i < (uint64_t) numThreads; i++) {
        createThread(i);
    }

kladko's avatar
kladko committed
51
    spdlog::info("Created thread pool");
kladko's avatar
kladko committed
52 53 54 55 56 57

}


void WorkerThreadPool::joinAll() {

kladko's avatar
kladko committed
58
    spdlog::info("Joining worker threads ...");
kladko's avatar
kladko committed
59

kladko's avatar
kladko committed
60 61
    if (joined.exchange(true))
        return;
kladko's avatar
kladko committed
62 63 64 65 66 67

    for (auto &&thread : threadpool) {
        if (thread->joinable())
            thread->join();
        CHECK_STATE(!thread->joinable());
    }
kladko's avatar
kladko committed
68 69

    spdlog::info("Joined worker threads.");
kladko's avatar
kladko committed
70 71 72 73 74 75 76 77
}

bool WorkerThreadPool::isJoined() const {
    return joined;
}

WorkerThreadPool::~WorkerThreadPool(){
}
kladko's avatar
kladko committed
78 79 80 81 82 83 84 85 86 87

void WorkerThreadPool::createThread(uint64_t _threadNumber) {

    spdlog::info("Starting ZMQ worker thread " + to_string(_threadNumber) );

    this->threadpool.push_back(
            make_shared< thread >( ZMQServer::workerThreadMessageProcessLoop, agent ) );

    spdlog::info("Started ZMQ worker thread " + to_string(_threadNumber) );
}