2020-06-05 00:44:35 +00:00
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
# Copyright 2018-present, Bryan Gillespie
"""
Author : Bryan Gillespie
https : / / github . com / RPGillespie6 / fastcov
A massively parallel gcov wrapper for generating intermediate coverage formats fast
The goal of fastcov is to generate code coverage intermediate formats as fast as possible ,
even for large projects with hundreds of gcda objects . The intermediate formats may then be
consumed by a report generator such as lcov ' s genhtml, or a dedicated frontend such as coveralls.
Sample Usage :
$ cd build_dir
$ . / fastcov . py - - zerocounters
$ < run unit tests >
$ . / fastcov . py - - exclude / usr / include test / - - lcov - o report . info
$ genhtml - o code_coverage report . info
"""
import re
import os
import sys
import glob
import json
import time
import logging
import argparse
import threading
import subprocess
import multiprocessing
FASTCOV_VERSION = ( 1 , 7 )
MINIMUM_PYTHON = ( 3 , 5 )
MINIMUM_GCOV = ( 9 , 0 , 0 )
# Interesting metrics
START_TIME = time . monotonic ( )
GCOVS_TOTAL = 0
GCOVS_SKIPPED = 0
2020-06-06 09:48:36 +00:00
# For when things go wrong...
# Start error codes at 3 because 1-2 are special
# See https://stackoverflow.com/a/1535733/2516916
EXIT_CODE = 0
EXIT_CODES = {
" gcov_version " : 3 ,
" python_version " : 4 ,
" unsupported_coverage_format " : 5 ,
" excl_not_found " : 6 ,
}
2020-06-05 00:44:35 +00:00
# Disable all logging in case developers are using this as a module
logging . disable ( level = logging . CRITICAL )
class FastcovFormatter ( logging . Formatter ) :
def format ( self , record ) :
record . levelname = record . levelname . lower ( )
log_message = super ( FastcovFormatter , self ) . format ( record )
return " [ {:.3f} s] {} " . format ( stopwatch ( ) , log_message )
def chunks ( l , n ) :
""" Yield successive n-sized chunks from l. """
for i in range ( 0 , len ( l ) , n ) :
yield l [ i : i + n ]
2020-06-06 09:48:36 +00:00
def setExitCode ( key ) :
global EXIT_CODE
EXIT_CODE = EXIT_CODES [ key ]
2020-06-05 00:44:35 +00:00
def incrementCounters ( total , skipped ) :
global GCOVS_TOTAL
global GCOVS_SKIPPED
GCOVS_TOTAL + = total
GCOVS_SKIPPED + = skipped
def stopwatch ( ) :
2020-06-05 22:05:49 +00:00
""" Return number of seconds since last time this was called. """
2020-06-05 00:44:35 +00:00
global START_TIME
end_time = time . monotonic ( )
delta = end_time - START_TIME
START_TIME = end_time
return delta
def parseVersionFromLine ( version_str ) :
2020-06-05 22:05:49 +00:00
""" Given a string containing a dotted integer version, parse out integers and return as tuple. """
2020-06-05 00:44:35 +00:00
version = re . search ( r ' ( \ d+ \ . \ d+ \ . \ d+) ' , version_str )
if not version :
return ( 0 , 0 , 0 )
return tuple ( map ( int , version . group ( 1 ) . split ( " . " ) ) )
def getGcovVersion ( gcov ) :
p = subprocess . Popen ( [ gcov , " -v " ] , stdout = subprocess . PIPE )
output = p . communicate ( ) [ 0 ] . decode ( ' UTF-8 ' )
p . wait ( )
return parseVersionFromLine ( output . split ( " \n " ) [ 0 ] )
def removeFiles ( files ) :
for file in files :
os . remove ( file )
def getFilteredCoverageFiles ( coverage_files , exclude ) :
def excludeGcda ( gcda ) :
for ex in exclude :
if ex in gcda :
logging . debug ( " Omitting %s due to ' --exclude-gcda %s ' " , gcda , ex )
return False
return True
return list ( filter ( excludeGcda , coverage_files ) )
def findCoverageFiles ( cwd , coverage_files , use_gcno ) :
coverage_type = " user provided "
if not coverage_files :
coverage_type = " gcno " if use_gcno else " gcda "
coverage_files = glob . glob ( os . path . join ( os . path . abspath ( cwd ) , " **/*. " + coverage_type ) , recursive = True )
logging . info ( " Found {} coverage files ( {} ) " . format ( len ( coverage_files ) , coverage_type ) )
logging . debug ( " Coverage files found: \n %s " , " \n " . join ( coverage_files ) )
return coverage_files
def gcovWorker ( data_q , metrics_q , args , chunk , gcov_filter_options ) :
base_report = { " sources " : { } }
gcovs_total = 0
gcovs_skipped = 0
gcov_args = " -it "
if args . branchcoverage or args . xbranchcoverage :
gcov_args + = " b "
p = subprocess . Popen ( [ args . gcov , gcov_args ] + chunk , cwd = args . cdirectory , stdout = subprocess . PIPE , stderr = subprocess . DEVNULL )
for line in iter ( p . stdout . readline , b ' ' ) :
intermediate_json = json . loads ( line . decode ( sys . stdout . encoding ) )
intermediate_json_files = processGcovs ( args . cdirectory , intermediate_json [ " files " ] , gcov_filter_options )
for f in intermediate_json_files :
distillSource ( f , base_report [ " sources " ] , args . test_name , args . xbranchcoverage )
gcovs_total + = len ( intermediate_json [ " files " ] )
gcovs_skipped + = len ( intermediate_json [ " files " ] ) - len ( intermediate_json_files )
p . wait ( )
data_q . put ( base_report )
metrics_q . put ( ( gcovs_total , gcovs_skipped ) )
def processGcdas ( args , coverage_files , gcov_filter_options ) :
chunk_size = max ( args . minimum_chunk , int ( len ( coverage_files ) / args . jobs ) + 1 )
processes = [ ]
data_q = multiprocessing . Queue ( )
metrics_q = multiprocessing . Queue ( )
for chunk in chunks ( coverage_files , chunk_size ) :
p = multiprocessing . Process ( target = gcovWorker , args = ( data_q , metrics_q , args , chunk , gcov_filter_options ) )
processes . append ( p )
p . start ( )
logging . info ( " Spawned {} gcov processes, each processing at most {} coverage files " . format ( len ( processes ) , chunk_size ) )
fastcov_jsons = [ ]
for p in processes :
fastcov_jsons . append ( data_q . get ( ) )
incrementCounters ( * metrics_q . get ( ) )
for p in processes :
p . join ( )
base_fastcov = fastcov_jsons . pop ( )
for fj in fastcov_jsons :
combineReports ( base_fastcov , fj )
return base_fastcov
def shouldFilterSource ( source , gcov_filter_options ) :
2020-06-05 22:05:49 +00:00
""" Returns true if the provided source file should be filtered due to CLI options, otherwise returns false. """
2020-06-05 00:44:35 +00:00
# If explicit sources were passed, check for match
if gcov_filter_options [ " sources " ] :
if source not in gcov_filter_options [ " sources " ] :
logging . debug ( " Filtering coverage for ' %s ' due to option ' --source-files ' " , source )
return True
# Check exclude filter
for ex in gcov_filter_options [ " exclude " ] :
if ex in source :
logging . debug ( " Filtering coverage for ' %s ' due to option ' --exclude %s ' " , source , ex )
return True
# Check include filter
if gcov_filter_options [ " include " ] :
included = False
for inc in gcov_filter_options [ " include " ] :
if inc in source :
included = True
break
if not included :
logging . debug ( " Filtering coverage for ' %s ' due to option ' --include %s ' " , source , " " . join ( gcov_filter_options [ " include " ] ) )
return True
return False
def filterFastcov ( fastcov_json , args ) :
logging . info ( " Performing filtering operations (if applicable) " )
gcov_filter_options = getGcovFilterOptions ( args )
for source in list ( fastcov_json [ " sources " ] . keys ( ) ) :
if shouldFilterSource ( source , gcov_filter_options ) :
del fastcov_json [ " sources " ] [ source ]
def processGcov ( cwd , gcov , files , gcov_filter_options ) :
# Add absolute path
gcov [ " file_abs " ] = os . path . abspath ( os . path . join ( cwd , gcov [ " file " ] ) )
if shouldFilterSource ( gcov [ " file_abs " ] , gcov_filter_options ) :
return
files . append ( gcov )
logging . debug ( " Accepted coverage for ' %s ' " , gcov [ " file_abs " ] )
def processGcovs ( cwd , gcov_files , gcov_filter_options ) :
files = [ ]
for gcov in gcov_files :
processGcov ( cwd , gcov , files , gcov_filter_options )
return files
def dumpBranchCoverageToLcovInfo ( f , branches ) :
branch_miss = 0
branch_found = 0
brda = [ ]
for line_num , branch_counts in branches . items ( ) :
for i , count in enumerate ( branch_counts ) :
# Branch (<line number>, <block number>, <branch number>, <taken>)
brda . append ( ( line_num , int ( i / 2 ) , i , count ) )
branch_miss + = int ( count == 0 )
branch_found + = 1
for v in sorted ( brda ) :
f . write ( " BRDA: {} , {} , {} , {} \n " . format ( * v ) )
f . write ( " BRF: {} \n " . format ( branch_found ) ) # Branches Found
f . write ( " BRH: {} \n " . format ( branch_found - branch_miss ) ) # Branches Hit
def dumpToLcovInfo ( fastcov_json , output ) :
with open ( output , " w " ) as f :
sources = fastcov_json [ " sources " ]
for sf in sorted ( sources . keys ( ) ) :
for tn in sorted ( sources [ sf ] . keys ( ) ) :
data = sources [ sf ] [ tn ]
f . write ( " TN: {} \n " . format ( tn ) ) #Test Name - used mainly in conjuction with genhtml --show-details
f . write ( " SF: {} \n " . format ( sf ) ) #Source File
fn_miss = 0
fn = [ ]
fnda = [ ]
for function , fdata in data [ " functions " ] . items ( ) :
fn . append ( ( fdata [ " start_line " ] , function ) ) # Function Start Line
fnda . append ( ( fdata [ " execution_count " ] , function ) ) # Function Hits
fn_miss + = int ( fdata [ " execution_count " ] == 0 )
# NOTE: lcov sorts FN, but not FNDA.
for v in sorted ( fn ) :
f . write ( " FN: {} , {} \n " . format ( * v ) )
for v in sorted ( fnda ) :
f . write ( " FNDA: {} , {} \n " . format ( * v ) )
f . write ( " FNF: {} \n " . format ( len ( data [ " functions " ] ) ) ) #Functions Found
f . write ( " FNH: {} \n " . format ( ( len ( data [ " functions " ] ) - fn_miss ) ) ) #Functions Hit
if data [ " branches " ] :
dumpBranchCoverageToLcovInfo ( f , data [ " branches " ] )
line_miss = 0
da = [ ]
for line_num , count in data [ " lines " ] . items ( ) :
da . append ( ( line_num , count ) )
line_miss + = int ( count == 0 )
for v in sorted ( da ) :
f . write ( " DA: {} , {} \n " . format ( * v ) ) # Line
f . write ( " LF: {} \n " . format ( len ( data [ " lines " ] ) ) ) #Lines Found
f . write ( " LH: {} \n " . format ( ( len ( data [ " lines " ] ) - line_miss ) ) ) #Lines Hit
f . write ( " end_of_record \n " )
def getSourceLines ( source , fallback_encodings = [ ] ) :
2020-06-05 22:05:49 +00:00
""" Return a list of lines from the provided source, trying to decode with fallback encodings if the default fails. """
2020-06-05 00:44:35 +00:00
default_encoding = sys . getdefaultencoding ( )
for encoding in [ default_encoding ] + fallback_encodings :
try :
with open ( source , encoding = encoding ) as f :
return f . readlines ( )
except UnicodeDecodeError :
pass
logging . warning ( " Could not decode ' {} ' with {} or fallback encodings ( {} ); ignoring errors " . format ( source , default_encoding , " , " . join ( fallback_encodings ) ) )
with open ( source , errors = " ignore " ) as f :
return f . readlines ( )
2020-06-06 09:48:36 +00:00
def exclProcessSource ( fastcov_sources , source , exclude_branches_sw , include_branches_sw , fallback_encodings ) :
start_line = 0
end_line = 0
# Start enumeration at line 1 because the first line of the file is line 1 not 0
for i , line in enumerate ( getSourceLines ( source , fallback_encodings ) , 1 ) :
# Cycle through test names (likely only 1)
for test_name in fastcov_sources [ source ] :
fastcov_data = fastcov_sources [ source ] [ test_name ]
# Build line to function dict so can quickly delete by line number
line_to_func = { }
for f in fastcov_data [ " functions " ] . keys ( ) :
l = fastcov_data [ " functions " ] [ f ] [ " start_line " ]
if l not in line_to_func :
line_to_func [ l ] = set ( )
line_to_func [ l ] . add ( f )
if i in fastcov_data [ " branches " ] :
del_exclude_br = exclude_branches_sw and any ( line . lstrip ( ) . startswith ( e ) for e in exclude_branches_sw )
del_include_br = include_branches_sw and all ( not line . lstrip ( ) . startswith ( e ) for e in include_branches_sw )
if del_exclude_br or del_include_br :
del fastcov_data [ " branches " ] [ i ]
if " LCOV_EXCL " not in line :
continue
2020-06-05 00:44:35 +00:00
2020-06-06 09:48:36 +00:00
if " LCOV_EXCL_LINE " in line :
for key in [ " lines " , " branches " ] :
if i in fastcov_data [ key ] :
del fastcov_data [ key ] [ i ]
if i in line_to_func :
for key in line_to_func [ i ] :
if key in fastcov_data [ " functions " ] :
del fastcov_data [ " functions " ] [ key ]
elif " LCOV_EXCL_START " in line :
start_line = i
elif " LCOV_EXCL_STOP " in line :
end_line = i
if not start_line :
end_line = 0
2020-06-05 00:44:35 +00:00
continue
2020-06-06 09:48:36 +00:00
for key in [ " lines " , " branches " ] :
for line_num in list ( fastcov_data [ key ] . keys ( ) ) :
if start_line < = line_num < = end_line :
del fastcov_data [ key ] [ line_num ]
for line_num in range ( start_line , end_line ) :
if line_num in line_to_func :
for key in line_to_func [ line_num ] :
2020-06-05 22:05:49 +00:00
if key in fastcov_data [ " functions " ] :
2020-06-05 00:44:35 +00:00
del fastcov_data [ " functions " ] [ key ]
2020-06-06 09:48:36 +00:00
start_line = end_line = 0
elif " LCOV_EXCL_BR_LINE " in line :
if i in fastcov_data [ " branches " ] :
del fastcov_data [ " branches " ] [ i ]
def exclMarkerWorker ( fastcov_sources , chunk , exclude_branches_sw , include_branches_sw , fallback_encodings ) :
for source in chunk :
try :
exclProcessSource ( fastcov_sources , source , exclude_branches_sw , include_branches_sw , fallback_encodings )
except FileNotFoundError :
logging . error ( " Could not find ' %s ' to scan for exclusion markers... " , source )
setExitCode ( " excl_not_found " ) # Set exit code because of error
2020-06-05 00:44:35 +00:00
def scanExclusionMarkers ( fastcov_json , jobs , exclude_branches_sw , include_branches_sw , min_chunk_size , fallback_encodings ) :
chunk_size = max ( min_chunk_size , int ( len ( fastcov_json [ " sources " ] ) / jobs ) + 1 )
threads = [ ]
for chunk in chunks ( list ( fastcov_json [ " sources " ] . keys ( ) ) , chunk_size ) :
t = threading . Thread ( target = exclMarkerWorker , args = ( fastcov_json [ " sources " ] , chunk , exclude_branches_sw , include_branches_sw , fallback_encodings ) )
threads . append ( t )
t . start ( )
logging . info ( " Spawned {} threads each scanning at most {} source files " . format ( len ( threads ) , chunk_size ) )
for t in threads :
t . join ( )
def distillFunction ( function_raw , functions ) :
function_name = function_raw [ " name " ]
# NOTE: need to explicitly cast all counts coming from gcov to int - this is because gcov's json library
# will pass as scientific notation (i.e. 12+e45)
start_line = int ( function_raw [ " start_line " ] )
execution_count = int ( function_raw [ " execution_count " ] )
if function_name not in functions :
functions [ function_name ] = {
" start_line " : start_line ,
" execution_count " : execution_count
}
else :
functions [ function_name ] [ " execution_count " ] + = execution_count
def emptyBranchSet ( branch1 , branch2 ) :
return ( branch1 [ " count " ] == 0 and branch2 [ " count " ] == 0 )
def matchingBranchSet ( branch1 , branch2 ) :
return ( branch1 [ " count " ] == branch2 [ " count " ] )
def filterExceptionalBranches ( branches ) :
filtered_branches = [ ]
exception_branch = False
for i in range ( 0 , len ( branches ) , 2 ) :
if i + 1 > = len ( branches ) :
filtered_branches . append ( branches [ i ] )
break
# Filter exceptional branch noise
if branches [ i + 1 ] [ " throw " ] :
exception_branch = True
continue
# Filter initializer list noise
if exception_branch and emptyBranchSet ( branches [ i ] , branches [ i + 1 ] ) and len ( filtered_branches ) > = 2 and matchingBranchSet ( filtered_branches [ - 1 ] , filtered_branches [ - 2 ] ) :
return [ ]
filtered_branches . append ( branches [ i ] )
filtered_branches . append ( branches [ i + 1 ] )
return filtered_branches
def distillLine ( line_raw , lines , branches , include_exceptional_branches ) :
line_number = int ( line_raw [ " line_number " ] )
count = int ( line_raw [ " count " ] )
if line_number not in lines :
lines [ line_number ] = count
else :
lines [ line_number ] + = count
# Filter out exceptional branches by default unless requested otherwise
if not include_exceptional_branches :
line_raw [ " branches " ] = filterExceptionalBranches ( line_raw [ " branches " ] )
# Increment all branch counts
for i , branch in enumerate ( line_raw [ " branches " ] ) :
if line_number not in branches :
branches [ line_number ] = [ ]
blen = len ( branches [ line_number ] )
glen = len ( line_raw [ " branches " ] )
if blen < glen :
branches [ line_number ] + = [ 0 ] * ( glen - blen )
branches [ line_number ] [ i ] + = int ( branch [ " count " ] )
def distillSource ( source_raw , sources , test_name , include_exceptional_branches ) :
source_name = source_raw [ " file_abs " ]
if source_name not in sources :
sources [ source_name ] = {
test_name : {
" functions " : { } ,
" branches " : { } ,
" lines " : { }
}
}
for function in source_raw [ " functions " ] :
distillFunction ( function , sources [ source_name ] [ test_name ] [ " functions " ] )
for line in source_raw [ " lines " ] :
distillLine ( line , sources [ source_name ] [ test_name ] [ " lines " ] , sources [ source_name ] [ test_name ] [ " branches " ] , include_exceptional_branches )
def dumpToJson ( intermediate , output ) :
with open ( output , " w " ) as f :
json . dump ( intermediate , f )
def getGcovFilterOptions ( args ) :
return {
" sources " : set ( [ os . path . abspath ( s ) for s in args . sources ] ) , #Make paths absolute, use set for fast lookups
" include " : args . includepost ,
" exclude " : args . excludepost ,
}
def addDicts ( dict1 , dict2 ) :
2020-06-05 22:05:49 +00:00
""" Add dicts together by value. i.e. addDicts( { " a " :1, " b " :0}, { " a " :2}) == { " a " :3, " b " :0}. """
2020-06-05 00:44:35 +00:00
result = { k : v for k , v in dict1 . items ( ) }
for k , v in dict2 . items ( ) :
if k in result :
result [ k ] + = v
else :
result [ k ] = v
return result
def addLists ( list1 , list2 ) :
2020-06-05 22:05:49 +00:00
""" Add lists together by value. i.e. addLists([1,1], [2,2]) == [3,3]. """
2020-06-05 00:44:35 +00:00
# Find big list and small list
blist , slist = list ( list2 ) , list ( list1 )
if len ( list1 ) > len ( list2 ) :
blist , slist = slist , blist
# Overlay small list onto big list
for i , b in enumerate ( slist ) :
blist [ i ] + = b
return blist
def combineReports ( base , overlay ) :
for source , scov in overlay [ " sources " ] . items ( ) :
# Combine Source Coverage
if source not in base [ " sources " ] :
base [ " sources " ] [ source ] = scov
continue
for test_name , tcov in scov . items ( ) :
# Combine Source Test Name Coverage
if test_name not in base [ " sources " ] [ source ] :
base [ " sources " ] [ source ] [ test_name ] = tcov
continue
# Drill down and create convenience variable
base_data = base [ " sources " ] [ source ] [ test_name ]
# Combine Line Coverage
base_data [ " lines " ] = addDicts ( base_data [ " lines " ] , tcov [ " lines " ] )
# Combine Branch Coverage
for branch , cov in tcov [ " branches " ] . items ( ) :
if branch not in base_data [ " branches " ] :
base_data [ " branches " ] [ branch ] = cov
else :
base_data [ " branches " ] [ branch ] = addLists ( base_data [ " branches " ] [ branch ] , cov )
# Combine Function Coverage
for function , cov in tcov [ " functions " ] . items ( ) :
if function not in base_data [ " functions " ] :
base_data [ " functions " ] [ function ] = cov
else :
base_data [ " functions " ] [ function ] [ " execution_count " ] + = cov [ " execution_count " ]
def parseInfo ( path ) :
2020-06-05 22:05:49 +00:00
""" Parse an lcov .info file into fastcov json. """
2020-06-05 00:44:35 +00:00
fastcov_json = {
" sources " : { }
}
with open ( path ) as f :
for line in f :
if line . startswith ( " TN: " ) :
current_test_name = line [ 3 : ] . strip ( )
elif line . startswith ( " SF: " ) :
current_sf = line [ 3 : ] . strip ( )
fastcov_json [ " sources " ] [ current_sf ] = {
current_test_name : {
" functions " : { } ,
" branches " : { } ,
" lines " : { } ,
}
}
current_data = fastcov_json [ " sources " ] [ current_sf ] [ current_test_name ]
elif line . startswith ( " FN: " ) :
line_num , function_name = line [ 3 : ] . strip ( ) . split ( " , " )
current_data [ " functions " ] [ function_name ] = { }
current_data [ " functions " ] [ function_name ] [ " start_line " ] = int ( line_num )
elif line . startswith ( " FNDA: " ) :
count , function_name = line [ 5 : ] . strip ( ) . split ( " , " )
current_data [ " functions " ] [ function_name ] [ " execution_count " ] = int ( count )
elif line . startswith ( " DA: " ) :
line_num , count = line [ 3 : ] . strip ( ) . split ( " , " )
current_data [ " lines " ] [ line_num ] = int ( count )
elif line . startswith ( " BRDA: " ) :
branch_tokens = line [ 5 : ] . strip ( ) . split ( " , " )
line_num , count = branch_tokens [ 0 ] , branch_tokens [ - 1 ]
if line_num not in current_data [ " branches " ] :
current_data [ " branches " ] [ line_num ] = [ ]
current_data [ " branches " ] [ line_num ] . append ( int ( count ) )
return fastcov_json
def convertKeysToInt ( report ) :
for source in report [ " sources " ] . keys ( ) :
for test_name in report [ " sources " ] [ source ] . keys ( ) :
report_data = report [ " sources " ] [ source ] [ test_name ]
report_data [ " lines " ] = { int ( k ) : v for k , v in report_data [ " lines " ] . items ( ) }
report_data [ " branches " ] = { int ( k ) : v for k , v in report_data [ " branches " ] . items ( ) }
def parseAndCombine ( paths ) :
base_report = { }
for path in paths :
if path . endswith ( " .json " ) :
with open ( path ) as f :
report = json . load ( f )
elif path . endswith ( " .info " ) :
report = parseInfo ( path )
else :
2020-06-06 09:48:36 +00:00
logging . error ( " Currently only fastcov .json and lcov .info supported for combine operations, aborting due to %s ... \n " , path )
sys . exit ( EXIT_CODES [ " unsupported_coverage_format " ] )
2020-06-05 00:44:35 +00:00
# In order for sorting to work later when we serialize,
# make sure integer keys are int
convertKeysToInt ( report )
if not base_report :
base_report = report
logging . info ( " Setting {} as base report " . format ( path ) )
else :
combineReports ( base_report , report )
logging . info ( " Adding {} to base report " . format ( path ) )
return base_report
def getCombineCoverage ( args ) :
logging . info ( " Performing combine operation " )
fastcov_json = parseAndCombine ( args . combine )
filterFastcov ( fastcov_json , args )
return fastcov_json
def getGcovCoverage ( args ) :
# Need at least python 3.5 because of use of recursive glob
checkPythonVersion ( sys . version_info [ 0 : 2 ] )
# Need at least gcov 9.0.0 because that's when gcov JSON and stdout streaming was introduced
checkGcovVersion ( getGcovVersion ( args . gcov ) )
# Get list of gcda files to process
coverage_files = findCoverageFiles ( args . directory , args . coverage_files , args . use_gcno )
# If gcda/gcno filtering is enabled, filter them out now
if args . excludepre :
coverage_files = getFilteredCoverageFiles ( coverage_files , args . excludepre )
logging . info ( " Found {} coverage files after filtering " . format ( len ( coverage_files ) ) )
# We "zero" the "counters" by simply deleting all gcda files
if args . zerocounters :
removeFiles ( coverage_files )
logging . info ( " Removed {} .gcda files " . format ( len ( coverage_files ) ) )
2020-06-06 09:48:36 +00:00
sys . exit ( )
2020-06-05 00:44:35 +00:00
# Fire up one gcov per cpu and start processing gcdas
gcov_filter_options = getGcovFilterOptions ( args )
fastcov_json = processGcdas ( args , coverage_files , gcov_filter_options )
# Summarize processing results
logging . info ( " Processed {} .gcov files ( {} total, {} skipped) " . format ( GCOVS_TOTAL - GCOVS_SKIPPED , GCOVS_TOTAL , GCOVS_SKIPPED ) )
logging . debug ( " Final report will contain coverage for the following %d source files: \n %s " , len ( fastcov_json [ " sources " ] ) , " \n " . join ( fastcov_json [ " sources " ] ) )
return fastcov_json
def dumpFile ( fastcov_json , args ) :
if args . lcov :
dumpToLcovInfo ( fastcov_json , args . output )
logging . info ( " Created lcov info file ' {} ' " . format ( args . output ) )
else :
dumpToJson ( fastcov_json , args . output )
logging . info ( " Created fastcov json file ' {} ' " . format ( args . output ) )
def tupleToDotted ( tup ) :
return " . " . join ( map ( str , tup ) )
def parseArgs ( ) :
parser = argparse . ArgumentParser ( description = ' A parallel gcov wrapper for fast coverage report generation ' )
parser . add_argument ( ' -z ' , ' --zerocounters ' , dest = ' zerocounters ' , action = " store_true " , help = ' Recursively delete all gcda files ' )
# Enable Branch Coverage
parser . add_argument ( ' -b ' , ' --branch-coverage ' , dest = ' branchcoverage ' , action = " store_true " , help = ' Include only the most useful branches in the coverage report. ' )
parser . add_argument ( ' -B ' , ' --exceptional-branch-coverage ' , dest = ' xbranchcoverage ' , action = " store_true " , help = ' Include ALL branches in the coverage report (including potentially noisy exceptional branches). ' )
parser . add_argument ( ' -A ' , ' --exclude-br-lines-starting-with ' , dest = ' exclude_branches_sw ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' Exclude branches from lines starting with one of the provided strings (i.e. assert, return, etc.) ' )
parser . add_argument ( ' -a ' , ' --include-br-lines-starting-with ' , dest = ' include_branches_sw ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' Include only branches from lines starting with one of the provided strings (i.e. if, else, while, etc.) ' )
parser . add_argument ( ' -X ' , ' --skip-exclusion-markers ' , dest = ' skip_exclusion_markers ' , action = " store_true " , help = ' Skip reading source files to search for lcov exclusion markers (such as " LCOV_EXCL_LINE " ) ' )
parser . add_argument ( ' -x ' , ' --scan-exclusion-markers ' , dest = ' scan_exclusion_markers ' , action = " store_true " , help = ' (Combine operations) Force reading source files to search for lcov exclusion markers (such as " LCOV_EXCL_LINE " ) ' )
# Capture untested file coverage as well via gcno
parser . add_argument ( ' -n ' , ' --process-gcno ' , dest = ' use_gcno ' , action = " store_true " , help = ' Process both gcno and gcda coverage files. This option is useful for capturing untested files in the coverage report. ' )
# Filtering Options
parser . add_argument ( ' -s ' , ' --source-files ' , dest = ' sources ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' Filter: Specify exactly which source files should be included in the final report. Paths must be either absolute or relative to current directory. ' )
parser . add_argument ( ' -e ' , ' --exclude ' , dest = ' excludepost ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' Filter: Exclude source files from final report if they contain one of the provided substrings (i.e. /usr/include test/, etc.) ' )
parser . add_argument ( ' -i ' , ' --include ' , dest = ' includepost ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' Filter: Only include source files in final report that contain one of the provided substrings (i.e. src/ etc.) ' )
parser . add_argument ( ' -f ' , ' --gcda-files ' , dest = ' coverage_files ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' Filter: Specify exactly which gcda or gcno files should be processed. Note that specifying gcno causes both gcno and gcda to be processed. ' )
parser . add_argument ( ' -E ' , ' --exclude-gcda ' , dest = ' excludepre ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' Filter: Exclude gcda or gcno files from being processed via simple find matching (not regex) ' )
parser . add_argument ( ' -g ' , ' --gcov ' , dest = ' gcov ' , default = ' gcov ' , help = ' Which gcov binary to use ' )
parser . add_argument ( ' -d ' , ' --search-directory ' , dest = ' directory ' , default = " . " , help = ' Base directory to recursively search for gcda files (default: .) ' )
parser . add_argument ( ' -c ' , ' --compiler-directory ' , dest = ' cdirectory ' , default = " . " , help = ' Base directory compiler was invoked from (default: .) \
This needs to be set if invoking fastcov from somewhere other than the base compiler directory . ' )
parser . add_argument ( ' -j ' , ' --jobs ' , dest = ' jobs ' , type = int , default = multiprocessing . cpu_count ( ) , help = ' Number of parallel gcov to spawn (default: {} ). ' . format ( multiprocessing . cpu_count ( ) ) )
parser . add_argument ( ' -m ' , ' --minimum-chunk-size ' , dest = ' minimum_chunk ' , type = int , default = 5 , help = ' Minimum number of files a thread should process (default: 5). \
If you have only 4 gcda files but they are monstrously huge , you could change this value to a 1 so that each thread will only process 1 gcda . Otherwise fastcov will spawn only 1 thread to process all of them . ' )
parser . add_argument ( ' -F ' , ' --fallback-encodings ' , dest = ' fallback_encodings ' , nargs = " + " , metavar = ' ' , default = [ ] , help = ' List of encodings to try if opening a source file with the default fails (i.e. latin1, etc.). This option is not usually needed. ' )
parser . add_argument ( ' -l ' , ' --lcov ' , dest = ' lcov ' , action = " store_true " , help = ' Output in lcov info format instead of fastcov json ' )
parser . add_argument ( ' -o ' , ' --output ' , dest = ' output ' , default = " coverage.json " , help = ' Name of output file (default: coverage.json) ' )
parser . add_argument ( ' -q ' , ' --quiet ' , dest = ' quiet ' , action = " store_true " , help = ' Suppress output to stdout ' )
parser . add_argument ( ' -t ' , ' --test-name ' , dest = ' test_name ' , default = " " , help = ' Specify a test name for the coverage. Equivalent to lcov \' s `-t`. ' )
parser . add_argument ( ' -C ' , ' --add-tracefile ' , dest = ' combine ' , nargs = " + " , help = ' Combine multiple coverage files into one. If this flag is specified, fastcov will do a combine operation instead invoking gcov. Equivalent to lcov \' s `-a`. ' )
parser . add_argument ( ' -V ' , ' --verbose ' , dest = " verbose " , action = " store_true " , help = " Print more detailed information about what fastcov is doing " )
parser . add_argument ( ' -v ' , ' --version ' , action = " version " , version = ' %(prog)s {version} ' . format ( version = __version__ ) , help = " Show program ' s version number and exit " )
args = parser . parse_args ( )
return args
def checkPythonVersion ( version ) :
2020-06-05 22:05:49 +00:00
""" Exit if the provided python version is less than the supported version. """
2020-06-05 00:44:35 +00:00
if version < MINIMUM_PYTHON :
sys . stderr . write ( " Minimum python version {} required, found {} \n " . format ( tupleToDotted ( MINIMUM_PYTHON ) , tupleToDotted ( version ) ) )
2020-06-06 09:48:36 +00:00
sys . exit ( EXIT_CODES [ " python_version " ] )
2020-06-05 00:44:35 +00:00
def checkGcovVersion ( version ) :
2020-06-05 22:05:49 +00:00
""" Exit if the provided gcov version is less than the supported version. """
2020-06-05 00:44:35 +00:00
if version < MINIMUM_GCOV :
sys . stderr . write ( " Minimum gcov version {} required, found {} \n " . format ( tupleToDotted ( MINIMUM_GCOV ) , tupleToDotted ( version ) ) )
2020-06-06 09:48:36 +00:00
sys . exit ( EXIT_CODES [ " gcov_version " ] )
2020-06-05 00:44:35 +00:00
def setupLogging ( quiet , verbose ) :
handler = logging . StreamHandler ( )
handler . setFormatter ( FastcovFormatter ( " [ %(levelname)s ]: %(message)s " ) )
root = logging . getLogger ( )
root . setLevel ( logging . INFO )
root . addHandler ( handler )
if not quiet :
logging . disable ( level = logging . NOTSET ) # Re-enable logging
if verbose :
root . setLevel ( logging . DEBUG )
def main ( ) :
args = parseArgs ( )
# Setup logging
setupLogging ( args . quiet , args . verbose )
# Get report from appropriate source
if args . combine :
fastcov_json = getCombineCoverage ( args )
skip_exclusion_markers = not args . scan_exclusion_markers
else :
fastcov_json = getGcovCoverage ( args )
skip_exclusion_markers = args . skip_exclusion_markers
# Scan for exclusion markers
if not skip_exclusion_markers :
scanExclusionMarkers ( fastcov_json , args . jobs , args . exclude_branches_sw , args . include_branches_sw , args . minimum_chunk , args . fallback_encodings )
logging . info ( " Scanned {} source files for exclusion markers " . format ( len ( fastcov_json [ " sources " ] ) ) )
# Dump to desired file format
dumpFile ( fastcov_json , args )
2020-06-06 09:48:36 +00:00
# If there was an error along the way, but we still completed the pipeline...
if EXIT_CODE :
sys . exit ( EXIT_CODE )
2020-06-05 00:44:35 +00:00
# Set package version... it's way down here so that we can call tupleToDotted
__version__ = tupleToDotted ( FASTCOV_VERSION )
if __name__ == ' __main__ ' :
main ( )