"""Helpful utilities for the AiiDAlab tools."""
from __future__ import annotations
import json
import logging
import subprocess
import sys
import time
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import asdict
from functools import wraps
from pathlib import Path
from subprocess import run
from threading import Lock
from typing import TYPE_CHECKING, Any, Callable, TypeVar
from urllib.parse import urlsplit, urlunsplit
import requests
from cachetools import TTLCache, cached
from packaging.utils import NormalizedName, canonicalize_name
from .config import AIIDALAB_REGISTRY
from .environment import Environment
from .fetch import fetch_from_url
from .metadata import Metadata
if TYPE_CHECKING:
from packaging.requirements import Requirement
R = TypeVar("R")
logger = logging.getLogger(__name__)
FIND_INSTALLED_PACKAGES_CACHE = TTLCache(maxsize=32, ttl=60) # type: ignore[var-annotated]
# NOTE: try-except is a fix for Quantum Mobile release v19.03.0 where
# requests_cache is not installed.
try:
# The cache is configured to avoid spamming the app registry server with requests
# that are made in rapid succession and also serves as a fallback in case
# that the index server is temporarily not reachable.
from requests_cache import CachedSession
_session = CachedSession(
"aiidalab_registry",
use_cache_dir=True, # store cache in ~/.cache/
backend="sqlite",
expire_after=60, # seconds
stale_if_error=True,
)
except ImportError:
logger.warning(
"The requests_cache package is missing. "
"Requests made to the app registry will not be cached."
)
_session = requests.Session() # type: ignore[assignment]
[docs]def load_app_registry_index() -> Any:
"""Load apps' information from the AiiDAlab registry."""
try:
return _session.get(f"{AIIDALAB_REGISTRY}/apps_index.json").json()
except (ValueError, requests.ConnectionError) as error:
raise RuntimeError("Unable to load registry index") from error
[docs]def load_app_registry_entry(app_id: str) -> Any:
"""Load registry entry for app with app_id."""
try:
return _session.get(f"{AIIDALAB_REGISTRY}/apps/{app_id}.json").json()
except (ValueError, requests.ConnectionError):
logger.debug(f"Unable to load registry entry for app with id '{app_id}'.")
return None
[docs]class PEP508CompliantUrl(str):
"""Represents a PEP 508 compliant URL."""
pass
_ParseAppCallable = Callable[[str], dict[str, Any]]
[docs]def parse_app_repo(
url: str, metadata_fallback: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Parse an app repo for metadata and other information.
Use this function to parse a local or remote app repository for the app
metadata and environment specification.
Examples:
For a local app repository, provide the absolute or relative path:
url="/path/to/aiidalab-hello-world"
For a remote app repository, provide a PEP 508 compliant URL, for example:
url="git+https://github.com/aiidalab/aiidalab-hello-world.git@v1.0.0"
"""
with fetch_from_url(url) as repo:
try:
metadata = asdict(Metadata.parse(repo))
except TypeError as error:
logger.debug(f"Failed to parse metadata for '{url}': {error}")
metadata = metadata_fallback # type: ignore[assignment]
return {
"metadata": metadata,
"environment": asdict(Environment.scan(repo)),
}
[docs]class throttled: # noqa: N801
"""Decorator to throttle calls to a function to a specified rate.
The throttle is specific to the first argument of the wrapped
function. That means for class methods it is specific to each
instance.
Adapted from: https://gist.github.com/gregburek/1441055
"""
[docs] def __init__(self, calls_per_second: int = 1):
self.calls_per_second = calls_per_second
self.last_start: dict[int, float] = defaultdict(lambda: -1.0)
self.locks: dict[int, Lock] = defaultdict(Lock)
[docs] def __call__(self, func: Callable[..., R]) -> Callable[..., R | None]:
"""Return decorator function."""
@wraps(func)
def wrapped(instance, *args, **kwargs): # type: ignore[no-untyped-def]
if self.last_start[hash(instance)] >= 0:
elapsed = time.perf_counter() - self.last_start[hash(instance)]
to_wait = 1.0 / self.calls_per_second - elapsed
if to_wait > 0:
locked = self.locks[hash(instance)].acquire(blocking=False)
if locked:
try:
time.sleep(to_wait)
finally:
self.locks[hash(instance)].release()
else:
return None # drop
self.last_start[hash(instance)] = time.perf_counter()
return func(instance, *args, **kwargs)
return wrapped
[docs]class Package:
"""Helper class to check whether a given package fulfills a requirement."""
[docs] def __init__(self, name: str, version: str | None = None):
"""If version is None, means not confinement for the version therefore
the package always fulfill."""
self._name = name # underscore to avoid name clash with property canonical_name
self.version = version
[docs] def __repr__(self) -> str:
return f"{type(self).__name__}({self.canonical_name}, {self.version})"
[docs] def __str__(self) -> str:
return f"{self.canonical_name}=={self.version}"
@property
def canonical_name(self) -> NormalizedName:
"""Return the cananicalized name of the package."""
return canonicalize_name(self._name)
[docs] def fulfills(self, requirement: Requirement) -> bool:
"""Returns True if this entry fulfills the requirement."""
return self.canonical_name == canonicalize_name(requirement.name) and (
self.version is None or self.version in requirement.specifier
)
[docs]@cached(cache=FIND_INSTALLED_PACKAGES_CACHE)
def find_installed_packages(python_bin: str | None = None) -> dict[str, Package]:
"""Return all currently installed packages."""
return {
canonicalize_name(package["name"]): Package(
name=canonicalize_name(package["name"]), version=package["version"]
)
for package in _pip_list(python_bin)
}
[docs]def _pip_list(python_bin: str | None = None) -> Any:
"""Return all currently installed packages."""
if python_bin is None:
python_bin = sys.executable
output = run(
[python_bin, "-m", "pip", "list", "--format=json"],
encoding="utf-8",
capture_output=True,
check=True,
).stdout
return json.loads(output)
[docs]def get_package_by_name(packages: dict[str, Package], name: str) -> Package | None:
"""Return the package with the given name from the list of packages.
The name can be the canonicalized name or the requirement name which may not canonicalized.
We try to convert the name to the canonicalized in both side and compare them.
For example, the requirement name is 'jupyter-client' and the package name is 'jupyter_client'.
The implementation of this method is inspired by https://github.com/pypa/pip/pull/8054
"""
for package in packages.values():
if package.canonical_name == canonicalize_name(name):
return package
return None
[docs]def is_valid_version(version: str) -> bool:
"""Return True if given string is a PEP440-compliant version, False otherwise."""
from packaging.version import InvalidVersion, parse
try:
parse(version)
except InvalidVersion:
return False
else:
return True
[docs]def sort_semantic(
versions: Iterable[str], reverse: bool = True, prereleases: bool = False
) -> list[str]:
"""Sort a list of app version strings semantically according to semver.
By default, sorting is performed in descending order (i.e. latest versions first).
By default, pre-release versions are filtured out.
:raises: packaging.version.InvalidVersion if the input list contains invalid version.
"""
from packaging.version import parse
return [
version
for version in sorted(versions, key=parse, reverse=reverse)
if prereleases or not parse(version).is_prerelease
]
[docs]def split_git_url(git_url: str) -> tuple[str, str | None]:
"""Split the base url and the ref pointer of a git url.
For example: git+https://example.com/app.git@v1 is split into and returned
as tuple: (git+https://example.com/app.git, v1)
"""
parsed_url = urlsplit(git_url)
if "@" in parsed_url.path:
path, ref = parsed_url.path.rsplit("@", 1)
else:
path, ref = parsed_url.path, None
return urlunsplit(parsed_url._replace(path=path)), ref
[docs]def this_or_only_subdir(path: Path) -> Path:
members = list(path.iterdir())
return members[0] if len(members) == 1 and members[0].is_dir() else path
[docs]def run_pip_install(*args: Any, python_bin: str) -> Any:
return subprocess.Popen(
[python_bin, "-m", "pip", "install", "--user", *args],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
[docs]def run_verdi_daemon_restart() -> Any:
return subprocess.Popen(
["verdi", "daemon", "restart"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
[docs]def run_post_install_script(post_install_script_path: Path) -> Any:
return subprocess.Popen(
f"./{post_install_script_path.resolve().stem}",
cwd=post_install_script_path.resolve().parent,
# TODO: We should redirect to a file (maybe post_install.out?"
# Otherwise any errors are impossible to debug.
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)