diff options
Diffstat (limited to 'rtemsspec/archiver.py')
-rw-r--r-- | rtemsspec/archiver.py | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/rtemsspec/archiver.py b/rtemsspec/archiver.py new file mode 100644 index 00000000..b6aa8f2f --- /dev/null +++ b/rtemsspec/archiver.py @@ -0,0 +1,252 @@ +# SPDX-License-Identifier: BSD-2-Clause +""" Build step to package deployed components into archive. """ + +# Copyright (C) 2021 EDISOFT (https://www.edisoft.pt/) +# Copyright (C) 2020, 2021 embedded brains GmbH & Co. KG +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import logging +import os +import stat +import tarfile +from typing import cast, Dict, List + +from rtemsspec.packagebuild import BuildItem +from rtemsspec.directorystate import DirectoryState + + +def _check_for_duplicates(uid: str, members: List[DirectoryState]) -> None: + logging.info("%s: check for duplicate files", uid) + for index, dir_state in enumerate(members): + logging.debug("%s: get files of: %s", uid, dir_state.uid) + files = dict(dir_state.files_and_hashes()) + paths = set(files.keys()) + for file_path in files: + assert os.path.isfile(file_path) or os.path.islink(file_path) + for dir_state_2 in members[index + 1:]: + logging.debug("%s: compare with files of: %s", uid, + dir_state_2.uid) + files_2 = dict(dir_state_2.files_and_hashes()) + paths_2 = set(files_2.keys()) + duplicates = paths.intersection(paths_2) + if duplicates: + logging.info( + "%s: duplicate files in directory states " + "%s and %s", uid, dir_state.uid, dir_state_2.uid) + for file_path in duplicates: + logging.info("%s: duplicate file: %s", uid, file_path) + value = files[file_path] + value_2 = files_2[file_path] + if value == value_2: + continue + logging.error( + "%s: inconsistent file hashes for '%s': %s != %s", uid, + file_path, value, value_2) + + +_SCRIPT_HEAD = """#!/usr/bin/env python3 +# SPDX-License-Identifier: BSD-2-Clause +\"\"\" Verifies the files of the package. \"\"\" + +# Copyright (C) 2021, 2022 embedded brains GmbH & Co. KG +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import base64 +import binascii +import argparse +import hashlib +import logging +import os +import sys +from typing import Dict, List + + +_FILES = { +""" + +_SCRIPT_TAIL = """} + + +def _hash_file(path: str) -> str: + file_hash = hashlib.sha512() + if os.path.islink(path): + file_hash.update(os.readlink(path).encode("utf-8")) + else: + buf = bytearray(65536) + memview = memoryview(buf) + with open(path, "rb", buffering=0) as src: + for size in iter(lambda: src.readinto(memview), 0): # type: ignore + file_hash.update(memview[:size]) + return base64.urlsafe_b64encode(file_hash.digest()).decode("ascii") + + +def _hex(digest: str) -> str: + binary = base64.urlsafe_b64decode(digest) + return binascii.hexlify(binary).decode('ascii') + + +def _check_file(file_path: str, expected_files: Dict[str, str]) -> int: + expected_hash = expected_files[file_path] + actual_hash = _hash_file(file_path) + if expected_hash != actual_hash: + logging.error( + "expected hash is %s, actual hash is %s for file: %s", + _hex(expected_hash), _hex(actual_hash), file_path) + return 1 + return 0 + + +def _verify_files(script: str, expected_files: Dict[str, str]) -> int: + status = 0 + script = os.path.normpath(script) + for path, dirs, files in os.walk("."): + dirs.sort() + for name in sorted(files): + file_path = os.path.normpath(os.path.join(path, name)) + if file_path in expected_files: + status = _check_file(file_path, expected_files) + del expected_files[file_path] + elif file_path != script: + logging.warning("unexpected file: %s", file_path) + for maybe_missing in expected_files.keys(): + if os.path.islink(maybe_missing): + status = _check_file(maybe_missing, expected_files) + continue + logging.error("missing file: %s", maybe_missing) + status = 1 + return status + + +def main(script: str, argv: List[str]) -> int: + \"\"\" Verifies the files of the package. \"\"\" + parser = argparse.ArgumentParser() + parser.add_argument( + "--log-level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + type=str.upper, + default="WARNING", + help="log level") + parser.add_argument("--log-file", + type=str, + default=None, + help="log to this file") + parser.add_argument("--list-files", + action="store_true", + help="list the files of the package") + parser.add_argument("--list-files-and-hashes", + action="store_true", + help="list the files of the package " + "with the SHA512 digest of each file") + args = parser.parse_args(argv) + logging.basicConfig(filename=args.log_file, level=args.log_level) + expected_files = dict( + zip(map(lambda x: os.path.normpath(x), _FILES.keys()), + _FILES.values())) + status = 0 + if args.list_files_and_hashes: + for file_path, hash_value in expected_files.items(): + print(f"{file_path}\t{_hex(hash_value)}") + elif args.list_files: + for file_path in expected_files.keys(): + print(file_path) + else: + status = _verify_files(script, expected_files) + return status + + + +if __name__ == "__main__": + status = main(sys.argv[0], sys.argv[1:]) + sys.exit(status) +""" + + +def _create_verification_script(script: str, archive_files: Dict[str, + str]) -> None: + with open(script, "w", encoding="utf-8") as out: + out.write(_SCRIPT_HEAD) + for file_path, hash_value in sorted(archive_files.items()): + print(f" \"{file_path}\": \"{hash_value}\",", file=out) + out.write(_SCRIPT_TAIL) + os.chmod(script, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + +class Archiver(BuildItem): + """ + The archiver adds the file of its directory state dependencies to an + archive file. + """ + + def run(self) -> None: + archive_file = self["archive-file"] + archive_state = self.output("archive") + assert isinstance(archive_state, DirectoryState) + archive_state.set_files([archive_file]) + archive_file = os.path.join(archive_state.directory, archive_file) + script_file = self["verification-script"] + script_state = self.output("verify-package") + assert isinstance(script_state, DirectoryState) + script_state.set_files([script_file]) + script_file = os.path.join(script_state.directory, script_file) + script_dir = os.path.dirname(script_file) + logging.info("%s: create archive: %s", self.uid, archive_file) + os.makedirs(os.path.dirname(archive_file), exist_ok=True) + with tarfile.open(archive_file, "w:xz") as tar_file: + members = cast(List[DirectoryState], list(self.inputs("member"))) + _check_for_duplicates(self.uid, members) + strip_prefix = self["archive-strip-prefix"] + archive_files: Dict[str, str] = {} + for dir_state in members: + logging.info("%s: add files of directory state: %s", self.uid, + dir_state.uid) + for file_path, hash_value in dir_state.files_and_hashes(): + verify_path = os.path.relpath(file_path, script_dir) + assert hash_value + archive_files[verify_path] = hash_value + tar_file.add(file_path, + os.path.relpath(file_path, strip_prefix)) + _create_verification_script(script_file, archive_files) + tar_file.add(script_file, os.path.relpath(script_file, + strip_prefix)) + logging.info("%s: finished to create archive: %s", self.uid, + archive_file) |