from __future__ import annotations
import logging
import os
import re
from collections.abc import Generator
from dataclasses import dataclass, replace
from urllib.parse import urlsplit, urlunsplit
from dulwich.refs import Ref
from ..environment import Environment
from ..fetch import fetch_from_url
from ..git_util import GitRepo
from ..metadata import Metadata
from ..utils import _ParseAppCallable, is_valid_version
logger = logging.getLogger(__name__)
[docs]@dataclass
class Release:
environment: Environment
metadata: Metadata
url: str
RELEASE_LINE_PATTERN = r"^(?P<rev>[^:]*?)(:(?P<rev_selection>.*))?$"
[docs]def _split_release_line(url: str) -> tuple[str, str | None]:
parsed_url = urlsplit(url)
if "@" in parsed_url.path:
path, release_line = parsed_url.path.rsplit("@", 1)
return urlunsplit(parsed_url._replace(path=path)), release_line
return url, None
[docs]def _get_release_commits(
repo: GitRepo, release_line: str
) -> Generator[tuple[str, str]]:
"""Get the commits for a release line.
:param repo: Git repository object.
:param release_line: support standard git revision selection syntax to further
reduce the selected commits on a release line. For example, @main:v1.0.0.. means “select all tagged commits on the main branch after commit tagged with v1.0.0”.
"""
match = re.match(RELEASE_LINE_PATTERN, release_line)
if not match:
raise ValueError(f"Invalid release line specification: {release_line}")
rev = match.groupdict()["rev"] or repo.get_current_branch()
if match.groupdict()["rev"] == "*":
# loop over all remote branches and yield the tags for the commits
tags = set()
remote_ref = Ref(b"refs/remotes/origin/")
for branch in repo.refs.as_dict(remote_ref).keys():
rev = branch.decode()
rev_selection = match.groupdict()["rev_selection"]
for tag, commit in _get_tags(repo, rev, rev_selection):
if tag not in tags:
tags.add(tag)
yield tag, commit
return
elif match.groupdict()["rev_selection"] is None:
# No rev_selection means to select this and only this specific
# revision. For example: '@main' means, simply checkout 'main' (could
# be a branch or a tag, however branches have priority).
for r in [
f"refs/heads/{rev}",
f"refs/remotes/origin/{rev}",
f"refs/tags/{rev}",
]:
ref = Ref(r.encode())
if ref in repo.refs:
yield rev, repo.get_peeled(ref).decode()
return
# rev likely committish (commit)
yield rev, rev
elif match.groupdict()["rev_selection"]:
# A rev selection is provided, we fetch the full rev list for the given
# selection. For example: '@main:v1..v2' means all commits from v1
# (exclusive) to v2 (inclusive).
rev_selection = match.groupdict()["rev_selection"]
for tag, commit in _get_tags(repo, rev, rev_selection):
yield tag, commit
else:
# The rev selection is empty, select all tagged commits for the
# selected revision. For example: '@main:' means all tagged commits on
# the main branch.
for tag in repo.get_merged_tags(rev):
yield tag, repo.get_commit_for_tag(tag)
[docs]def _gather_releases(
release_specs: list[str | dict],
scan_app_repository: _ParseAppCallable,
app_metadata: dict | None,
) -> Generator[tuple[str | None, Release]]:
for release_spec in release_specs:
if isinstance(release_spec, str):
url = release_spec
environment_override = None
metadata_override = None
version_override = None
else:
url = release_spec["url"]
environment_override = release_spec.get("environment")
metadata_override = release_spec.get("metadata", app_metadata)
version_override = release_spec.get("version")
def _set_overrides(
version: str | None, release: Release
) -> tuple[str | None, Release]:
return version_override or version, replace( # noqa: B023
release,
environment=environment_override or release.environment, # noqa: B023
metadata=metadata_override or release.metadata, # noqa: B023
)
# The way that an app is retrieved is determined by the scheme of the
# release url. For example, "git+https://example.com/my-app.git" means
# that the app is located at a remote git repository from which it can
# be downloaded (cloned) via https.
base_url, release_line = _split_release_line(url)
parsed_url = urlsplit(base_url)
with fetch_from_url(base_url) as repo_path:
if parsed_url.scheme.startswith("git+"):
repo = GitRepo(os.fspath(repo_path))
for ref, sha in _get_release_commits(
repo, release_line or repo.get_current_branch()
):
# Parse environment from local copy of repository.
metadata_and_environment = scan_app_repository(
f"git+file:{os.fspath(repo_path.resolve())}@{sha}"
)
if (
metadata_and_environment["metadata"] is None
and metadata_override is None
):
logger.warning(
f"Failed to parse metadata for {base_url}@{ref} and no override specified, skipping release!"
)
continue
# Replace release specifier to point to specific commit.
path = f"{parsed_url.path.rsplit('@', 1)[0]}@{sha}"
release = Release(
url=urlunsplit(parsed_url._replace(path=path)),
**metadata_and_environment,
)
yield _set_overrides(ref, release)
else:
release = Release(
url=url,
**scan_app_repository(f"file:{os.fspath(repo_path.resolve())}"),
)
yield _set_overrides(None, release)
[docs]def gather_releases(
app_data: dict, scan_app_repository: _ParseAppCallable
) -> Generator[tuple[str, Release]]:
for version, release in _gather_releases(
app_data.get("releases", []),
scan_app_repository,
app_metadata=app_data.get("metadata"),
):
if version is None:
raise ValueError(f"Unable to determine version for: {release}")
if is_valid_version(version):
yield version, release
else:
logger.warning(f"{release.url}: Skipping invalid version '{version}'")