#!/usr/bin/env python3 # pylint: disable=C0103,C0114,C0115,C0116,C0209,C0302,R0902,R0903,R0904,R0912,R0913,R0914,R0915,R0916,W0212,W0511,W0603,W1201 ###################################################################### import argparse import collections import glob import hashlib import json import logging import multiprocessing import os import pickle import platform import re import runpy import shutil import signal import subprocess import sys import time from functools import lru_cache # Eventually use python 3.9's cache from pprint import pformat, pprint from packaging import version if False: # pylint: disable=using-constant-test pprint(pformat("Ignored")) # Prevent unused warning # Map of all scenarios, with the names used to enable them All_Scenarios = { # yapf: disable 'dist': ['dist'], 'atsim': ['simulator', 'simulator_st', 'atsim'], 'ghdl': ['linter', 'simulator', 'simulator_st', 'ghdl'], 'iv': ['simulator', 'simulator_st', 'iv'], 'ms': ['linter', 'simulator', 'simulator_st', 'ms'], 'nc': ['linter', 'simulator', 'simulator_st', 'nc'], 'vcs': ['linter', 'simulator', 'simulator_st', 'vcs'], 'xrun': ['linter', 'simulator', 'simulator_st', 'xrun'], 'xsim': ['linter', 'simulator', 'simulator_st', 'xsim'], 'vlt': ['linter', 'simulator', 'simulator_st', 'vlt_all', 'vlt'], 'vltmt': ['simulator', 'vlt_all', 'vltmt'], # yapf: enable } # Globals test = None Arg_Tests = [] Arg_Driver_Verilator_Flags = [] Quitting = False Vltmt_Threads = 3 # So an 'import vltest_bootstrap' inside test files will do nothing sys.modules['vltest_bootstrap'] = {} ####################################################################### ####################################################################### # Decorators class staticproperty(property): def __get__(self, owner_self, owner_cls): return self.fget() ####################################################################### ####################################################################### # VtOs -- OS extensions class VtOs: @staticmethod def delenv(var: str) -> None: """Delete environment variable, if exists""" if var in os.environ: del os.environ[var] @staticmethod def getenv_def(var: str, default=None) -> str: """Return environment variable, returning default if does not exist""" if var in os.environ: return os.environ[var] return default @staticmethod def mkdir_ok(path: str) -> None: """Make directory, no error if exists""" try: os.mkdir(path) except FileExistsError: pass @staticmethod def run_capture(command: str, check=True) -> str: """Run a command and return results""" proc = subprocess.run([command], capture_output=True, text=True, shell=True, check=False) if check and proc.returncode: sys.exit("%Error: command failed '" + command + "':\n" + proc.stderr + proc.stdout) return str(proc.stdout) @staticmethod def unlink_ok(filename: str) -> None: """Unlink a file, no error if fails""" try: os.unlink(filename) except OSError: pass ####################################################################### ####################################################################### # Capabilities -- What OS/Verilator is built to support class Capabilities: # @lru_cache(maxsize=1024) broken with @staticmethod on older pythons we use _cached_cmake_version = None _cached_cxx_version = None _cached_have_coroutines = None _cached_have_gdb = None _cached_have_sc = None _cached_have_solver = None _cached_make_version = None @staticproperty def cmake_version() -> str: # pylint: disable=no-method-argument if Capabilities._cached_cmake_version is None: out = VtOs.run_capture('cmake --version', check=False) match = re.search(r'cmake version (\d+)\.(\d+)', out, re.IGNORECASE) if match: Capabilities._cached_cmake_version = match.group(1) + "." + match.group(2) else: Capabilities._cached_cmake_version = 0 return Capabilities._cached_cmake_version @staticproperty def cxx_version() -> str: # pylint: disable=no-method-argument if Capabilities._cached_cxx_version is None: Capabilities._cached_cxx_version = VtOs.run_capture(os.environ['MAKE'] + " -C " + os.environ['TEST_REGRESS'] + " -f Makefile print-cxx-version", check=False) return Capabilities._cached_cxx_version @staticproperty def have_coroutines() -> bool: # pylint: disable=no-method-argument if Capabilities._cached_have_coroutines is None: Capabilities._cached_have_coroutines = bool( Capabilities._verilator_get_supported('COROUTINES')) return Capabilities._cached_have_coroutines @staticproperty def have_gdb() -> bool: # pylint: disable=no-method-argument if Capabilities._cached_have_gdb is None: out = VtOs.run_capture('gdb --version 2>/dev/null', check=False) Capabilities._cached_have_gdb = bool('Copyright' in out) return Capabilities._cached_have_gdb @staticproperty def have_sc() -> bool: # pylint: disable=no-method-argument if Capabilities._cached_have_sc is None: if 'SYSTEMC' in os.environ: Capabilities._cached_have_sc = True elif 'SYSTEMC_INCLUDE' in os.environ: Capabilities._cached_have_sc = True elif 'CFG_HAVE_SYSTEMC' in os.environ: Capabilities._cached_have_sc = True else: Capabilities._cached_have_sc = bool( Capabilities._verilator_get_supported('SYSTEMC')) return Capabilities._cached_have_sc @staticproperty def have_solver() -> bool: # pylint: disable=no-method-argument if Capabilities._cached_have_solver is None: out = VtOs.run_capture('(z3 --help || cvc5 --help || cvc4 --help) 2>/dev/null', check=False) Capabilities._cached_have_solver = bool('Usage' in out) return Capabilities._cached_have_solver @staticproperty @lru_cache(maxsize=1024) def make_version() -> str: # pylint: disable=no-method-argument if Capabilities._cached_make_version is None: out = VtOs.run_capture(os.environ['MAKE'] + ' --version', check=False) match = re.search(r'make ([0-9]+\.[0-9]+)', out, flags=re.IGNORECASE) if match: Capabilities._cached_make_version = match.group(1) else: Capabilities._cached_make_version = -1 return Capabilities._cached_make_version # Fetch @staticmethod def warmup_cache() -> None: _ignore = Capabilities.have_coroutines _ignore = Capabilities.have_gdb _ignore = Capabilities.have_sc _ignore = Capabilities.have_solver # Internals @staticmethod def _verilator_get_supported(feature) -> str: # Returns if given feature is supported cmd = "perl " + os.environ['VERILATOR_ROOT'] + "/bin/verilator -get-supported " + feature out = VtOs.run_capture(cmd, check=False).strip() if out == '1': return True if out in ('', '0'): return False sys.exit("%Error: couldn't verilator_get_supported: " + cmd + "\n" + out) ####################################################################### ####################################################################### # Forker class - run multprocess pool of processes # Similar interface to Perl's Parallel::Forker. class Forker: class Job: def __init__(self, _id, name, scenario, args, quiet, rerun_skipping, run_pre_start, run_on_start, run_on_finish): self.fail_max_skip = False self.id = _id self.name = name self.scenario = scenario self.args = args self.quiet = quiet self.rerun_skipping = rerun_skipping self.run_pre_start = run_pre_start self.run_on_start = run_on_start self.run_on_finish = run_on_finish self.mprocess = None # Set once call run() @property def pid(self) -> int: return self.mprocess.pid @property def exitcode(self) -> int: return self.mprocess.exitcode def __init__(self, max_processes): self._max_processes = max_processes self._id_next = 0 self._left = collections.deque() # deque of Job self._running = {} # key of pid, value of Job def is_any_left(self) -> bool: return self.num_running() > 0 or (len(self._left) > 0 and not Quitting) def max_proc(self, n: int) -> None: self._max_processes = n def poll(self) -> bool: """Run threads, returning if more work to do (if False, sleep)""" # We don't use SIGCHLD as conflicted with other handler, instead just poll completed = [] # Need two passes to avoid changing the list we are iterating nrunning = 0 more_now = False for process in self._running.values(): if process.exitcode is not None: completed.append(process) more_now = True else: nrunning += 1 # Start new work now, so running in background while we then collect completions while len(self._left) and nrunning < self._max_processes and not Quitting: process = self._left.popleft() self._run(process) nrunning += 1 more_now = True for process in completed: self._finished(process) return more_now def running(self) -> list: return self._running.values() def num_running(self) -> int: return len(self._running) def schedule(self, name, scenario, args, quiet, rerun_skipping, run_pre_start, run_on_start, run_on_finish): # print("-Forker::schedule: [" + name + "]") process = Forker.Job(self._id_next, name=name, scenario=scenario, args=args, quiet=quiet, rerun_skipping=rerun_skipping, run_pre_start=run_pre_start, run_on_start=run_on_start, run_on_finish=run_on_finish) self._id_next += 1 self._left.append(process) def kill_tree_all(self) -> None: # print("-Forker: kill_tree_all") for process in self._running.values(): process.mprocess.kill() def _run(self, process) -> None: # print("-Forker: [" + process.name + "] run_pre_start") process.run_pre_start(process) ctx = multiprocessing.get_context('forkserver') process.mprocess = ctx.Process( # target=forker.child_start, # pylint: disable=used-before-assignment args=(process, )) process.mprocess.start() # print("-Forker: [" + process.name + "] RUNNING pid=" + str(process.pid)) self._running[process.pid] = process @staticmethod def child_start(process) -> None: # Runs in context of child # print("-Forker: [" + process.name + "] run_on_start") process.run_on_start(process) # print("-Forker: [" + process.name + "] FINISHED run_on_start") sys.exit(0) # Don't close anything def _finished(self, process) -> None: del self._running[process.pid] # print("-Forker: [" + process.name + "] run_on_finish exitcode=" + str(process.exitcode)) process.run_on_finish(process) process.mprocess.close() ####################################################################### ####################################################################### # Runner class class Runner: def __init__(self, driver_log_filename, quiet, ok_cnt=0, fail1_cnt=0, skip_cnt=0): self.driver_log_filename = driver_log_filename self.quiet = quiet # Counts self.all_cnt = 0 self.left_cnt = 0 self.ok_cnt = ok_cnt self.fail1_cnt = fail1_cnt self.fail_cnt = 0 self.skip_cnt = skip_cnt self.skip_msgs = [] self.fail_msgs = [] self.fail_tests = [] self._last_proc_finish_time = 0 self._last_summary_time = 0 self._last_summary_left = 0 self._running_ids = {} self._msg_fail_max_skip = False Runner.runner = self def one_test(self, py_filename: str, scenario: str, rerun_skipping=False) -> None: self.all_cnt += 1 self.left_cnt += 1 forker.schedule(name=py_filename, scenario=scenario, args=Args, quiet=self.quiet, rerun_skipping=rerun_skipping, run_pre_start=self._run_pre_start_static, run_on_start=self._run_on_start_static, run_on_finish=self._run_on_finish_static) @staticmethod def _run_pre_start_static(process) -> None: Runner.runner._run_pre_start(process) # pylint: disable=protected-access def _run_pre_start(self, process) -> None: # Running in context of parent, before run_on_start # Make an identifier that is unique across all current running jobs i = 1 while i in self._running_ids: i += 1 process.running_id = i self._running_ids[process.running_id] = 1 if Args.fail_max and Args.fail_max <= self.fail_cnt: if not self._msg_fail_max_skip: self._msg_fail_max_skip = True print("== Too many test failures; exceeded --fail-max\n", file=sys.stderr) process.fail_max_skip = 1 @staticmethod def _run_on_start_static(process) -> None: # Running in context of child, so can't pass data to parent directly if process.quiet: sys.stdout = open(os.devnull, 'w') # pylint: disable=R1732,unspecified-encoding sys.stderr = open(os.devnull, 'w') # pylint: disable=R1732,unspecified-encoding print("=" * 70) global Args Args = process.args global test test = VlTest(py_filename=process.name, scenario=process.scenario, running_id=process.running_id) test.oprint("=" * 50) test._prep() if process.rerun_skipping: print(" ---------- Earlier logfiles below; test was rerunnable = False\n") os.system("cat " + test.obj_dir + "/*.log") print(" ---------- Earlier logfiles above; test was rerunnable = False\n") elif process.fail_max_skip: test.skip("Too many test failures; exceeded --fail-max") else: VtOs.unlink_ok(test._status_filename) test._read() # Don't put anything other than _exit after _read, # as may call _exit via another path test._exit() @staticmethod def _run_on_finish_static(process) -> None: Runner.runner._run_on_finish(process) # pylint: disable=protected-access def _run_on_finish(self, process) -> None: # Running in context of parent global test test = VlTest(py_filename=process.name, scenario=process.scenario, running_id=process.running_id) test._quit = Quitting test._read_status() if test.ok: self.ok_cnt += 1 elif test._quit: pass elif test._scenario_off and not test.errors: pass elif test._skips and not test.errors and not test.errors_keep_going: self.skip_msgs.append("\t#" + test.soprint("-Skip: " + test._skips)) self.skip_cnt += 1 else: error_msg = test.errors if test.errors else test.errors_keep_going test.oprint("FAILED: " + error_msg) makecmd = VtOs.getenv_def('VERILATOR_MAKE', os.environ['MAKE']) + " &&" upperdir = 'test_regress/' if re.search(r'test_regress', os.getcwd()) else '' self.fail_msgs.append("\t#" + test.soprint("%Error: " + error_msg) + "\t\t" + makecmd + " " + upperdir + test.py_filename + ' '.join(self._manual_args()) + " --" + test.scenario + "\n") self.fail_tests.append(test) self.fail_cnt += 1 self.report(self.driver_log_filename) other = "" for proc in forker.running(): other += " " + proc.name if other != "" and not Args.quiet: test.oprint("Simultaneous running tests:" + other) if Args.stop: sys.exit("%Error: --stop and errors found\n") self.left_cnt -= 1 self._last_proc_finish_time = time.time() if process.running_id: del self._running_ids[process.running_id] def wait_and_report(self) -> None: self.print_summary(force=True) # Wait for all children to finish while forker.is_any_left(): more_now = forker.poll() if not Args.interactive_debugger: self.print_summary(force=False) if not more_now: time.sleep(0.1) self.report(None) self.report(self.driver_log_filename) def report(self, filename: str) -> None: if filename: with open(filename, "w", encoding="utf8") as fh: self._report_fh(fh) else: self._report_fh(sys.stdout) def _report_fh(self, fh) -> None: fh.write("\n") fh.write('=' * 70 + "\n") for f in sorted(self.fail_msgs): fh.write(f.strip() + "\n") for f in sorted(self.skip_msgs): fh.write(f.strip() + "\n") if self.fail_cnt: sumtxt = 'FAILED' elif self.skip_cnt: sumtxt = 'PASSED w/SKIPS' else: sumtxt = 'PASSED' fh.write("==TESTS DONE, " + sumtxt + ": " + self.sprint_summary() + "\n") def print_summary(self, force=False): change = self._last_summary_left != self.left_cnt if (force or ((time.time() - self._last_summary_time) >= 15) or (not self.quiet and change)): self._last_summary_left = self.left_cnt self._last_summary_time = time.time() print("==SUMMARY: " + self.sprint_summary(), file=sys.stderr) if (self._last_proc_finish_time != 0 and ((time.time() - self._last_proc_finish_time) > 15)): self._last_proc_finish_time = time.time() other = "" for proc in forker.running(): other += " " + proc.name print("==STILL RUNNING:" + other, file=sys.stderr) @staticmethod def _py_filename_adjust(py_filename: str, tdir_def: str) -> list: # Return (py_filename, t_dir) for tdir in Args.test_dirs: # pylint: disable=redefined-outer-name # t_dir used both absolutely and under obj_dir try_py_filename = tdir + "/" + os.path.basename(py_filename) if os.path.exists(try_py_filename): # Note most tests require error messages of the form t/x.v # Therefore py_filename must be t/ for local tests # t_dir must be absolute - used under t or under obj_dir tdir_abs = os.path.abspath(tdir) return (try_py_filename, tdir_abs) return (py_filename, os.path.abspath(tdir_def)) def sprint_summary(self) -> str: delta = time.time() - Start # pylint: disable=used-before-assignment # Fudge of 120% works out about right so ETA correctly predicts completion time eta = 1.2 * ((self.all_cnt * (delta / ((self.all_cnt - self.left_cnt) + 0.001))) - delta) if delta < 10: eta = 0 out = "" if self.left_cnt: out += "Left " + str(self.left_cnt) + " " out += "Passed " + str(self.ok_cnt) # Ordered below most severe to least severe out += " Failed " + str(self.fail_cnt) if self.fail1_cnt: out += " Failed-First " + str(self.fail1_cnt) if self.skip_cnt: out += " Skipped " + str(self.skip_cnt) if forker.num_running(): out += " Running " + str(forker.num_running()) if self.left_cnt > 10 and eta > 10: out += " Eta %d:%02d" % (int(eta / 60), eta % 60) out += " Time %d:%02d" % (int(delta / 60), delta % 60) return out def _manual_args(self) -> str: # Return command line with scenarios stripped out = [] for oarg in Args.orig_argv_sw: showarg = True for val in All_Scenarios.values(): for allscarg in val: if oarg == "--" + allscarg: showarg = False # Also strip certain flags that per-test debugging won't want if showarg and oarg != '--rerun' and oarg != '--quiet': out.append(oarg) return out ####################################################################### ####################################################################### # Test exceptions class VtSkipException(Exception): pass class VtErrorException(Exception): pass ####################################################################### ####################################################################### # Test class class VlTest: _file_contents_cache = {} # @lru_cache(maxsize=1024) broken with @staticmethod on older pythons we use _cached_aslr_off = None _cached_cfg_with_ccache = None def __init__(self, py_filename, scenario, running_id): self.py_filename = py_filename # Name of .py file to get setup from self.running_id = running_id self.scenario = scenario self._have_solver_called = False self._inputs = {} self._ok = False self._quit = False self._scenario_off = False # scenarios() didn't match running scenario self._skips = None match = re.match(r'^(.*/)?([^/]*)\.py', self.py_filename) self.name = match.group(2) # Name of this test self.benchmark = Args.benchmark self.benchmarksim = False self.clean_command = None self.context_threads = 0 # Number of threads to allocate in the context self.errors = None self.errors_keep_going = None self.main_time_multiplier = 1 self.make_main = 1 # Make __main.cpp self.make_pli = 0 # need to compile pli self.make_top_shell = 1 # Make a default __top.v file self.rerunnable = True # Rerun if fails self.sc_time_resolution = "SC_PS" # Keep - PS is SystemC default self.sim_time = 1100 # simulation time units for main wrapper self.threads = -1 # --threads (negative means auto based on scenario) self.verbose = Args.verbose self.verilated_randReset = "" self.vm_prefix = "V" + self.name # Make e.g. self.vlt, self.vltmt etc self.vlt = False # Set below also self.vltmt = False # Set below also self.xsim = False # Set below also for ascenario in All_Scenarios: self.__dict__[ascenario] = False self.__dict__[scenario] = True self.vlt_all = self.vlt or self.vltmt # Any Verilator scenario (self.py_filename, self.t_dir) = Runner._py_filename_adjust(self.py_filename, ".") for tdir in Args.test_dirs: # pylint: disable=redefined-outer-name # t_dir used both absolutely and under obj_dir self.t_dir = None if os.path.exists(tdir + "/" + self.name + ".py"): # Note most tests require error messages of the form t/x.v # Therefore py_filename must be t/ for local tests self.py_filename = os.path.relpath(tdir + "/" + self.name + ".py") # t_dir must be absolute - used under t or under obj_dir self.t_dir = os.path.abspath(tdir) break if not self.t_dir: sys.exit("%Error: Can't locate dir for " + self.name) scen_dir = os.path.relpath(self.t_dir + "/../obj_" + self.scenario) # Simplify filenames on local runs scen_dir = re.sub(r'^t/\.\./', '', scen_dir) # Not mkpath so error if try to build somewhere odd VtOs.mkdir_ok(scen_dir) self.obj_dir = scen_dir + "/" + self.name define_opt = self._define_opt_calc() # All compilers self.v_flags = [] if self.xsim: self.v_flags += ["-f input.xsim.vc"] elif os.path.exists('input.vc'): self.v_flags += ["-f input.vc"] if not re.search(r'/test_regress', self.t_dir): # Don't include standard dir, only site's self.v_flags += ["+incdir+" + self.t_dir + " -y " + self.t_dir] self.v_flags += [define_opt + "TEST_OBJ_DIR=" + self.obj_dir] if Args.verbose: self.v_flags += [define_opt + "TEST_VERBOSE=1"] if Args.benchmark: self.v_flags += [define_opt + "TEST_BENCHMARK=Args.benchmark"] if Args.trace: self.v_flags += [define_opt + "WAVES=1"] self.v_flags2 = [] # Overridden in some sim files self.v_other_filenames = [] self.all_run_flags = [] self.pli_flags = [ "-I" + os.environ['VERILATOR_ROOT'] + "/include/vltstd", "-fPIC", "-shared" ] if platform.system() == 'Darwin': self.pli_flags += ["-Wl,-undefined,dynamic_lookup"] else: self.pli_flags += ["-rdynamic"] if Args.verbose: self.pli_flags += ["-DTEST_VERBOSE=1"] self.pli_flags += ["-o", self.obj_dir + "/libvpi.so"] self.tool_c_flags = [] # ATSIM self.atsim_define = 'ATSIM' self.atsim_flags = [ "-c", "+sv", "+define+ATSIM", ("+sv_dir+" + self.obj_dir + "/.athdl_compile") ] self.atsim_flags2 = [] # Overridden in some sim files self.atsim_run_flags = [] # GHDL self.ghdl_define = 'GHDL' self.ghdl_work_dir = self.obj_dir + "/ghdl_compile" self.ghdl_flags = [("-v" if Args.debug else ""), ("--workdir=" + self.obj_dir + "/ghdl_compile")] self.ghdl_flags2 = [] # Overridden in some sim files self.ghdl_run_flags = [] # IV self.iv_define = 'IVERILOG' self.iv_flags = ["+define+IVERILOG", "-g2012", ("-o" + self.obj_dir + "/simiv")] self.iv_flags2 = [] # Overridden in some sim files self.iv_run_flags = [] # VCS self.vcs_define = 'VCS' self.vcs_flags = [ "+vcs+lic+wait", "+cli", "-debug_access", "+define+VCS+1", "-q", "-sverilog", "-CFLAGS", "'-DVCS'" ] self.vcs_flags2 = [] # Overridden in some sim files self.vcs_run_flags = ["+vcs+lic_wait"] # NC self.nc_define = 'NC' self.nc_flags = [ "+licqueue", "+nowarn+LIBNOU", "+define+NC=1", "-q", "+assert", "+sv", "-c", ("+access+r" if Args.trace else "") ] self.nc_flags2 = [] # Overridden in some sim files self.nc_run_flags = ["+licqueue", "-q", "+assert", "+sv", "-R"] # ModelSim self.ms_define = 'MS' self.ms_flags = [ "-sv", "-work", (self.obj_dir + "/work"), "+define+MS=1", "-ccflags", '\"-DMS=1\"' ] self.ms_flags2 = [] # Overridden in some sim files self.ms_run_flags = ["-lib", self.obj_dir + "/work", "-c", "-do", "'run -all;quit'"] # XSim self.xsim_define = 'XSIM' self.xsim_flags = [ "--nolog", "--sv", "--define", "XSIM", "--work", (self.name + "=" + self.obj_dir + "/xsim") ] self.xsim_flags2 = [] # Overridden in some sim files self.xsim_run_flags = [ "--nolog", "--runall", "--lib", (self.name + "=" + self.obj_dir + "/xsim"), (" --debug all" if Args.trace else "") ] self.xsim_run_flags2 = [] # Overridden in some sim files # Xcelium (xrun) self.xrun = 0 self.xrun_define = 'XRUN' self.xrun_flags = [] # Doesn't really have a compile step self.xrun_flags2 = [] # Overridden in some sim files self.xrun_run_flags = [ "-64", "-access", "+rwc", "-newsv", "-sv", "-xmlibdirname", self.obj_dir + "/work", "-l", self.obj_dir + "/history", "-quiet", "-plinowarn" ] # Verilator self.verilator_define = 'VERILATOR' self.verilator_flags = [ "-cc", "-Mdir", self.obj_dir, "--fdedup", # As currently disabled unless -O3 "--debug-check", "--comp-limit-members", "10" ] self.verilator_flags2 = [] self.verilator_flags3 = ["--clk clk"] self.verilator_make_gmake = True self.verilator_make_cmake = False self.verilated_debug = Args.verilated_debug self._status_filename = self.obj_dir + "/V" + self.name + ".status" self.coverage_filename = self.obj_dir + "/coverage.dat" self.golden_filename = re.sub(r'\.py$', '.out', self.py_filename) self.main_filename = self.obj_dir + "/" + self.vm_prefix + "__main.cpp" self.compile_log_filename = self.obj_dir + "/vlt_compile.log" self.run_log_filename = self.obj_dir + "/vlt_sim.log" self.stats = self.obj_dir + "/V" + self.name + "__stats.txt" self.top_filename = re.sub(r'\.py$', '', self.py_filename) + '.' + self.v_suffix self.pli_filename = re.sub(r'\.py$', '', self.py_filename) + '.cpp' self.top_shell_filename = self.obj_dir + "/" + self.vm_prefix + "__top.v" def _define_opt_calc(self) -> str: return "--define " if self.xsim else "+define+" def init_benchmarksim(self) -> None: # Simulations with benchmarksim enabled append to the same file between runs. # Test files must ensure a clean benchmark data file before executing tests. filename = self.benchmarksim_filename with open(filename, 'w', encoding="utf8") as fh: fh.write("# Verilator simulation benchmark data\n") fh.write("# Test name: " + self.name + "\n") fh.write("# Top file: " + self.top_filename + "\n") fh.write("evals, time[s]\n") def soprint(self, message: str) -> str: message = message.rstrip() + "\n" message = self.scenario + "/" + self.name + ": " + message return message def oprint(self, message: str) -> None: message = message.rstrip() + "\n" print(self.soprint(message), end="") def error(self, message: str) -> None: """Called from tests as: error("Reason message") Newline is optional. Only first line is passed to summaries Throws a VtErrorException, so rest of testing is not executed""" message = message.rstrip() + "\n" print("%Warning: " + self.scenario + "/" + self.name + ": " + message, file=sys.stderr, end="") if not self.errors: message = re.sub(r'\n.*', '\n', message) self.errors = message raise VtErrorException def error_keep_going(self, message: str) -> None: """Called from tests as: error_keep_going("Reason message") Newline is optional. Only first line is passed to summaries""" if self._quit: return message = message.rstrip() + "\n" print("%Warning: " + self.scenario + "/" + self.name + ": " + message, file=sys.stderr, end="") if not self.errors_keep_going: message = re.sub(r'\n.*', '\n', message) self.errors_keep_going = message def skip(self, message: str) -> None: """Called from tests as: skip("Reason message"[, ...]) Newline is optional. Only first line is passed to summaries. Throws a VtSkipException, so rest of testing is not executed""" message = message.rstrip() + "\n" print("-Skip: " + self.scenario + "/" + self.name + ": " + message, file=sys.stderr, end="") if not self._skips: message = re.sub(r'\n.*', '\n', message) self._skips = message raise VtSkipException def scenarios(self, *scenario_list) -> None: """Called from tests as: scenarios() to specify which scenarios this test runs under. Where ... is one cases listed in All_Scenarios. All scenarios must be on one line; this is parsed outside Python.""" enabled_scenarios = {} for param in scenario_list: hit = False for allsc, allscs in All_Scenarios.items(): for allscarg in allscs: if param == allscarg: hit = True enabled_scenarios[allsc] = True if not hit: self.error("scenarios(...) has unknown scenario '" + param + "'") if not enabled_scenarios.get(self.scenario, None): self._scenario_off = True self.skip("scenario '" + self.scenario + "' not enabled for test") # self._exit() implied by skip's exception @staticmethod def _prefilter_scenario(py_filename: str, scenario: str) -> bool: """Read a python file to see if scenarios require it to be run. Much faster than parsing the file for a runtime check.""" (py_filename, _) = Runner._py_filename_adjust(py_filename, ".") with open(py_filename, 'r', encoding="utf-8") as fh: for line in fh: m = re.search(r'^\s*test.scenarios\((.*?)\)', line) if m: for param in re.findall(r"""["']([^,]*)["']""", m.group(1)): for allscarg in All_Scenarios[scenario]: if param == allscarg: return True return False def _prep(self) -> None: VtOs.mkdir_ok(self.obj_dir) # Ok if already exists def _read(self) -> None: if not os.path.exists(self.py_filename): self.error("Can't open " + self.py_filename) return global test test = self sys.path.append(self.t_dir) # To find vltest_bootstrap.py # print("_read/exec py_filename=" + self.py_filename) # print("_read/exec dir=", ' '.join(dir())) # print("_read/exec vars=", ' '.join(vars().keys())) # print("_read/exec globals=", ' '.join(globals().keys())) # print("_read/exec locals=", ' '.join(locals().keys())) try: runpy.run_path(self.py_filename, globals()) except (VtErrorException, VtSkipException): pass def _exit(self): if self.ok: self.oprint("Self PASSED") elif self._skips and not self.errors: self.oprint("-Skip: " + self._skips) else: # If no worse errors, promote errors_keep_going to normal errors if not self.errors and self.errors_keep_going: self.errors = self.errors_keep_going if not self.errors: self.error("Missing ok") self.oprint("%Error: " + self.errors) self._write_status() sys.exit(0) def _write_status(self) -> None: with open(self._status_filename, "wb") as fh: pass_to_driver = { '_ok': self._ok, '_scenario_off': self._scenario_off, '_skips': self._skips, 'errors': self.errors, } pickle.dump(pass_to_driver, fh) def _read_status(self) -> None: filename = self._status_filename if not os.path.isfile(filename): self.error_keep_going("Child test did not return status (test has Python error?): " + self.py_filename) return with open(filename, "rb") as fh: dic = pickle.load(fh) for k in dic.keys(): # print("_read_status " + filename + ".test['" + k + "]=" + pformat(dic[k])) setattr(self, k, dic[k]) #---------------------------------------------------------------------- # Methods invoked by tests def clean(self) -> None: """Called on a rerun to cleanup files.""" if self.clean_command: os.system(self.clean_command) # Prevents false-failures when switching compilers # Remove old results to force hard rebuild os.system('/bin/rm -rf ' + self.obj_dir + '__fail1') os.system('/bin/mv ' + self.obj_dir + ' ' + self.obj_dir + '__fail1') def clean_objs(self) -> None: os.system("/bin/rm -rf " + ' '.join(glob.glob(self.obj_dir + "/*"))) def _checkflags(self, param): checkflags = ( ' ' + ' '.join(param['v_flags']) + # ' ' + ' '.join(param['v_flags2']) + # ' ' + ' '.join(param['verilator_flags']) + # ' ' + ' '.join(param['verilator_flags2']) + # ' ' + ' '.join(param['verilator_flags3']) + ' ') return checkflags def compile_vlt_cmd(self, **kwargs) -> list: """Return default command list to run verilator""" param = {'stdout_filename': None} param.update(vars(self)) param.update(kwargs) vlt_cmd = [ "perl", os.environ["VERILATOR_ROOT"] + "/bin/verilator", *self._compile_vlt_flags(**param), param['top_filename'], *param['v_other_filenames'] ] if param['stdout_filename']: vlt_cmd += ["> " + param['stdout_filename']] return vlt_cmd def _compile_vlt_flags(self, **param) -> list: checkflags = self._checkflags(param) d_verilator_flags = ' ' + ' '.join(self.driver_verilator_flags) + ' ' self.pins_sc_uint_bool = ( # pylint: disable=attribute-defined-outside-init bool( re.search(r'-pins-sc-uint-bool\b', checkflags) or re.search(r'-pins-sc-uint-bool\b', d_verilator_flags))) self.savable = ( # pylint: disable=attribute-defined-outside-init bool(re.search(r'-savable\b', checkflags))) self.coverage = ( # pylint: disable=attribute-defined-outside-init bool(re.search(r'-coverage\b', checkflags))) self.sanitize = param.get('sanitize', None) # pylint: disable=attribute-defined-outside-init self.sc = ( # pylint: disable=attribute-defined-outside-init bool(re.search(r'-sc\b', checkflags))) self.timing = ( # pylint: disable=attribute-defined-outside-init bool(re.search(r'( -?-timing\b| -?-binary\b)', checkflags))) self.trace = ( # pylint: disable=attribute-defined-outside-init bool(Args.trace or re.search(r'-trace\b|-trace-fst\b', checkflags))) if re.search(r'-trace-fst', checkflags): if self.sc: self.trace_format = 'fst-sc' # pylint: disable=attribute-defined-outside-init else: self.trace_format = 'fst-c' # pylint: disable=attribute-defined-outside-init elif self.sc: self.trace_format = 'vcd-sc' # pylint: disable=attribute-defined-outside-init else: self.trace_format = 'vcd-c' # pylint: disable=attribute-defined-outside-init if param.get('benchmarksim', None): self.benchmarksim = True # pylint: disable=attribute-defined-outside-init verilator_flags = [*param.get('verilator_flags', "")] if Args.gdb: verilator_flags += ["--gdb"] if Args.gdbbt: verilator_flags += ["--gdbbt"] if Args.rr: verilator_flags += ["--rr"] if Args.trace: verilator_flags += ["--trace"] if Args.gdbsim or Args.rrsim: verilator_flags += ["-CFLAGS -ggdb -LDFLAGS -ggdb"] verilator_flags += ["--x-assign unique"] # More likely to be buggy if param['vltmt']: verilator_flags += ["--debug-partition"] if param['threads'] >= 0: verilator_flags += ["--threads", str(param['threads'])] if param['vltmt'] and re.search(r'-trace-fst ', checkflags): verilator_flags += ["--trace-threads 2"] if self.sanitize: verilator_flags += [ "-CFLAGS -fsanitize=address,undefined -LDFLAGS -fsanitize=address,undefined" ] if param['verilator_make_cmake']: verilator_flags += ["--make cmake"] if param['verilator_make_gmake']: verilator_flags += ["--make gmake"] if param['make_main'] and param['verilator_make_gmake']: verilator_flags += ["--exe"] if param['make_main'] and param['verilator_make_gmake']: verilator_flags += ["../" + self.main_filename] cmdargs = [ "--prefix", param['vm_prefix'], *verilator_flags, *param['verilator_flags2'], *param['verilator_flags3'], *param['v_flags'], *param['v_flags2'], # Flags from driver cmdline override default flags and # flags from the test itself *self.driver_verilator_flags, ] return cmdargs def lint(self, **kwargs) -> None: """Run a linter. Arguments similar to run(); default arguments are from self""" param = {} param.update(vars(self)) param.update({ # Lint-specific default overrides 'make_main': False, 'make_top_shell': False, 'verilator_flags2': ["--lint-only"], 'verilator_make_gmake': False }) param.update(kwargs) self.compile(**param) def compile(self, **kwargs) -> None: """Run simulation compiler. Arguments similar to run(); default arguments are from self""" param = { 'expect_filename': None, 'fails': False, 'make_flags': [], 'tee': True, 'timing_loop': False, } param.update(vars(self)) param.update(kwargs) if self.verbose: self.oprint("Compile") if param['vlt'] and param['threads'] > 1: self.error("'threads =' argument must be <= 1 for vlt scenario") # Compute automatic parameter values checkflags = self._checkflags(param) if re.search(r'(^|\s)-?-threads\s', checkflags): self.error("Specify threads via 'threads=' argument, not as a command line option") if param['threads'] < 0 and param['vltmt']: param['threads'] = calc_threads(Vltmt_Threads) if not param['context_threads']: param['context_threads'] = param['threads'] if (param['threads'] >= 1) else 1 if re.search(r'( -?-main\b| -?-binary\b)', checkflags): param['make_main'] = False if re.search(r'( -?-build\b| -?-binary\b)', checkflags): param['verilator_make_cmake'] = False param['verilator_make_gmake'] = False self.threads = param['threads'] self.context_threads = param['context_threads'] self.compile_vlt_cmd(**param) if not re.search(r'TEST_DUMPFILE', ' '.join(self.v_flags)): self.v_flags += [self._define_opt_calc() + "TEST_DUMPFILE=" + self.trace_filename] if not param['make_top_shell']: self.top_shell_filename = "" else: self.top_shell_filename = self.obj_dir + "/" + self.vm_prefix + "__top." + self.v_suffix param['top_shell_filename'] = self.top_shell_filename if param['atsim']: tool_define = param['atsim_define'] self._make_top(param['make_top_shell']) self.run(logfile=self.obj_dir + "/atsim_compile.log", fails=param['fails'], cmd=[ VtOs.getenv_def('VERILATOR_ATSIM', "atsim"), ' '.join(param['atsim_flags']), ' '.join(param['atsim_flags2']), ' '.join(param['v_flags']), ' '.join(param['v_flags2']), param['top_filename'], param['top_shell_filename'], ' '.join(param['v_other_filenames']), ]) elif param['ghdl']: tool_define = param['ghdl_define'] VtOs.mkdir_ok(self.ghdl_work_dir) self._make_top(param['make_top_shell']) tool_exe = VtOs.getenv_def('VERILATOR_GHDL', "ghdl") self.run( logfile=self.obj_dir + "/ghdl_compile.log", fails=param['fails'], cmd=[ tool_exe, # Add -c here, as having -c twice freaks it out ("" if re.search(r' -c\b', tool_exe) else "-c"), ' '.join(param['ghdl_flags']), ' '.join(param['ghdl_flags2']), #' '.join(param['v_flags']), # Not supported #' '.join(param['v_flags2']), # Not supported param['top_filename'], param['top_shell_filename'], ' '.join(param['v_other_filenames']), "-e t", ]) elif param['vcs']: tool_define = param['vcs_define'] self._make_top(param['make_top_shell']) self.run(logfile=self.obj_dir + "/vcs_compile.log", fails=param['fails'], cmd=[ VtOs.getenv_def('VERILATOR_VCS', "vcs"), ' '.join(param['vcs_flags']), ' '.join(param['vcs_flags2']), ("-CFLAGS -DTEST_VERBOSE=1" if Args.verbose else ""), ' '.join(param['v_flags']), ' '.join(param['v_flags2']), param['top_filename'], param['top_shell_filename'], ' '.join(param['v_other_filenames']), ]) elif param['nc']: tool_define = param['nc_define'] self._make_top(param['make_top_shell']) self.run(logfile=self.obj_dir + "/nc_compile.log", fails=param['fails'], cmd=[ VtOs.getenv_def('VERILATOR_NCVERILOG', "ncverilog"), ' '.join(param['nc_flags']), ' '.join(param['nc_flags2']), ' '.join(param['v_flags']), ' '.join(param['v_flags2']), param['top_filename'], param['top_shell_filename'], ' '.join(param['v_other_filenames']), ]) elif param['ms']: tool_define = param['ms_define'] self._make_top(param['make_top_shell']) self.run(logfile=self.obj_dir + "/ms_compile.log", fails=param['fails'], cmd=[ ("vlib " + self.obj_dir + "/work && "), VtOs.getenv_def('VERILATOR_MODELSIM', "vlog"), ' '.join(param['ms_flags']), ' '.join(param['ms_flags2']), ' '.join(param['v_flags']), ' '.join(param['v_flags2']), param['top_filename'], param['top_shell_filename'], ]) elif param['iv']: tool_define = param['iv_define'] self._make_top(param['make_top_shell']) cmd = (VtOs.getenv_def('VERILATOR_IVERILOG', "iverilog"), ' '.join(param['iv_flags']), ' '.join(param['iv_flags2']), ' '.join(param['v_flags']), ' '.join(param['v_flags2']), param['top_filename'], param['top_shell_filename'], ' '.join(param['v_other_filenames'])) cmd = list(map(lambda str: re.sub(r'\+define\+', '-D ', str), cmd)) self.run(logfile=self.obj_dir + "/iv_compile.log", fails=param['fails'], cmd=cmd) elif param['xrun']: tool_define = param['xrun_define'] self._make_top(param['make_top_shell']) elif param['xsim']: tool_define = param['xsim_define'] self._make_top(param['make_top_shell']) self.run(logfile=self.obj_dir + "/xsim_compile.log", fails=param['fails'], cmd=[ VtOs.getenv_def('VERILATOR_XVLOG', "xvlog"), ' '.join(param['xsim_flags']), ' '.join(param['xsim_flags2']), ' '.join(param['v_flags']), ' '.join(param['v_flags2']), param['top_filename'], param['top_shell_filename'], ]) elif param['vlt_all']: tool_define = param['verilator_define'] if self.sc and not self.have_sc: self.skip("Test requires SystemC; ignore error since not installed\n") return if self.timing and not self.have_coroutines: self.skip("Test requires Coroutines; ignore error since not available\n") return if param['verilator_make_cmake'] and not self.have_cmake: self.skip( "Test requires CMake; ignore error since not available or version too old\n") return if not param['fails'] and param['make_main']: self._make_main(param['timing_loop']) if (param['verilator_make_gmake'] or (not param['verilator_make_gmake'] and not param['verilator_make_cmake'])): vlt_cmd = self.compile_vlt_cmd(**param) if self.verbose: self.oprint("Running Verilator (gmake)") if Args.verilation: self.run(logfile=self.obj_dir + "/vlt_compile.log", fails=param['fails'], tee=param['tee'], expect_filename=param['expect_filename'], verilator_run=True, cmd=vlt_cmd) if param['verilator_make_cmake']: vlt_args = self._compile_vlt_flags(**param) if self.verbose: self.oprint("Running cmake") VtOs.mkdir_ok(self.obj_dir) csources = [] if param['make_main']: csources.append(self.main_filename) self.run( logfile=self.obj_dir + "/vlt_cmake.log", fails=param['fails'], tee=param['tee'], expect_filename=param['expect_filename'], verilator_run=True, cmd=[ "cd \"" + self.obj_dir + "\" && cmake", "\"" + self.t_dir + "/..\"", "-DTEST_VERILATOR_ROOT=" + os.environ['VERILATOR_ROOT'], "-DTEST_NAME=" + self.name, "-DTEST_CSOURCES=\"" + ' '.join(csources) + "\"", "-DTEST_VERILATOR_ARGS=\"" + ' '.join(vlt_args) + "\"", "-DTEST_VERILATOR_SOURCES=\"" + param['top_filename'] + ' ' + ' '.join(param['v_other_filenames']) + "\"", "-DTEST_VERBOSE=\"" + ("1" if self.verbose else "0") + "\"", "-DTEST_SYSTEMC=\"" + ("1" if self.sc else "0") + "\"", "-DCMAKE_PREFIX_PATH=\"" + (VtOs.getenv_def('SYSTEMC_INCLUDE', VtOs.getenv_def('SYSTEMC', '')) + "/..\""), "-DTEST_OPT_FAST=\"" + ("-Os" if param['benchmark'] else "-O0") + "\"", "-DTEST_OPT_GLOBAL=\"" + ("-Os" if param['benchmark'] else "-O0") + "\"", "-DTEST_VERILATION=\"" + ("1" if Args.verilation else "0") + "\"", ]) if not param['fails'] and param['verilator_make_gmake']: if self.verbose: self.oprint("Running make (gmake)") self.run( logfile=self.obj_dir + "/vlt_gcc.log", entering=self.obj_dir, cmd=[ os.environ['MAKE'], "-C " + self.obj_dir, "-f " + os.path.abspath(os.path.dirname(__file__)) + "/Makefile_obj", ("" if self.verbose else "--no-print-directory"), "VM_PREFIX=" + self.vm_prefix, "TEST_OBJ_DIR=" + self.obj_dir, "CPPFLAGS_DRIVER=-D" + self.name.upper(), ("CPPFLAGS_DRIVER2=-DTEST_VERBOSE=1" if self.verbose else ""), ("" if param['benchmark'] else "OPT_FAST=-O0"), ("" if param['benchmark'] else "OPT_GLOBAL=-O0"), self.vm_prefix, # bypass default rule, as we don't need archive *param['make_flags'], ]) if not param['fails'] and param['verilator_make_cmake']: if self.verbose: self.oprint("Running cmake --build") self.run(logfile=self.obj_dir + "/vlt_cmake_build.log", cmd=[ "cmake", "--build", self.obj_dir, ("--verbose" if self.verbose else ""), ]) else: self.error("No compile step defined for '%s' scenario" % self.scenario()) if param['make_pli']: if self.verbose: self.oprint("Compile vpi") cmd = [ os.environ['CXX'], *param['pli_flags'], "-D" + tool_define, "-DIS_VPI", VtOs.getenv_def('CFLAGS', ''), self.pli_filename ] self.run(logfile=self.obj_dir + "/pli_compile.log", fails=param['fails'], cmd=cmd) def execute(self, **kwargs) -> None: """Run simulation executable. Arguments similar to run(); default arguments are from self""" # Default arguments are from self # params may be expect or {tool}_expect param = { 'aslr_off': False, 'entering': False, 'check_finished': False, 'executable': None, 'expect_filename': None, 'fails': False, 'run_env': '', 'tee': True, } param.update(vars(self)) param.update(kwargs) if self.verbose: self.oprint("Run") if not self.verbose: os.environ['SYSTEMC_DISABLE_COPYRIGHT_MESSAGE'] = 'DISABLE' else: VtOs.delenv('SYSTEMC_DISABLE_COPYRIGHT_MESSAGE') if not self._have_solver_called: os.environ['VERILATOR_SOLVER'] = "test.py-file-needs-have_solver()-call" if param['check_finished'] is None and not param['fails']: param['check_finished'] = 1 run_env = param['run_env'] if run_env: run_env = run_env + ' ' if param['atsim']: self.run( logfile=self.obj_dir + "/atsim_sim.log", fails=param['fails'], cmd=[ "echo q | " + run_env + self.obj_dir + "/athdl_sv", ' '.join(param['atsim_run_flags']), ' '.join(param['all_run_flags']), ], *param, expect_filename=param['atsim_run_expect_filename'], ) elif param['ghdl']: self.run( logfile=self.obj_dir + "/ghdl_sim.log", fails=param['fails'], cmd=[ run_env + self.obj_dir + "/simghdl", ' '.join(param['ghdl_run_flags']), ' '.join(param['all_run_flags']), ], *param, expect_filename=param['ghdl_run_expect_filename'], ) elif param['iv']: cmd = [ run_env + self.obj_dir + "/simiv", ' '.join(param['iv_run_flags']), ' '.join(param['all_run_flags']), ] if param['use_libvpi']: # Don't enter command line on $stop, include vpi cmd += ["vvp -n -m " + self.obj_dir + "/libvpi.so"] self.run( logfile=self.obj_dir + "/iv_sim.log", fails=param['fails'], cmd=cmd, *param, expect_filename=param['iv_run_expect_filename'], ) elif param['ms']: pli_opt = "" if param['use_libvpi']: pli_opt = "-pli " + self.obj_dir + "/libvpi.so" self.run( logfile=self.obj_dir + "/ms_sim.log", fails=param['fails'], cmd=[ "echo q | " + run_env + VtOs.getenv_def('VERILATOR_MODELSIM', "vsim"), ' '.join(param['ms_run_flags']), ' '.join(param['all_run_flags']), pli_opt, (" top") ], *param, expect_filename=param['ms_expect_filename'], ) elif param['nc']: self.run( logfile=self.obj_dir + "/nc_sim.log", fails=param['fails'], cmd=[ "echo q | " + run_env + VtOs.getenv_def('VERILATOR_NCVERILOG', "ncverilog"), ' '.join(param['nc_run_flags']), ' '.join(param['all_run_flags']), ], *param, expect_filename=param['nc_run_expect_filename'], ) elif param['vcs']: # my $fh = IO::File->new(">simv.key") or die "%Error: $! simv.key," # fh.print("quit\n"); fh.close() self.run( logfile=self.obj_dir + "/vcs_sim.log", cmd=[ "echo q | " + run_env + "./simv", ' '.join(param['vcs_run_flags']), ' '.join(param['all_run_flags']), ], *param, expect_filename=param['vcs_run_expect_filename'], ) elif param['xrun']: pli_opt = "" if param['use_libvpi']: pli_opt = "-loadvpi " + self.obj_dir + "/libvpi.so:vpi_compat_bootstrap" self.run( logfile=self.obj_dir + "/xrun_sim.log", fails=param['fails'], cmd=[ "echo q | " + run_env + VtOs.getenv_def('VERILATOR_XRUN', "xrun"), ' '.join(param['xrun_run_flags']), ' '.join(param['xrun_flags2']), ' '.join(param['all_run_flags']), pli_opt, param['top_filename'], ], *param, expect_filename=param['xrun_run_expect_filename'], ) elif param['xsim']: self.run( logfile=self.obj_dir + "/xsim_sim.log", fails=param['fails'], cmd=[ run_env + VtOs.getenv_def('VERILATOR_XELAB', "xelab"), ' '.join(param['xsim_run_flags']), ' '.join(param['xsim_run_flags2']), ' '.join(param['all_run_flags']), (" " + self.name + ".top") ], *param, expect_filename=param['xsim_expect_filename'], ) elif param['vlt_all']: if not param['executable']: param['executable'] = self.obj_dir + "/" + param['vm_prefix'] debugger = "" if Args.gdbsim: debugger = VtOs.getenv_def('VERILATOR_GDB', "gdb") + " " elif Args.rrsim: debugger = "rr record " self.run( cmd=[ (run_env + debugger + param['executable'] + (" -ex 'run " if Args.gdbsim else "")), *param['all_run_flags'], ("'" if Args.gdbsim else ""), ], aslr_off=param['aslr_off'], # Disable address space layour randomization check_finished=param['check_finished'], # Check for All Finished entering=param['entering'], # Print entering directory information expect_filename=param['expect_filename'], fails=param['fails'], logfile=param.get('logfile', self.obj_dir + "/vlt_sim.log"), tee=param['tee'], verilator_run=True, ) else: self.error("No execute step for this simulator") #--------------------------------------------------------------- # Accessors @property def aslr_off(self) -> str: if VlTest._cached_aslr_off is None: out = VtOs.run_capture('setarch --addr-no-randomize echo OK 2>/dev/null`', check=False) if re.search(r'OK', out): VlTest._cached_aslr_off = "setarch --addr-no-randomize " else: VlTest._cached_aslr_off = "" return VlTest._cached_aslr_off @property def benchmarksim_filename(self) -> str: return self.obj_dir + "/" + self.name + "_benchmarksim.csv" @property def driver_verilator_flags(self) -> list: return Arg_Driver_Verilator_Flags @property def get_default_vltmt_threads(self) -> int: return Vltmt_Threads @property def ok(self) -> bool: if self.errors or self.errors_keep_going or self._skips: self._ok = False return self._ok def passes(self, is_ok=True): if not self.errors: self._ok = is_ok @property def too_few_cores(self) -> bool: return calc_threads(Vltmt_Threads) < Vltmt_Threads @property def trace_filename(self) -> str: if re.match(r'^fst', self.trace_format): return self.obj_dir + "/simx.fst" return self.obj_dir + "/simx.vcd" def skip_if_too_few_cores(self) -> None: if self.too_few_cores: self.skip("Skipping due to too few cores") @property def v_suffix(self) -> str: return "v" @property def wno_unopthreads_for_few_cores(self) -> str: if self.too_few_cores: print("Too few cores, using -Wno-UNOPTTHREADS") return "-Wno-UNOPTTHREADS" return "" #--------------------------------------------------------------- # Capabilities @property def cmake_version(self) -> str: return Capabilities.cmake_version @property def cxx_version(self) -> str: return Capabilities.cxx_version @property def have_cmake(self) -> bool: ver = Capabilities.cmake_version return ver and version.parse(ver) >= version.parse("3.8") @property def have_coroutines(self) -> bool: return Capabilities.have_coroutines @property def have_gdb(self) -> bool: return Capabilities.have_gdb @property def have_sc(self) -> bool: return Capabilities.have_sc @property def have_solver(self) -> bool: self._have_solver_called = True return Capabilities.have_solver @property def make_version(self) -> str: return Capabilities.make_version #--------------------------------------------------------------- # OS functions def getenv_def(self, var: str, default=None) -> str: """Return environment variable, returning default if does not exist""" return VtOs.getenv_def(var, default) def mkdir_ok(self, filename) -> None: """Make directory, no error if exists""" if test.verbose: print("\tmkdir " + filename) VtOs.mkdir_ok(filename) def run_capture(self, cmd: str, check=True) -> str: """Run a command and return results""" if test.verbose: print("\t" + cmd) return VtOs.run_capture(cmd, check=check) def setenv(self, var: str, val: str) -> None: """Set enviornment variable""" print("\texport %s='%s'" % (var, val)) os.environ[var] = val def unlink_ok(self, filename) -> None: """Unlink a file, no error if fails""" if test.verbose: print("\trm " + filename) VtOs.unlink_ok(filename) #---------------------------------------------------------------------- def run( self, # cmd: list, aslr_off=False, # Disable address space layour randomization check_finished=False, # Check for All Finished entering=None, # Print entering directory information expect_filename=None, # Filename that should match logfile fails=False, # Command should fail logfile=None, # Filename to write putput to tee=True, verilator_run=False) -> str: # Move gcov data to parallel area try: command = ' '.join(cmd) except TypeError: print('run(cmd=' + pformat(cmd)) command = ' '.join(cmd) if aslr_off and aslr_off != "": prefix = self.aslr_off if prefix: command = prefix + " " + command if Args.benchmark and re.match(r'^cd ', command): command = "time " + command if verilator_run: # Gcov fails when parallel jobs write same data file, # so we make sure .gcda output dir is unique across all running jobs. # We can't just put each one in an unique obj_dir as it uses too much disk. # Must use absolute path as some execute()s have different PWD self.setenv('GCOV_PREFIX_STRIP', '99') self.setenv('GCOV_PREFIX', os.path.abspath(__file__ + "/../obj_dist/gcov_" + str(self.running_id))) os.makedirs(os.environ['GCOV_PREFIX'], exist_ok=True) else: VtOs.delenv('GCOV_PREFIX_STRIP') VtOs.delenv('GCOV_PREFIX') print("\t" + command + ((" > " + logfile) if logfile else "")) if entering: print("driver: Entering directory '" + os.path.abspath(entering) + "'") # Execute command redirecting output, keeping order between stderr and stdout. # Must do low-level IO so GCC interaction works (can't be line-based) status = None if True: # process_caller_block # pylint: disable=using-constant-test logfh = None if logfile: logfh = open(logfile, 'wb') # pylint: disable=consider-using-with with subprocess.Popen(command, shell=True, bufsize=0, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as proc: rawbuf = bytearray(2048) while proc.poll() is None: got = proc.stdout.readinto(rawbuf) if got: data = rawbuf[0:got] if tee: sys.stdout.write(data.decode('latin-1')) if Args.interactive_debugger: sys.stdout.flush() if logfh: logfh.write(data) if logfh: logfh.close() rc = proc.returncode # Negative if killed by signal if (rc in ( -4, # SIGILL -8, # SIGFPA -11)): # SIGSEGV self.error("Exec failed with core dump") status = 10 elif rc: status = 10 else: status = 0 sys.stdout.flush() sys.stderr.flush() if entering: print("driver: Leaving directory '" + os.path.abspath(entering) + "'") if not fails and status: firstline = "" if logfile: with open(logfile, 'r', encoding="utf8") as fh: for line in fh: line = line.rstrip() if re.match(r'^- ', line): # Debug message continue firstline = line break self.error("Exec of " + cmd[0] + " failed: " + firstline) if fails and status: print("(Exec expected to fail, and did.)") if fails and not status: self.error("Exec of " + cmd[0] + " ok, but expected to fail") if self.errors or self._skips: return False # Read the log file a couple of times to allow for NFS delays if check_finished: for tryn in range(Args.log_retries - 1, -1, -1): if tryn != Args.log_retries - 1: time.sleep(1) moretry = tryn != 0 if not self._run_log_try(logfile, check_finished, moretry): break if expect_filename: self.files_identical(logfile, expect_filename, is_logfile=True) return False return True def _run_log_try(self, logfile: str, check_finished: bool, moretry: bool) -> bool: # If moretry, then return true to try again with open(logfile, 'r', encoding='latin-1', newline='\n') as fh: if not fh and moretry: return True wholefile = str(fh.read()) # Finished? if check_finished and not re.search(r'\*\-\* All Finished \*\-\*', wholefile): if moretry: return True self.error("Missing '*-* All Finished *-*'") return False ####################################################################### # Little utilities @staticmethod def _try_regex(text: str, regex) -> None: # Try to eval a regexp # Returns: # 1 if $text ~= /$regex/ms # 0 if no match # -1 if $regex is invalid, doesn't compile try: m = re.search(regex, text) return 1 if m else 0 except re.error: return -1 def _make_main(self, timing_loop: bool) -> None: if timing_loop and self.sc: self.error("Cannot use timing loop and SystemC together!") self._read_inputs_v() filename = self.main_filename with open(filename, "w", encoding="utf8") as fh: fh.write("// Test defines\n") fh.write("#define MAIN_TIME_MULTIPLIER " + str(int(round(self.main_time_multiplier, 0))) + "\n") fh.write("#include \n") if self.benchmarksim: fh.write("#include \n") fh.write("#include \n") fh.write("#include \n") fh.write("// OS header\n") fh.write('#include "verilatedos.h"' + "\n") fh.write("// Generated header\n") fh.write('#include "' + self.vm_prefix + '.h"' + "\n") fh.write("// General headers\n") fh.write('#include "verilated.h"' + "\n") if self.sc: fh.write('#include "systemc.h"' + "\n") if self.trace and self.trace_format == 'fst-c': fh.write("#include \"verilated_fst_c.h\"\n") if self.trace and self.trace_format == 'fst-sc': fh.write("#include \"verilated_fst_sc.h\"\n") if self.trace and self.trace_format == 'vcd-c': fh.write("#include \"verilated_vcd_c.h\"\n") if self.trace and self.trace_format == 'vcd-sc': fh.write("#include \"verilated_vcd_sc.h\"\n") if self.savable: fh.write("#include \"verilated_save.h\"\n") fh.write("std::unique_ptr<" + self.vm_prefix + "> topp;\n") if self.savable: fh.write("\n") fh.write("void save_model(const char* filenamep) {\n") fh.write(" VL_PRINTF(\"Saving model to '%s'\\n\", filenamep);\n") fh.write(" VerilatedSave os;\n") fh.write(" os.open(filenamep);\n") fh.write(" os << *topp;\n") fh.write(" os.close();\n") fh.write("}\n") fh.write("\n") fh.write("void restore_model(const char* filenamep) {\n") fh.write(" VL_PRINTF(\"Restoring model from '%s'\\n\", filenamep);\n") fh.write(" VerilatedRestore os;\n") fh.write(" os.open(filenamep);\n") fh.write(" os >> *topp;\n") fh.write(" os.close();\n") fh.write("}\n") #### Main if self.sc: fh.write("extern int sc_main(int argc, char** argv);\n") fh.write("int sc_main(int argc, char** argv) {\n") if 'fastclk' in self._inputs: if self.pins_sc_uint_bool: fh.write(" sc_signal> fastclk;\n") else: fh.write(" sc_signal fastclk;\n") if 'clk' in self._inputs: if self.pins_sc_uint_bool: fh.write(" sc_signal> clk;\n") else: fh.write(" sc_signal clk;\n") fh.write(" sc_set_time_resolution(1, " + self.sc_time_resolution + ");\n") fh.write(" sc_time sim_time(" + str(self.sim_time) + ", " + self.sc_time_resolution + ");\n") else: fh.write("int main(int argc, char** argv) {\n") fh.write(" uint64_t sim_time = " + str(self.sim_time) + ";\n") fh.write( " const std::unique_ptr contextp{new VerilatedContext};\n") fh.write(" contextp->threads(" + str(self.context_threads) + ");\n") fh.write(" contextp->commandArgs(argc, argv);\n") fh.write(" contextp->debug(" + ('1' if self.verilated_debug else '0') + ");\n") fh.write(" srand48(5);\n") # Ensure determinism if self.verilated_randReset is not None and self.verilated_randReset != "": fh.write(" contextp->randReset(" + str(self.verilated_randReset) + ");\n") fh.write(" topp.reset(new " + self.vm_prefix + "{\"top\"});\n") if self.verilated_debug: fh.write(" contextp->internalsDump()\n;") if self.sc: if 'fastclk' in self._inputs: fh.write(" topp->fastclk(fastclk);\n") if 'clk' in self._inputs: fh.write(" topp->clk(clk);\n") setp = "" else: fh.write(" topp->eval();\n") setp = "topp->" if self.benchmarksim: fh.write(" std::chrono::time_point starttime;\n") fh.write(" bool warm = false;\n") fh.write(" uint64_t n_evals = 0;\n") if self.trace: fh.write("\n") fh.write("#if VM_TRACE\n") fh.write(" contextp->traceEverOn(true);\n") if self.trace_format == 'fst-c': fh.write(" std::unique_ptr tfp{new VerilatedFstC};\n") if self.trace_format == 'fst-sc': fh.write(" std::unique_ptr tfp{new VerilatedFstSc};\n") if self.trace_format == 'vcd-c': fh.write(" std::unique_ptr tfp{new VerilatedVcdC};\n") if self.trace_format == 'vcd-sc': fh.write(" std::unique_ptr tfp{new VerilatedVcdSc};\n") if self.sc: fh.write(" sc_core::sc_start(sc_core::SC_ZERO_TIME);" + " // Finish elaboration before trace and open\n") fh.write(" topp->trace(tfp.get(), 99);\n") fh.write(" tfp->open(\"" + self.trace_filename + "\");\n") if self.trace and not self.sc: fh.write(" if (tfp) tfp->dump(contextp->time());\n") fh.write("#endif\n") if self.savable: fh.write(" const char* save_time_strp" " = contextp->commandArgsPlusMatch(\"save_time=\");\n") fh.write(" unsigned int save_time = !save_time_strp[0]" " ? 0 : std::atoi(save_time_strp + std::strlen(\"+save_time=\"));\n") fh.write(" const char* save_restore_strp" " = contextp->commandArgsPlusMatch(\"save_restore=\");\n") fh.write(" unsigned int save_restore = !save_restore_strp[0] ? 0 : 1;\n") if self.savable: fh.write(" if (save_restore) {\n") fh.write(" restore_model(\"" + self.obj_dir + "/saved.vltsv\");\n") fh.write(" } else {\n") else: fh.write(" {\n") if 'fastclk' in self._inputs: fh.write(" " + setp + "fastclk = false;\n") if 'clk' in self._inputs: fh.write(" " + setp + "clk = false;\n") if not timing_loop: self._print_advance_time(fh, 10, None) fh.write(" }\n") timestamp = "sc_time_stamp()" if self.sc else "contextp->time()" fh.write(" while (") if not timing_loop or 'clk' in self._inputs: fh.write("(" + timestamp + " < sim_time * MAIN_TIME_MULTIPLIER) && ") fh.write("!contextp->gotFinish()) {\n") if timing_loop: fh.write(" topp->eval();\n") if self.trace: fh.write("#if VM_TRACE\n") fh.write(" if (tfp) tfp->dump(contextp->time());\n") fh.write("#endif // VM_TRACE\n") if 'clk' in self._inputs: fh.write(" const uint64_t cycles" " = contextp->time() / MAIN_TIME_MULTIPLIER;\n") fh.write(" uint64_t new_time = (cycles + 1) * MAIN_TIME_MULTIPLIER;\n") fh.write(" if (topp->eventsPending() &&\n") fh.write(" topp->nextTimeSlot()" " / MAIN_TIME_MULTIPLIER <= cycles) {\n") fh.write(" new_time = topp->nextTimeSlot();\n") fh.write(" } else {\n") if self.pins_sc_uint_bool: fh.write(" " + setp + "clk.write(!" + setp + "clk.read());\n") else: fh.write(" " + setp + "clk = !" + setp + "clk;\n") fh.write(" }\n") fh.write(" contextp->time(new_time);\n") else: fh.write(" if (!topp->eventsPending()) break;\n") fh.write(" contextp->time(topp->nextTimeSlot());\n") else: for i in range(5): action = False if 'fastclk' in self._inputs: if self.pins_sc_uint_bool: fh.write(" " + setp + "fastclk.write(!" + setp + "fastclk.read());\n") else: fh.write(" " + setp + "fastclk = !" + setp + "fastclk;\n") action = True if i == 0 and 'clk' in self._inputs: if self.pins_sc_uint_bool: fh.write(" " + setp + "clk.write(!" + setp + "clk.read());\n") else: fh.write(" " + setp + "clk = !" + setp + "clk;\n") action = True if self.savable: fh.write(" if (save_time && " + timestamp + " == save_time) {\n") fh.write(" save_model(\"" + self.obj_dir + "/saved.vltsv\");\n") fh.write(" printf(\"Exiting after save_model\\n\");\n") fh.write(" topp.reset(nullptr);\n") fh.write(" return 0;\n") fh.write(" }\n") self._print_advance_time(fh, 1, action) if self.benchmarksim: fh.write(" if (VL_UNLIKELY(!warm)) {\n") fh.write(" starttime = std::chrono::steady_clock::now();\n") fh.write(" warm = true;\n") fh.write(" } else {\n") fh.write(" ++n_evals;\n") fh.write(" }\n") fh.write(" }\n") if self.benchmarksim: fh.write(" {\n") fh.write(" const std::chrono::duration exec_s" " = std::chrono::steady_clock::now() - starttime;\n") fh.write(" std::ofstream benchfile(\"" + self.benchmarksim_filename + "\", std::ofstream::out | std::ofstream::app);\n") fh.write(" benchfile << std::fixed << std::setprecision(9)" " << n_evals << \",\" << exec_s.count() << std::endl;\n") fh.write(" benchfile.close();\n") fh.write(" }\n") fh.write(" if (!contextp->gotFinish()) {\n") fh.write(' vl_fatal(__FILE__, __LINE__, "main",' + ' "%Error: Timeout; never got a $finish");' + "\n") fh.write(" }\n") fh.write(" topp->final();\n") fh.write("\n") if self.coverage: fh.write("#if VM_COVERAGE\n") fh.write(" contextp->coveragep()->write(\"" + self.coverage_filename + "\");\n") fh.write("#endif // VM_COVERAGE\n") if self.trace: fh.write("#if VM_TRACE\n") fh.write(" if (tfp) tfp->close();\n") fh.write(" tfp.reset();\n") fh.write("#endif // VM_TRACE\n") fh.write(" topp.reset();\n") fh.write(" return 0;\n") fh.write("}\n") def _print_advance_time(self, fh, timeinc: str, action: bool) -> None: setp = "" if self.sc else "topp->" if self.sc: fh.write(" sc_start(" + str(timeinc) + " * MAIN_TIME_MULTIPLIER, " + self.sc_time_resolution + ");\n") else: if action: fh.write(" " + setp + "eval();\n") if self.trace and not self.sc: fh.write("#if VM_TRACE\n") fh.write(" if (tfp) tfp->dump(contextp->time());\n") fh.write("#endif // VM_TRACE\n") fh.write(" contextp->timeInc(" + str(timeinc) + " * MAIN_TIME_MULTIPLIER);\n") ####################################################################### def _make_top(self, needed=True) -> None: if not needed: return self._make_top_v() def _make_top_v(self) -> None: self._read_inputs_v() with open(self.top_shell_filename(), 'w', encoding="utf8") as fh: fh.write("module top;\n") for inp in sorted(self._inputs.keys()): fh.write(" reg " + inp + ";\n") # Inst fh.write(" t t (\n") comma = "" for inp in sorted(self._inputs.keys()): fh.write(" " + comma + "." + inp + " (" + inp + ")\n") comma = "," fh.write(" );\n") # Waves fh.write("\n") fh.write("`ifdef WAVES\n") fh.write(" initial begin\n") fh.write(" $display(\"-Tracing Waves to Dumpfile: " + self.trace_filename + "\");\n") fh.write(" $dumpfile(\"" + self.trace_filename + "\");\n") fh.write(" $dumpvars(0, top);\n") fh.write(" end\n") fh.write("`endif\n") # Test fh.write("\n") fh.write(" initial begin\n") if 'fastclk' in self._inputs: fh.write(" fastclk = 0;\n") if 'clk' in self._inputs: fh.write(" clk = 0;\n") fh.write(" #10;\n") if 'fastclk' in self._inputs: fh.write(" fastclk = 1;\n") if 'clk' in self._inputs: fh.write(" clk = 1;\n") fh.write(" while (" + time + " < " + self.sim_time + ") begin\n") for i in range(6): fh.write(" #1;\n") if 'fastclk' in self._inputs: fh.write(" fastclk = !fastclk;\n") if i == 4 and 'clk' in self._inputs: fh.write(" clk = !clk;\n") fh.write(" end\n") fh.write(" end\n") fh.write("endmodule\n") ####################################################################### def _read_inputs_v(self) -> None: filename = self.top_filename if not os.path.exists(filename): filename = self.t_dir + '/' + filename with open(filename, 'r', encoding="utf8") as fh: get_sigs = True inputs = {} for line in fh: if get_sigs: m = re.match(r'^\s*input\s*(\S+)\s*(\/[^\/]+\/|)\s*;', line) if m: inputs[m.group(1)] = m.group(1) if re.match(r'^\s*(function|task|endmodule)', line): get_sigs = False # Ignore any earlier inputs; Module 't' has precedence if re.match(r'^\s*module\s+t\b', line): inputs = {} get_sigs = True for sig, val in inputs.items(): self._inputs[sig] = val ####################################################################### # File utilities def files_identical(self, fn1: str, fn2: str, is_logfile=False) -> None: """Test if two files have identical contents""" for tryn in range(Args.log_retries, -1, -1): if tryn != Args.log_retries - 1: time.sleep(1) moretry = tryn != 0 if not self._files_identical_try( fn1=fn1, fn2=fn2, is_logfile=is_logfile, moretry=moretry): break def _files_identical_try(self, fn1: str, fn2: str, is_logfile: bool, moretry: bool) -> bool: # If moretry, then return true to try again try: f1 = open( # pylint: disable=consider-using-with fn1, 'r', encoding='latin-1', newline='\n') except FileNotFoundError: f1 = None if not moretry: self.error("Files_identical file does not exist: " + fn1) return True # Retry try: f2 = open( # pylint: disable=consider-using-with fn2, 'r', encoding='latin-1', newline='\n') except FileNotFoundError: f2 = None if not moretry: self.copy_if_golden(fn1, fn2) self.error("Files_identical file does not exist: " + fn2) return True # Retry again = self._files_identical_reader(f1, f2, fn1=fn1, fn2=fn2, is_logfile=is_logfile, moretry=moretry) if f1: f1.close() if f2: f2.close() return again def _files_identical_reader(self, f1, f2, fn1: str, fn2: str, is_logfile: bool, moretry: bool) -> None: # If moretry, then return true to try again l1s = f1.readlines() l2s = f2.readlines() if f2 else [] # print(" rawGOT="+pformat(l1s)+"\n rawEXP="+pformat(l2s)) if is_logfile: l1o = [] for line in l1s: if (re.match(r'^- [^\n]+\n', line) # or re.match(r'^- [a-z.0-9]+:\d+:[^\n]+\n', line) or re.match(r'^-node:', line) # or re.match(r'^dot [^\n]+\n', line) # or re.match(r'^Aborted', line) # or re.match(r'^In file: .*\/sc_.*:\d+', line) # or re.match(r'^libgcov.*', line) # or re.match(r'--- \/tmp\/', line) # t_difftree.py or re.match(r'\+\+\+ \/tmp\/', line) # t_difftree.py or re.match(r'^==[0-9]+== ?[^\n]*\n', line)): # valgrind continue # Don't put control chars or unstable lines into source repository while True: (line, didn) = re.subn(r'(Internal Error: [^\n]+?\.(cpp|h)):[0-9]+', r'\1:#', line) if not didn: break # --vlt vs --vltmt run differences line = re.sub(r'^-V\{t[0-9]+,[0-9]+\}', '-V{t#,#}', line) line = re.sub(r'\r', '<#013>', line) line = re.sub(r'Command Failed[^\n]+', 'Command Failed', line) line = re.sub(r'Version: Verilator[^\n]+', 'Version: Verilator ###', line) line = re.sub(r'CPU Time: +[0-9.]+ seconds[^\n]+', 'CPU Time: ###', line) line = re.sub(r'\?v=[0-9.]+', '?v=latest', line) # warning URL line = re.sub(r'_h[0-9a-f]{8}_', '_h########_', line) line = re.sub(r'%Error: /[^: ]+/([^/:])', r'%Error: .../\1', line) # Avoid absolute paths line = re.sub(r' \/[^ ]+\/verilated_std.sv', ' verilated_std.sv', line) # (line, n) = re.subn(r'Exiting due to.*', r"Exiting due to", line) if n: l1o.append(line) break # Trunc rest l1o.append(line) # l1s = l1o for lineno_m1 in range(0, max(len(l1s), len(l2s))): l1 = l1s[lineno_m1] if lineno_m1 < len(l1s) else "*EOF*\n" l2 = l2s[lineno_m1] if lineno_m1 < len(l2s) else "*EOF*\n" if l1 != l2: # print(" clnGOT="+pformat(l1s)+"\n clnEXP="+pformat(l2s)) if moretry: return True # Retry self.error_keep_going("Line " + str(lineno_m1) + " miscompares; " + fn1 + " != " + fn2) for c in range(min(len(l1), len(l2))): if ord(l1[c]) != ord(l2[c]): print("Miscompare starts at column " + str(c) + (" w/ F1(got)=0x%02x F2(exp)=0x%02x" % (ord(l1[c]), ord(l2[c]))), file=sys.stderr) break print("F1(got): " + l1 + "F2(exp): " + l2, file=sys.stderr) if 'HARNESS_UPDATE_GOLDEN' in os.environ: # Update golden files with current print("%Warning: HARNESS_UPDATE_GOLDEN set: cp " + fn1 + " " + fn2, file=sys.stderr) with open(fn2, 'w', encoding="utf8") as fhw: fhw.write(''.join(l1s)) else: print("To update reference: HARNESS_UPDATE_GOLDEN=1 {command} or --golden", file=sys.stderr) return False # No retry - bad return False # No retry - good def files_identical_sorted(self, fn1: str, fn2: str, is_logfile=False) -> None: """Test if two files, after sorting both, have identical contents""" # Set LC_ALL as suggested in the sort manpage to avoid sort order # changes from the locale. os.environ['LC_ALL'] = 'C' fn1sort = fn1 + '.sort' self.run(cmd=['sort', fn1, "> " + fn1sort]) self.files_identical(fn1sort, fn2, is_logfile) def copy_if_golden(self, fn1: str, fn2: str) -> None: """Copy a file if updating golden .out files""" if 'HARNESS_UPDATE_GOLDEN' in os.environ: # Update golden files with current print("%Warning: HARNESS_UPDATE_GOLDEN set: cp " + fn1 + " " + fn2, file=sys.stderr) shutil.copy(fn1, fn2) def vcd_identical(self, fn1: str, fn2: str) -> None: """Test if two VCD files have logically-identical contents""" # vcddiff to check transitions, if installed cmd = "vcddiff --help" out = test.run_capture(cmd, check=True) cmd = 'vcddiff ' + fn1 + ' ' + fn2 out = test.run_capture(cmd, check=True) if out != "": cmd = 'vcddiff ' + fn2 + " " + fn1 # Reversed arguments out = VtOs.run_capture(cmd, check=False) if out != "": print(out) self.copy_if_golden(fn1, fn2) self.error("VCD miscompares " + fn2 + " " + fn1) # vcddiff doesn't check module and variable scope, so check that # Also provides backup if vcddiff not installed h1 = self._vcd_read(fn1) h2 = self._vcd_read(fn2) a = json.dumps(h1, sort_keys=True, indent=1) b = json.dumps(h2, sort_keys=True, indent=1) if a != b: self.copy_if_golden(fn1, fn2) self.error("VCD hier miscompares " + fn1 + " " + fn2 + "\nGOT=" + a + "\nEXP=" + b + "\n") def fst2vcd(self, fn1: str, fn2: str) -> None: cmd = "fst2vcd -h" out = VtOs.run_capture(cmd, check=False) if out == "" or not re.search(r'Usage:', out): self.skip("No fst2vcd installed") return cmd = 'fst2vcd -e -f "' + fn1 + '" -o "' + fn2 + '"' print("\t " + cmd + "\n") # Always print to help debug race cases out = VtOs.run_capture(cmd, check=False) print(out) def fst_identical(self, fn1: str, fn2: str) -> None: """Test if two FST files have logically-identical contents""" tmp = fn1 + ".vcd" self.fst2vcd(fn1, tmp) self.vcd_identical(tmp, fn2) def _vcd_read(self, filename: str) -> str: data = {} with open(filename, 'r', encoding='latin-1') as fh: hier_stack = ["TOP"] var = [] for line in fh: match1 = re.search(r'\$scope (module|struct|interface)\s+(\S+)', line) match2 = re.search(r'(\$var (\S+)\s+\d+\s+)\S+\s+(\S+)', line) match3 = re.search(r'(\$attrbegin .* \$end)', line) line = line.rstrip() # print("VR"+ ' '*len(hier_stack) +" L " + line) if match1: # $scope name = match1.group(2) # print("VR"+ ' '*len(hier_stack) +" scope " + line) hier_stack += [name] scope = '.'.join(hier_stack) data[scope] = match1.group(1) + " " + name elif match2: # $var # print("VR"+ ' '*len(hier_stack) +" var " + line) scope = '.'.join(hier_stack) var = match2.group(2) data[scope + "." + var] = match2.group(1) + match2.group(3) elif match3: # $attrbegin # print("VR"+ ' '*len(hier_stack) +" attr " + line) if var: scope = '.'.join(hier_stack) data[scope + "." + var + "#"] = match3.group(1) elif re.search(r'\$enddefinitions', line): break n = len(re.findall(r'\$upscope', line)) if n: for i in range(0, n): # pylint: disable=unused-variable # print("VR"+ ' '*len(hier_stack) +" upscope " + line) hier_stack.pop() return data def inline_checks(self) -> None: covfn = self.coverage_filename contents = self.file_contents(covfn) if self.verbose: self.oprint("Extract checks") with open(self.top_filename, 'r', encoding="utf8") as fh: flineno = 0 for line in fh: flineno += 1 if re.search(r'CHECK', line): match1 = re.search( r'CHECK_COVER *\( *((-|[0-9])+) *,' r'*"([^"]+)" *, *("([^"]+)" *,|) *(\d+) *\)', line) match2 = re.search(r'CHECK_COVER_MISSING *\( *((-|[0-9])+) *\)', line) if match1: lineno = flineno + int(match1.group(1)) hier = match1.group(3) comment = match1.group(5) count = match1.group(6) regexp = "\001l\002" + str(lineno) if comment: regexp += ".*\001o\002" + re.escape(comment) if hier: regexp += ".*\001h\002" + re.escape(hier) regexp += ".*' " + str(count) if not re.search(regexp, contents): self.error("CHECK_COVER: " + covfn + ":" + str(flineno) + ": Regexp not found: " + regexp + "\n" + "From " + self.top_filename + ": " + line) elif match2: lineno = flineno + int(match2.group(1)) regexp = "\001l\002" + str(lineno) if re.search(regexp, contents): self.error("CHECK_COVER_MISSING: " + covfn + ":" + str(flineno) + ": Regexp found: " + regexp + "\n" + "From " + self.top_filename + ": " + line) else: self.error(self.top_filename + ":" + str(flineno) + ": Unknown CHECK request: " + line) @staticmethod def cfg_with_ccache() -> bool: if VlTest._cached_cfg_with_ccache is None: mkf = VlTest._file_contents_static(os.environ['VERILATOR_ROOT'] + "/include/verilated.mk") VlTest._cached_cfg_with_ccache = bool(re.match(r'OBJCACHE \?= ccache', mkf)) return VlTest._cached_cfg_with_ccache def glob_some(self, pattern: str) -> list: """Return list of filenames matching a glob, with at least one match required.""" files = glob.glob(pattern) # print("glob_some('" + pattern + "') files =\n " + pformat(files)) if not files: self.error("glob_one: pattern '" + pattern + "' does not match any files") return ['No_file_found'] return sorted(files) def glob_one(self, pattern: str) -> str: """Return a filename matching a glob, with exactly one match required.""" files = self.glob_some(pattern) if files and len(files) > 1: msg = "glob_one: pattern '" + pattern + "' matches multiple files:\n" for file in files: msg += file + "\n" self.error(msg) return 'Multiple_files_found' return files[0] def file_grep_not(self, filename: str, regexp) -> None: contents = self.file_contents(filename) if contents == "_Already_Errored_": return if re.search(regexp, contents, re.MULTILINE): self.error("File_grep_not: " + filename + ": Regexp found: '" + regexp + "'") def file_grep(self, filename: str, regexp, expvalue=None) -> list: contents = self.file_contents(filename) if contents == "_Already_Errored_": return None match = re.search(regexp, contents, re.MULTILINE) if not match: self.error("File_grep: " + filename + ": Regexp not found: " + regexp) return None if expvalue and str(expvalue) != match.group(1): self.error("File_grep: " + filename + ": Got='" + match.group(1) + "' Expected='" + str(expvalue) + "' in regexp: '" + regexp + "'") return None return [match.groups()] def file_grep_count(self, filename: str, regexp, expcount: int) -> None: contents = self.file_contents(filename) if contents == "_Already_Errored_": return count = len(re.findall(regexp, contents)) if expcount != count: self.error("File_grep_count: " + filename + ": Got='" + count + "' Expected='" + expcount + "' in regexp: '" + regexp + "'") def file_grep_any(self, filenames: list, regexp, expvalue=None) -> None: for filename in filenames: contents = self.file_contents(filename) if contents == "_Already_Errored_": return match = re.search(regexp, contents) if match: if expvalue and str(expvalue) != match.group(1): self.error("file_grep: " + filename + ": Got='" + match.group(1) + "' Expected='" + str(expvalue) + "' in regexp: " + regexp) return msg = "file_grep_any: Regexp '" + regexp + "' not found in any of the following files:\n" for filename in filenames: msg += filename + "\n" self.error(msg) def file_contents(self, filename: str) -> str: if filename not in self._file_contents_cache: if not os.path.exists(filename): self._file_contents_cache[filename] = "_Already_Errored_" self.error("File_contents file not found: " + filename) else: with open(filename, 'r', encoding='latin-1') as fh: self._file_contents_cache[filename] = str(fh.read()) return self._file_contents_cache[filename] @staticmethod def _file_contents_static(filename: str) -> str: if filename not in VlTest._file_contents_cache: if not os.path.exists(filename): VlTest._file_contents_cache[filename] = "_Already_Errored_" sys.exit("_file_contents_static file not found: " + filename) else: with open(filename, 'r', encoding='latin-1') as fh: VlTest._file_contents_cache[filename] = str(fh.read()) return VlTest._file_contents_cache[filename] def write_wholefile(self, filename: str, contents: str) -> None: with open(filename, 'wb') as fh: fh.write(contents.encode('latin-1')) if filename in self._file_contents_cache: del self._file_contents_cache[filename] def file_sed(self, in_filename: str, out_filename, edit_lambda) -> None: contents = self.file_contents(in_filename) contents = edit_lambda(contents) self.write_wholefile(out_filename, contents) def extract( self, in_filename: str, out_filename: str, regexp=r'.*', lineno_adjust=-9999, # lines=None) -> None: #'#, #-#' temp_fn = out_filename temp_fn = re.sub(r'.*/', '', temp_fn) temp_fn = self.obj_dir + "/" + temp_fn out = [] emph = "" lineno = 0 lineno_out = 0 with open(in_filename, 'r', encoding="latin-1") as fh: for line in fh: lineno += 1 if re.search(regexp, line) and self._lineno_match(lineno, lines): match = re.search(r't/[A-Za-z0-9_]+.v:(\d+):(\d+):', line) if match: mlineno = int(match.group(1)) + lineno_adjust col = int(match.group(2)) mlineno = max(1, mlineno) line = re.sub(r't/[A-Za-z0-9_]+.v:(\d+):(\d+):', 'example.v:' + str(mlineno) + ':' + str(col), line) out.append(" " + line) lineno_out += 1 if '<--' in line: if emph: emph += "," emph += str(lineno_out) with open(temp_fn, 'w', encoding="latin-1") as fhw: lang = " sv" if re.search(r'\.s?vh?$', in_filename) else "" fhw.write(".. comment: generated by " + self.name + "\n") fhw.write(".. code-block::" + lang + "\n") if lang != "" and len(out) > 1: fhw.write(" :linenos:\n") if emph: fhw.write(" :emphasize-lines: " + emph + "\n") fhw.write("\n") for line in out: fhw.write(line) self.files_identical(temp_fn, out_filename) @staticmethod def _lineno_match(lineno: int, lines: str) -> bool: if not lines: return True for lc in lines.split(','): match1 = re.match(r'^(\d+)$', lc) match2 = re.match(r'^(\d+)-(\d+)$', lc) if match1 and int(match1.group(1)) == lineno: return True if match2 and int(match2.group(1)) <= lineno <= int(match2.group(2)): return True return False ###################################################################### ###################################################################### # Global Functions def calc_jobs() -> int: ok_threads = max_procs() print("driver.py: Found %d cores, using -j %d" % (ok_threads, ok_threads)) return ok_threads def calc_threads(default_threads) -> int: ok_threads = max_procs() return ok_threads if (ok_threads < default_threads) else default_threads def _calc_hashset() -> list: match = re.match(r'^(\d+)/(\d+)$', Args.hashset) if not match: sys.exit("%Error: Need number/number format for --hashset: " + Args.hashset) setn = int(match.group(1)) nsets = int(match.group(2)) new = [] global Arg_Tests for t in Arg_Tests: checksum = int(hashlib.sha256(t.encode("utf-8")).hexdigest()[0:4], 16) if (setn % nsets) == (checksum % nsets): new.append(t) Arg_Tests = new ####################################################################### ####################################################################### # Verilator utilities @lru_cache(maxsize=1) def max_procs() -> int: procs = multiprocessing.cpu_count() if procs < 2: print("driver.py: Python didn't find at least two CPUs") return procs def _parameter(param: str) -> None: global _Parameter_Next_Level if _Parameter_Next_Level: if not re.match(r'^(\d+)$', param): sys.exit("%Error: Expected number following " + _Parameter_Next_Level + ": " + param) Arg_Driver_Verilator_Flags.append(param) _Parameter_Next_Level = None elif re.search(r'\.py', param): Arg_Tests.append(param) elif re.match(r'^-?(-debugi|-dumpi)', param): Arg_Driver_Verilator_Flags.append(param) _Parameter_Next_Level = param elif re.match(r'^-?(-W||-debug-check)', param): Arg_Driver_Verilator_Flags.append(param) else: sys.exit("%Error: Unknown parameter: " + param) def run_them() -> None: VtOs.mkdir_ok("obj_dist") timestart = time.strftime("%Y%m%d_%H%M%S") runner = Runner(driver_log_filename="obj_dist/driver_" + timestart + ".log", quiet=Args.quiet) for test_py in Arg_Tests: for scenario in sorted(set(Args.scenarios)): if VlTest._prefilter_scenario(test_py, scenario): runner.one_test(py_filename=test_py, scenario=scenario) runner.wait_and_report() if Args.rerun and runner.fail_cnt and not Quitting: print('=' * 70) print('=' * 70) print("RERUN ==\n") # Avoid parallel run to ensure that isn't causing problems # If > 10 failures something more wrong and get results quickly if runner.fail_cnt < 10: forker.max_proc(1) orig_runner = runner runner = Runner(driver_log_filename="obj_dist/driver_" + timestart + "_rerun.log", quiet=False, fail1_cnt=orig_runner.fail_cnt, ok_cnt=orig_runner.ok_cnt, skip_cnt=orig_runner.skip_cnt) for ftest in orig_runner.fail_tests: # Reschedule test if ftest.rerunnable: ftest.clean() runner.one_test(py_filename=ftest.py_filename, scenario=ftest.scenario, rerun_skipping=not ftest.rerunnable) runner.wait_and_report() if runner.fail_cnt: sys.exit(10) ###################################################################### ###################################################################### # Main if __name__ == '__main__': os.environ['PYTHONUNBUFFERED'] = "1" if ('VERILATOR_ROOT' not in os.environ) and os.path.isfile('../bin/verilator'): os.environ['VERILATOR_ROOT'] = os.getcwd() + "/.." if 'MAKE' not in os.environ: os.environ['MAKE'] = "make" if 'CXX' not in os.environ: os.environ['CXX'] = "c++" if 'TEST_REGRESS' in os.environ: sys.exit("%Error: TEST_REGRESS environment variable is already set") os.environ['TEST_REGRESS'] = os.getcwd() forker = None Start = time.time() _Parameter_Next_Level = None def sig_int(signum, env) -> None: # pylint: disable=unused-argument global Quitting if Quitting: sys.exit("\nQuitting (immediately)...") print("\nQuitting... (send another interrupt signal for immediate quit)") Quitting = True if forker: forker.kill_tree_all() signal.signal(signal.SIGINT, sig_int) #--------------------------------------------------------------------- parser = argparse.ArgumentParser( allow_abbrev=False, formatter_class=argparse.RawDescriptionHelpFormatter, description="""Run Verilator regression tests""", epilog="""driver.py invokes Verilator or another simulator on each test file. See docs/internals.rst in the distribution for more information. Copyright 2024-2024 by Wilson Snyder. This program is free software; you can redistribute it and/or modify it under the terms of either the GNU Lesser General Public License Version 3 or the Perl Artistic License Version 2.0. SPDX-License-Identifier: LGPL-3.0-only OR Artistic-2.0""") parser.add_argument('--benchmark', action='store', help='enable benchmarking') parser.add_argument('--debug', action='store_const', const=9, help='enable debug') # --debugi: see _parameter() parser.add_argument('--fail-max', action='store', default=None, help='run Verilator executable with gdb') parser.add_argument('--gdb', action='store_true', help='run Verilator executable with gdb') parser.add_argument('--gdbbt', action='store_true', help='run Verilated executable with gdb and backtrace') parser.add_argument('--gdbsim', action='store_true', help='run Verilated executable with gdb') parser.add_argument('--golden', '--gold', action='store_true', help='update golden .out files') parser.add_argument('--hashset', action='store', help='split tests based on /') parser.add_argument('--jobs', '-j', action='store', default=0, type=int, help='parallel job count (0=cpu count)') parser.add_argument('--quiet', action='store_true', help='suppress output except failures and progress') parser.add_argument('--rerun', action='store_true', help='rerun all tests that fail') parser.add_argument('--rr', action='store_true', help='run Verilator executable with rr') parser.add_argument('--rrsim', action='store_true', help='run Verilated executable with rr') parser.add_argument('--sanitize', action='store_true', help='run address sanitizer') parser.add_argument('--site', action='store_true', help='include VERILATOR_TEST_SITE test list') parser.add_argument('--stop', action='store_true', help='stop on the first error') parser.add_argument('--trace', action='store_true', help='enable simulator waveform tracing') parser.add_argument('--verbose', action='store_true', help='compile and run test in verbose mode') parser.add_argument( '--verilation', # -no-verilation undocumented debugging action='store_true', default=True, help="don't run verilator compile() phase") parser.add_argument('--verilated-debug', action='store_true', help='enable Verilated executable debug') ## Scenarios for scen, v in All_Scenarios.items(): parser.add_argument('--' + scen, dest='scenarios', action='append_const', const=scen, help='scenario-enable ' + scen) (Args, rest) = parser.parse_known_intermixed_args() for arg in rest: _parameter(arg) if Args.debug: Arg_Driver_Verilator_Flags.append("--debug --no-skip-identical") logging.basicConfig(level=logging.DEBUG) logging.info("In driver.py, ARGV=" + ' '.join(sys.argv)) # Use Args for some global information, so gets passed to forked process Args.interactive_debugger = Args.gdb or Args.gdbsim or Args.rr or Args.rrsim if Args.jobs > 1 and Args.interactive_debugger: sys.exit("%Error: Unable to use -j > 1 with --gdb* and --rr* options") if Args.golden: os.environ['HARNESS_UPDATE_GOLDEN'] = '1' if Args.jobs == 0: Args.jobs = 1 if Args.interactive_debugger else calc_jobs() if not Args.scenarios: Args.scenarios = [] Args.scenarios.append('dist') Args.scenarios.append('vlt') Args.orig_argv_sw = [] for arg in sys.argv: if re.match(r'^-', arg) and not re.match(r'^-j', arg): Args.orig_argv_sw.append(arg) Args.test_dirs = ["t"] if 'VERILATOR_TESTS_SITE' in os.environ: if Args.site or len(Arg_Tests) >= 1: for tdir in os.environ['VERILATOR_TESTS_SITE'].split(':'): Args.test_dirs.append(tdir) if not Arg_Tests: # Run everything uniq = {} for tdir in Args.test_dirs: # Uniquify by inode, so different paths to same place get combined stats = os.stat(tdir) if stats.st_ino not in uniq: uniq[stats.st_ino] = 1 Arg_Tests += sorted(glob.glob(tdir + "/t_*.py")) if Args.hashset: _calc_hashset() # Number of retries when reading logfiles, generally only need many # retries when system is busy running a lot of tests Args.log_retries = 10 if (len(Arg_Tests) > 3) else 2 forker = Forker(Args.jobs) if len(Arg_Tests) >= 2 and Args.jobs >= 2: # Read supported into master process, so don't call every subprocess Capabilities.warmup_cache() # Without this tests such as t_debug_sigsegv_bt_bad.py will occasionally # block on input and cause a SIGSTOP, then a "fg" was needed to resume testing. print("== Many jobs; redirecting STDIN", file=sys.stderr) # sys.stdin = open("/dev/null", 'r', encoding="utf8") # pylint: disable=consider-using-with run_them()