Source code for general_python.common.directories

"""Path and directory helpers built on :class:`pathlib.Path`.

The :class:`Directories` wrapper keeps legacy convenience methods available
while exposing a path-like object that can be passed to standard-library APIs.
It covers path joining, directory creation, file discovery, copying, and common
serialization helpers used by analysis scripts.
"""

import os
import random
import shutil
from pathlib import Path
from typing import Callable, Iterable, List, Optional, Union, Iterator, Any, Dict

# Type alias for path-like objects
PathLike    = Union[str, Path]
kPS         = os.sep

#################################################################################

[docs] class staticproperty(property): """Descriptor for exposing a zero-argument function as a static property.""" def __get__(self, owner_self, owner_cls): return self.fget()
[docs] class classproperty(property): """Descriptor for exposing a classmethod-like function as a property.""" def __get__(self, owner_self, owner_cls): return self.fget(owner_cls)
#################################################################################
[docs] class Directories(object): """ Class representing a directory handler - static methods are represented with camel case - class methods are represented with underscore """
[docs] def __init__(self, *parts: PathLike) -> None: """ Initialize with one or more path components. >>> d = Directories("foo", "bar") # -> Path("foo/bar") """ self.path = Path(*parts)
def __fspath__(self) -> str: return str(self.path)
[docs] def __len__(self) -> int | str: """ Return the number of items in the directory if it is a directory, """ if self.path.is_dir(): return self.size_human() return self.path.stat().st_size
#! Operators
[docs] def __add__(self, other: PathLike) -> "Directories": """ Concatenate with another path component. >>> d = Directories("foo") + "bar" # -> Path("foo/bar") """ return self.join(other)
[docs] def __iadd__(self, other: PathLike) -> "Directories": """ In-place concatenation with another path component. >>> d = Directories("foo"); d += "bar" # -> Path("foo/bar") """ self.path = self.path.joinpath(other) return self
[docs] def __radd__(self, other: PathLike) -> "Directories": """ Concatenate with another path component. >>> d = "foo" + Directories("bar") # -> Path("foo/bar") """ return self.join(other)
[docs] def __truediv__(self, other: PathLike) -> "Directories": """ Concatenate with another path component using / operator. >>> d = Directories("foo") / "bar" # -> Path("foo/bar") """ return self.join(other)
[docs] def __rtruediv__(self, other: PathLike) -> "Directories": """ Concatenate with another path component using / operator. >>> d = "foo" / Directories("bar") # -> Path("foo/bar") """ return self.join(other)
[docs] def __iter__(self) -> Iterator[Path]: """ Iterate over parts of the path. >>> d = Directories("foo/bar") # -> iterates over ["foo", "bar"] """ yield from self.path.parts
#! Comparison
[docs] def __eq__(self, other: PathLike) -> bool: """ Check equality with another path component. >>> d = Directories("foo") == "foo" # -> True """ if isinstance(other, str): return str(self) == other elif isinstance(other, Path): return self.path == other else: return False
[docs] def __ne__(self, other: PathLike) -> bool: """ Check inequality with another path component. >>> d = Directories("foo") != "bar" # -> True """ return not self.__eq__(other)
#! Hashing
[docs] def __hash__(self) -> int: """ Hash the path for use in sets or dictionaries. >>> d = Directories("foo") # -> hash(Path("foo")) """ return hash(self.path)
#! String representation
[docs] def __repr__(self) -> str: """ Return a string representation of the path. >>> d = Directories("foo") # -> "Directories('foo')" """ return f"Directories({self.path!r})"
[docs] def __str__(self) -> str: """ Return a string representation of the path. >>> d = Directories("foo") # -> "foo" """ return str(self.path)
################################################################################ #! Some standard filters ################################################################################
[docs] @staticmethod def f_h5(p: List[Path]) -> List[str]: """Filter for .h5 files.""" return [str(x) for x in p if str(x).endswith('.h5')]
[docs] @staticmethod def f_csv(p: List[Path]) -> List[str]: """Filter for .csv files.""" return [str(x) for x in p if str(x).endswith('.csv')]
[docs] @staticmethod def f_nonempty(p: List[Path]) -> List[str]: """Filter for non-empty files.""" return [str(x) for x in p if x.stat().st_size > 0]
[docs] @staticmethod def f_contains(substr: str) -> Callable[[Path], bool]: """Return a filter that checks if the filename contains a substring.""" def _filter(p: List[Path]) -> List[str]: return [str(x) for x in p if substr in str(x)] return _filter
################################################################################ #! Construction / Navigation ################################################################################
[docs] def join(self, *parts: PathLike, create: bool = False) -> "Directories": """ Return a new Directories for self/path joined with parts. If create=True, mkdir(parents=True, exist_ok=True) is called. """ new_path = self.path.joinpath(*parts) if create: new_path.mkdir(parents=True, exist_ok=True) return Directories(new_path)
@property def parent(self) -> "Directories": """ Return Directories for parent directory (..). """ return Directories(self.path.parent)
[docs] @classmethod def win(cls, raw: str) -> "Directories": """ Parse a Windows-style backslash path into Directories. """ return cls(*raw.split("\\"))
[docs] def format(self, *args, **kwargs) -> "Directories": """ Format the path using str.format() and return a new Directories. >>> d = Directories("foo").format("bar") # -> Path("foo/bar") """ formatted_path = self.path.as_posix().format(*args, **kwargs) return Directories(formatted_path)
[docs] def resolve(self) -> "Directories": """ Return a new Directories with the absolute resolved path. """ return Directories(self.path.resolve())
[docs] def endswith(self, suffix: str) -> bool: """ Check if the path ends with the given suffix. """ return str(self.path).endswith(suffix)
################################################################################ #! Creation ################################################################################
[docs] def mkdir(self, parents: bool = True, exist_ok: bool = True) -> "Directories": """ Create this directory on disk. Returns self for chaining. """ self.path.mkdir(parents=parents, exist_ok=exist_ok) return self
[docs] @staticmethod def mkdirs(paths : Iterable[PathLike], parents : bool = True, exist_ok : bool = True) -> None: """ Create multiple directories. """ for p in paths: Path(p).mkdir(parents=parents, exist_ok=exist_ok)
################################################################################ #! Listing & Clearing ################################################################################
[docs] def list_files(self, *, include_empty : bool = True, filters : List[Callable[[Path], bool]] = None, sort_key : Optional[Callable[[Path], any]] = None) -> List[Path]: """ List files (not directories) in this directory. - include_empty : if False, skip files of size zero. - filters : a list of callables Path->bool; all must pass. - sort_key : key function for sorting. """ try: files = [p for p in self.path.iterdir() if p.is_file()] except FileNotFoundError: return [] except PermissionError: print(f"PermissionError: {self.path}") return [] except OSError as e: print(f"OSError: {self.path} - {e}") return [] except Exception as e: print(f"Unexpected error: {self.path} - {e}") return [] if not include_empty: files = [p for p in files if p.stat().st_size > 0] if filters is None: filters = [] elif not isinstance(filters, list): filters = [filters] for f in filters: try: files = list(filter(f, files)) except Exception as e: print(f"Error applying filter {f.__name__}: {e}") continue if sort_key: files.sort(key=sort_key) return files
[docs] def list_dirs(self, *, include_empty : bool = True, include_hidden : bool = True, relative : bool = False, as_string : bool = False, filters : List[Callable[[Path], bool]] = [], sort_key : Optional[Callable[[Path], Any]] = None) -> List[Path]: """ List directories in this directory. Parameters ---------- include_empty: bool if False, skip empty directories. If True, include all directories. This checks only if the directory has any entries, not if they are files or directories. filters : list of callables Path -> bool A list of callables; all must return True for a directory to be included. sort_key : callable, optional Key function for sorting the results. """ try: dirs = [p for p in self.path.iterdir() if p.is_dir()] except FileNotFoundError: return [] except PermissionError: print(f"PermissionError: {self.path}") return [] except OSError as e: print(f"OSError: {self.path} - {e}") return [] except Exception as e: print(f"Unexpected error: {self.path} - {e}") return [] if not include_empty: dirs = [p for p in dirs if any(p.iterdir())] if not include_hidden: dirs = [p for p in dirs if not p.name.startswith('.')] for f in filters: try: dirs = list(filter(f, dirs)) except Exception as e: print(f"Error applying filter {repr(f)}: {e}") continue if sort_key: dirs.sort(key=sort_key) if relative: dirs = [p.relative_to(self.path) for p in dirs] if as_string: dirs = [str(p) for p in dirs] return dirs
[docs] @staticmethod def list_data_roots(base: PathLike, *, sort: bool = True, as_dirs: bool = True) -> List["Directories"] | List[Path]: """ List all first-level directories inside base... Parameters ---------- base : PathLike Root directory (e.g. data_path) sort : bool Sort lexicographically (useful for YYYYMMDD) as_dirs : bool Return Directories objects instead of Path Returns ------- List of directories """ base = Path(base) try: dirs = [p for p in base.iterdir() if p.is_dir()] except Exception: return [] if sort: dirs.sort() if as_dirs: return [Directories(p) for p in dirs] return dirs
[docs] @staticmethod def expand_data_roots(base: PathLike, *subpath: PathLike, require_exist: bool = True) -> List["Directories"]: """ Expand a relative subpath across all first-level directories. Parameters ---------- base : PathLike Root directory (e.g. data_path) subpath : PathLike Relative path to append to each root (e.g. hamil/occ/ns/sp) require_exist : bool If True, only include paths that exist on disk. Example ------- expand_data_roots(data_path, 'data', hamil, ..., 'sp') Returns list of: base/<date>/data/.../sp """ roots = Directories.list_data_roots(base) out = [] for r in roots: p = r.join(*subpath) if not require_exist or p.exists: out.append(p) return out
[docs] @staticmethod def collect_files(dirs: List["Directories"], *, prefix: str = None, suffix: str = None, filters: List[Callable[[Path], bool]] = None, sort: bool = False) -> List[Path]: """ Collect files from multiple directories. Parameters ---------- dirs list of Directories prefix optional filename prefix filter suffix optional filename suffix filter filters additional filters (Path -> bool) sort global sorting Returns ------- Flat list of Paths """ files = [] for d in dirs: local = d.list_files(filters=filters) if prefix is not None: local = [p for p in local if p.name.startswith(prefix) and p.is_file()] if suffix is not None: local = [p for p in local if p.name.endswith(suffix) and p.is_file()] files.extend(local) if sort: files.sort() return files
##############################################################################
[docs] def clear_empty(self) -> List[Path]: """ Remove all zero-length files in this directory. Returns list of files left after removal. """ survivors: List[Path] = [] for p in self.path.iterdir(): if p.is_file() and p.stat().st_size == 0: p.unlink() else: survivors.append(p) return survivors
[docs] def walk(self) -> Iterator[Path]: """ Walk the directory tree and yield all files. """ yield from self.path.rglob('*')
[docs] def glob(self, pattern: str) -> List[Path]: """ Return a list of all files matching the pattern in this directory. """ return list(self.path.glob(pattern))
################################################################################ #! Random file ################################################################################
[docs] def random_file(self, condition: Callable[[Path], bool] = lambda _: True) -> Path: """ Return a random Path in this directory satisfying condition. Raises ValueError if none match. """ candidates = [p for p in self.path.iterdir() if p.is_file() and condition(p)] if not candidates: raise ValueError(f"No file satisfying condition in {self.path}") return random.choice(candidates)
################################################################################ #! Transfer ################################################################################
[docs] def copy_files(self, dest : PathLike, condition : Callable[[Path], bool], overwrite : bool = False) -> None: """ Copy all files satisfying condition() from self to dest. Creates dest if needed. Parameters ---------- dest : PathLike Destination directory. condition : Callable[[Path], bool] Function that takes a Path and returns True if the file should be copied. overwrite : bool, optional If True, overwrite existing files in the destination directory. Default is False. """ dest_path = Path(dest) dest_path.mkdir(parents=True, exist_ok=True) for p in self.path.iterdir(): if p.is_file() and condition(p): target = dest_path / p.name if not overwrite and target.exists(): continue shutil.copy2(p, target)
[docs] def transfer_files(self, dest : PathLike, condition : Callable[[Path], bool]) -> None: """ Move all files satisfying condition() from self to dest. Creates dest if needed. """ dest_path = Path(dest) dest_path.mkdir(parents=True, exist_ok=True) for p in self.path.iterdir(): if p.is_file() and condition(p): target = dest_path / p.name p.rename(target)
################################################################################ #! Convenience ################################################################################ @property def exists(self) -> bool: """ Check if the path exists. """ return self.path.exists() @property def as_path(self) -> Path: ''' Return the path as a Path object. ''' return self.path @property def is_empty(self) -> bool: """ Check if the directory is empty. """ return not any(self.path.iterdir()) @property def is_dir(self) -> bool: """ Check if the path is a directory. """ return self.path.is_dir() @property def is_file(self) -> bool: """ Check if the path is a file. """ return self.path.is_file() @property def is_symlink(self) -> bool: """ Check if the path is a symlink. """ return self.path.is_symlink() @property def size(self) -> int: """ Return the size of the directory in bytes. """ return sum(f.stat().st_size for f in self.path.glob('*') if f.is_file()) @property def size_human(self) -> str: """ Return the size of the directory in a human-readable format. """ size = self.size() for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 return f"{size:.2f} PB" @property def disk_usage(self) -> str: """ Return the disk usage of the directory in a human-readable format. """ total, used, free = shutil.disk_usage(self.path) return f"Total: {total // (2**30)} GB, Used: {used // (2**30)} GB, Free: {free // (2**30)} GB" @property def checksum(self) -> str: """ Return the checksum of the directory. """ import hashlib hash_md5 = hashlib.md5() for f in self.path.glob('*'): if f.is_file(): with open(f, "rb") as file: for chunk in iter(lambda: file.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() # -------------------------------------------------------------------------- #! Convenience methods (static) # --------------------------------------------------------------------------
[docs] @staticmethod def temp_dir(prefix: str = "tmp") -> "Directories": """ Create and return a temporary directory with the given prefix. """ import tempfile temp_path = tempfile.mkdtemp(prefix=prefix) return Directories(temp_path)
[docs] @staticproperty def current() -> "Directories": """ Return the current working directory as a Directories object. """ return Directories(Path.cwd())
[docs] @staticproperty def home() -> "Directories": """ Return the user's home directory as a Directories object. """ return Directories(Path.home())
[docs] @staticproperty def root() -> "Directories": """ Return the root directory as a Directories object. """ return Directories(Path("/"))
[docs] @staticmethod def from_env(var_name: str) -> Optional["Directories"]: """ Create a Directories object from an environment variable. Returns None if the variable is not set or the path does not exist. """ value = os.environ.get(var_name) if value and Path(value).exists(): return Directories(value) return None
[docs] @staticmethod def from_config(config: dict, key: str) -> Optional["Directories"]: """ Create a Directories object from a configuration dictionary. Returns None if the key is not found or the path does not exist. """ value = config.get(key) if value and Path(value).exists(): return Directories(value) return None
[docs] @staticmethod def from_string(s: str) -> "Directories": """ Create a Directories object from a string path. """ return Directories(s)
[docs] @staticmethod def from_parts(*parts: PathLike) -> "Directories": """ Create a Directories object from multiple path components. """ return Directories(*parts)
[docs] @staticmethod def from_path(p: PathLike) -> "Directories": """ Create a Directories object from a Path-like object. """ return Directories(p)
################################################################################
[docs] class DirectoriesData: """ Collects directories across multiple machines and stores them in dictionaries. Only directories that exist are included in `existing`. Example: >>> dirs = DirectoriesData( >>> klimak_um_only_f=("/media/.../klimak_um_only_f_t100000/uniform", "503"), >>> klimak_all=("/media/.../klimak_um_plrb_all_t100000/uniform", "503"), >>> locally=("data_project/uniform", "local") >>> ) """
[docs] def __init__(self, **dirs: str): """ Initialize with named directory paths. Each value can be either a string (path) or a tuple (path, machine). """ self.all: Dict[str, Directories] = {} for name, spec in dirs.items(): if isinstance(spec, tuple): path, machine = spec else: path, machine = spec, "default" self.all[name] = (Directories(path), machine) #! track existing directories self.existing: Dict[str, Directories] = { name: d for name, (d, _) in self.all.items() if os.path.exists(d) } #! track machines self.machines: Dict[str, List[str]] = {} for name, (d, machine) in self.all.items(): self.machines.setdefault(machine, []).append(name)
############################################################
[docs] def get(self, name: str, only_existing: bool = True) -> Optional[Directories]: """ Get a directory by name. Optionally restrict to existing ones. Parameters ---------- name : str The name of the directory to retrieve. only_existing : bool, optional If True, only return the directory if it exists. Default is True. """ if only_existing: return self.existing.get(name) return self.all.get(name)
############################################################
[docs] def add(self, name: str, path: PathLike, machine: str = "default"): """Add a new directory.""" self.all[name] = Directories(path, machine) if self.all[name].exists(): self.existing[name] = self.all[name] self.machines.setdefault(machine, []).append(name)
[docs] def remove(self, name: str) -> None: """Remove a directory entry by name.""" if name in self.all: machine = self.all[name].machine del self.all[name] self.existing.pop(name, None) if machine in self.machines and name in self.machines[machine]: self.machines[machine].remove(name) if not self.machines[machine]: del self.machines[machine]
############################################################ def _match(self, name: str, filters: list[Union[str, Callable[[str], bool]]]) -> bool: """Check if a name matches any filter.""" for f in filters: if isinstance(f, str): if f in name: return True elif callable(f): if f(name): return True return False
[docs] def filter_names(self, filters: list[Union[str, Callable[[str], bool]]], only_existing: bool = True) -> list[str]: """ Return names that match any filter. Filters can be substrings or callables (e.g. regex matchers, lambdas). """ source = self.existing if only_existing else self.all return [name for name in source.keys() if self._match(name, filters)]
[docs] def filter_dirs(self, filters: list[Union[str, Callable[[str], bool]]], only_existing: bool = True) -> dict[str, Directories]: """ Return {name: Directories} for names matching any filter. Filters can be substrings or callables (e.g. regex matchers, lambdas). """ source = self.existing if only_existing else self.all return {name: d for name, d in source.items() if self._match(name, filters)}
############################################################
[docs] def list_existing(self) -> List[str]: """List names of existing directories.""" return list(self.existing.keys())
[docs] def list_existing_dirs(self) -> List[Directories]: """List existing Directories objects.""" return list(self.existing.values())
[docs] def list_all(self) -> List[str]: """List all directory names provided.""" return list(self.all.keys())
[docs] def list_all_dirs(self) -> List[Directories]: """List all Directories objects provided.""" return list(self.all.values())
[docs] def list_machines(self) -> List[str]: """List all machines.""" return list(self.machines.keys())
############################################################
[docs] def on(self, machine: str, only_existing: bool = True) -> Dict[str, Directories]: """ Get directories for a specific machine. Parameters ---------- machine : str The machine name to filter directories. only_existing : bool, optional If True, only return existing directories. Default is True. Returns ------- Dict[str, Directories] A dictionary of directory names to Directories objects. Raises ------- KeyError If the machine is not known. """ names = self.machines.get(machine, []) if only_existing: return {n: self.all[n] for n in names if n in self.existing} return {n: self.all[n] for n in names}
############################################################
[docs] def register_machine(self, machine: str): """Ensure machine is known (for clarity, optional).""" self.machines.setdefault(machine, [])
############################################################ def __repr__(self): return ( f"DirectoriesData(\n" f" machines={list(self.machines.keys())},\n" f" all={list(self.all.keys())},\n" f" existing={list(self.existing.keys())}\n" f")" ) def __str__(self): lines = ["DirectoriesData:"] for name, d in self.all.items(): status = "exists" if d.exists() else "missing" lines.append(f" {name}: {d} [{status}]") return "\n".join(lines) def __len__(self): return len(self.all) def __getitem__(self, name: str) -> Directories: return self.all[name] def __contains__(self, name: str) -> bool: return name in self.all def __iter__(self): return iter(self.all.items())
[docs] def __add__(self, other: "DirectoriesData") -> "DirectoriesData": """Return a new DirectoriesData with merged contents.""" new = DirectoriesData(**{}) # empty # copy self for name, d in self.all.items(): new.add(name, str(d.path), d.machine) # add from other for name, d in other.all.items(): new.add(name, str(d.path), d.machine) return new
[docs] def __iadd__(self, other: "DirectoriesData") -> "DirectoriesData": """In-place merge of other into self.""" for name, d in other.all.items(): self.add(name, str(d.path), d.machine) return self
[docs] def __radd__(self, other: "DirectoriesData") -> "DirectoriesData": """Allow sum([...]) to work by reusing __add__.""" return self.__add__(other)
################################################################################ #! EOF