summaryrefslogtreecommitdiff
path: root/qdp_workspace.py
diff options
context:
space:
mode:
Diffstat (limited to 'qdp_workspace.py')
-rwxr-xr-xqdp_workspace.py526
1 files changed, 526 insertions, 0 deletions
diff --git a/qdp_workspace.py b/qdp_workspace.py
new file mode 100755
index 00000000..0f38429a
--- /dev/null
+++ b/qdp_workspace.py
@@ -0,0 +1,526 @@
+#!/usr/bin/env python
+# SPDX-License-Identifier: BSD-2-Clause
+""" Creates a QDP workspace directory according to the configuration file. """
+
+# Copyright (C) 2020, 2023 embedded brains GmbH & Co. KG
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import copy
+import logging
+import os
+import subprocess
+import sys
+from typing import Any, Callable, Dict, List, Optional, Set
+import uuid
+
+from rtemsspec.items import Item, ItemCache, JSONItemCache, is_enabled
+from rtemsspec.directorystate import DirectoryState
+from rtemsspec.packagebuild import BuildItem, BuildItemFactory, \
+ PackageBuildDirector
+from rtemsspec.packagebuildfactory import create_build_item_factory
+from rtemsspec.specverify import verify
+from rtemsspec.util import create_build_argument_parser, hash_file, \
+ init_logging, load_config, run_command
+
+
+class _ConfigItem(BuildItem):
+
+ def __init__(self, director: PackageBuildDirector, item: Item,
+ factory: BuildItemFactory):
+ super().__init__(director, item)
+ workspace_cache_config: Dict[str, Any] = {
+ "enabled": [],
+ "initialize-links": False,
+ "paths": [],
+ "resolve-proxies": False,
+ "spec-type-root-uid": None
+ }
+ self.workspace_cache = JSONItemCache(workspace_cache_config)
+ self.workspace_director = PackageBuildDirector(self.workspace_cache,
+ factory)
+
+ def is_enabled(self, enabled_by: Any) -> bool:
+ """
+ Returns true, if the enabled by expression evaluates to true for the
+ enabled set of the configuration item, otherwise returns false.
+ """
+ try:
+ enabled_set = self.enabled_set
+ except KeyError:
+ enabled_set = []
+ return is_enabled(self.substitute(enabled_set), enabled_by)
+
+ def set_file(self, item: Item) -> None:
+ """ Sets the file of the item. """
+ file = os.path.join(self["spec-directory"], f"{item.uid[1:]}.json")
+ os.makedirs(os.path.dirname(file), exist_ok=True)
+ item.file = file
+
+ def try_save(self, build_item: BuildItem, _reason: str) -> None:
+ """ Tries to set the file of the item and save it. """
+ if "spec-directory" in self:
+ self.set_file(build_item.item)
+ build_item.item.save()
+ self.item.cache[build_item.uid].data.clear()
+ self.item.cache[build_item.uid].data.update(build_item.item.data)
+ else:
+ logging.info("%s: cannot save item", build_item.uid)
+
+
+def _create_item(
+ config: _ConfigItem,
+ action: Dict[str, Any],
+ data: Dict[str, Any],
+ type_name: str,
+ prepare: Optional[Callable[[_ConfigItem, Dict[str, Any]], None]] = None
+) -> Optional[BuildItem]:
+ uid = action["uid"]
+ item = config.item.cache.create_item(uid, data)
+ item["_type"] = type_name
+ if prepare is not None:
+ prepare(config, action)
+ workspace_item = config.workspace_cache.create_item(
+ uid, copy.deepcopy(data))
+ workspace_item["_type"] = type_name
+ build_item = config.workspace_director[uid]
+ return build_item
+
+
+def _make_root_data(config: _ConfigItem, type_name: str) -> Any:
+ return {
+ "SPDX-License-Identifier": config.variant["SPDX-License-Identifier"],
+ "copyrights": config.variant["copyrights"],
+ "enabled-by": True,
+ "links": [],
+ "qdp-type": type_name,
+ "type": "qdp"
+ }
+
+
+def _make_directory_state_data(config: _ConfigItem, directory: str,
+ directory_state_type: str) -> Any:
+ data = _make_root_data(config, "directory-state")
+ data["copyrights-by-license"] = {}
+ data["directory"] = directory
+ data["directory-state-type"] = directory_state_type
+ data["files"] = []
+ data["hash"] = None
+ data["patterns"] = []
+ return data
+
+
+def _load_directory(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ directory_state = config.director[action["uid"]]
+ assert isinstance(directory_state, DirectoryState)
+ directory_state.load()
+ directory_state["patterns"] = []
+
+
+def _action_copy_directory(config: _ConfigItem, action: Dict[str,
+ Any]) -> None:
+ source_directory = config.substitute(action["source-directory"])
+ data = _make_directory_state_data(config, source_directory, "generic")
+ data["copyrights-by-license"] = action["copyrights-by-license"]
+ data["patterns"] = action["patterns"]
+ data["files"] = action["files"]
+ data["links"] = action["links"]
+ workspace_directory_state = _create_item(config, action, data,
+ "qdp/directory-state/generic",
+ _load_directory)
+ assert isinstance(workspace_directory_state, DirectoryState)
+ directory_state = config.director[action["uid"]]
+ assert isinstance(directory_state, DirectoryState)
+ workspace_directory_state["directory"] = action["destination-directory"]
+ workspace_directory_state.clear()
+ workspace_directory_state.lazy_clone(directory_state)
+ config.try_save(workspace_directory_state, "Update directory state")
+
+
+def _copy_file(config: _ConfigItem, action: Dict[str, Any],
+ the_type: str) -> None:
+ source_file = config.substitute(action["source-file"])
+ data = _make_directory_state_data(config, os.path.dirname(source_file),
+ the_type)
+ data["files"] = [{"file": os.path.basename(source_file), "hash": None}]
+ data["links"] = action["links"]
+ workspace_directory_state = _create_item(
+ config, action, data, f"qdp/directory-state/{the_type}",
+ _load_directory)
+ assert isinstance(workspace_directory_state, DirectoryState)
+ directory_state = config.director[action["uid"]]
+ assert isinstance(directory_state, DirectoryState)
+ workspace_directory_state["directory"] = action["destination-directory"]
+ workspace_directory_state.clear()
+ workspace_directory_state.copy_file(source_file,
+ action["destination-file"])
+ workspace_directory_state.load()
+ config.try_save(workspace_directory_state, "Update directory state")
+
+
+def _action_copy_file(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ _copy_file(config, action, "generic")
+
+
+def _action_copy_test_log(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ _copy_file(config, action, "test-log")
+
+
+def _git_clone(config: _ConfigItem, action: Dict[str, Any],
+ repository: DirectoryState) -> None:
+ source_directory = config.substitute(action["source-directory"])
+ status = run_command(["git", "fetch", "origin"], source_directory)
+ assert status == 0
+ branch = action["branch"]
+ commit = action["commit"]
+ status = run_command(["git", "branch", "-f", branch, commit],
+ source_directory)
+ assert status == 0
+ destination_directory = repository.directory
+ status = run_command([
+ "git", "clone", "--branch", branch, "--single-branch",
+ f"file://{source_directory}", destination_directory
+ ], source_directory)
+ assert status == 0
+ origin_url = action["origin-url"]
+ if origin_url:
+ status = run_command(
+ ["git", "remote", "add", "tmp",
+ config.substitute(origin_url)], destination_directory)
+ assert status == 0
+ status = run_command(["git", "remote", "remove", "origin"],
+ destination_directory)
+ assert status == 0
+ status = run_command(["git", "remote", "rename", "tmp", "origin"],
+ destination_directory)
+ assert status == 0
+ for fetch in action["origin-fetch"]:
+ status = run_command(
+ ["git", "fetch", "origin",
+ config.substitute(fetch)], destination_directory)
+ assert status == 0
+ origin_branch = action["origin-branch"]
+ origin_commit = action["origin-commit"]
+ if origin_branch and origin_commit:
+ status = run_command(
+ ["git", "checkout", "-b", origin_branch, origin_commit],
+ destination_directory)
+ assert status == 0
+ status = run_command(["git", "checkout", branch],
+ destination_directory)
+ assert status == 0
+ status = run_command([
+ "git", "symbolic-ref", "refs/remotes/origin/HEAD",
+ f"refs/remotes/origin/{origin_branch}"
+ ], destination_directory)
+ assert status == 0
+ for command in action["post-clone-commands"]:
+ status = run_command(config.substitute(command), destination_directory)
+ assert status == 0
+ repository.load()
+ config.try_save(repository, "Clone Git repository")
+
+
+def _action_git_clone(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ data = _make_directory_state_data(config, action["destination-directory"],
+ "repository")
+ data["patterns"] = [{"include": "**/*", "exclude": []}]
+ for key in [
+ "branch", "commit", "copyrights-by-license", "description",
+ "links", "origin-branch", "origin-commit", "origin-commit-url",
+ "origin-url"
+ ]:
+ data[key] = action[key]
+ repository = _create_item(config, action, data,
+ "qdp/directory-state/repository")
+ assert isinstance(repository, DirectoryState)
+ destination_directory = repository.directory
+ assert not os.path.exists(destination_directory)
+ _git_clone(config, action, repository)
+
+
+def _add_item(config: _ConfigItem, action: Dict[str, Any],
+ data: Dict[str, Any]) -> None:
+ config.item.cache.create_item(action["uid"], data)
+
+
+def _action_add_item(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ source_file = config.substitute(action["source"])
+ data = config.item.cache.load_data(source_file, action["uid"])
+ _add_item(config, action, data)
+
+
+def _action_make_item(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ _add_item(config, action, action["data"])
+
+
+def _action_make_uuid_item(config: _ConfigItem, action: Dict[str,
+ Any]) -> None:
+ data = _make_root_data(config, "uuid")
+ data["uuid"] = str(uuid.uuid4())
+ _add_item(config, action, data)
+
+
+def _set_types(item_cache: ItemCache, action: Dict[str, Any]) -> None:
+ for set_type in action["set-types"]:
+ item_cache[set_type["uid"]]["_type"] = set_type["type"]
+
+
+def _action_load_items(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ action_name = action["action-name"]
+ item_cache = config.item.cache
+ for path in action["paths"]:
+ path = config.substitute(path)
+ logging.info("%s: load items from: %s", action_name, path)
+ item_cache.load_items(path)
+ _set_types(item_cache, action)
+ config.director.clear()
+ config.workspace_director.clear()
+
+
+def _new_workspace_items(config: _ConfigItem, action_name: str,
+ new: Set[str]) -> None:
+ for uid in sorted(new):
+ logging.debug("%s: new item: %s", action_name, uid)
+ data = config.item.cache[uid].data
+ item = config.workspace_cache.create_item(uid, copy.deepcopy(data))
+ item["_type"] = data["_type"]
+ config.set_file(item)
+ item.save()
+
+
+def _action_load_workspace_items(config: _ConfigItem,
+ action: Dict[str, Any]) -> None:
+ action_name = action["action-name"]
+ path = config.substitute(action["path"])
+ config["spec-directory"] = path
+ config.variant.item["enabled"] = config.variant["enabled"]
+ logging.info("%s: add workspace items to: %s", action_name, path)
+ new = set(config.item.cache.keys())
+ _new_workspace_items(config, action_name, new)
+ _set_types(config.workspace_cache, action)
+
+
+def _action_finalize_workspace_items(config: _ConfigItem,
+ action: Dict[str, Any]) -> None:
+ action_name = action["action-name"]
+ new = set(config.item.cache.keys())
+ _new_workspace_items(config, action_name, new)
+ logging.info("%s: initialize workspace item links", action_name)
+ config.workspace_cache.initialize_links()
+ enabled_set = config.variant["enabled"]
+ logging.info("%s: set workspace enabled: %s", action_name, enabled_set)
+ config.workspace_cache.set_enabled(enabled_set)
+ logging.info("%s: set workspace item types", action_name)
+ config.workspace_cache.set_types(action["spec-type-root-uid"])
+ config.workspace_director.clear()
+ if action["verify"] and config["spec-verify"]:
+ logging.info("%s: verify workspace items", action_name)
+ logger = logging.getLogger()
+ level = logger.getEffectiveLevel()
+ logger.setLevel(logging.ERROR)
+ verify_config = {"root-type": action["spec-type-root-uid"]}
+ status = verify(verify_config, config.workspace_cache)
+ assert status.critical == 0
+ assert status.error == 0
+ logger.setLevel(level)
+ logging.debug("%s: finished verifying workspace items", action_name)
+
+
+def _action_make_deployment_directory(config: _ConfigItem,
+ _action: Dict[str, Any]) -> None:
+ deployment_directory = config.variant["deployment-directory"]
+ assert not os.path.exists(deployment_directory)
+ logging.info("workspace: create deployment directory: %s",
+ deployment_directory)
+ os.makedirs(deployment_directory)
+
+
+def _create_symbolic_links(action: Dict[str, Any],
+ unpacked_archive: DirectoryState) -> None:
+ for symbolic_link in action["archive-symbolic-links"]:
+ link = unpacked_archive.substitute(symbolic_link["link"])
+ link_path = os.path.join(unpacked_archive.directory, link)
+ target = unpacked_archive.substitute(symbolic_link["target"])
+ target = os.path.relpath(target, os.path.dirname(link_path))
+ logging.info("%s: create symbolic link from '%s' to '%s'",
+ action["action-name"], link_path, target)
+ os.symlink(target, link_path)
+ unpacked_archive.add_files([link])
+
+
+def _apply_patches(config: _ConfigItem, action: Dict[str, Any],
+ unpacked_archive: DirectoryState) -> None:
+ for patch in action["archive-patches"]:
+ if patch["type"] == "inline":
+ source = "-"
+ stdin = config.substitute(patch["patch"]).encode("utf-8")
+ logging.info("%s: apply inline patch: %s", action["action-name"],
+ stdin)
+ else:
+ assert patch["type"] == "file"
+ source = patch["file"]
+ stdin = None
+ logging.info("%s: apply patch: %s", action["action-name"], source)
+ command = [
+ "git", "apply", "--apply", "--numstat", "--directory",
+ unpacked_archive.directory, "-p", "1", "--unsafe-paths", source
+ ]
+ output = subprocess.check_output(command,
+ stdin=stdin,
+ cwd=config["toolchain-directory"])
+ unpacked_archive.add_files([
+ line.decode("utf-8").split("\t")[2]
+ for line in output.splitlines()
+ ])
+
+
+def _action_unpack_archive(config: _ConfigItem, action: Dict[str,
+ Any]) -> None:
+ data = _make_directory_state_data(config, action["destination-directory"],
+ "unpacked-archive")
+ archive_file = config.substitute(action["archive-file"])
+ data["archive-file"] = os.path.basename(archive_file)
+ for key in [
+ "archive-hash", "archive-patches", "archive-symbolic-links",
+ "archive-url", "copyrights-by-license", "description",
+ "enabled-by", "links"
+ ]:
+ data[key] = action[key]
+ unpacked_archive = _create_item(config, action, data,
+ "qdp/directory-state/unpacked-archive")
+ assert isinstance(unpacked_archive, DirectoryState)
+ unpacked_archive.add_tarfile_members(archive_file,
+ unpacked_archive.directory, True)
+ assert hash_file(archive_file) == unpacked_archive["archive-hash"]
+ unpacked_archive.compact()
+ _create_symbolic_links(action, unpacked_archive)
+ _apply_patches(config, action, unpacked_archive)
+ unpacked_archive.load()
+ config.try_save(unpacked_archive, "Unpack archive")
+
+
+def _create_dummy(config: _ConfigItem, action: Dict[str, Any]) -> None:
+ data = _make_root_data(config, "dummy")
+ data["enabled-by"] = action["enabled-by"]
+ dummy = _create_item(config, action, data, "qdp/dummy")
+ if dummy is None:
+ return
+ config.try_save(dummy, "Add dummy item")
+
+
+_ACTIONS = {
+ "add-item": _action_add_item,
+ "copy-directory": _action_copy_directory,
+ "copy-file": _action_copy_file,
+ "copy-test-log": _action_copy_test_log,
+ "finalize-workspace-items": _action_finalize_workspace_items,
+ "git-clone": _action_git_clone,
+ "load-items": _action_load_items,
+ "load-workspace-items": _action_load_workspace_items,
+ "make-deployment-directory": _action_make_deployment_directory,
+ "make-item": _action_make_item,
+ "make-uuid-item": _action_make_uuid_item,
+ "unpack-archive": _action_unpack_archive
+}
+
+_NEEDS_DUMMY = [
+ "add-item", "copy-directory", "copy-file", "copy-test-log", "git-clone",
+ "make-item", "unpack-archive"
+]
+
+
+def _run_actions(config: _ConfigItem, when: int,
+ actions: List[Dict[str, Any]]) -> None:
+ logging.info("workspace: run actions at: %i", when)
+ for action in actions:
+ action_type = action["action-type"]
+ if config.is_enabled(action["enabled-by"]):
+ logging.info("%s: run action", action["action-name"])
+ _ACTIONS[action_type](config, action)
+ else:
+ logging.info("%s: action is disabled", action["action-name"])
+ if action_type in _NEEDS_DUMMY:
+ _create_dummy(config, action)
+
+
+def _toolchain_commit(toolchain_directory: str) -> str:
+ stdout: List[str] = []
+ status = run_command(["git", "rev-parse", "HEAD"], toolchain_directory,
+ stdout)
+ assert status == 0
+ return stdout[0].strip()
+
+
+def main(argv: List[str]) -> None:
+ """
+ Creates a QDP workspace directory according to the configuration files.
+ """
+ parser = create_build_argument_parser()
+ parser.add_argument("--prefix",
+ help="the prefix directory",
+ default="/tmp/qdp")
+ parser.add_argument("--cache-directory",
+ help="the config cache directory",
+ default="config-cache")
+ parser.add_argument('config_files', nargs='+')
+ args = parser.parse_args(argv)
+ init_logging(args)
+ config_cache_config: Dict[str, Any] = {
+ "cache-directory": os.path.abspath(args.cache_directory),
+ "enabled": [],
+ "resolve-proxies": False,
+ "initialize-links": False,
+ "paths": [],
+ "spec-type-root-uid": None,
+ }
+ config_cache = ItemCache(config_cache_config)
+ factory = create_build_item_factory()
+ director = PackageBuildDirector(config_cache, factory)
+ config_item = Item(
+ config_cache, "/qdp/config", {
+ "SPDX-License-Identifier":
+ "CC-BY-SA-4.0 OR BSD-2-Clause",
+ "copyrights":
+ ["Copyright (C) 2020, 2023 embedded brains GmbH & Co. KG"]
+ })
+ config_item["_type"] = "config"
+ config = _ConfigItem(director, config_item, factory)
+ config["prefix-directory"] = os.path.abspath(args.prefix)
+ config["spec-verify"] = not args.no_spec_verify
+ toolchain_directory = os.path.abspath(os.path.dirname(__file__))
+ logging.info("workspace: toolchain directory: %s", toolchain_directory)
+ config["toolchain-directory"] = toolchain_directory
+ config["toolchain-commit"] = _toolchain_commit(toolchain_directory)
+ config["enabled"] = []
+ actions_at: Dict[int, List[Dict[str, Any]]] = {}
+ for file in args.config_files:
+ logging.info("workspace: load configuration from: %s", file)
+ for action in load_config(file)["workspace-actions"]:
+ actions_at.setdefault(action["action-when"], []).append(action)
+ for when, actions in sorted(actions_at.items()):
+ _run_actions(config, when, actions)
+ config.workspace_director.build_package(args.only, args.force)
+
+
+if __name__ == "__main__": # pragma: no cover
+ main(sys.argv[1:])