Source code for gitconductor.gitlab

"""GitLab-related constructs."""

from __future__ import annotations

import os
import pathlib
import pickle
import shlex
import subprocess
import typing

import git
import gitlab
import rich
import rich.console
import tomllib
from pydantic import BaseModel, Field

from gitconductor import output, settings

GROUP_FNAME = pathlib.Path(".gitconductor.pkl")


[docs] class GitlabInstance(BaseModel): """A GitLab generic instance convenience class. Attributes: ... """ gitlab_url: str = "https://gitlab.com" gitlab_key: str server: typing.Any | None = None root: pathlib.Path = pathlib.Path().resolve() flat: bool = False cfg: settings.Settings | None = None @property def toplevel_dir(self) -> pathlib.Path: """What is the top level work directory? Returns: pathlib.Path: Directory name. """ if self.flat: return self.root else: return self.root / self.path.relative_to(self.root).parts[0]
[docs] class GitlabGroup(GitlabInstance): """A GitLab group convenience class. Attributes: ... """ name: str gitlab_key: str gitlab_url: str = "https://gitlab.com" fullname: str = "" group: typing.Any | None = None projects: list[str] = Field(default_factory=list) subgroups: list[str] = Field(default_factory=list) subgroup: bool = False cfg: settings.Settings | None = None
[docs] def model_post_init(self, __context: str | None = None) -> None: """Post-init function calls.""" kwargs = self.cfg.gitlab if self.cfg else {} self.server = gitlab.Gitlab(self.gitlab_url, private_token=self.gitlab_key, **kwargs) self.root = self.root.resolve() self.group = self.server.groups.get(self.fullname) self.build()
@property def path(self) -> pathlib.Path: """Auto-generated path. Returns: pathlib.Path: Project path. """ return self.root / self.fullname.replace(os.sep, "-") if self.flat else self.root / self.fullname @property def visibility(self) -> str: """Project visibility. Returns: str: Project visibility (private, internal, public). """ return self.group.attributes["visibility"] @property def members(self) -> list: """Get members. Returns: int: Number of members. """ self.group = self.server.groups.get(self.group.id) return self.group.members_all.list(all=True)
[docs] def build(self) -> None: """Build each project object. Returns: None """ # Loop through projects in the group, set up GitlabProject instance for the project. for project in sorted(self.group.projects.list(all=True), key=lambda x: x.path): proj = GitlabProject( gitlab_url=self.gitlab_url, gitlab_key=self.gitlab_key, project=project, root=self.root, flat=self.flat, cfg=self.cfg, ) fullname = self.path.parent / f"{self.path.name}-{project.path}" if self.flat else self.path / project.path fullname = str(fullname.relative_to(self.root)) proj.fullname = fullname self.projects.append(proj) # Loop through subgroups in the group, set up GitlabGroup instance for each subgroup. for group in self.group.subgroups.list(all=True): grp = GitlabGroup( gitlab_url=self.gitlab_url, gitlab_key=self.gitlab_key, fullname=group.full_path, name=pathlib.Path(group.full_path).name, root=self.root, flat=self.flat, subgroup=True, cfg=self.cfg, ) self.subgroups.append(grp)
[docs] def rebuild(self, cfg: pathlib.Path) -> None: """Rebuild the objects when reloaded (new directories, etc.). Arguments: cfg (pathlib.Path): Serialised .pkl file. Returns: None """ self.root = cfg.resolve().parent.parent for project in self.projects: project.root = self.root project.git = git.Repo(project.path) for group in self.subgroups: group.root = self.root group.rebuild(cfg)
@property def count(self) -> int: """How many repositories are in the full group structure? Returns: int: Total number of projects in all recursive subgeoups. """ count = len(self.projects) for subgroup in self.subgroups: count += subgroup.count return count
[docs] def recursive_command(self, command: str, **kwargs: dict) -> None: """Recursively walk down group tree, finding projects and executing commands. Arguments: command (str): Command to execute. kwargs (*): Keyword args to each recursive command. Returns: list: List of returns from each command execution. """ returns = [] if not self.subgroup: output.PROGRESS_TOTAL.update(output.TASK_TOTAL, description=self.name, total=self.count) else: output.PROGRESS_TOTAL.update(output.TASK_TOTAL, description=self.name) for project in self.projects: project.rows = [] if hasattr(project, command): returns.append(getattr(project, command)(**kwargs)) for row in project.rows: output.TABLE.add_row(*row) output.PROGRESS_TOTAL.update(output.TASK_TOTAL, description=project.name) output.PROGRESS_TOTAL.advance(output.TASK_TOTAL) if command == "clone": output.LIVE.update( rich.console.Group(output.TABLE, output.PROGRESS_PROJECT, output.PROGRESS_TOTAL), refresh=True, ) else: output.LIVE.update( rich.console.Group(output.TABLE), refresh=True, ) else: raise Exception(f'Command "{command}" not recognised.') for subgroup in self.subgroups: returns.extend(subgroup.recursive_command(command, **kwargs)) if not self.subgroup: output.LIVE.update(rich.console.Group(output.TABLE), refresh=True) self.dump() return returns
[docs] def dump(self) -> None: """Dump to file.""" with open(self.toplevel_dir / GROUP_FNAME, "wb") as fobj: pickle.dump(self, fobj)
[docs] class GitlabProject(GitlabInstance): """A GitLab project convenience class.""" project: typing.Any gitlab_key: str gitlab_url: str = "https://gitlab.com" name: str = "" git: typing.Any | None = None rows: list = Field(default_factory=list) _path: pathlib.Path = pathlib.Path() fullname: str = ""
[docs] def model_post_init(self, __context: str | None = None) -> None: """Post-init function calls.""" kwargs = self.cfg.gitlab if self.cfg else {} self.server = gitlab.Gitlab(self.gitlab_url, private_token=self.gitlab_key, **kwargs) if self.path is None: self.path = pathlib.Path() / self.project.path
@property def path(self) -> pathlib.Path: """Auto-generated path. Returns: pathlib.Path: Project path. """ return self.root / self.fullname @property def visibility(self) -> str: """Project visibility. Returns: str: Project visibility (private, internal, public). """ return self.project.attributes["visibility"] @property def members(self) -> list: """Get members. Returns: int: Number of members. """ self.project = self.server.projects.get(self.project.id) return self.project.members_all.list(all=True) @property def is_python_package(self) -> bool: """Is this project a Python package? Returns: bool: Is this project a Python package? """ return (self.path / "setup.py").exists() or (self.path / "pyproject.toml").exists() @property def python_package_name(self) -> str: """What is the name of the Python package? Returns: str: Name of the Python package. """ if (self.path / "setup.py").exists(): with open(self.path / "setup.py") as fobj: for line in fobj: if line.strip().startswith("name="): return line.strip().split("name=")[1].split(",")[0].strip().strip('"').strip("'") elif (self.path / "pyproject.toml").exists(): with open(self.path / "pyproject.toml", "rb") as fobj: pyproject = tomllib.load(fobj) return pyproject.get("project", {}).get("name", "") else: raise Exception(f'Cannot determine Python package name for project "{self.name}".')
[docs] def recursive_command(self, command: str, **kwargs: dict) -> None: """Trick to execute a command recursively on a project to keep code the same elsewhere. Arguments: command (str): Command to execute. kwargs (*): Keyword args to each recursive command. Returns: list: List of returns from each command execution. """ if hasattr(self, command): return getattr(self, command)(**kwargs) else: raise Exception(f'Command "{command}" not recognised.')
[docs] def clone(self) -> None: """Clone a repository. Returns: None """ self.git = git.Repo.clone_from(self.project.ssh_url_to_repo, self.path, progress=output.CloneProgress()) self.name = pathlib.Path(self.git.working_tree_dir).name self.rows.append( [ self.name, # str(self.path.relative_to(self.root)), str(self.fullname), self.git.git.rev_parse("--abbrev-ref", "HEAD"), str(self.path.relative_to(self.root)), self.git.remote(name="origin").url, ] )
[docs] def branch(self, name: str | None = None) -> None: """Make branch in a repository. Returns: None """ if not self.path: raise Exception(f'Cannot find repository @ "{self.path}"') self.git = git.Repo(self.path) self.rows.append( [ self.name, str(self.path.relative_to(self.root)), self.git.git.rev_parse("--abbrev-ref", "HEAD"), name, ] ) self.git.create_head(name)
[docs] def checkout(self, name: str | None = None) -> None: """Checkout a branch in a repository. Returns: None """ if not self.path: raise Exception(f'Cannot find repository @ "{self.path}"') self.git = git.Repo(self.path) self.rows.append( [ self.name, str(self.path.relative_to(self.root)), self.git.git.rev_parse("--abbrev-ref", "HEAD"), name, ] ) self.git.heads[name].checkout()
[docs] def add(self, fnames: tuple, all_: bool) -> None: """Add files to staging area. Returns: None """ if all_: untracked = [self.path / fname for fname in self.git.untracked_files] modified = [self.path / d.a_path for d in self.git.index.diff(None)] fnames = untracked + modified for fname in fnames: fname = pathlib.Path(fname).resolve() if fname.is_relative_to(self.path): rel_path = str(fname.relative_to(self.path)) if rel_path in self.git.untracked_files or rel_path in [d.a_path for d in self.git.index.diff(None)]: self.git.index.add( [ fname.relative_to(self.path), ] ) self.rows.append( [ self.name, self.git.git.rev_parse("--abbrev-ref", "HEAD"), str(fname), ] )
[docs] def commit(self, message: str) -> None: """Add files to staging area. Returns: None """ staged = self.git.index.diff("HEAD") fnames = [d.a_path for d in staged] if fnames: self.git.index.commit(message) for fname in fnames: self.rows.append( [ self.name, self.git.git.rev_parse("--abbrev-ref", "HEAD"), str(fname), message, ] )
[docs] def status(self) -> None: """Return git status. Returns: None """ untracked = self.git.untracked_files modified = [d.a_path for d in self.git.index.diff(None)] added = [d.a_path for d in self.git.index.diff("HEAD")] for fname in sorted(added): entry = [self.fullname, fname, "Changes to be committed"] entry = [f"[green]{string}[/]" for string in entry] self.rows.append(entry) for fname in sorted(modified): entry = [self.fullname, fname, "Changes not staged for commit"] entry = [f"[red]{string}[/]" for string in entry] self.rows.append(entry) for fname in sorted(untracked): entry = [self.fullname, fname, "Untracked files"] entry = [f"[magenta]{string}[/]" for string in entry] self.rows.append(entry)
[docs] def push(self) -> None: """Push recursively. Returns: None """ self.git.remotes.origin.push() self.rows.append( [self.name, self.git.git.rev_parse("--abbrev-ref", "HEAD"), self.git.remote(name="origin").url] )
[docs] def pyinstall(self, pm: str = "uv pip", editable: bool = False, index: str | None = None) -> None: """Install Python package. Returns: None """ if self.is_python_package: cmd = [*shlex.split(pm), "install"] if editable: cmd.append("--editable") if index: cmd.extend(["--index", index]) cmd.append(str(self.path)) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode: raise Exception(f'Failed to install Python package at "{self.path}". Command: {" ".join(cmd)}') self.rows.append([self.python_package_name, str(self.path.relative_to(self.root))])
[docs] def pyreqs(self) -> None: """Generate requirements.txt file for Python package. Returns: None """ if self.is_python_package: self.rows.append( [ self.name, str(self.path.relative_to(self.root)), self.python_package_name, ] ) return f"{self.python_package_name} @ {self.git.remotes.origin.url}@{self.git.head.commit.hexsha}" else: return ""
[docs] def pywheel(self) -> None: """Generate wheel file for Python package. Returns: None """ if self.is_python_package: cmd = [*shlex.split("uv build"), str(self.path), "--wheel", "--out-dir", str(self.path / "dist")] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode: print(result.stdout) print(result.stderr) raise Exception(f'Failed to build Python package at "{self.path}". Command: {" ".join(cmd)}') self.rows.append([self.python_package_name, str(self.path.relative_to(self.root))])