diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 54d979b8a..b25c900b5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -150,6 +150,7 @@ set(HEADERS V3SymTable.h V3Table.h V3Task.h + V3ThreadPool.h V3Timing.h V3Trace.h V3TraceDecl.h @@ -283,6 +284,7 @@ set(COMMON_SOURCES V3Subst.cpp V3Table.cpp V3Task.cpp + V3ThreadPool.cpp V3Trace.cpp V3TraceDecl.cpp V3Tristate.cpp diff --git a/src/Makefile_obj.in b/src/Makefile_obj.in index dd177da20..9a515360b 100644 --- a/src/Makefile_obj.in +++ b/src/Makefile_obj.in @@ -265,6 +265,7 @@ RAW_OBJS = \ V3TSP.o \ V3Table.o \ V3Task.o \ + V3ThreadPool.o \ V3Timing.o \ V3Trace.o \ V3TraceDecl.o \ diff --git a/src/V3ThreadPool.cpp b/src/V3ThreadPool.cpp new file mode 100644 index 000000000..6f4ba5ed6 --- /dev/null +++ b/src/V3ThreadPool.cpp @@ -0,0 +1,198 @@ +// -*- mode: C++; c-file-style: "cc-mode" -*- +//************************************************************************* +// DESCRIPTION: Verilator: Thread pool for Verilator itself +// +// Code available from: https://verilator.org +// +//************************************************************************* +// +// Copyright 2005-2023 by Wilson Snyder. This program is free software; you can +// redistribute it and/or modify it under the terms of either the GNU +// Lesser General Public License Version 3 or the Perl Artistic License +// Version 2.0. +// SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0 +// +//************************************************************************* + +#include "config_build.h" + +#include "V3ThreadPool.h" + +#include "V3Error.h" + +// c++11 requires definition of static constexpr as well as declaration +constexpr unsigned int V3ThreadPool::FUTUREWAITFOR_MS; + +void V3ThreadPool::resize(unsigned n) VL_MT_UNSAFE { + // This function is not thread-safe and can result in race between threads + VerilatedLockGuard lock{m_mutex}; + VerilatedLockGuard stoppedJobsLock{m_stoppedJobsMutex}; + UASSERT(m_queue.empty(), "Resizing busy thread pool"); + // Shut down old threads + m_shutdown = true; + m_stoppedJobs = 0; + m_cv.notify_all(); + m_stoppedJobsCV.notify_all(); + stoppedJobsLock.unlock(); + lock.unlock(); + while (!m_workers.empty()) { + m_workers.front().join(); + m_workers.pop_front(); + } + lock.lock(); + // Start new threads + m_shutdown = false; + for (unsigned int i = 1; i < n; ++i) { + m_workers.emplace_back(&V3ThreadPool::startWorker, this, i); + } +} + +void V3ThreadPool::startWorker(V3ThreadPool* selfThreadp, int id) VL_MT_SAFE { + selfThreadp->workerJobLoop(id); +} + +void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE { + while (true) { + // Wait for a notification + waitIfStopRequested(); + job_t job; + { + VerilatedLockGuard lock(m_mutex); + m_cv.wait(lock, [&]() VL_REQUIRES(m_mutex) { + return !m_queue.empty() || m_shutdown || m_stopRequested; + }); + if (m_shutdown) return; // Terminate if requested + if (stopRequestedStandalone()) { continue; } + // Get the job + UASSERT(!m_queue.empty(), "Job should be available"); + + job = m_queue.front(); + m_queue.pop(); + } + + // Execute the job + job(); + } +} + +template <> +void V3ThreadPool::pushJob(std::shared_ptr>& prom, + std::function&& f) VL_MT_SAFE { + if (willExecuteSynchronously()) { + f(); + prom->set_value(); + } else { + const VerilatedLockGuard lock{m_mutex}; + m_queue.push([prom, f] { + f(); + prom->set_value(); + }); + } +} + +void V3ThreadPool::requestExclusiveAccess(const V3ThreadPool::job_t&& exclusiveAccessJob) + VL_MT_SAFE { + if (willExecuteSynchronously()) { + exclusiveAccessJob(); + } else { + VerilatedLockGuard stoppedJobLock{m_stoppedJobsMutex}; + // if some other job already requested exclusive access + // wait until it stops + if (stopRequested()) { waitStopRequested(stoppedJobLock); } + m_stopRequested = true; + waitOtherThreads(stoppedJobLock); + m_exclusiveAccess = true; + exclusiveAccessJob(); + m_exclusiveAccess = false; + m_stopRequested = false; + m_stoppedJobsCV.notify_all(); + } +} + +bool V3ThreadPool::waitIfStopRequested() VL_MT_SAFE { + VerilatedLockGuard stoppedJobLock(m_stoppedJobsMutex); + if (!stopRequested()) return false; + waitStopRequested(stoppedJobLock); + return true; +} + +void V3ThreadPool::waitStopRequested(VerilatedLockGuard& stoppedJobLock) + VL_REQUIRES(m_stoppedJobsMutex) { + ++m_stoppedJobs; + m_stoppedJobsCV.notify_all(); + m_stoppedJobsCV.wait( + stoppedJobLock, [&]() VL_REQUIRES(m_stoppedJobsMutex) { return !m_stopRequested.load(); }); + --m_stoppedJobs; + m_stoppedJobsCV.notify_all(); +} + +void V3ThreadPool::waitOtherThreads(VerilatedLockGuard& stoppedJobLock) + VL_MT_SAFE_EXCLUDES(m_mutex) VL_REQUIRES(m_stoppedJobsMutex) { + ++m_stoppedJobs; + m_stoppedJobsCV.notify_all(); + m_cv.notify_all(); + m_stoppedJobsCV.wait(stoppedJobLock, [&]() VL_REQUIRES(m_stoppedJobsMutex) { + // count also the main thread + return m_stoppedJobs == (m_workers.size() + 1); + }); + --m_stoppedJobs; +} + +void V3ThreadPool::selfTest() { + VerilatedMutex commonMutex; + int commonValue{0}; + + auto firstJob = [&](int sleep) -> void { + std::this_thread::sleep_for(std::chrono::milliseconds{sleep}); + s().requestExclusiveAccess([&]() { + commonValue = 10; + std::this_thread::sleep_for(std::chrono::milliseconds{sleep + 10}); + UASSERT(commonValue == 10, "unexpected commonValue = " << commonValue); + }); + }; + auto secondJob = [&](int sleep) -> void { + VerilatedLockGuard lock{commonMutex}; + lock.unlock(); + s().waitIfStopRequested(); + lock.lock(); + std::this_thread::sleep_for(std::chrono::milliseconds{sleep}); + commonValue = 1000; + }; + auto thirdJob = [&](int sleep) -> void { + VerilatedLockGuard lock{commonMutex}; + std::this_thread::sleep_for(std::chrono::milliseconds{sleep}); + lock.unlock(); + s().requestExclusiveAccess([&]() { firstJob(sleep); }); + lock.lock(); + commonValue = 1000; + }; + std::list> futures; + + futures.push_back(s().enqueue(std::bind(firstJob, 100))); + futures.push_back(s().enqueue(std::bind(secondJob, 100))); + futures.push_back(s().enqueue(std::bind(firstJob, 100))); + futures.push_back(s().enqueue(std::bind(secondJob, 100))); + futures.push_back(s().enqueue(std::bind(secondJob, 200))); + futures.push_back(s().enqueue(std::bind(firstJob, 200))); + futures.push_back(s().enqueue(std::bind(firstJob, 300))); + while (!futures.empty()) { + s().waitForFuture(futures.front()); + futures.pop_front(); + } + futures.push_back(s().enqueue(std::bind(thirdJob, 100))); + futures.push_back(s().enqueue(std::bind(thirdJob, 100))); + while (!futures.empty()) { + s().waitForFuture(futures.front()); + futures.pop_front(); + } + s().waitIfStopRequested(); + s().requestExclusiveAccess(std::bind(firstJob, 100)); + auto forthJob = [&]() -> int { return 1234; }; + std::list> futuresInt; + futuresInt.push_back(s().enqueue(forthJob)); + while (!futuresInt.empty()) { + int result = s().waitForFuture(futuresInt.front()); + UASSERT(result == 1234, "unexpected future result = " << commonValue); + futuresInt.pop_front(); + } +} diff --git a/src/V3ThreadPool.h b/src/V3ThreadPool.h new file mode 100644 index 000000000..b074491be --- /dev/null +++ b/src/V3ThreadPool.h @@ -0,0 +1,169 @@ +// -*- mode: C++; c-file-style: "cc-mode" -*- +//************************************************************************* +// DESCRIPTION: Verilator: Thread pool for Verilator itself +// +// Code available from: https://verilator.org +// +//************************************************************************* +// +// Copyright 2005-2023 by Wilson Snyder. This program is free software; you can +// redistribute it and/or modify it under the terms of either the GNU +// Lesser General Public License Version 3 or the Perl Artistic License +// Version 2.0. +// SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0 +// +//************************************************************************* + +#ifndef _V3THREADPOOL_H_ +#define _V3THREADPOOL_H_ 1 + +#include "verilated_threads.h" + +#include +#include +#include +#include +#include +#include +#include + +//============================================================================ + +class V3ThreadPool final { + // MEMBERS + static constexpr unsigned int FUTUREWAITFOR_MS = 100; + using job_t = std::function; + + mutable VerilatedMutex m_mutex; // Mutex for use by m_queue + std::queue m_queue VL_GUARDED_BY(m_mutex); // Queue of jobs + // We don't need to guard this condition_variable as + // both `notify_one` and `notify_all` functions are atomic, + // `wait` function is not atomic, but we are guarding `m_queue` that is + // used by this condition_variable, so clang checks that we have mutex locked + std::condition_variable_any m_cv; // Conditions to wake up workers + std::list m_workers; // Worker threads + VerilatedMutex m_stoppedJobsMutex; // Used to signal stopped jobs + // Conditions to wake up stopped jobs + std::condition_variable_any m_stoppedJobsCV VL_GUARDED_BY(m_stoppedJobsMutex); + std::atomic_uint m_stoppedJobs{0}; // Currently stopped jobs waiting for wake up + std::atomic_bool m_stopRequested{false}; // Signals to resume stopped jobs + std::atomic_bool m_exclusiveAccess{false}; // Signals that all other threads are stopped + + std::atomic_bool m_shutdown{false}; // Termination pending + + // CONSTRUCTORS + V3ThreadPool() = default; + ~V3ThreadPool() { + VerilatedLockGuard lock{m_mutex}; + m_queue = {}; // make sure queue is empty + lock.unlock(); + resize(0); + } + +public: + // METHODS + // Singleton + static V3ThreadPool& s() VL_MT_SAFE { + static V3ThreadPool s_s; + return s_s; + } + + // Resize thread pool to n workers (queue must be empty) + void resize(unsigned n) VL_MT_UNSAFE; + + // Enqueue a job for asynchronous execution + template + std::future enqueue(std::function&& f) VL_MT_SAFE; + + // Request exclusive access to processing. + // It sends request to stop all other threads and waits for them to stop. + // Other threads needs to manually call 'check_stop_requested' in places where + // they can be stopped. + // When all other threads are stopped, this function executes the job + // and resumes execution of other jobs. + void requestExclusiveAccess(const job_t&& exclusiveAccessJob) VL_MT_SAFE; + + // Check if other thread requested exclusive access to processing, + // if so, it waits for it to complete. Afterwards it is resumed. + // Returns true if request was send and we waited, otherwise false + bool waitIfStopRequested() VL_MT_SAFE; + + template + T waitForFuture(std::future& future) VL_MT_SAFE_EXCLUDES(m_mutex); + + static void selfTest(); + +private: + bool willExecuteSynchronously() const VL_MT_SAFE { + return m_workers.empty() || m_exclusiveAccess; + } + + // True when any thread requested exclusive access + bool stopRequested() const VL_REQUIRES(m_stoppedJobsMutex) { + // don't wait if shutdown already requested + if (m_shutdown) return false; + return m_stopRequested; + } + + bool stopRequestedStandalone() VL_MT_SAFE_EXCLUDES(m_stoppedJobsMutex) { + const VerilatedLockGuard lock{m_stoppedJobsMutex}; + return stopRequested(); + } + + // Waits until exclusive access job completes its job + void waitStopRequested(VerilatedLockGuard& stoppedJobLock) VL_REQUIRES(m_stoppedJobsMutex); + + // Waits until all other jobs are stopped + void waitOtherThreads(VerilatedLockGuard& stoppedJobLock) VL_MT_SAFE_EXCLUDES(m_mutex) + VL_REQUIRES(m_stoppedJobsMutex); + + template + void pushJob(std::shared_ptr>& prom, std::function&& f) VL_MT_SAFE; + + void workerJobLoop(int id) VL_MT_SAFE; + + static void startWorker(V3ThreadPool* selfThreadp, int id) VL_MT_SAFE; +}; + +template +T V3ThreadPool::waitForFuture(std::future& future) VL_MT_SAFE_EXCLUDES(m_mutex) { + while (true) { + waitIfStopRequested(); + { + std::future_status status + = future.wait_for(std::chrono::milliseconds(V3ThreadPool::FUTUREWAITFOR_MS)); + switch (status) { + case std::future_status::deferred: continue; + case std::future_status::timeout: continue; + case std::future_status::ready: return future.get(); + } + } + } +} + +template +std::future V3ThreadPool::enqueue(std::function&& f) VL_MT_SAFE { + std::shared_ptr> prom = std::make_shared>(); + std::future result = prom->get_future(); + pushJob(prom, std::move(f)); + const VerilatedLockGuard guard{m_mutex}; + m_cv.notify_one(); + return result; +} + +template +void V3ThreadPool::pushJob(std::shared_ptr>& prom, + std::function&& f) VL_MT_SAFE { + if (willExecuteSynchronously()) { + prom->set_value(f()); + } else { + const VerilatedLockGuard guard{m_mutex}; + m_queue.push([prom, f] { prom->set_value(f()); }); + } +} + +template <> +void V3ThreadPool::pushJob(std::shared_ptr>& prom, + std::function&& f) VL_MT_SAFE; + +#endif // Guard diff --git a/src/Verilator.cpp b/src/Verilator.cpp index 76956b5e8..73c542ac9 100644 --- a/src/Verilator.cpp +++ b/src/Verilator.cpp @@ -90,6 +90,7 @@ #include "V3TSP.h" #include "V3Table.h" #include "V3Task.h" +#include "V3ThreadPool.h" #include "V3Timing.h" #include "V3Trace.h" #include "V3TraceDecl.h" @@ -600,6 +601,9 @@ static void verilate(const string& argString) { v3fatalSrc("VERILATOR_DEBUG_SKIP_IDENTICAL w/ --skip-identical: Changes found\n"); } // LCOV_EXCL_STOP + // Adjust thread pool size + V3ThreadPool::s().resize(v3Global.opt.verilateJobs()); + // --FRONTEND------------------ // Cleanup @@ -619,6 +623,7 @@ static void verilate(const string& argString) { V3Partition::selfTest(); V3Partition::selfTestNormalizeCosts(); V3Broken::selfTest(); + V3ThreadPool::selfTest(); } // Read first filename diff --git a/test_regress/t/t_a3_selftest_thread.pl b/test_regress/t/t_a3_selftest_thread.pl new file mode 100755 index 000000000..a47da043c --- /dev/null +++ b/test_regress/t/t_a3_selftest_thread.pl @@ -0,0 +1,20 @@ +#!/usr/bin/env perl +if (!$::Driver) { use FindBin; exec("$FindBin::Bin/bootstrap.pl", @ARGV, $0); die; } +# DESCRIPTION: Verilator: Verilog Test driver/expect definition +# +# Copyright 2023 by Wilson Snyder. This program is free software; you +# can redistribute it and/or modify it under the terms of either the GNU +# Lesser General Public License Version 3 or the Perl Artistic License +# Version 2.0. +# SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0 + +scenarios(vlt => 1); + +top_filename("t/t_EXAMPLE.v"); + +lint( + v_flags => ["--lint-only --verilate-jobs 2 --debug-self-test"], + ); + +ok(1); +1;