summaryrefslogtreecommitdiff
path: root/rtemsspec/testrunner.py
diff options
context:
space:
mode:
Diffstat (limited to 'rtemsspec/testrunner.py')
-rw-r--r--rtemsspec/testrunner.py179
1 files changed, 179 insertions, 0 deletions
diff --git a/rtemsspec/testrunner.py b/rtemsspec/testrunner.py
new file mode 100644
index 00000000..94bad5a4
--- /dev/null
+++ b/rtemsspec/testrunner.py
@@ -0,0 +1,179 @@
+# SPDX-License-Identifier: BSD-2-Clause
+""" This module provides a build item to run tests. """
+
+# Copyright (C) 2022, 2023 embedded brains GmbH & Co. KG
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import logging
+import multiprocessing
+import os
+import queue
+import subprocess
+from subprocess import run as subprocess_run
+import tarfile
+import time
+import threading
+from typing import Any, Dict, List, NamedTuple, Union
+
+from rtemsspec.items import Item, ItemGetValueContext
+from rtemsspec.packagebuild import BuildItem, PackageBuildDirector
+from rtemsspec.testoutputparser import augment_report
+
+Report = Dict[str, Union[str, List[str]]]
+
+
+class Executable(NamedTuple):
+ """ This data class represents a test executable. """
+ path: str
+ digest: str
+ timeout: int
+
+
+class TestRunner(BuildItem):
+ """ Runs the tests. """
+
+ def __init__(self, director: PackageBuildDirector, item: Item):
+ super().__init__(director, item)
+ self._executable = "/dev/null"
+ self._executables: List[Executable] = []
+ self.mapper.add_get_value(f"{self.item.type}:/test-executable",
+ self._get_test_executable)
+ self.mapper.add_get_value(f"{self.item.type}:/test-executables-grmon",
+ self._get_test_executables_grmon)
+
+ def _get_test_executable(self, _ctx: ItemGetValueContext) -> Any:
+ return self._executable
+
+ def _get_test_executables_grmon(self, _ctx: ItemGetValueContext) -> Any:
+ return " \\\n".join(
+ os.path.basename(executable.path)
+ for executable in self._executables)
+
+ def run_tests(self, executables: List[Executable]) -> List[Report]:
+ """
+ Runs the test executables and produces a log file of the test run.
+ """
+ self._executables = executables
+ return []
+
+
+class DummyTestRunner(TestRunner):
+ """ Cannot run the tests. """
+
+ def run_tests(self, _executables: List[Executable]) -> List[Report]:
+ """ Raises an exception. """
+ raise IOError("this test runner cannot run tests")
+
+
+class GRMONManualTestRunner(TestRunner):
+ """ Provides scripts to run the tests using GRMON. """
+
+ def run_tests(self, executables: List[Executable]) -> List[Report]:
+ super().run_tests(executables)
+ base = self["script-base-path"]
+ dir_name = os.path.basename(base)
+ grmon_name = f"{base}.grmon"
+ shell_name = f"{base}.sh"
+ tar_name = f"{base}.tar.xz"
+ os.makedirs(os.path.dirname(base), exist_ok=True)
+ with tarfile.open(tar_name, "w:xz") as tar_file:
+ with open(grmon_name, "w", encoding="utf-8") as grmon_file:
+ grmon_file.write(self["grmon-script"])
+ tar_file.add(grmon_name, os.path.join(dir_name, "run.grmon"))
+ with open(shell_name, "w", encoding="utf-8") as shell_file:
+ shell_file.write(self["shell-script"])
+ tar_file.add(shell_name, os.path.join(dir_name, "run.sh"))
+ for executable in executables:
+ tar_file.add(
+ executable.path,
+ os.path.join(dir_name, os.path.basename(executable.path)))
+ raise IOError(f"Run the tests provided by {tar_name}")
+
+
+def _now_utc() -> str:
+ return datetime.datetime.utcnow().isoformat()
+
+
+class _Job:
+ # pylint: disable=too-few-public-methods
+ def __init__(self, executable: Executable, command: List[str]):
+ self.report: Report = {
+ "executable": executable.path,
+ "executable-sha512": executable.digest,
+ "command-line": command
+ }
+ self.timeout = executable.timeout
+
+
+def _worker(work_queue: queue.Queue, item: BuildItem):
+ with open(os.devnull, "rb") as devnull:
+ while True:
+ try:
+ job = work_queue.get_nowait()
+ except queue.Empty:
+ return
+ logging.info("%s: run: %s", item.uid, job.report["command-line"])
+ job.report["start-time"] = _now_utc()
+ begin = time.monotonic()
+ try:
+ process = subprocess_run(job.report["command-line"],
+ check=False,
+ stdin=devnull,
+ stdout=subprocess.PIPE,
+ timeout=job.timeout)
+ stdout = process.stdout.decode("utf-8")
+ except subprocess.TimeoutExpired as timeout:
+ if timeout.stdout is not None:
+ stdout = timeout.stdout.decode("utf-8")
+ else:
+ stdout = ""
+ except Exception: # pylint: disable=broad-exception-caught
+ stdout = ""
+ output = stdout.rstrip().replace("\r\n", "\n").split("\n")
+ augment_report(job.report, output)
+ job.report["output"] = output
+ job.report["duration"] = time.monotonic() - begin
+ logging.debug("%s: done: %s", item.uid, job.report["executable"])
+ work_queue.task_done()
+
+
+class SubprocessTestRunner(TestRunner):
+ """ Runs the tests in subprocesses. """
+
+ def run_tests(self, executables: List[Executable]) -> List[Report]:
+ super().run_tests(executables)
+ work_queue: queue.Queue[_Job] = \
+ queue.Queue() # pylint: disable=unsubscriptable-object
+ jobs: List[_Job] = []
+ for executable in executables:
+ self._executable = executable.path
+ job = _Job(executable, self["command"])
+ jobs.append(job)
+ work_queue.put(job)
+ for _ in range(min(multiprocessing.cpu_count(), len(executables))):
+ threading.Thread(target=_worker,
+ args=(work_queue, self),
+ daemon=True).start()
+ work_queue.join()
+ return [job.report for job in jobs]