mirror of
https://github.com/verilator/verilator.git
synced 2025-01-19 12:54:02 +00:00
1070 lines
48 KiB
Python
Executable File
1070 lines
48 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: MIT
|
|
# Copyright 2018-present, Bryan Gillespie
|
|
"""
|
|
Author: Bryan Gillespie
|
|
https://github.com/RPGillespie6/fastcov
|
|
|
|
A massively parallel gcov wrapper for generating intermediate coverage formats fast
|
|
|
|
The goal of fastcov is to generate code coverage intermediate formats as fast as possible,
|
|
even for large projects with hundreds of gcda objects. The intermediate formats may then be
|
|
consumed by a report generator such as lcov's genhtml, or a dedicated frontend such as coveralls.
|
|
|
|
Sample Usage:
|
|
$ cd build_dir
|
|
$ ./fastcov.py --zerocounters
|
|
$ <run unit tests>
|
|
$ ./fastcov.py --exclude /usr/include test/ --lcov -o report.info
|
|
$ genhtml -o code_coverage report.info
|
|
"""
|
|
import re
|
|
import os
|
|
import sys
|
|
import glob
|
|
import json
|
|
import time
|
|
import fnmatch
|
|
import logging
|
|
import argparse
|
|
import threading
|
|
import subprocess
|
|
import multiprocessing
|
|
from pathlib import Path
|
|
|
|
FASTCOV_VERSION = (1,15)
|
|
MINIMUM_PYTHON = (3,5)
|
|
MINIMUM_GCOV = (9,0,0)
|
|
|
|
# Interesting metrics
|
|
START_TIME = time.monotonic()
|
|
GCOVS_TOTAL = 0
|
|
GCOVS_SKIPPED = 0
|
|
|
|
# Gcov Coverage File Extensions
|
|
GCOV_GCNO_EXT = ".gcno" # gcno = "[gc]ov [no]te"
|
|
GCOV_GCDA_EXT = ".gcda" # gcda = "[gc]ov [da]ta"
|
|
|
|
# For when things go wrong...
|
|
# Start error codes at 3 because 1-2 are special
|
|
# See https://stackoverflow.com/a/1535733/2516916
|
|
EXIT_CODE = 0
|
|
EXIT_CODES = {
|
|
"gcov_version": 3,
|
|
"python_version": 4,
|
|
"unsupported_coverage_format": 5,
|
|
"excl_not_found": 6,
|
|
"bad_chunk_file": 7,
|
|
"missing_json_key": 8,
|
|
}
|
|
|
|
# Disable all logging in case developers are using this as a module
|
|
logging.disable(level=logging.CRITICAL)
|
|
|
|
class FastcovFormatter(logging.Formatter):
|
|
def format(self, record):
|
|
record.levelname = record.levelname.lower()
|
|
log_message = super(FastcovFormatter, self).format(record)
|
|
return "[{:.3f}s] {}".format(stopwatch(), log_message)
|
|
|
|
class DiffParseError(Exception):
|
|
pass
|
|
|
|
class DiffParser(object):
|
|
def _refinePaths(self, diff_metadata, diff_base_dir):
|
|
diff_metadata.pop('/dev/null', None)
|
|
diff_metadata.pop('', None)
|
|
for key, value in diff_metadata.copy().items():
|
|
diff_metadata.pop(key)
|
|
#sources without added lines will be excluded
|
|
if value:
|
|
newpath = os.path.join(diff_base_dir, key) if diff_base_dir else os.path.abspath(key)
|
|
diff_metadata[newpath] = value
|
|
|
|
def _parseTargetFile(self, line_with_target_file):
|
|
#f.e. '+++ b/README.md1' or '+++ b/README.md1 timestamp'
|
|
target_source = line_with_target_file[4:].partition('\t')[0].strip()
|
|
target_source = target_source[2:] if target_source.startswith('b/') else target_source
|
|
return target_source
|
|
|
|
def _parseHunkBoundaries(self, line_with_hunk_boundaries, line_index):
|
|
#f.e. '@@ -121,4 +122,4 @@ Time to process all gcda and parse all gcov:'
|
|
# Here ['-121,4', '+122,4']
|
|
lines_info = line_with_hunk_boundaries[3:].partition("@@")[0].strip().split(' ')
|
|
if len(lines_info) != 2:
|
|
raise DiffParseError("Found invalid hunk. Line #{}. {}".format(line_index, line_with_hunk_boundaries))
|
|
# Here ['122','4']
|
|
target_lines_info = lines_info[1].strip('+').partition(',')
|
|
target_line_current = int(target_lines_info[0])
|
|
target_lines_count = int(target_lines_info[2]) if target_lines_info[2] else 1
|
|
|
|
# Here ['121','4']
|
|
source_lines_info = lines_info[0].strip('-').partition(',')
|
|
source_line_current = int(source_lines_info[0])
|
|
source_lines_count = int(source_lines_info[2]) if source_lines_info[2] else 1
|
|
return target_line_current, target_lines_count, source_line_current, source_lines_count
|
|
|
|
def parseDiffFile(self, diff_file, diff_base_dir, fallback_encodings=[]):
|
|
diff_metadata = {}
|
|
target_source = None
|
|
target_hunk = set()
|
|
target_line_current = 0
|
|
target_line_end = 0
|
|
source_line_current = 0
|
|
source_line_end = 0
|
|
found_hunk = False
|
|
for i, line in enumerate(getSourceLines(diff_file, fallback_encodings), 1):
|
|
line = line.rstrip()
|
|
if not found_hunk:
|
|
if line.startswith('+++ '):
|
|
# refresh file
|
|
target_source = self._parseTargetFile(line)
|
|
elif line.startswith('@@ '):
|
|
# refresh hunk
|
|
target_line_current, target_lines_count, source_line_current, source_lines_count = self._parseHunkBoundaries(line, i)
|
|
target_line_end = target_line_current + target_lines_count
|
|
source_line_end = source_line_current + source_lines_count
|
|
target_hunk = set()
|
|
found_hunk = True
|
|
continue
|
|
if target_line_current > target_line_end or source_line_current > source_line_end:
|
|
raise DiffParseError("Hunk longer than expected. Line #{}. {}".format(i, line))
|
|
|
|
if line.startswith('+'):
|
|
#line related to target
|
|
target_hunk.add(target_line_current)
|
|
target_line_current = target_line_current + 1
|
|
elif line.startswith(' ') or line == '':
|
|
# line related to both
|
|
target_line_current = target_line_current + 1
|
|
source_line_current = source_line_current + 1
|
|
elif line.startswith('-'):
|
|
# line related to source
|
|
source_line_current = source_line_current + 1
|
|
elif not line.startswith('\\'): # No newline at end of file
|
|
# line with newline marker is not included into any boundaries
|
|
raise DiffParseError("Found unrecognized hunk line type. Line #{}. {}".format(i, line))
|
|
|
|
if target_line_current == target_line_end and source_line_current == source_line_end:
|
|
# Checked all lines, save data
|
|
if target_source in diff_metadata:
|
|
diff_metadata[target_source] = target_hunk.union(diff_metadata[target_source])
|
|
else:
|
|
diff_metadata[target_source] = target_hunk
|
|
target_hunk = set()
|
|
found_hunk = False
|
|
|
|
if target_line_current != target_line_end or source_line_current != source_line_end:
|
|
raise DiffParseError("Unexpected end of file. Expected hunk with {} target lines, {} source lines".format(
|
|
target_line_end - target_line_current, source_line_end - source_line_current))
|
|
|
|
self._refinePaths(diff_metadata, diff_base_dir)
|
|
return diff_metadata
|
|
|
|
def filterByDiff(self, diff_file, dir_base_dir, fastcov_json, fallback_encodings=[]):
|
|
diff_metadata = self.parseDiffFile(diff_file, dir_base_dir, fallback_encodings)
|
|
logging.debug("Include only next files: {}".format(diff_metadata.keys()))
|
|
excluded_files_count = 0
|
|
excluded_lines_count = 0
|
|
for source in list(fastcov_json["sources"].keys()):
|
|
diff_lines = diff_metadata.get(source, None)
|
|
if not diff_lines:
|
|
excluded_files_count = excluded_files_count + 1
|
|
logging.debug("Exclude {} according to diff file".format(source))
|
|
fastcov_json["sources"].pop(source)
|
|
continue
|
|
for test_name, report_data in fastcov_json["sources"][source].copy().items():
|
|
#No info about functions boundaries, removing all
|
|
for function in list(report_data["functions"].keys()):
|
|
report_data["functions"].pop(function, None)
|
|
for line in list(report_data["lines"].keys()):
|
|
if line not in diff_lines:
|
|
excluded_lines_count = excluded_lines_count + 1
|
|
report_data["lines"].pop(line)
|
|
for branch_line in list(report_data["branches"].keys()):
|
|
if branch_line not in diff_lines:
|
|
report_data["branches"].pop(branch_line)
|
|
if len(report_data["lines"]) == 0:
|
|
fastcov_json["sources"][source].pop(test_name)
|
|
if len(fastcov_json["sources"][source]) == 0:
|
|
excluded_files_count = excluded_files_count + 1
|
|
logging.debug('Exclude {} file as it has no lines due to diff filter'.format(source))
|
|
fastcov_json["sources"].pop(source)
|
|
logging.info("Excluded {} files and {} lines according to diff file".format(excluded_files_count, excluded_lines_count))
|
|
return fastcov_json
|
|
|
|
def chunks(l, n):
|
|
"""Yield successive n-sized chunks from l."""
|
|
for i in range(0, len(l), n):
|
|
yield l[i:i + n]
|
|
|
|
def setExitCode(key):
|
|
global EXIT_CODE
|
|
EXIT_CODE = EXIT_CODES[key]
|
|
|
|
def setExitCodeRaw(code):
|
|
global EXIT_CODE
|
|
EXIT_CODE = code
|
|
|
|
def incrementCounters(total, skipped):
|
|
global GCOVS_TOTAL
|
|
global GCOVS_SKIPPED
|
|
GCOVS_TOTAL += total
|
|
GCOVS_SKIPPED += skipped
|
|
|
|
def stopwatch():
|
|
"""Return number of seconds since last time this was called."""
|
|
global START_TIME
|
|
end_time = time.monotonic()
|
|
delta = end_time - START_TIME
|
|
START_TIME = end_time
|
|
return delta
|
|
|
|
def parseVersionFromLine(version_str):
|
|
"""Given a string containing a dotted integer version, parse out integers and return as tuple."""
|
|
version = re.search(r'(\d+\.\d+\.\d+)', version_str)
|
|
|
|
if not version:
|
|
return (0,0,0)
|
|
|
|
return tuple(map(int, version.group(1).split(".")))
|
|
|
|
def getGcovVersion(gcov):
|
|
p = subprocess.Popen([gcov, "-v"], stdout=subprocess.PIPE)
|
|
output = p.communicate()[0].decode('UTF-8')
|
|
p.wait()
|
|
return parseVersionFromLine(output.split("\n")[0])
|
|
|
|
def tryParseNumber(s):
|
|
try:
|
|
return int(s)
|
|
except ValueError:
|
|
# Log a warning if not hyphen
|
|
if s != "-":
|
|
logging.warning("Unsupported numerical value '%s', using 0", s)
|
|
|
|
# Default to 0 if we can't parse the number (e.g. "-", "NaN", etc.)
|
|
return 0
|
|
|
|
def removeFiles(files):
|
|
for file in files:
|
|
os.remove(file)
|
|
|
|
def processPrefix(path, prefix, prefix_strip):
|
|
p = Path(path)
|
|
if p.exists() or not p.is_absolute():
|
|
return path
|
|
|
|
if prefix_strip > 0:
|
|
segments = p.parts
|
|
|
|
if len(segments) < prefix_strip + 1:
|
|
logging.warning("Couldn't strip %i path levels from %s.", prefix_strip, path)
|
|
return path
|
|
|
|
segments = segments[prefix_strip+1:]
|
|
p = Path(segments[0])
|
|
segments = segments[1:]
|
|
for s in segments:
|
|
p = p.joinpath(s)
|
|
|
|
if len(prefix) > 0:
|
|
if p.is_absolute():
|
|
p = Path(prefix).joinpath(p.relative_to('/'))
|
|
else:
|
|
p = Path(prefix).joinpath(p)
|
|
|
|
return str(p)
|
|
|
|
|
|
def getFilteredCoverageFiles(coverage_files, exclude):
|
|
def excludeGcda(gcda):
|
|
for ex in exclude:
|
|
if ex in gcda:
|
|
logging.debug("Omitting %s due to '--exclude-gcda %s'", gcda, ex)
|
|
return False
|
|
return True
|
|
return list(filter(excludeGcda, coverage_files))
|
|
|
|
def globCoverageFiles(cwd, coverage_type):
|
|
return glob.glob(os.path.join(os.path.abspath(cwd), "**/*" + coverage_type), recursive=True)
|
|
|
|
def findCoverageFiles(cwd, coverage_files, use_gcno):
|
|
coverage_type = "user provided"
|
|
if not coverage_files:
|
|
# gcov strips off extension of whatever you pass it and searches [extensionless name] + .gcno/.gcda
|
|
# We should pass either gcno or gcda, but not both - if you pass both it will be processed twice
|
|
coverage_type = GCOV_GCNO_EXT if use_gcno else GCOV_GCDA_EXT
|
|
coverage_files = globCoverageFiles(cwd, coverage_type)
|
|
|
|
logging.info("Found {} coverage files ({})".format(len(coverage_files), coverage_type))
|
|
logging.debug("Coverage files found:\n %s", "\n ".join(coverage_files))
|
|
return coverage_files
|
|
|
|
def gcovWorker(data_q, metrics_q, args, chunk, gcov_filter_options):
|
|
base_report = {"sources": {}}
|
|
gcovs_total = 0
|
|
gcovs_skipped = 0
|
|
error_exit = False
|
|
|
|
gcov_bin = args.gcov
|
|
gcov_args = ["--json-format", "--stdout"]
|
|
if args.branchcoverage or args.xbranchcoverage:
|
|
gcov_args.append("--branch-probabilities")
|
|
|
|
encoding = sys.stdout.encoding if sys.stdout.encoding else 'UTF-8'
|
|
workdir = args.cdirectory if args.cdirectory else "."
|
|
|
|
p = subprocess.Popen([gcov_bin] + gcov_args + chunk, cwd=workdir, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
|
for i, line in enumerate(iter(p.stdout.readline, b'')):
|
|
try:
|
|
intermediate_json = json.loads(line.decode(encoding))
|
|
except json.decoder.JSONDecodeError as e:
|
|
logging.error("Could not process chunk file '{}' ({}/{})".format(chunk[i], i+1, len(chunk)))
|
|
logging.error(str(e))
|
|
setExitCode("bad_chunk_file")
|
|
continue
|
|
|
|
if "current_working_directory" not in intermediate_json:
|
|
logging.error("Missing 'current_working_directory' for data file: {}".format(intermediate_json))
|
|
setExitCode("missing_json_key")
|
|
continue
|
|
|
|
intermediate_json_files = processGcovs(args.cdirectory, intermediate_json["files"], intermediate_json["current_working_directory"], gcov_filter_options)
|
|
for f in intermediate_json_files:
|
|
distillSource(f, base_report["sources"], args.test_name, args.xbranchcoverage)
|
|
gcovs_total += len(intermediate_json["files"])
|
|
gcovs_skipped += len(intermediate_json["files"]) - len(intermediate_json_files)
|
|
|
|
p.wait()
|
|
data_q.put(base_report)
|
|
metrics_q.put((gcovs_total, gcovs_skipped))
|
|
|
|
sys.exit(EXIT_CODE)
|
|
|
|
def processGcdas(args, coverage_files, gcov_filter_options):
|
|
chunk_size = max(args.minimum_chunk, int(len(coverage_files) / args.jobs) + 1)
|
|
|
|
processes = []
|
|
data_q = multiprocessing.Queue()
|
|
metrics_q = multiprocessing.Queue()
|
|
for chunk in chunks(coverage_files, chunk_size):
|
|
p = multiprocessing.Process(target=gcovWorker, args=(data_q, metrics_q, args, chunk, gcov_filter_options))
|
|
processes.append(p)
|
|
p.start()
|
|
|
|
logging.info("Spawned {} gcov processes, each processing at most {} coverage files".format(len(processes), chunk_size))
|
|
|
|
fastcov_jsons = []
|
|
for p in processes:
|
|
fastcov_jsons.append(data_q.get())
|
|
incrementCounters(*metrics_q.get())
|
|
|
|
for p in processes:
|
|
p.join()
|
|
if p.exitcode != 0:
|
|
setExitCodeRaw(p.exitcode)
|
|
|
|
base_fastcov = fastcov_jsons.pop()
|
|
for fj in fastcov_jsons:
|
|
combineReports(base_fastcov, fj)
|
|
|
|
return base_fastcov
|
|
|
|
def shouldFilterSource(source, gcov_filter_options):
|
|
"""Returns true if the provided source file should be filtered due to CLI options, otherwise returns false."""
|
|
# If explicit sources were passed, check for match
|
|
if gcov_filter_options["sources"]:
|
|
if source not in gcov_filter_options["sources"]:
|
|
logging.debug("Filtering coverage for '%s' due to option '--source-files'", source)
|
|
return True
|
|
|
|
# Check exclude filter
|
|
for ex in gcov_filter_options["exclude"]:
|
|
if ex in source:
|
|
logging.debug("Filtering coverage for '%s' due to option '--exclude %s'", source, ex)
|
|
return True
|
|
|
|
# Check exclude filter
|
|
for ex_glob in gcov_filter_options["exclude_glob"]:
|
|
if fnmatch.fnmatch(source, ex_glob):
|
|
logging.debug("Filtering coverage for '%s' due to option '--exclude-glob %s'", source, ex_glob)
|
|
return True
|
|
|
|
# Check include filter
|
|
if gcov_filter_options["include"]:
|
|
included = False
|
|
for inc in gcov_filter_options["include"]:
|
|
if inc in source:
|
|
included = True
|
|
break
|
|
|
|
if not included:
|
|
logging.debug("Filtering coverage for '%s' due to option '--include %s'", source, " ".join(gcov_filter_options["include"]))
|
|
return True
|
|
|
|
return False
|
|
|
|
def filterFastcov(fastcov_json, args):
|
|
logging.info("Performing filtering operations (if applicable)")
|
|
gcov_filter_options = getGcovFilterOptions(args)
|
|
for source in list(fastcov_json["sources"].keys()):
|
|
if shouldFilterSource(source, gcov_filter_options):
|
|
del fastcov_json["sources"][source]
|
|
|
|
def processGcov(cwd, gcov, source_base_dir, files, gcov_filter_options):
|
|
# Uses cwd if set, else source_base_dir from gcov json. If both are empty, uses "."
|
|
base_dir = cwd if cwd else source_base_dir
|
|
base_dir = base_dir if base_dir else "."
|
|
# Add absolute path
|
|
gcov["file_abs"] = os.path.abspath(os.path.join(base_dir, gcov["file"]))
|
|
|
|
if shouldFilterSource(gcov["file_abs"], gcov_filter_options):
|
|
return
|
|
|
|
files.append(gcov)
|
|
logging.debug("Accepted coverage for '%s'", gcov["file_abs"])
|
|
|
|
def processGcovs(cwd, gcov_files, source_base_dir, gcov_filter_options):
|
|
files = []
|
|
for gcov in gcov_files:
|
|
processGcov(cwd, gcov, source_base_dir, files, gcov_filter_options)
|
|
return files
|
|
|
|
def dumpBranchCoverageToLcovInfo(f, branches):
|
|
branch_miss = 0
|
|
branch_found = 0
|
|
brda = []
|
|
for line_num, branch_counts in branches.items():
|
|
for i, count in enumerate(branch_counts):
|
|
# Branch (<line number>, <block number>, <branch number>, <taken>)
|
|
brda.append((line_num, int(i/2), i, count))
|
|
branch_miss += int(count == 0)
|
|
branch_found += 1
|
|
for v in sorted(brda):
|
|
f.write("BRDA:{},{},{},{}\n".format(*v))
|
|
f.write("BRF:{}\n".format(branch_found)) # Branches Found
|
|
f.write("BRH:{}\n".format(branch_found - branch_miss)) # Branches Hit
|
|
|
|
def dumpToLcovInfo(fastcov_json, output):
|
|
with open(output, "w") as f:
|
|
sources = fastcov_json["sources"]
|
|
for sf in sorted(sources.keys()):
|
|
for tn in sorted(sources[sf].keys()):
|
|
data = sources[sf][tn]
|
|
f.write("TN:{}\n".format(tn)) #Test Name - used mainly in conjuction with genhtml --show-details
|
|
f.write("SF:{}\n".format(sf)) #Source File
|
|
|
|
fn_miss = 0
|
|
fn = []
|
|
fnda = []
|
|
for function, fdata in data["functions"].items():
|
|
fn.append((fdata["start_line"], function)) # Function Start Line
|
|
fnda.append((fdata["execution_count"], function)) # Function Hits
|
|
fn_miss += int(fdata["execution_count"] == 0)
|
|
# NOTE: lcov sorts FN, but not FNDA.
|
|
for v in sorted(fn):
|
|
f.write("FN:{},{}\n".format(*v))
|
|
for v in sorted(fnda):
|
|
f.write("FNDA:{},{}\n".format(*v))
|
|
f.write("FNF:{}\n".format(len(data["functions"]))) #Functions Found
|
|
f.write("FNH:{}\n".format((len(data["functions"]) - fn_miss))) #Functions Hit
|
|
|
|
if data["branches"]:
|
|
dumpBranchCoverageToLcovInfo(f, data["branches"])
|
|
|
|
line_miss = 0
|
|
da = []
|
|
for line_num, count in data["lines"].items():
|
|
da.append((line_num, count))
|
|
line_miss += int(count == 0)
|
|
for v in sorted(da):
|
|
f.write("DA:{},{}\n".format(*v)) # Line
|
|
f.write("LF:{}\n".format(len(data["lines"]))) #Lines Found
|
|
f.write("LH:{}\n".format((len(data["lines"]) - line_miss))) #Lines Hit
|
|
f.write("end_of_record\n")
|
|
|
|
def getSourceLines(source, fallback_encodings=[]):
|
|
"""Return a list of lines from the provided source, trying to decode with fallback encodings if the default fails."""
|
|
default_encoding = sys.getdefaultencoding()
|
|
for encoding in [default_encoding] + fallback_encodings:
|
|
try:
|
|
with open(source, encoding=encoding) as f:
|
|
return f.readlines()
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
logging.warning("Could not decode '{}' with {} or fallback encodings ({}); ignoring errors".format(source, default_encoding, ",".join(fallback_encodings)))
|
|
with open(source, errors="ignore") as f:
|
|
return f.readlines()
|
|
|
|
def containsMarker(markers, strBody):
|
|
for marker in markers:
|
|
if marker in strBody:
|
|
return True
|
|
return False
|
|
|
|
# Returns whether source coverage changed or not
|
|
def exclProcessSource(fastcov_sources, source, exclude_branches_sw, include_branches_sw, exclude_line_marker, fallback_encodings, gcov_prefix, gcov_prefix_strip):
|
|
source_to_open = processPrefix(source, gcov_prefix, gcov_prefix_strip)
|
|
|
|
# Before doing any work, check if this file even needs to be processed
|
|
if not exclude_branches_sw and not include_branches_sw:
|
|
# Ignore unencodable characters
|
|
with open(source_to_open, errors="ignore") as f:
|
|
if not containsMarker(exclude_line_marker + ["LCOV_EXCL"], f.read()):
|
|
return False
|
|
|
|
# If we've made it this far we have to check every line
|
|
|
|
start_line = 0
|
|
end_line = 0
|
|
# Start enumeration at line 1 because the first line of the file is line 1 not 0
|
|
for i, line in enumerate(getSourceLines(source_to_open, fallback_encodings), 1):
|
|
# Cycle through test names (likely only 1)
|
|
for test_name in fastcov_sources[source]:
|
|
fastcov_data = fastcov_sources[source][test_name]
|
|
|
|
# Check if branch coverage should be deleted based on CLI options
|
|
if (exclude_branches_sw or include_branches_sw) and (i in fastcov_data["branches"]):
|
|
del_exclude_br = exclude_branches_sw and any(line.lstrip().startswith(e) for e in exclude_branches_sw)
|
|
del_include_br = include_branches_sw and all(not line.lstrip().startswith(e) for e in include_branches_sw)
|
|
if del_exclude_br or del_include_br:
|
|
del fastcov_data["branches"][i]
|
|
|
|
# Skip to next line as soon as possible
|
|
if not containsMarker(exclude_line_marker + ["LCOV_EXCL"], line):
|
|
continue
|
|
|
|
# Build line to function dict so can quickly delete by line number
|
|
line_to_func = {}
|
|
for f in fastcov_data["functions"].keys():
|
|
l = fastcov_data["functions"][f]["start_line"]
|
|
if l not in line_to_func:
|
|
line_to_func[l] = set()
|
|
line_to_func[l].add(f)
|
|
|
|
if any(marker in line for marker in exclude_line_marker):
|
|
for key in ["lines", "branches"]:
|
|
if i in fastcov_data[key]:
|
|
del fastcov_data[key][i]
|
|
if i in line_to_func:
|
|
for key in line_to_func[i]:
|
|
if key in fastcov_data["functions"]:
|
|
del fastcov_data["functions"][key]
|
|
elif "LCOV_EXCL_START" in line:
|
|
start_line = i
|
|
elif "LCOV_EXCL_STOP" in line:
|
|
end_line = i
|
|
|
|
if not start_line:
|
|
end_line = 0
|
|
continue
|
|
|
|
for key in ["lines", "branches"]:
|
|
for line_num in list(fastcov_data[key].keys()):
|
|
if start_line <= line_num <= end_line:
|
|
del fastcov_data[key][line_num]
|
|
|
|
for line_num in range(start_line, end_line):
|
|
if line_num in line_to_func:
|
|
for key in line_to_func[line_num]:
|
|
if key in fastcov_data["functions"]:
|
|
del fastcov_data["functions"][key]
|
|
|
|
start_line = end_line = 0
|
|
elif "LCOV_EXCL_BR_LINE" in line:
|
|
if i in fastcov_data["branches"]:
|
|
del fastcov_data["branches"][i]
|
|
|
|
# Source coverage changed
|
|
return True
|
|
|
|
def exclMarkerWorker(data_q, fastcov_sources, chunk, exclude_branches_sw, include_branches_sw, exclude_line_marker, fallback_encodings, gcov_prefix, gcov_prefix_strip):
|
|
changed_sources = []
|
|
|
|
for source in chunk:
|
|
try:
|
|
if exclProcessSource(fastcov_sources, source, exclude_branches_sw, include_branches_sw, exclude_line_marker, fallback_encodings, gcov_prefix, gcov_prefix_strip):
|
|
changed_sources.append((source, fastcov_sources[source]))
|
|
except FileNotFoundError:
|
|
logging.error("Could not find '%s' to scan for exclusion markers...", source)
|
|
setExitCode("excl_not_found") # Set exit code because of error
|
|
|
|
# Write out changed sources back to main fastcov file
|
|
data_q.put(changed_sources)
|
|
|
|
# Exit current process with appropriate code
|
|
sys.exit(EXIT_CODE)
|
|
|
|
def processExclusionMarkers(fastcov_json, jobs, exclude_branches_sw, include_branches_sw, exclude_line_marker, min_chunk_size, fallback_encodings, gcov_prefix, gcov_prefix_strip):
|
|
chunk_size = max(min_chunk_size, int(len(fastcov_json["sources"]) / jobs) + 1)
|
|
|
|
processes = []
|
|
data_q = multiprocessing.Queue()
|
|
for chunk in chunks(list(fastcov_json["sources"].keys()), chunk_size):
|
|
p = multiprocessing.Process(target=exclMarkerWorker, args=(data_q, fastcov_json["sources"], chunk, exclude_branches_sw, include_branches_sw, exclude_line_marker, fallback_encodings, gcov_prefix, gcov_prefix_strip))
|
|
processes.append(p)
|
|
p.start()
|
|
|
|
logging.info("Spawned {} exclusion marker scanning processes, each processing at most {} source files".format(len(processes), chunk_size))
|
|
|
|
changed_sources = []
|
|
for p in processes:
|
|
changed_sources += data_q.get()
|
|
|
|
for p in processes:
|
|
p.join()
|
|
if p.exitcode != 0:
|
|
setExitCodeRaw(p.exitcode)
|
|
|
|
for changed_source in changed_sources:
|
|
fastcov_json["sources"][changed_source[0]] = changed_source[1]
|
|
|
|
def validateSources(fastcov_json, gcov_prefix, gcov_prefix_strip):
|
|
logging.info("Checking if all sources exist")
|
|
for source in fastcov_json["sources"].keys():
|
|
source = processPrefix(source, gcov_prefix, gcov_prefix_strip)
|
|
if not os.path.exists(source):
|
|
logging.error("Cannot find '{}'".format(source))
|
|
|
|
def distillFunction(function_raw, functions):
|
|
function_name = function_raw["name"]
|
|
# NOTE: need to explicitly cast all counts coming from gcov to int - this is because gcov's json library
|
|
# will pass as scientific notation (i.e. 12+e45)
|
|
start_line = int(function_raw["start_line"])
|
|
execution_count = int(function_raw["execution_count"])
|
|
if function_name not in functions:
|
|
functions[function_name] = {
|
|
"start_line": start_line,
|
|
"execution_count": execution_count
|
|
}
|
|
else:
|
|
functions[function_name]["execution_count"] += execution_count
|
|
|
|
def emptyBranchSet(branch1, branch2):
|
|
return (branch1["count"] == 0 and branch2["count"] == 0)
|
|
|
|
def matchingBranchSet(branch1, branch2):
|
|
return (branch1["count"] == branch2["count"])
|
|
|
|
def filterExceptionalBranches(branches):
|
|
filtered_branches = []
|
|
exception_branch = False
|
|
for i in range(0, len(branches), 2):
|
|
if i+1 >= len(branches):
|
|
filtered_branches.append(branches[i])
|
|
break
|
|
|
|
# Filter exceptional branch noise
|
|
if branches[i+1]["throw"]:
|
|
exception_branch = True
|
|
continue
|
|
|
|
# Filter initializer list noise
|
|
if exception_branch and emptyBranchSet(branches[i], branches[i+1]) and len(filtered_branches) >= 2 and matchingBranchSet(filtered_branches[-1], filtered_branches[-2]):
|
|
return []
|
|
|
|
filtered_branches.append(branches[i])
|
|
filtered_branches.append(branches[i+1])
|
|
|
|
return filtered_branches
|
|
|
|
def distillLine(line_raw, lines, branches, include_exceptional_branches):
|
|
line_number = int(line_raw["line_number"])
|
|
count = int(line_raw["count"])
|
|
if count < 0:
|
|
if "function_name" in line_raw:
|
|
logging.warning("Ignoring negative count found in '%s'.", line_raw["function_name"])
|
|
else:
|
|
logging.warning("Ignoring negative count.")
|
|
count = 0
|
|
|
|
if line_number not in lines:
|
|
lines[line_number] = count
|
|
else:
|
|
lines[line_number] += count
|
|
|
|
# Filter out exceptional branches by default unless requested otherwise
|
|
if not include_exceptional_branches:
|
|
line_raw["branches"] = filterExceptionalBranches(line_raw["branches"])
|
|
|
|
# Increment all branch counts
|
|
for i, branch in enumerate(line_raw["branches"]):
|
|
if line_number not in branches:
|
|
branches[line_number] = []
|
|
blen = len(branches[line_number])
|
|
glen = len(line_raw["branches"])
|
|
if blen < glen:
|
|
branches[line_number] += [0] * (glen - blen)
|
|
branches[line_number][i] += int(branch["count"])
|
|
|
|
def distillSource(source_raw, sources, test_name, include_exceptional_branches):
|
|
source_name = source_raw["file_abs"]
|
|
if source_name not in sources:
|
|
sources[source_name] = {
|
|
test_name: {
|
|
"functions": {},
|
|
"branches": {},
|
|
"lines": {}
|
|
}
|
|
}
|
|
|
|
for function in source_raw["functions"]:
|
|
distillFunction(function, sources[source_name][test_name]["functions"])
|
|
|
|
for line in source_raw["lines"]:
|
|
distillLine(line, sources[source_name][test_name]["lines"], sources[source_name][test_name]["branches"], include_exceptional_branches)
|
|
|
|
def dumpToJson(intermediate, output):
|
|
with open(output, "w") as f:
|
|
json.dump(intermediate, f)
|
|
|
|
def getGcovFilterOptions(args):
|
|
return {
|
|
"sources": set([os.path.abspath(s) for s in args.sources]), #Make paths absolute, use set for fast lookups
|
|
"include": args.includepost,
|
|
"exclude": args.excludepost,
|
|
"exclude_glob":args.excludepost_glob
|
|
}
|
|
|
|
def addDicts(dict1, dict2):
|
|
"""Add dicts together by value. i.e. addDicts({"a":1,"b":0}, {"a":2}) == {"a":3,"b":0}."""
|
|
result = {k:v for k,v in dict1.items()}
|
|
for k,v in dict2.items():
|
|
if k in result:
|
|
result[k] += v
|
|
else:
|
|
result[k] = v
|
|
|
|
return result
|
|
|
|
def addLists(list1, list2):
|
|
"""Add lists together by value. i.e. addLists([1,1], [2,2]) == [3,3]."""
|
|
# Find big list and small list
|
|
blist, slist = list(list2), list(list1)
|
|
if len(list1) > len(list2):
|
|
blist, slist = slist, blist
|
|
|
|
# Overlay small list onto big list
|
|
for i, b in enumerate(slist):
|
|
blist[i] += b
|
|
|
|
return blist
|
|
|
|
def combineReports(base, overlay):
|
|
for source, scov in overlay["sources"].items():
|
|
# Combine Source Coverage
|
|
if source not in base["sources"]:
|
|
base["sources"][source] = scov
|
|
continue
|
|
|
|
for test_name, tcov in scov.items():
|
|
# Combine Source Test Name Coverage
|
|
if test_name not in base["sources"][source]:
|
|
base["sources"][source][test_name] = tcov
|
|
continue
|
|
|
|
# Drill down and create convenience variable
|
|
base_data = base["sources"][source][test_name]
|
|
|
|
# Combine Line Coverage
|
|
base_data["lines"] = addDicts(base_data["lines"], tcov["lines"])
|
|
|
|
# Combine Branch Coverage
|
|
for branch, cov in tcov["branches"].items():
|
|
if branch not in base_data["branches"]:
|
|
base_data["branches"][branch] = cov
|
|
else:
|
|
base_data["branches"][branch] = addLists(base_data["branches"][branch], cov)
|
|
|
|
# Combine Function Coverage
|
|
for function, cov in tcov["functions"].items():
|
|
if function not in base_data["functions"]:
|
|
base_data["functions"][function] = cov
|
|
else:
|
|
base_data["functions"][function]["execution_count"] += cov["execution_count"]
|
|
|
|
def parseInfo(path):
|
|
"""Parse an lcov .info file into fastcov json."""
|
|
fastcov_json = {
|
|
"sources": {}
|
|
}
|
|
|
|
with open(path) as f:
|
|
for line in f:
|
|
if line.startswith("TN:"):
|
|
current_test_name = line[3:].strip()
|
|
elif line.startswith("SF:"):
|
|
current_sf = line[3:].strip()
|
|
fastcov_json["sources"].setdefault(current_sf, {
|
|
current_test_name: {
|
|
"functions": {},
|
|
"branches": {},
|
|
"lines": {},
|
|
}
|
|
})
|
|
current_data = fastcov_json["sources"][current_sf][current_test_name]
|
|
elif line.startswith("FN:"):
|
|
line_num, function_name = line[3:].strip().split(",")
|
|
current_data["functions"][function_name] = {}
|
|
current_data["functions"][function_name]["start_line"] = tryParseNumber(line_num)
|
|
elif line.startswith("FNDA:"):
|
|
count, function_name = line[5:].strip().split(",")
|
|
current_data["functions"][function_name]["execution_count"] = tryParseNumber(count)
|
|
elif line.startswith("DA:"):
|
|
line_num, count = line[3:].strip().split(",")
|
|
current_data["lines"][line_num] = tryParseNumber(count)
|
|
elif line.startswith("BRDA:"):
|
|
branch_tokens = line[5:].strip().split(",")
|
|
line_num, count = branch_tokens[0], branch_tokens[-1]
|
|
if line_num not in current_data["branches"]:
|
|
current_data["branches"][line_num] = []
|
|
current_data["branches"][line_num].append(tryParseNumber(count))
|
|
|
|
return fastcov_json
|
|
|
|
def convertKeysToInt(report):
|
|
for source in report["sources"].keys():
|
|
for test_name in report["sources"][source].keys():
|
|
report_data = report["sources"][source][test_name]
|
|
report_data["lines"] = {int(k):v for k,v in report_data["lines"].items()}
|
|
report_data["branches"] = {int(k):v for k,v in report_data["branches"].items()}
|
|
|
|
def parseAndCombine(paths):
|
|
base_report = {}
|
|
|
|
for path in paths:
|
|
if path.endswith(".json"):
|
|
with open(path) as f:
|
|
report = json.load(f)
|
|
elif path.endswith(".info"):
|
|
report = parseInfo(path)
|
|
else:
|
|
logging.error("Currently only fastcov .json and lcov .info supported for combine operations, aborting due to %s...\n", path)
|
|
sys.exit(EXIT_CODES["unsupported_coverage_format"])
|
|
|
|
# In order for sorting to work later when we serialize,
|
|
# make sure integer keys are int
|
|
convertKeysToInt(report)
|
|
|
|
if not base_report:
|
|
base_report = report
|
|
logging.info("Setting {} as base report".format(path))
|
|
else:
|
|
combineReports(base_report, report)
|
|
logging.info("Adding {} to base report".format(path))
|
|
|
|
return base_report
|
|
|
|
def getCombineCoverage(args):
|
|
logging.info("Performing combine operation")
|
|
fastcov_json = parseAndCombine(args.combine)
|
|
filterFastcov(fastcov_json, args)
|
|
return fastcov_json
|
|
|
|
def getGcovCoverage(args):
|
|
# Need at least python 3.5 because of use of recursive glob
|
|
checkPythonVersion(sys.version_info[0:2])
|
|
|
|
# Need at least gcov 9.0.0 because that's when gcov JSON and stdout streaming was introduced
|
|
checkGcovVersion(getGcovVersion(args.gcov))
|
|
|
|
# Get list of gcda files to process
|
|
coverage_files = findCoverageFiles(args.directory, args.coverage_files, args.use_gcno)
|
|
|
|
# If gcda/gcno filtering is enabled, filter them out now
|
|
if args.excludepre:
|
|
coverage_files = getFilteredCoverageFiles(coverage_files, args.excludepre)
|
|
logging.info("Found {} coverage files after filtering".format(len(coverage_files)))
|
|
|
|
# We "zero" the "counters" by simply deleting all gcda files
|
|
if args.zerocounters:
|
|
removeFiles(globCoverageFiles(args.directory, GCOV_GCDA_EXT))
|
|
logging.info("Removed {} .gcda files".format(len(coverage_files)))
|
|
sys.exit()
|
|
|
|
# Fire up one gcov per cpu and start processing gcdas
|
|
gcov_filter_options = getGcovFilterOptions(args)
|
|
fastcov_json = processGcdas(args, coverage_files, gcov_filter_options)
|
|
|
|
# Summarize processing results
|
|
logging.info("Processed {} .gcov files ({} total, {} skipped)".format(GCOVS_TOTAL - GCOVS_SKIPPED, GCOVS_TOTAL, GCOVS_SKIPPED))
|
|
logging.debug("Final report will contain coverage for the following %d source files:\n %s", len(fastcov_json["sources"]), "\n ".join(fastcov_json["sources"]))
|
|
|
|
return fastcov_json
|
|
|
|
def formatCoveredItems(covered, total):
|
|
coverage = (covered * 100.0) / total if total > 0 else 100.0
|
|
coverage = round(coverage, 2)
|
|
return "{:.2f}%, {}/{}".format(coverage, covered, total)
|
|
|
|
def dumpStatistic(fastcov_json):
|
|
total_lines = 0
|
|
covered_lines = 0
|
|
total_functions = 0
|
|
covered_functions = 0
|
|
total_files = len(fastcov_json["sources"])
|
|
covered_files = 0
|
|
|
|
for source_name, source in fastcov_json["sources"].items():
|
|
is_file_covered = False
|
|
for test_name, test in source.items():
|
|
total_lines += len(test["lines"])
|
|
for execution_count in test["lines"].values():
|
|
covered_lines += 1 if execution_count > 0 else 0
|
|
is_file_covered = is_file_covered or execution_count > 0
|
|
total_functions += len(test["functions"])
|
|
for function in test["functions"].values():
|
|
covered_functions += 1 if function['execution_count'] > 0 else 0
|
|
is_file_covered = is_file_covered or function['execution_count'] > 0
|
|
if is_file_covered:
|
|
covered_files = covered_files + 1
|
|
|
|
logging.info("Files Coverage: {}".format(formatCoveredItems(covered_files, total_files)))
|
|
logging.info("Functions Coverage: {}".format(formatCoveredItems(covered_functions, total_functions)))
|
|
logging.info("Lines Coverage: {}".format(formatCoveredItems(covered_lines, total_lines)))
|
|
|
|
def dumpFile(fastcov_json, args):
|
|
if args.lcov:
|
|
dumpToLcovInfo(fastcov_json, args.output)
|
|
logging.info("Created lcov info file '{}'".format(args.output))
|
|
else:
|
|
dumpToJson(fastcov_json, args.output)
|
|
logging.info("Created fastcov json file '{}'".format(args.output))
|
|
|
|
if args.dump_statistic:
|
|
dumpStatistic(fastcov_json)
|
|
|
|
def tupleToDotted(tup):
|
|
return ".".join(map(str, tup))
|
|
|
|
def parseArgs():
|
|
parser = argparse.ArgumentParser(description='A parallel gcov wrapper for fast coverage report generation')
|
|
parser.add_argument('-z', '--zerocounters', dest='zerocounters', action="store_true", help='Recursively delete all gcda files')
|
|
|
|
# Enable Branch Coverage
|
|
parser.add_argument('-b', '--branch-coverage', dest='branchcoverage', action="store_true", help='Include only the most useful branches in the coverage report.')
|
|
parser.add_argument('-B', '--exceptional-branch-coverage', dest='xbranchcoverage', action="store_true", help='Include ALL branches in the coverage report (including potentially noisy exceptional branches).')
|
|
parser.add_argument('-A', '--exclude-br-lines-starting-with', dest='exclude_branches_sw', nargs="+", metavar='', default=[], help='Exclude branches from lines starting with one of the provided strings (i.e. assert, return, etc.)')
|
|
parser.add_argument('-a', '--include-br-lines-starting-with', dest='include_branches_sw', nargs="+", metavar='', default=[], help='Include only branches from lines starting with one of the provided strings (i.e. if, else, while, etc.)')
|
|
parser.add_argument('-X', '--skip-exclusion-markers', dest='skip_exclusion_markers', action="store_true", help='Skip reading source files to search for lcov exclusion markers (such as "LCOV_EXCL_LINE")')
|
|
parser.add_argument('-x', '--scan-exclusion-markers', dest='scan_exclusion_markers', action="store_true", help='(Combine operations) Force reading source files to search for lcov exclusion markers (such as "LCOV_EXCL_LINE")')
|
|
|
|
# Capture untested file coverage as well via gcno
|
|
parser.add_argument('-n', '--process-gcno', dest='use_gcno', action="store_true", help='Process both gcno and gcda coverage files. This option is useful for capturing untested files in the coverage report.')
|
|
|
|
# Filtering Options
|
|
parser.add_argument('-s', '--source-files', dest='sources', nargs="+", metavar='', default=[], help='Filter: Specify exactly which source files should be included in the final report. Paths must be either absolute or relative to current directory.')
|
|
parser.add_argument('-e', '--exclude', dest='excludepost', nargs="+", metavar='', default=[], help='Filter: Exclude source files from final report if they contain one of the provided substrings (i.e. /usr/include test/, etc.)')
|
|
parser.add_argument('-eg', '--exclude-glob', dest='excludepost_glob', nargs="+", metavar='', default=[], help='Filter: Exclude source files by glob pattern from final report if they contain one of the provided substrings (i.e. /usr/include test/, etc.)')
|
|
parser.add_argument('-i', '--include', dest='includepost', nargs="+", metavar='', default=[], help='Filter: Only include source files in final report that contain one of the provided substrings (i.e. src/ etc.)')
|
|
parser.add_argument('-f', '--gcda-files', dest='coverage_files', nargs="+", metavar='', default=[], help='Filter: Specify exactly which gcda or gcno files should be processed. Note that specifying gcno causes both gcno and gcda to be processed.')
|
|
parser.add_argument('-E', '--exclude-gcda', dest='excludepre', nargs="+", metavar='', default=[], help='Filter: Exclude gcda or gcno files from being processed via simple find matching (not regex)')
|
|
parser.add_argument('-u', '--diff-filter', dest='diff_file', default='', help='Unified diff file with changes which will be included into final report')
|
|
parser.add_argument('-ub', '--diff-base-dir', dest='diff_base_dir', default='', help='Base directory for sources in unified diff file, usually repository dir')
|
|
parser.add_argument('-ce', '--custom-exclusion-marker', dest='exclude_line_marker', nargs="+", metavar='', default=["LCOV_EXCL_LINE"], help='Filter: Add filter for lines that will be excluded from coverage (same behavior as "LCOV_EXCL_LINE")')
|
|
|
|
parser.add_argument('-g', '--gcov', dest='gcov', default='gcov', help='Which gcov binary to use')
|
|
|
|
parser.add_argument('-d', '--search-directory', dest='directory', default=".", help='Base directory to recursively search for gcda files (default: .)')
|
|
parser.add_argument('-c', '--compiler-directory', dest='cdirectory', default="", help='Base directory compiler was invoked from (default: . or read from gcov) \
|
|
This needs to be set if invoking fastcov from somewhere other than the base compiler directory. No need to set it if gcc version > 9.1')
|
|
|
|
parser.add_argument('-j', '--jobs', dest='jobs', type=int, default=multiprocessing.cpu_count(), help='Number of parallel gcov to spawn (default: {}).'.format(multiprocessing.cpu_count()))
|
|
parser.add_argument('-m', '--minimum-chunk-size', dest='minimum_chunk', type=int, default=5, help='Minimum number of files a thread should process (default: 5). \
|
|
If you have only 4 gcda files but they are monstrously huge, you could change this value to a 1 so that each thread will only process 1 gcda. Otherwise fastcov will spawn only 1 thread to process all of them.')
|
|
|
|
parser.add_argument('-F', '--fallback-encodings', dest='fallback_encodings', nargs="+", metavar='', default=[], help='List of encodings to try if opening a source file with the default fails (i.e. latin1, etc.). This option is not usually needed.')
|
|
|
|
parser.add_argument('-l', '--lcov', dest='lcov', action="store_true", help='Output in lcov info format instead of fastcov json')
|
|
parser.add_argument('-o', '--output', dest='output', default="", help='Name of output file (default: coverage.json or coverage.info, depends on --lcov option)')
|
|
parser.add_argument('-q', '--quiet', dest='quiet', action="store_true", help='Suppress output to stdout')
|
|
|
|
parser.add_argument('-t', '--test-name', dest='test_name', default="", help='Specify a test name for the coverage. Equivalent to lcov\'s `-t`.')
|
|
parser.add_argument('-C', '--add-tracefile', dest='combine', nargs="+", help='Combine multiple coverage files into one. If this flag is specified, fastcov will do a combine operation instead invoking gcov. Equivalent to lcov\'s `-a`.')
|
|
|
|
parser.add_argument('-V', '--verbose', dest="verbose", action="store_true", help="Print more detailed information about what fastcov is doing")
|
|
parser.add_argument('-w', '--validate-sources', dest="validate_sources", action="store_true", help="Check if every source file exists")
|
|
parser.add_argument('-p', '--dump-statistic', dest="dump_statistic", action="store_true", help="Dump total statistic at the end")
|
|
parser.add_argument('-v', '--version', action="version", version='%(prog)s {version}'.format(version=__version__), help="Show program's version number and exit")
|
|
|
|
parser.add_argument('-gps', '--gcov_prefix_strip', dest="gcov_prefix_strip", action="store", default=0, type=int, help="The number of initial directory names to strip off the absolute paths in the object file.")
|
|
parser.add_argument('-gp', '--gcov_prefix', dest="gcov_prefix", action="store", default="", help="The prefix to add to the paths in the object file.")
|
|
|
|
args = parser.parse_args()
|
|
if not args.output:
|
|
args.output = 'coverage.info' if args.lcov else 'coverage.json'
|
|
return args
|
|
|
|
def checkPythonVersion(version):
|
|
"""Exit if the provided python version is less than the supported version."""
|
|
if version < MINIMUM_PYTHON:
|
|
sys.stderr.write("Minimum python version {} required, found {}\n".format(tupleToDotted(MINIMUM_PYTHON), tupleToDotted(version)))
|
|
sys.exit(EXIT_CODES["python_version"])
|
|
|
|
def checkGcovVersion(version):
|
|
"""Exit if the provided gcov version is less than the supported version."""
|
|
if version < MINIMUM_GCOV:
|
|
sys.stderr.write("Minimum gcov version {} required, found {}\n".format(tupleToDotted(MINIMUM_GCOV), tupleToDotted(version)))
|
|
sys.exit(EXIT_CODES["gcov_version"])
|
|
|
|
def setupLogging(quiet, verbose):
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(FastcovFormatter("[%(levelname)s]: %(message)s"))
|
|
|
|
root = logging.getLogger()
|
|
root.setLevel(logging.INFO)
|
|
root.addHandler(handler)
|
|
|
|
if not quiet:
|
|
logging.disable(level=logging.NOTSET) # Re-enable logging
|
|
|
|
if verbose:
|
|
root.setLevel(logging.DEBUG)
|
|
|
|
def main():
|
|
args = parseArgs()
|
|
|
|
# Setup logging
|
|
setupLogging(args.quiet, args.verbose)
|
|
|
|
if args.gcov_prefix_strip > 0:
|
|
os.environ["GCOV_PREFIX_STRIP"] = str(args.gcov_prefix_strip)
|
|
|
|
if len(args.gcov_prefix) > 0:
|
|
os.environ["GCOV_PREFIX"] = args.gcov_prefix
|
|
|
|
# Get report from appropriate source
|
|
if args.combine:
|
|
fastcov_json = getCombineCoverage(args)
|
|
skip_exclusion_markers = not args.scan_exclusion_markers
|
|
else:
|
|
fastcov_json = getGcovCoverage(args)
|
|
skip_exclusion_markers = args.skip_exclusion_markers
|
|
|
|
# Scan for exclusion markers
|
|
if not skip_exclusion_markers:
|
|
processExclusionMarkers(fastcov_json, args.jobs, args.exclude_branches_sw, args.include_branches_sw, args.exclude_line_marker, args.minimum_chunk, args.fallback_encodings, args.gcov_prefix, args.gcov_prefix_strip)
|
|
logging.info("Scanned {} source files for exclusion markers".format(len(fastcov_json["sources"])))
|
|
|
|
if args.diff_file:
|
|
logging.info("Filtering according to {} file".format(args.diff_file))
|
|
DiffParser().filterByDiff(args.diff_file, args.diff_base_dir, fastcov_json, args.fallback_encodings)
|
|
|
|
if args.validate_sources:
|
|
validateSources(fastcov_json, args.gcov_prefix, args.gcov_prefix_strip)
|
|
|
|
# Dump to desired file format
|
|
dumpFile(fastcov_json, args)
|
|
|
|
# If there was an error along the way, but we still completed the pipeline...
|
|
if EXIT_CODE:
|
|
sys.exit(EXIT_CODE)
|
|
|
|
# Set package version... it's way down here so that we can call tupleToDotted
|
|
__version__ = tupleToDotted(FASTCOV_VERSION)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|