Internals: Add V3ThreadPool class (#3898)

The thread pool is self tested, but not otherwise used by the code yet.
This commit is contained in:
Kamil Rakoczy 2023-01-27 16:43:50 +01:00 committed by GitHub
parent 5125b94fd8
commit a39c7f7dac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 395 additions and 0 deletions

View File

@ -150,6 +150,7 @@ set(HEADERS
V3SymTable.h
V3Table.h
V3Task.h
V3ThreadPool.h
V3Timing.h
V3Trace.h
V3TraceDecl.h
@ -283,6 +284,7 @@ set(COMMON_SOURCES
V3Subst.cpp
V3Table.cpp
V3Task.cpp
V3ThreadPool.cpp
V3Trace.cpp
V3TraceDecl.cpp
V3Tristate.cpp

View File

@ -265,6 +265,7 @@ RAW_OBJS = \
V3TSP.o \
V3Table.o \
V3Task.o \
V3ThreadPool.o \
V3Timing.o \
V3Trace.o \
V3TraceDecl.o \

198
src/V3ThreadPool.cpp Normal file
View File

@ -0,0 +1,198 @@
// -*- mode: C++; c-file-style: "cc-mode" -*-
//*************************************************************************
// DESCRIPTION: Verilator: Thread pool for Verilator itself
//
// Code available from: https://verilator.org
//
//*************************************************************************
//
// Copyright 2005-2023 by Wilson Snyder. This program is free software; you can
// redistribute it and/or modify it under the terms of either the GNU
// Lesser General Public License Version 3 or the Perl Artistic License
// Version 2.0.
// SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0
//
//*************************************************************************
#include "config_build.h"
#include "V3ThreadPool.h"
#include "V3Error.h"
// c++11 requires definition of static constexpr as well as declaration
constexpr unsigned int V3ThreadPool::FUTUREWAITFOR_MS;
void V3ThreadPool::resize(unsigned n) VL_MT_UNSAFE {
// This function is not thread-safe and can result in race between threads
VerilatedLockGuard lock{m_mutex};
VerilatedLockGuard stoppedJobsLock{m_stoppedJobsMutex};
UASSERT(m_queue.empty(), "Resizing busy thread pool");
// Shut down old threads
m_shutdown = true;
m_stoppedJobs = 0;
m_cv.notify_all();
m_stoppedJobsCV.notify_all();
stoppedJobsLock.unlock();
lock.unlock();
while (!m_workers.empty()) {
m_workers.front().join();
m_workers.pop_front();
}
lock.lock();
// Start new threads
m_shutdown = false;
for (unsigned int i = 1; i < n; ++i) {
m_workers.emplace_back(&V3ThreadPool::startWorker, this, i);
}
}
void V3ThreadPool::startWorker(V3ThreadPool* selfThreadp, int id) VL_MT_SAFE {
selfThreadp->workerJobLoop(id);
}
void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE {
while (true) {
// Wait for a notification
waitIfStopRequested();
job_t job;
{
VerilatedLockGuard lock(m_mutex);
m_cv.wait(lock, [&]() VL_REQUIRES(m_mutex) {
return !m_queue.empty() || m_shutdown || m_stopRequested;
});
if (m_shutdown) return; // Terminate if requested
if (stopRequestedStandalone()) { continue; }
// Get the job
UASSERT(!m_queue.empty(), "Job should be available");
job = m_queue.front();
m_queue.pop();
}
// Execute the job
job();
}
}
template <>
void V3ThreadPool::pushJob<void>(std::shared_ptr<std::promise<void>>& prom,
std::function<void()>&& f) VL_MT_SAFE {
if (willExecuteSynchronously()) {
f();
prom->set_value();
} else {
const VerilatedLockGuard lock{m_mutex};
m_queue.push([prom, f] {
f();
prom->set_value();
});
}
}
void V3ThreadPool::requestExclusiveAccess(const V3ThreadPool::job_t&& exclusiveAccessJob)
VL_MT_SAFE {
if (willExecuteSynchronously()) {
exclusiveAccessJob();
} else {
VerilatedLockGuard stoppedJobLock{m_stoppedJobsMutex};
// if some other job already requested exclusive access
// wait until it stops
if (stopRequested()) { waitStopRequested(stoppedJobLock); }
m_stopRequested = true;
waitOtherThreads(stoppedJobLock);
m_exclusiveAccess = true;
exclusiveAccessJob();
m_exclusiveAccess = false;
m_stopRequested = false;
m_stoppedJobsCV.notify_all();
}
}
bool V3ThreadPool::waitIfStopRequested() VL_MT_SAFE {
VerilatedLockGuard stoppedJobLock(m_stoppedJobsMutex);
if (!stopRequested()) return false;
waitStopRequested(stoppedJobLock);
return true;
}
void V3ThreadPool::waitStopRequested(VerilatedLockGuard& stoppedJobLock)
VL_REQUIRES(m_stoppedJobsMutex) {
++m_stoppedJobs;
m_stoppedJobsCV.notify_all();
m_stoppedJobsCV.wait(
stoppedJobLock, [&]() VL_REQUIRES(m_stoppedJobsMutex) { return !m_stopRequested.load(); });
--m_stoppedJobs;
m_stoppedJobsCV.notify_all();
}
void V3ThreadPool::waitOtherThreads(VerilatedLockGuard& stoppedJobLock)
VL_MT_SAFE_EXCLUDES(m_mutex) VL_REQUIRES(m_stoppedJobsMutex) {
++m_stoppedJobs;
m_stoppedJobsCV.notify_all();
m_cv.notify_all();
m_stoppedJobsCV.wait(stoppedJobLock, [&]() VL_REQUIRES(m_stoppedJobsMutex) {
// count also the main thread
return m_stoppedJobs == (m_workers.size() + 1);
});
--m_stoppedJobs;
}
void V3ThreadPool::selfTest() {
VerilatedMutex commonMutex;
int commonValue{0};
auto firstJob = [&](int sleep) -> void {
std::this_thread::sleep_for(std::chrono::milliseconds{sleep});
s().requestExclusiveAccess([&]() {
commonValue = 10;
std::this_thread::sleep_for(std::chrono::milliseconds{sleep + 10});
UASSERT(commonValue == 10, "unexpected commonValue = " << commonValue);
});
};
auto secondJob = [&](int sleep) -> void {
VerilatedLockGuard lock{commonMutex};
lock.unlock();
s().waitIfStopRequested();
lock.lock();
std::this_thread::sleep_for(std::chrono::milliseconds{sleep});
commonValue = 1000;
};
auto thirdJob = [&](int sleep) -> void {
VerilatedLockGuard lock{commonMutex};
std::this_thread::sleep_for(std::chrono::milliseconds{sleep});
lock.unlock();
s().requestExclusiveAccess([&]() { firstJob(sleep); });
lock.lock();
commonValue = 1000;
};
std::list<std::future<void>> futures;
futures.push_back(s().enqueue<void>(std::bind(firstJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(secondJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(firstJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(secondJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(secondJob, 200)));
futures.push_back(s().enqueue<void>(std::bind(firstJob, 200)));
futures.push_back(s().enqueue<void>(std::bind(firstJob, 300)));
while (!futures.empty()) {
s().waitForFuture(futures.front());
futures.pop_front();
}
futures.push_back(s().enqueue<void>(std::bind(thirdJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(thirdJob, 100)));
while (!futures.empty()) {
s().waitForFuture(futures.front());
futures.pop_front();
}
s().waitIfStopRequested();
s().requestExclusiveAccess(std::bind(firstJob, 100));
auto forthJob = [&]() -> int { return 1234; };
std::list<std::future<int>> futuresInt;
futuresInt.push_back(s().enqueue<int>(forthJob));
while (!futuresInt.empty()) {
int result = s().waitForFuture(futuresInt.front());
UASSERT(result == 1234, "unexpected future result = " << commonValue);
futuresInt.pop_front();
}
}

169
src/V3ThreadPool.h Normal file
View File

@ -0,0 +1,169 @@
// -*- mode: C++; c-file-style: "cc-mode" -*-
//*************************************************************************
// DESCRIPTION: Verilator: Thread pool for Verilator itself
//
// Code available from: https://verilator.org
//
//*************************************************************************
//
// Copyright 2005-2023 by Wilson Snyder. This program is free software; you can
// redistribute it and/or modify it under the terms of either the GNU
// Lesser General Public License Version 3 or the Perl Artistic License
// Version 2.0.
// SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0
//
//*************************************************************************
#ifndef _V3THREADPOOL_H_
#define _V3THREADPOOL_H_ 1
#include "verilated_threads.h"
#include <condition_variable>
#include <functional>
#include <future>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
//============================================================================
class V3ThreadPool final {
// MEMBERS
static constexpr unsigned int FUTUREWAITFOR_MS = 100;
using job_t = std::function<void()>;
mutable VerilatedMutex m_mutex; // Mutex for use by m_queue
std::queue<job_t> m_queue VL_GUARDED_BY(m_mutex); // Queue of jobs
// We don't need to guard this condition_variable as
// both `notify_one` and `notify_all` functions are atomic,
// `wait` function is not atomic, but we are guarding `m_queue` that is
// used by this condition_variable, so clang checks that we have mutex locked
std::condition_variable_any m_cv; // Conditions to wake up workers
std::list<std::thread> m_workers; // Worker threads
VerilatedMutex m_stoppedJobsMutex; // Used to signal stopped jobs
// Conditions to wake up stopped jobs
std::condition_variable_any m_stoppedJobsCV VL_GUARDED_BY(m_stoppedJobsMutex);
std::atomic_uint m_stoppedJobs{0}; // Currently stopped jobs waiting for wake up
std::atomic_bool m_stopRequested{false}; // Signals to resume stopped jobs
std::atomic_bool m_exclusiveAccess{false}; // Signals that all other threads are stopped
std::atomic_bool m_shutdown{false}; // Termination pending
// CONSTRUCTORS
V3ThreadPool() = default;
~V3ThreadPool() {
VerilatedLockGuard lock{m_mutex};
m_queue = {}; // make sure queue is empty
lock.unlock();
resize(0);
}
public:
// METHODS
// Singleton
static V3ThreadPool& s() VL_MT_SAFE {
static V3ThreadPool s_s;
return s_s;
}
// Resize thread pool to n workers (queue must be empty)
void resize(unsigned n) VL_MT_UNSAFE;
// Enqueue a job for asynchronous execution
template <typename T>
std::future<T> enqueue(std::function<T()>&& f) VL_MT_SAFE;
// Request exclusive access to processing.
// It sends request to stop all other threads and waits for them to stop.
// Other threads needs to manually call 'check_stop_requested' in places where
// they can be stopped.
// When all other threads are stopped, this function executes the job
// and resumes execution of other jobs.
void requestExclusiveAccess(const job_t&& exclusiveAccessJob) VL_MT_SAFE;
// Check if other thread requested exclusive access to processing,
// if so, it waits for it to complete. Afterwards it is resumed.
// Returns true if request was send and we waited, otherwise false
bool waitIfStopRequested() VL_MT_SAFE;
template <typename T>
T waitForFuture(std::future<T>& future) VL_MT_SAFE_EXCLUDES(m_mutex);
static void selfTest();
private:
bool willExecuteSynchronously() const VL_MT_SAFE {
return m_workers.empty() || m_exclusiveAccess;
}
// True when any thread requested exclusive access
bool stopRequested() const VL_REQUIRES(m_stoppedJobsMutex) {
// don't wait if shutdown already requested
if (m_shutdown) return false;
return m_stopRequested;
}
bool stopRequestedStandalone() VL_MT_SAFE_EXCLUDES(m_stoppedJobsMutex) {
const VerilatedLockGuard lock{m_stoppedJobsMutex};
return stopRequested();
}
// Waits until exclusive access job completes its job
void waitStopRequested(VerilatedLockGuard& stoppedJobLock) VL_REQUIRES(m_stoppedJobsMutex);
// Waits until all other jobs are stopped
void waitOtherThreads(VerilatedLockGuard& stoppedJobLock) VL_MT_SAFE_EXCLUDES(m_mutex)
VL_REQUIRES(m_stoppedJobsMutex);
template <typename T>
void pushJob(std::shared_ptr<std::promise<T>>& prom, std::function<T()>&& f) VL_MT_SAFE;
void workerJobLoop(int id) VL_MT_SAFE;
static void startWorker(V3ThreadPool* selfThreadp, int id) VL_MT_SAFE;
};
template <typename T>
T V3ThreadPool::waitForFuture(std::future<T>& future) VL_MT_SAFE_EXCLUDES(m_mutex) {
while (true) {
waitIfStopRequested();
{
std::future_status status
= future.wait_for(std::chrono::milliseconds(V3ThreadPool::FUTUREWAITFOR_MS));
switch (status) {
case std::future_status::deferred: continue;
case std::future_status::timeout: continue;
case std::future_status::ready: return future.get();
}
}
}
}
template <typename T>
std::future<T> V3ThreadPool::enqueue(std::function<T()>&& f) VL_MT_SAFE {
std::shared_ptr<std::promise<T>> prom = std::make_shared<std::promise<T>>();
std::future<T> result = prom->get_future();
pushJob(prom, std::move(f));
const VerilatedLockGuard guard{m_mutex};
m_cv.notify_one();
return result;
}
template <typename T>
void V3ThreadPool::pushJob(std::shared_ptr<std::promise<T>>& prom,
std::function<T()>&& f) VL_MT_SAFE {
if (willExecuteSynchronously()) {
prom->set_value(f());
} else {
const VerilatedLockGuard guard{m_mutex};
m_queue.push([prom, f] { prom->set_value(f()); });
}
}
template <>
void V3ThreadPool::pushJob<void>(std::shared_ptr<std::promise<void>>& prom,
std::function<void()>&& f) VL_MT_SAFE;
#endif // Guard

View File

@ -90,6 +90,7 @@
#include "V3TSP.h"
#include "V3Table.h"
#include "V3Task.h"
#include "V3ThreadPool.h"
#include "V3Timing.h"
#include "V3Trace.h"
#include "V3TraceDecl.h"
@ -600,6 +601,9 @@ static void verilate(const string& argString) {
v3fatalSrc("VERILATOR_DEBUG_SKIP_IDENTICAL w/ --skip-identical: Changes found\n");
} // LCOV_EXCL_STOP
// Adjust thread pool size
V3ThreadPool::s().resize(v3Global.opt.verilateJobs());
// --FRONTEND------------------
// Cleanup
@ -619,6 +623,7 @@ static void verilate(const string& argString) {
V3Partition::selfTest();
V3Partition::selfTestNormalizeCosts();
V3Broken::selfTest();
V3ThreadPool::selfTest();
}
// Read first filename

View File

@ -0,0 +1,20 @@
#!/usr/bin/env perl
if (!$::Driver) { use FindBin; exec("$FindBin::Bin/bootstrap.pl", @ARGV, $0); die; }
# DESCRIPTION: Verilator: Verilog Test driver/expect definition
#
# Copyright 2023 by Wilson Snyder. This program is free software; you
# can redistribute it and/or modify it under the terms of either the GNU
# Lesser General Public License Version 3 or the Perl Artistic License
# Version 2.0.
# SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0
scenarios(vlt => 1);
top_filename("t/t_EXAMPLE.v");
lint(
v_flags => ["--lint-only --verilate-jobs 2 --debug-self-test"],
);
ok(1);
1;