Source code for onyo.lib.git

from __future__ import annotations

import logging
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING

import pathspec

from onyo.lib.exceptions import OnyoInvalidRepoError
from onyo.lib.ui import ui

if TYPE_CHECKING:
    from typing import (
        Generator,
        Iterable,
        Literal,
    )

log: logging.Logger = logging.getLogger('onyo.git')


[docs] class GitRepo(object): r"""Representation of a Git repository. Uses :py:mod:`subprocess` to execute git commands on a repository. Bare repositories are not supported. Attributes ---------- root The absolute Path of the root of the git repository. """
[docs] def __init__(self, path: Path, find_root: bool = False) -> None: r"""Instantiate a ``GitRepo`` object with ``path`` as the root directory. Parameters ---------- path The absolute Path of a git repository. find_root Replace ``path`` with the results of :py:func:`find_root`. Thus any directory of a git repository can be passed as ``path``, not just the repo root. """ self.root = GitRepo.find_root(path) if find_root else path.resolve() self._files: list[Path] | None = None
[docs] @staticmethod def find_root(path: Path) -> Path: r"""Return the absolute path of the git worktree root that ``path`` belongs to. Checks ``path`` itself and each of its parents. Parameters ---------- path The Path to find the git worktree root for. Raises ------ OnyoInvalidRepoError Neither ``path`` nor its parents are a git repository. """ try: ret = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=path, check=True, capture_output=True, text=True) root = Path(ret.stdout.strip()) except (subprocess.CalledProcessError, FileNotFoundError): raise OnyoInvalidRepoError(f"'{path}' is not a Git repository.") return root
def _git(self, args: list[str], *, cwd: Path | None = None, raise_error: bool = True) -> str: r"""Run git commands and return the output. Parameters ---------- args Arguments to append to the ``git`` command. e.g. ``args=['add', <file>]`` results in the system call ``git add <file>``. cwd Path to run commands from. Default to ``self.root``. raise_error Raise :py:exc:`subprocess.CalledProcessError` if the command returns with a non-zero exit code. """ cwd = cwd or self.root ui.log_debug(f"Running 'git {' '.join(args)}'") ret = subprocess.run(["git"] + args, cwd=cwd, check=raise_error, capture_output=True, text=True) return ret.stdout @property def files(self) -> list[Path]: r"""Get the absolute ``Path``\ s of all tracked files. This property is cached, and is reset automatically by :py:func:`commit`. """ if not self._files: self._files = self.get_files() return self._files
[docs] def clear_cache(self) -> None: r"""Clear the cache of this instance of GitRepo. When the repository is modified using only the public API functions, the cache is consistent. This method is only necessary if the repository is modified otherwise. """ self._files = None
[docs] def get_files(self, paths: Iterable[Path] | None = None) -> list[Path]: r"""Get the absolute ``Path``\ s of all tracked files under ``paths``. Parameters ---------- paths Paths to limit the scope of the search to. The entire repo by default. """ ui.log_debug("Looking up tracked files%s", f" underneath {', '.join([str(p) for p in paths])}" if paths else "") git_cmd = ['ls-tree', '-r', '--full-tree', '--name-only', '-z', 'HEAD'] if paths: git_cmd.extend([str(p) for p in paths]) try: tree = self._git(git_cmd) except subprocess.CalledProcessError as e_ls_tree: try: self._git(['rev-parse', 'HEAD', '--']) raise e_ls_tree except subprocess.CalledProcessError: # no HEAD -> empty repository tree = "" files = [self.root / x for x in tree.split('\0') if x] return files
[docs] def is_clean_worktree(self) -> bool: r"""Whether the git worktree is clean.""" return not bool(self._git(['status', '--porcelain']))
[docs] def init_without_reinit(self) -> None: r"""Initialize ``self.root`` as a git repo, but not if it's already one.""" # make sure target dir exists self.root.mkdir(exist_ok=True) if (self.root / '.git').exists(): log.info(f"'{self.root}' is already a git repository.") else: ret = self._git(['init']) ui.log_debug(ret.strip())
[docs] def commit(self, paths: Iterable[Path] | Path, message: str) -> None: r"""Stage and commit changes in git. Parameters ---------- paths Paths to commit. message The git commit message. """ from onyo.lib.utils import get_temp_file if isinstance(paths, Path): paths = [paths] # Pass paths and message as files to avoid exceeding the OS's maximum # command and argument length. # Detecting this accurately cross-platform is buggy and complicated for # no gain. pathspecs = [str(p) for p in paths] tmpfile_paths = get_temp_file(suffix='.commit-paths') tmpfile_paths.write_text('\x00'.join(pathspecs)) tmpfile_message = get_temp_file(suffix='.commit-message') tmpfile_message.write_text(message) # stage and commit self._git(['add', '--pathspec-file-nul', '--pathspec-from-file', str(tmpfile_paths)]) self._git(['commit', '--file', str(tmpfile_message), '--pathspec-file-nul', '--pathspec-from-file', str(tmpfile_paths)]) # clean up tmpfile_paths.unlink() tmpfile_message.unlink() self.clear_cache()
[docs] @staticmethod def is_git_path(path: Path) -> bool: r"""Whether ``path`` is a git file or directory. A "git path" is a path that is used by git itself (tracked or not) and therefore not valid for use by Onyo. Any path underneath a directory called ``.git`` and any basename starting with ``.git`` is considered a git path. e.g. ``.git/*``, ``.gitignore``, ``gitattributes``, ``.gitmodules``, etc. Parameters ---------- path The path to check. """ return '.git' in path.parts or path.name.startswith('.git')
[docs] def get_config(self, key: str, path: Path | None = None) -> str | None: r"""Get the value of a configuration key. If no ``path`` is given, the configuration key is acquired according to ``git-config``'s order of precedence (worktree, local, global, system). Parameters ---------- key Name of the configuration key to query. Follows Git's convention of "SECTION.NAME.KEY" to address a key in a git config file:: [SECTION "NAME"] KEY = VALUE path Path of a config file, rather than Git's default locations. """ value = None if path: try: value = self._git(['config', '--file', str(path), '--get', key]).strip() ui.log_debug(f"config '{key}' acquired from {path}: '{value}'") except subprocess.CalledProcessError: ui.log_debug(f"config '{key}' missing in {path}") else: # git-config (with its full stack of locations to check) try: value = self._git(['config', '--get', key]).strip() ui.log_debug(f"git config acquired '{key}': '{value}'") except subprocess.CalledProcessError: ui.log_debug(f"git config missed '{key}'") return value
[docs] def set_config(self, key: str, value: str, location: Literal['system', 'global', 'local', 'worktree'] | Path | None = None ) -> None: r"""Set the value of a configuration key. Parameters ---------- key The name of the configuration key to set. value The value to set the configuration key to. location The location to set the key/value in. Valid locations are standard git-config locations (``'system'``, ``'global'``, ``'local'``, and ``'worktree'``) or a Path of a file. ``None`` will use ``git-config``'s default location (``'local'``). Raises ------ ValueError ``location`` is invalid. """ location_options = { 'system': ['--system'], 'global': ['--global'], 'local': ['--local'], 'worktree': ['--worktree'], None: [] # use Git's default behavior } try: location_arg = ['--file', str(location)] if isinstance(location, Path) \ else location_options[location] except KeyError as e: raise ValueError("Invalid config location requested. Valid options are: {}" "".format(', '.join(str(location_options.keys())))) from e self._git(['config'] + location_arg + [key, value]) ui.log_debug(f"'config for '{location}' set '{key}': '{value}'")
[docs] def get_hexsha(self, commitish: str | None = None, short: bool = False) -> str | None: r"""Return the hexsha of a given commit-ish. Will return ``None`` if querying the mother of all commits (i.e. "HEAD" of an empty repository). Parameters ---------- commitish Any identifier that refers to a commit (defaults to "HEAD"). short Return the abbreviated form of the hexsha. Raises ------ ValueError ``commitish`` is unknown. """ # Use --quiet to suppress the 'Needed a single revision' error message # when running this on a repo with no commits. cmd = ['rev-parse', '--quiet', '--verify', '{}^{{commit}}'.format(commitish if commitish else 'HEAD')] if short: cmd.append('--short') try: return self._git(cmd).strip() except subprocess.CalledProcessError: if commitish is None: return None raise ValueError("Unknown commit identifier: %s" % commitish)
[docs] def get_commit_msg(self, commitish: str | None = None) -> str: r"""Return the full commit message of a commit-ish. Parameters ---------- commitish Any identifier that refers to a commit (defaults to "HEAD"). """ return self._git(['log', commitish or 'HEAD', '-n1', '--pretty=%B'])
[docs] def check_ignore(self, ignore: Path, paths: list[Path]) -> list[Path]: r"""Get the subset of ``paths`` that match patterns defined in ``ignore``. Parameters ---------- ignore Path to a file containing exclude patterns (in the style of ``.gitignore``). paths Paths to check Raises ------ ValueError Path is outside of the repository. """ # all paths must be in the repo for p in paths: if self.root not in p.parents: raise ValueError(f"{str(p)} is not under {str(self.root)}") # extract, load, and match the patterns ignore_patterns = ignore.read_text().splitlines() spec = pathspec.GitIgnoreSpec.from_lines(ignore_patterns) matches = list(spec.match_files(paths)) # build a list of the original Paths ignored = [p for p in paths if p in matches] return ignored
def _parse_log_output(self, lines: list[str]) -> dict: """Produce a commit dict (of one commit) from the output of ``git log``. A helper for :py:func:`history`. Parameters ---------- lines List of text lines from ``git log`` for a single commit. """ import datetime import re regex_author = re.compile(r"Author:\s+(?P<name>\b.*\b)\s+<(?P<email>[^\s]+)>$") regex_committer = re.compile(r"Commit:\s+(?P<name>\b.*\b)\s+<(?P<email>[^\s]+)>$") commit = dict() for line in lines: if line.startswith('commit '): commit['hexsha'] = line.split()[1] continue elif line.startswith('Author:'): try: commit['author'] = re.match(regex_author, line).groupdict() # pyre-ignore [16] AttributeError is caught except AttributeError as e: if str(e).endswith("'groupdict'"): raise RuntimeError(f"Unable to parse author-line:\n{line}") from e raise continue elif line.startswith('AuthorDate:'): continue elif line.startswith('Commit:'): try: commit['committer'] = re.match(regex_committer, line).groupdict() except AttributeError as e: if str(e).endswith("'groupdict'"): raise RuntimeError(f"Unable to parse committer-line:\n{line}") from e raise continue elif line.startswith('CommitDate:'): commit['time'] = datetime.datetime.fromisoformat(line.split(maxsplit=1)[1]) continue else: # This line is part of the message. if 'message' not in commit: commit['message'] = [line] else: commit['message'].append(line) return commit
[docs] def history(self, path: Path | None = None, n: int | None = None) -> Generator[dict, None, None]: """Yield commit dicts representing the history of ``path``. The history is acquired via ``git log`` (``git log --follow`` if a ``path`` is given). Parameters ---------- path The Path to get the history of. Defaults to the repo root. n Limit history to ``n`` commits. ``None`` for no limit (default). """ # TODO: Formatting the output with `--pretty` may simplify handling # multi-line strings in git-log's output (rather than splitting # lines and iterating), parsing git's metadata and Onyo's # operations record, and remove the leading spaces for the commit # message. # --pretty='format:commit: %H%nAuthor: %an (%ae)%nCommitter: %cn (%ce)%nCommitDate: %cI%nMessage:%n%B' limit = [f'-n{n}'] if n is not None else [] pathspec = ['--follow', '--', str(path)] if path else [] cmd = ['log', '--date=iso-strict', '--pretty=fuller'] + limit + pathspec output = self._git(cmd) # yield output on a per-commit-basis commit_output = [] for line in output.splitlines(): if line.startswith('commit '): # This is the first line of a new commit. # 1. store previous commit output if commit_output: # we just finished the previous commit. yield self._parse_log_output(commit_output) # 2. start new commit output commit_output = [line] else: # add to current commit output commit_output.append(line) if commit_output: yield self._parse_log_output(commit_output)