summaryrefslogtreecommitdiff
path: root/rtemsspec/directorystate.py
diff options
context:
space:
mode:
Diffstat (limited to 'rtemsspec/directorystate.py')
-rw-r--r--rtemsspec/directorystate.py460
1 files changed, 460 insertions, 0 deletions
diff --git a/rtemsspec/directorystate.py b/rtemsspec/directorystate.py
new file mode 100644
index 00000000..0a966fb7
--- /dev/null
+++ b/rtemsspec/directorystate.py
@@ -0,0 +1,460 @@
+# SPDX-License-Identifier: BSD-2-Clause
+""" This module provides support for directory states. """
+
+# Copyright (C) 2020, 2023 embedded brains GmbH & Co. KG
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import base64
+import fnmatch
+import hashlib
+import json
+import logging
+import os
+from pathlib import Path
+import shutil
+import tarfile
+from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, \
+ Set, Tuple, Union
+
+from rtemsspec.items import Item, ItemGetValueContext, Link
+from rtemsspec.packagebuild import BuildItem, BuildItemFactory, \
+ PackageBuildDirector
+from rtemsspec.util import hash_file
+
+_Path = Union[Path, str]
+
+
+def _get_file_path(ctx: ItemGetValueContext) -> str:
+ index = max(ctx.index, 0)
+ return f"{ctx.item['directory']}/{ctx.item['files'][index]['file']}"
+
+
+def _get_file_path_without_extension(ctx: ItemGetValueContext) -> str:
+ return os.path.splitext(_get_file_path(ctx))[0]
+
+
+def _file_nop(_source: _Path, _target: _Path) -> None:
+ pass
+
+
+class DirectoryState(BuildItem):
+ """ Maintains a directory state. """
+
+ # pylint: disable=too-many-public-methods
+ @classmethod
+ def prepare_factory(cls, factory: BuildItemFactory,
+ type_name: str) -> None:
+ BuildItem.prepare_factory(factory, type_name)
+ factory.add_get_value(f"{type_name}:/file", _get_file_path)
+ factory.add_get_value(f"{type_name}:/file-without-extension",
+ _get_file_path_without_extension)
+
+ def __init__(self, director: PackageBuildDirector, item: Item):
+ super().__init__(director, item)
+ self._discarded_files: Set[str] = set()
+ self._files: Dict[str, Union[str, None]] = dict(
+ (file_info["file"], file_info["hash"])
+ for file_info in item["files"])
+
+ def __iter__(self):
+ yield from self.files()
+
+ @property
+ def directory(self) -> str:
+ """ Returns the base directory of the directory state. """
+ return self["directory"]
+
+ @property
+ def digest(self) -> str:
+ the_digest = self.item["hash"]
+ if the_digest is None:
+ raise ValueError(f"{self.uid}: directory state hash is not set")
+ return the_digest
+
+ def _get_hash(self, _base: str, relative_file_path: str) -> str:
+ digest = self._files[relative_file_path]
+ assert digest is not None
+ return digest
+
+ def _hash_file(self, base: str, relative_file_path: str) -> str:
+ file_path = os.path.join(base, relative_file_path)
+ digest = hash_file(file_path)
+ logging.debug("%s: file '%s' hash is %s", self.uid, file_path, digest)
+ self._files[relative_file_path] = digest
+ return digest
+
+ def _add_hashes(self, base: str, hash_file_handler: Callable[[str, str],
+ str]) -> str:
+ overall_hash = hashlib.sha512()
+ overall_hash.update(base.encode("utf-8"))
+ for relative_file_path in sorted(self._files):
+ digest = hash_file_handler(base, relative_file_path)
+ overall_hash.update(relative_file_path.encode("utf-8"))
+ overall_hash.update(digest.encode("utf-8"))
+ self._update_item_files()
+ digest = base64.urlsafe_b64encode(
+ overall_hash.digest()).decode("ascii")
+ logging.info("%s: directory '%s' hash is %s", self.uid, base, digest)
+ self.item["hash"] = digest
+ return digest
+
+ def _directory_state_exclude(self, base: str, files: Set[str]) -> None:
+ for exclude_item in self.item.parents("directory-state-exclude"):
+ exclude_state = self.director[exclude_item.uid]
+ assert isinstance(exclude_state, DirectoryState)
+ exclude_files = files.intersection(
+ os.path.relpath(path, base) for path in exclude_state)
+ logging.info(
+ "%s: exclude files of directory state %s: %s", self.uid,
+ exclude_item.uid,
+ [os.path.join(base, path) for path in sorted(exclude_files)])
+ files.difference_update(exclude_files)
+
+ def _load_from_patterns(self, base: str,
+ patterns: List[Dict[str, Any]]) -> None:
+ logging.info("%s: load pattern defined directory state: %s", self.uid,
+ base)
+ files: Set[str] = set()
+ base_path = Path(base)
+ for include_exclude in patterns:
+ include = include_exclude["include"]
+ logging.info("%s: add files matching '%s' in: %s", self.uid,
+ include, base)
+ more = set(
+ os.path.relpath(path, base) for path in base_path.glob(include)
+ if not path.is_dir())
+ for exclude in include_exclude["exclude"]:
+ exclude_files = set(
+ path for path in more
+ if fnmatch.fnmatch(os.path.join("/", path), exclude))
+ logging.info("%s: exclude files for pattern '%s': %s",
+ self.uid, exclude, [
+ os.path.join(base, path)
+ for path in sorted(exclude_files)
+ ])
+ more.difference_update(exclude_files)
+ files.update(more)
+ self._directory_state_exclude(base, files)
+ self._files = dict.fromkeys(files, None)
+
+ def load(self) -> str:
+ """ Loads the directory state and returns the overall hash. """
+ base = self.directory
+ patterns = self.item["patterns"]
+ if patterns:
+ self._load_from_patterns(base, patterns)
+ else:
+ logging.info("%s: load explicit directory state: %s", self.uid,
+ base)
+ return self._add_hashes(base, self._hash_file)
+
+ def lazy_load(self) -> str:
+ """
+ Loads the directory state if the overall hash is not present and
+ returns the overall hash.
+ """
+ digest = self.item["hash"]
+ if digest is not None:
+ return digest
+ return self.load()
+
+ @property
+ def file(self) -> str:
+ """ Is the path of the first file of the file state. """
+ return next(self.files())
+
+ def files(self, base: Optional[str] = None) -> Iterator[str]:
+ """ Yields the file paths of the directory state. """
+ if base is None:
+ base = self.directory
+ for file_path in sorted(self._files):
+ yield os.path.join(base, file_path)
+
+ def files_and_hashes(
+ self,
+ base: Optional[str] = None) -> Iterator[Tuple[str, Optional[str]]]:
+ """ Yields the file paths and hashes of the directory state. """
+ if base is None:
+ base = self.directory
+ for file_path, file_hash in sorted(self._files.items()):
+ yield os.path.join(base, file_path), file_hash
+
+ def compact(self) -> None:
+ """
+ Removes the common prefix from the files and adds it to the base
+ directory.
+ """
+ prefix = os.path.commonprefix(list(self._files.keys())).rstrip("/")
+ if prefix and not os.path.isabs(prefix):
+ self.item["directory"] = os.path.join(self.item["directory"],
+ prefix)
+ self.item["hash"] = None
+ self._files = dict(
+ (os.path.relpath(path, prefix), None) for path in self._files)
+ self._update_item_files()
+
+ def _update_item_files(self):
+ self.item["files"] = list({
+ "file": path,
+ "hash": digest
+ } for path, digest in sorted(self._files.items()))
+
+ def clear(self) -> None:
+ """ Clears the file set of the directory state. """
+ logging.info("%s: clear directory state", self.uid)
+ self.item["hash"] = None
+ self._files.clear()
+ self._update_item_files()
+
+ def invalidate(self) -> None:
+ """ Invalidates the directory state. """
+ logging.info("%s: invalidate directory state", self.uid)
+ self.item["hash"] = None
+ if self.item["patterns"]:
+ self._files.clear()
+ else:
+ self._files = dict.fromkeys(self._files.keys(), None)
+ self._update_item_files()
+
+ def remove_files(self) -> None:
+ """ Removes the files of the directory state. """
+ for file in self.files():
+ try:
+ logging.info("%s: remove: %s", self.uid, file)
+ os.remove(file)
+ except FileNotFoundError:
+ if self.item["patterns"]:
+ logging.warning("%s: file not found: %s", self.uid, file)
+ else:
+ logging.debug("%s: file not found: %s", self.uid, file)
+
+ def add_files(self, files: Iterable[_Path]) -> None:
+ """ Adds the files to the file set of the directory state. """
+ self.item["hash"] = None
+ more = set(os.path.normpath(name) for name in files)
+ self._directory_state_exclude(self.directory, more)
+ self._files.update(dict.fromkeys(more, None))
+ self._update_item_files()
+
+ def set_files(self, files: Iterable[_Path]) -> None:
+ """ Sets the file set of the directory state to the files. """
+ self.clear()
+ self.add_files(files)
+
+ def _copy_file(self, source: _Path, target: _Path) -> None:
+ logging.info("%s: copy '%s' to '%s'", self.uid, source, target)
+ os.makedirs(os.path.dirname(target), exist_ok=True)
+ shutil.copy2(source, target)
+
+ def _move_file(self, source: _Path, target: _Path) -> None:
+ logging.info("%s: move '%s' to '%s'", self.uid, source, target)
+ os.makedirs(os.path.dirname(target), exist_ok=True)
+ os.replace(source, target)
+
+ def copy_file(self, source: _Path, target: _Path) -> None:
+ """
+ Copies the file from the source path to the target path.
+
+ Adds the target file to the file set of the directory state. The
+ target path is relative to the base directory of the directory state.
+ """
+ self._copy_file(source, os.path.join(self.directory, target))
+ self.add_files([target])
+
+ def copy_files(self,
+ root_dir: _Path,
+ files: Iterable[_Path],
+ prefix: _Path = ".") -> None:
+ """
+ Copies the files relative to the root directory to the base directory
+ of the directory state using the prefix.
+
+ The base directory of the directory state and the prefix is prepended
+ to the file path for each file before it is added to the directory
+ state. Adds the target files to the file set of the directory state.
+ """
+ file_list: List[str] = []
+ base = self.directory
+ for name in files:
+ file_source = os.path.join(root_dir, name)
+ file_list_path = os.path.join(prefix, name)
+ file_list.append(file_list_path)
+ file_target = os.path.join(base, file_list_path)
+ self._copy_file(file_source, file_target)
+ self.add_files(file_list)
+
+ def _add_tree(self,
+ root_dir: _Path,
+ prefix: _Path,
+ file_op: Callable[[_Path, _Path], None],
+ excludes: Optional[List[str]] = None) -> None:
+ file_list: List[str] = []
+ base = self.directory
+ for path, _, files in os.walk(os.path.abspath(root_dir)):
+ for name in files:
+ file_source = os.path.join(path, name)
+ file_list_path = os.path.join(
+ prefix, os.path.relpath(file_source, root_dir))
+ file_target = os.path.join(base, file_list_path)
+ if excludes is None:
+ file_list.append(file_list_path)
+ file_op(file_source, file_target)
+ else:
+ match_path = os.path.normpath(
+ os.path.join("/", file_list_path))
+ for exclude in excludes:
+ if fnmatch.fnmatch(match_path, exclude):
+ logging.info(
+ "%s: exclude file for pattern '%s': %s",
+ self.uid, exclude, file_target)
+ break
+ else:
+ file_list.append(file_list_path)
+ file_op(file_source, file_target)
+ self.add_files(file_list)
+
+ def add_tree(self,
+ root_dir: _Path,
+ prefix: _Path = ".",
+ excludes: Optional[List[str]] = None) -> None:
+ """
+ Adds the files of the directory tree starting at the root directory
+ to the file set of the directory state.
+
+ The added file path is relative to the root directory. The prefix is
+ prepended to the file path for each file before it is added to the
+ directory state. The files are not copied or moved.
+ """
+ self._add_tree(root_dir, prefix, _file_nop, excludes)
+
+ def copy_tree(self,
+ root_dir: _Path,
+ prefix: _Path = ".",
+ excludes: Optional[List[str]] = None) -> None:
+ """
+ Adds the files of the directory tree starting at the root directory
+ to the file set of the directory state.
+
+ The added file path is relative to the root directory. The prefix is
+ prepended to the file path for each file before it is added to the
+ directory state. The files are copied.
+ """
+ self._add_tree(root_dir, prefix, self._copy_file, excludes)
+
+ def move_tree(self,
+ root_dir: _Path,
+ prefix: _Path = ".",
+ excludes: Optional[List[str]] = None) -> None:
+ """
+ Adds the files of the directory tree starting at the root directory
+ to the file set of the directory state.
+
+ The added file path is relative to the root directory. The prefix is
+ prepended to the file path for each file before it is added to the
+ directory state. The files are moved.
+ """
+ self._add_tree(root_dir, prefix, self._move_file, excludes)
+
+ def add_tarfile_members(self, archive: _Path, prefix: _Path,
+ extract: bool) -> None:
+ """
+ Appends the members of the archive to the file list of the directory
+ state.
+
+ For each member the prefix path and the member path are joined and then
+ added to the file list of the directory state. If extract is true,
+ then the members of the archive are extracted to the prefix path.
+ """
+ extract_info = "and extract " if extract else ""
+ logging.info("%s: add %smembers of '%s' using prefix '%s'", self.uid,
+ extract_info, archive, prefix)
+ with tarfile.open(archive, "r") as tar_file:
+ base = self.directory
+ file_list = [
+ os.path.relpath(os.path.join(prefix, info.name), base)
+ for info in tar_file.getmembers() if not info.isdir()
+ ]
+ if extract:
+ tar_file.extractall(prefix)
+ self.add_files(file_list)
+
+ def lazy_clone(self, other: "DirectoryState") -> str:
+ """ Lazily clones the directory state. """
+ logging.info("%s: lazy clone from: %s", self.uid, other.uid)
+ # pylint: disable=protected-access
+ current = set(self._files.keys())
+ new = set(other._files.keys())
+ base = self.directory
+ other_base = other.directory
+ for file in sorted(current.difference(new)):
+ target = os.path.join(base, file)
+ try:
+ logging.info("%s: remove: %s", self.uid, target)
+ os.remove(target)
+ except FileNotFoundError:
+ logging.warning("%s: file not found: %s", self.uid, target)
+ for file in sorted(new.difference(current)):
+ target = os.path.join(base, file)
+ self._copy_file(os.path.join(other_base, file), target)
+ for file in sorted(current.intersection(new)):
+ target = os.path.join(base, file)
+ if self._files[file] == other._files[file]:
+ logging.info("%s: keep as is: %s", self.uid, target)
+ else:
+ self._copy_file(os.path.join(other_base, file), target)
+ self._files = other._files.copy()
+ return self._add_hashes(base, self._get_hash)
+
+ def json_dump(self, data: Any) -> None:
+ """ Dumps the data into the file of the directory state. """
+ file_path = self.file
+ os.makedirs(os.path.dirname(file_path), exist_ok=True)
+ with open(file_path, "w", encoding="utf-8") as file:
+ json.dump(data, file, sort_keys=True, indent=2)
+
+ def json_load(self) -> Any:
+ """ Loads the data from the file of the directory state. """
+ with open(self.file, "r", encoding="utf-8") as file:
+ return json.load(file)
+
+ def save(self) -> None:
+ """ Saves the directory state to the item file. """
+ self.item.save()
+
+ def has_changed(self, link: Link) -> bool:
+ digest = self.digest
+ return link["hash"] is None or digest != link["hash"]
+
+ def discard(self) -> None:
+ """ Discards the directory state. """
+ logging.info("%s: discard", self.uid)
+ self._discarded_files = set(self._files.keys())
+ self.remove_files()
+ self.invalidate()
+ self.save()
+
+ def refresh(self) -> None:
+ """ Refreshes the directory state. """
+ logging.info("%s: refresh", self.uid)
+ self.load()
+ self.commit("Update directory state")