summaryrefslogblamecommitdiffstats
path: root/rtemsspec/testoutputparser.py
blob: c18fa9c849254fa1e06ee4939eaed2a0c17e601c (plain) (tree)




















































                                                                             
                                  
















                                                         

                                                 
                                                               
 



                                                            








































                                                                  









                                                       



















































































                                                               
                                               






































































































































                                                                               
                              





                                               









                                                     


                                                             

                                     



























                                                                             
                    



















































































































                                                                               

                                                                 


                                                                  



                                                         
                               


                                                            

                                                       



                                                       
                             
                           
                                                                          






                                                                      







                                                        
                                                            
                                  






                                                          
                                








                                                                    
                                       






                                                               
                                     










                                                                         

                                        














                                                                          
# SPDX-License-Identifier: BSD-2-Clause
""" This module provides a test output parser. """

# Copyright (C) 2022 embedded brains GmbH (http://www.embedded-brains.de)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import base64
import hashlib
import re
from typing import Any, Dict, Iterable

_TEST_BEGIN = re.compile(r"\*\*\* BEGIN OF TEST ([^*]+) \*\*\*")
_TEST_VERSION = re.compile(r"\*\*\* TEST VERSION: (.+)")
_TEST_STATE = re.compile(r"\*\*\* TEST STATE: (.+)")
_TEST_BUILD = re.compile(r"\*\*\* TEST BUILD: ?(.*)")
_TEST_TOOLS = re.compile(r"\*\*\* TEST TOOLS: (.+)")
_TEST_END = re.compile(r"\*\*\* END OF TEST ([^*]+) \*\*\*")

_TS_SUITE_BEGIN = re.compile(r"A:(.+)")
_TS_SUITE_END = re.compile(r"Z:([^:]+):C:([^:]+):N:([^:]+):F:([^:]+):D:(.+)")
_TS_CASE_BEGIN = re.compile(r"B:(.+)")
_TS_CASE_END = re.compile(r"E:([^:]+):N:([^:]+):F:([^:]+):D:(.+)")
_TS_PLATFORM = re.compile(r"S:Platform:(.+)")
_TS_COMPILER = re.compile(r"S:Compiler:(.+)")
_TS_VERSION = re.compile(r"S:Version:(.+)")
_TS_BSP = re.compile(r"S:BSP:(.+)")
_TS_BUILD_LABEL = re.compile(r"S:BuildLabel:(.+)")
_TS_TARGET_HASH = re.compile(r"S:TargetHash:SHA256:(.*)")
_TS_RTEMS_DEBUG = re.compile(r"S:RTEMS_DEBUG:([01])$")
_TS_RTEMS_MULTIPROCESSING = re.compile(r"S:RTEMS_MULTIPROCESSING:([01])$")
_TS_RTEMS_POSIX_API = re.compile(r"S:RTEMS_POSIX_API:([01])$")
_TS_RTEMS_PROFILING = re.compile(r"S:RTEMS_PROFILING:([01])$")
_TS_RTEMS_SMP = re.compile(r"S:RTEMS_SMP:([01])$")
_TS_REMARK = re.compile(r"R:(.+)")
_TS_REPORT_HASH = re.compile(r"Y:ReportHash:SHA256:(.+)")

_M_BEGIN = re.compile(r"M:B:(.+)")
_M_END = re.compile(r"M:E:([^:]+):D:(.+)")
_M_V = re.compile(r"M:V:(.+)")
_M_N = re.compile(r"M:N:(.+)")
_M_S = re.compile(r"M:S:([^:]+):(.+)")
_M_MI = re.compile(r"M:MI:(.+)")
_M_P1 = re.compile(r"M:P1:(.+)")
_M_Q1 = re.compile(r"M:Q1:(.+)")
_M_Q2 = re.compile(r"M:Q2:(.+)")
_M_Q3 = re.compile(r"M:Q3:(.+)")
_M_P99 = re.compile(r"M:P99:(.+)")
_M_MX = re.compile(r"M:MX:(.+)")
_M_MAD = re.compile(r"M:MAD:(.+)")
_M_D = re.compile(r"M:D:(.+)")

_GCOV_BEGIN = "*** BEGIN OF GCOV INFO BASE64 ***"
_GCOV_END = "*** END OF GCOV INFO BASE64 ***"
_GCOV_HASH = re.compile(r"\*\*\* GCOV INFO SHA256 (.*) \*\*\*")

_RECORDS_BEGIN = "*** BEGIN OF RECORDS BASE64 ***"
_RECORDS_END = "*** END OF RECORDS BASE64 ***"
_RECORDS_ZLIB_BEGIN = "*** BEGIN OF RECORDS BASE64 ZLIB ***"
_RECORDS_ZLIB_END = "*** END OF RECORDS BASE64 ZLIB ***"


def _are_samples_valid(measurement) -> bool:
    if len(measurement["samples"]) != measurement["sample-count"]:
        return False
    if not measurement["samples"]:
        return True
    if measurement["min"] != measurement["samples"][0]:
        return False
    return measurement["max"] == measurement["samples"][-1]


class TestOutputParser:
    """ Provides a line by line parser of test output. """

    # pylint: disable=too-few-public-methods
    def __init__(self, data) -> None:
        self.data = data
        self.consume = self._test_begin
        self.hash_line = self._hash_none
        assert "info" not in data
        self.data["info"] = {}
        self.data["data-ranges"] = []
        assert "test-suite" not in data
        self.level = 0
        self._hash_state = hashlib.sha256()
        self._measurement: Dict[str, Any] = {}
        self._test_case: Dict[str, Any] = {}

    def _error(self, index: int) -> bool:
        assert "line-parser-error" not in self.data
        self.data["line-parser-error"] = index
        self.consume = self._extra
        return False

    def _hash_none(self, line: str) -> None:
        pass

    def _hash_sha256(self, line: str) -> None:
        self._hash_state.update(f"{line}\n".encode("ascii"))

    def _hash_sha256_skip_one(self, line: str) -> None:
        # pylint: disable=unused-argument
        self.hash_line = self._hash_sha256

    def _hash_finalize(self) -> str:
        self.hash_line = self._hash_none
        digest = base64.urlsafe_b64encode(
            self._hash_state.digest()).decode("ascii")
        return digest

    def _test_begin(self, index: int, line: str) -> bool:
        mobj = _TEST_BEGIN.match(line)
        if mobj:
            self.level += 1
            self.data["info"]["line-begin-of-test"] = index
            self.data["info"]["name"] = mobj.group(1)
            self.consume = self._test_version
            return True
        return self._extra(index, line)

    def _test_version(self, index: int, line: str) -> bool:
        mobj = _TEST_VERSION.match(line)
        if mobj:
            self.data["info"]["version"] = mobj.group(1)
            self.data["info"]["line-version"] = index
            self.consume = self._test_state
            return True
        self.consume = self._test_body
        return self._test_body(index, line)

    def _test_state(self, index: int, line: str) -> bool:
        mobj = _TEST_STATE.match(line)
        if mobj:
            self.data["info"]["state"] = mobj.group(1)
            self.data["info"]["line-state"] = index
            self.consume = self._test_build
            return True
        return self._error(index)

    def _test_build(self, index: int, line: str) -> bool:
        mobj = _TEST_BUILD.match(line)
        if mobj:
            build = mobj.group(1)
            if build:
                self.data["info"]["build"] = build.split(", ")
            else:
                self.data["info"]["build"] = []
            self.data["info"]["line-build"] = index
            self.consume = self._test_tools
            return True
        return self._error(index)

    def _test_tools(self, index: int, line: str) -> bool:
        mobj = _TEST_TOOLS.match(line)
        if mobj:
            self.data["info"]["tools"] = mobj.group(1)
            self.data["info"]["line-tools"] = index
            self.consume = self._test_body
            return True
        return self._error(index)

    def _test_body(self, index: int, line: str) -> bool:
        if self._test_suite_begin(index, line):
            return True
        mobj = _TEST_END.match(line)
        if mobj:
            self.level -= 1
            if self.data["info"]["name"] == mobj.group(1):
                self.data["info"]["line-end-of-test"] = index
                self.consume = self._extra
                return True
            return self._error(index)
        return self._extra(index, line)

    def _test_suite_begin(self, index: int, line: str) -> bool:
        mobj = _TS_SUITE_BEGIN.match(line)
        if mobj:
            self.level += 1
            self.data["test-suite"] = {
                "duration": "?",
                "failed-steps-count": "?",
                "line-begin": index,
                "line-duration": "?",
                "line-end": "?",
                "line-failed-steps-count": "?",
                "line-report-hash": "?",
                "line-step-count": "?",
                "name": mobj.group(1),
                "report-hash": "?",
                "report-hash-calculated": "?",
                "step-count": "?",
                "test-cases": []
            }
            self.consume = self._test_suite_platform
            self._hash_state = hashlib.sha256()
            self.hash_line = self._hash_sha256
            return True
        return self._extra(index, line)

    def _test_suite_platform(self, index: int, line: str) -> bool:
        mobj = _TS_PLATFORM.match(line)
        if mobj:
            self.data["test-suite"]["platform"] = mobj.group(1)
            self.data["test-suite"]["line-platform"] = index
            self.consume = self._test_suite_compiler
            return True
        return self._error(index)

    def _test_suite_compiler(self, index: int, line: str) -> bool:
        mobj = _TS_COMPILER.match(line)
        if mobj:
            self.data["test-suite"]["compiler"] = mobj.group(1)
            self.data["test-suite"]["line-compiler"] = index
            self.consume = self._test_suite_version
            return True
        return self._error(index)

    def _test_suite_version(self, index: int, line: str) -> bool:
        mobj = _TS_VERSION.match(line)
        if mobj:
            self.data["test-suite"]["version"] = mobj.group(1)
            self.data["test-suite"]["line-version"] = index
            self.consume = self._test_suite_bsp
            return True
        return self._error(index)

    def _test_suite_bsp(self, index: int, line: str) -> bool:
        mobj = _TS_BSP.match(line)
        if mobj:
            self.data["test-suite"]["bsp"] = mobj.group(1)
            self.data["test-suite"]["line-bsp"] = index
            self.consume = self._test_suite_build_label
            return True
        return self._error(index)

    def _test_suite_build_label(self, index: int, line: str) -> bool:
        mobj = _TS_BUILD_LABEL.match(line)
        if mobj:
            self.data["test-suite"]["build-label"] = mobj.group(1)
            self.data["test-suite"]["line-build-label"] = index
            self.consume = self._test_suite_target_hash
            return True
        return self._error(index)

    def _test_suite_target_hash(self, index: int, line: str) -> bool:
        mobj = _TS_TARGET_HASH.match(line)
        if mobj:
            self.data["test-suite"]["target-hash"] = mobj.group(1)
            self.data["test-suite"]["line-target-hash"] = index
            self.consume = self._test_suite_rtems_debug
            return True
        return self._error(index)

    def _test_suite_rtems_debug(self, index: int, line: str) -> bool:
        mobj = _TS_RTEMS_DEBUG.match(line)
        if mobj:
            self.data["test-suite"]["rtems-debug"] = bool(int(mobj.group(1)))
            self.data["test-suite"]["line-rtems-debug"] = index
            self.consume = self._test_suite_rtems_multiprocessing
            return True
        return self._error(index)

    def _test_suite_rtems_multiprocessing(self, index: int, line: str) -> bool:
        mobj = _TS_RTEMS_MULTIPROCESSING.match(line)
        if mobj:
            self.data["test-suite"]["rtems-multiprocessing"] = bool(
                int(mobj.group(1)))
            self.data["test-suite"]["line-rtems-multiprocessing"] = index
            self.consume = self._test_suite_rtems_posix_api
            return True
        return self._error(index)

    def _test_suite_rtems_posix_api(self, index: int, line: str) -> bool:
        mobj = _TS_RTEMS_POSIX_API.match(line)
        if mobj:
            self.data["test-suite"]["rtems-posix-api"] = bool(
                int(mobj.group(1)))
            self.data["test-suite"]["line-rtems-posix-api"] = index
            self.consume = self._test_suite_rtems_profiling
            return True
        return self._error(index)

    def _test_suite_rtems_profiling(self, index: int, line: str) -> bool:
        mobj = _TS_RTEMS_PROFILING.match(line)
        if mobj:
            self.data["test-suite"]["rtems-profiling"] = bool(
                int(mobj.group(1)))
            self.data["test-suite"]["line-rtems-profiling"] = index
            self.consume = self._test_suite_rtems_smp
            return True
        return self._error(index)

    def _test_suite_rtems_smp(self, index: int, line: str) -> bool:
        mobj = _TS_RTEMS_SMP.match(line)
        if mobj:
            self.data["test-suite"]["rtems-smp"] = bool(int(mobj.group(1)))
            self.data["test-suite"]["line-rtems-smp"] = index
            self.consume = self._test_suite_body
            return True
        return self._error(index)

    def _test_suite_body(self, index: int, line: str) -> bool:
        if self._test_case_begin(index, line):
            return True
        mobj = _TS_SUITE_END.match(line)
        if mobj:
            self.level -= 1
            data = self.data["test-suite"]
            count = int(mobj.group(2))
            if data["name"] == mobj.group(1) and len(
                    data["test-cases"]) == count:
                data["line-end"] = index
                data["line-step-count"] = index
                data["line-failed-steps-count"] = index
                data["line-duration"] = index
                data["step-count"] = int(mobj.group(3))
                data["failed-steps-count"] = int(mobj.group(4))
                data["duration"] = float(mobj.group(5))
                self.consume = self._report_hash
                return True
            return self._error(index)
        return self._extra(index, line)

    def _test_case_begin(self, index: int, line: str) -> bool:
        mobj = _TS_CASE_BEGIN.match(line)
        if mobj:
            self.level += 1
            self._test_case = {
                "line-begin": index,
                "name": mobj.group(1),
                "remarks": [],
                "runtime-measurements": []
            }
            self.consume = self._test_case_body
            return True
        return self._extra(index, line)

    def _remark(self, index: int, line: str) -> bool:
        mobj = _TS_REMARK.match(line)
        if mobj:
            self._test_case["remarks"].append({
                "line": index,
                "remark": mobj.group(1)
            })
            return True
        return False

    def _test_case_body(self, index: int, line: str) -> bool:
        if self._measurement_begin(index, line):
            return True
        if self._remark(index, line):
            return True
        mobj = _TS_CASE_END.match(line)
        if mobj:
            self.level -= 1
            if self._test_case["name"] == mobj.group(1):
                self._test_case["line-end"] = index
                self._test_case["line-step-count"] = index
                self._test_case["line-failed-steps-count"] = index
                self._test_case["line-duration"] = index
                self._test_case["step-count"] = int(mobj.group(2))
                self._test_case["failed-steps-count"] = int(mobj.group(3))
                self._test_case["duration"] = float(mobj.group(4))
                self.data["test-suite"]["test-cases"].append(self._test_case)
                self.consume = self._test_suite_body
                return True
            return self._error(index)
        return self._extra(index, line)

    def _measurement_begin(self, index: int, line: str) -> bool:
        mobj = _M_BEGIN.match(line)
        if mobj:
            self.level += 1
            self._measurement = {
                "line-begin": index,
                "name": mobj.group(1),
                "samples": []
            }
            self.consume = self._measurement_variant
            return True
        return False

    def _measurement_variant(self, index: int, line: str) -> bool:
        mobj = _M_V.match(line)
        if mobj:
            self._measurement["variant"] = mobj.group(1)
            self.consume = self._measurement_count
            return True
        return self._error(index)

    def _measurement_count(self, index: int, line: str) -> bool:
        mobj = _M_N.match(line)
        if mobj:
            self._measurement["sample-count"] = int(mobj.group(1))
            self.consume = self._measurement_samples
            return True
        return self._error(index)

    def _measurement_samples(self, index: int, line: str) -> bool:
        if self._measurement_min(index, line):
            return True
        mobj = _M_S.match(line)
        if mobj:
            self._measurement["samples"].extend(  # type: ignore
                int(mobj.group(1)) * [float(mobj.group(2))])
            return True
        return self._error(index)

    def _measurement_min(self, index: int, line: str) -> bool:
        mobj = _M_MI.match(line)
        if mobj:
            self._measurement["min"] = float(mobj.group(1))
            self.consume = self._measurement_p1
            return True
        return self._extra(index, line)

    def _measurement_p1(self, index: int, line: str) -> bool:
        mobj = _M_P1.match(line)
        if mobj:
            self._measurement["p1"] = float(mobj.group(1))
            self.consume = self._measurement_q1
            return True
        return self._error(index)

    def _measurement_q1(self, index: int, line: str) -> bool:
        mobj = _M_Q1.match(line)
        if mobj:
            self._measurement["q1"] = float(mobj.group(1))
            self.consume = self._measurement_q2
            return True
        return self._error(index)

    def _measurement_q2(self, index: int, line: str) -> bool:
        mobj = _M_Q2.match(line)
        if mobj:
            self._measurement["q2"] = float(mobj.group(1))
            self.consume = self._measurement_q3
            return True
        return self._error(index)

    def _measurement_q3(self, index: int, line: str) -> bool:
        mobj = _M_Q3.match(line)
        if mobj:
            self._measurement["q3"] = float(mobj.group(1))
            self.consume = self._measurement_p99
            return True
        return self._error(index)

    def _measurement_p99(self, index: int, line: str) -> bool:
        mobj = _M_P99.match(line)
        if mobj:
            self._measurement["p99"] = float(mobj.group(1))
            self.consume = self._measurement_max
            return True
        return self._error(index)

    def _measurement_max(self, index: int, line: str) -> bool:
        mobj = _M_MX.match(line)
        if mobj:
            self._measurement["max"] = float(mobj.group(1))
            self.consume = self._measurement_mad
            return True
        return self._error(index)

    def _measurement_mad(self, index: int, line: str) -> bool:
        mobj = _M_MAD.match(line)
        if mobj:
            self._measurement["mad"] = float(mobj.group(1))
            self.consume = self._measurement_duration
            return True
        return self._error(index)

    def _measurement_duration(self, index: int, line: str) -> bool:
        mobj = _M_D.match(line)
        if mobj:
            self._measurement["duration-sum"] = float(mobj.group(1))
            self.consume = self._measurement_end
            return True
        return self._error(index)

    def _measurement_end(self, index: int, line: str) -> bool:
        mobj = _M_END.match(line)
        if mobj:
            self.level -= 1
            if self._measurement["name"] == mobj.group(
                    1) and _are_samples_valid(self._measurement):
                self._measurement["line-end"] = index
                self._measurement["duration-total"] = float(mobj.group(2))
                self._test_case["runtime-measurements"].append(  # type: ignore
                    self._measurement)
                self.consume = self._test_case_body
                return True
        return self._error(index)

    def _report_hash(self, index: int, line: str) -> bool:
        mobj = _TS_REPORT_HASH.match(line)
        if mobj:
            self.data["test-suite"][
                "report-hash-calculated"] = self._hash_finalize()
            self.data["test-suite"]["report-hash"] = mobj.group(1)
            self.data["test-suite"]["line-report-hash"] = index
            self.consume = self._test_body
            return True
        return self._extra(index, line)

    def _gcov_begin(self, index: int, line: str) -> bool:
        if line in _GCOV_BEGIN:
            self.level += 1
            self.data["line-gcov-info-base64-begin"] = index
            self.consume = self._gcov_end
            self._hash_state = hashlib.sha256()
            self.hash_line = self._hash_sha256_skip_one
            return True
        return False

    def _gcov_end(self, index: int, line: str) -> bool:
        if line in _GCOV_END:
            self.level -= 1
            self.data["gcov-info-hash-calculated"] = self._hash_finalize()
            self.data["line-gcov-info-base64-end"] = index
            self.data["data-ranges"].append(
                (self.data["line-gcov-info-base64-begin"] + 1, index))
            self.consume = self._extra
            return True
        return False

    def _gcov_hash(self, index: int, line: str) -> bool:
        mobj = _GCOV_HASH.match(line)
        if mobj:
            self.data["gcov-info-hash"] = mobj.group(1)
            self.data["line-gcov-info-hash"] = index
            return True
        return False

    def _records_begin(self, index: int, line: str) -> bool:
        if line in _RECORDS_BEGIN:
            self.level += 1
            self.data["line-records-base64-begin"] = index
            self.consume = self._records_end
            return True
        return False

    def _records_end(self, index: int, line: str) -> bool:
        if line in _RECORDS_END:
            self.level -= 1
            self.data["line-records-base64-end"] = index
            self.data["data-ranges"].append(
                (self.data["line-records-base64-begin"] + 1, index))
            self.consume = self._extra
            return True
        return False

    def _records_zlib_begin(self, index: int, line: str) -> bool:
        if line in _RECORDS_ZLIB_BEGIN:
            self.level += 1
            self.data["line-records-base64-zlib-begin"] = index
            self.consume = self._records_zlib_end
            return True
        return False

    def _records_zlib_end(self, index: int, line: str) -> bool:
        if line in _RECORDS_ZLIB_END:
            self.level -= 1
            self.data["line-records-base64-zlib-end"] = index
            self.data["data-ranges"].append(
                (self.data["line-records-base64-zlib-begin"] + 1, index))
            self.consume = self._extra
            return True
        return False

    def _extra(self, index: int, line: str) -> bool:
        if self._gcov_begin(index, line):
            return True
        if self._gcov_hash(index, line):
            return True
        if self._records_begin(index, line):
            return True
        if self._records_zlib_begin(index, line):
            return True
        return False


def augment_report(report: Dict[str, Any], output: Iterable[str]) -> None:
    """ Augments the report with the results of the parsed output. """
    test_parser = TestOutputParser(report)
    for index, line in enumerate(output):
        if not line:
            continue
        test_parser.consume(index, line)
        test_parser.hash_line(line)