""" Cache service for upstream artifact caching. Provides URL parsing, system project management, and caching logic for the upstream caching feature. """ import logging import re from dataclasses import dataclass from typing import Optional from urllib.parse import urlparse, unquote logger = logging.getLogger(__name__) # System project names for each source type SYSTEM_PROJECT_NAMES = { "npm": "_npm", "pypi": "_pypi", "maven": "_maven", "docker": "_docker", "helm": "_helm", "nuget": "_nuget", "deb": "_deb", "rpm": "_rpm", "generic": "_generic", } # System project descriptions SYSTEM_PROJECT_DESCRIPTIONS = { "npm": "System cache for npm packages", "pypi": "System cache for PyPI packages", "maven": "System cache for Maven packages", "docker": "System cache for Docker images", "helm": "System cache for Helm charts", "nuget": "System cache for NuGet packages", "deb": "System cache for Debian packages", "rpm": "System cache for RPM packages", "generic": "System cache for generic artifacts", } @dataclass class ParsedUrl: """Parsed URL information for caching.""" package_name: str version: Optional[str] = None filename: Optional[str] = None def parse_npm_url(url: str) -> Optional[ParsedUrl]: """ Parse npm registry URL to extract package name and version. Formats: - https://registry.npmjs.org/{package}/-/{package}-{version}.tgz - https://registry.npmjs.org/@{scope}/{package}/-/{package}-{version}.tgz Examples: - https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz - https://registry.npmjs.org/@types/node/-/node-18.0.0.tgz """ parsed = urlparse(url) path = unquote(parsed.path) # Pattern for scoped packages: /@scope/package/-/package-version.tgz scoped_pattern = r"^/@([^/]+)/([^/]+)/-/\2-(.+)\.tgz$" match = re.match(scoped_pattern, path) if match: scope, name, version = match.groups() return ParsedUrl( package_name=f"@{scope}/{name}", version=version, filename=f"{name}-{version}.tgz", ) # Pattern for unscoped packages: /package/-/package-version.tgz unscoped_pattern = r"^/([^/@]+)/-/\1-(.+)\.tgz$" match = re.match(unscoped_pattern, path) if match: name, version = match.groups() return ParsedUrl( package_name=name, version=version, filename=f"{name}-{version}.tgz", ) return None def parse_pypi_url(url: str) -> Optional[ParsedUrl]: """ Parse PyPI URL to extract package name and version. Formats: - https://files.pythonhosted.org/packages/.../package-version.tar.gz - https://files.pythonhosted.org/packages/.../package-version-py3-none-any.whl - https://pypi.org/packages/.../package-version.tar.gz Examples: - https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0.tar.gz - https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0-py3-none-any.whl """ parsed = urlparse(url) path = unquote(parsed.path) # Get the filename from the path filename = path.split("/")[-1] if not filename: return None # Handle wheel files: package-version-py3-none-any.whl wheel_pattern = r"^([a-zA-Z0-9_-]+)-(\d+[^-]*)-.*\.whl$" match = re.match(wheel_pattern, filename) if match: name, version = match.groups() # Normalize package name (PyPI uses underscores internally) name = name.replace("_", "-").lower() return ParsedUrl( package_name=name, version=version, filename=filename, ) # Handle source distributions: package-version.tar.gz or package-version.zip sdist_pattern = r"^([a-zA-Z0-9_-]+)-(\d+(?:\.\d+)*(?:[a-zA-Z0-9_.+-]*)?)(?:\.tar\.gz|\.zip|\.tar\.bz2)$" match = re.match(sdist_pattern, filename) if match: name, version = match.groups() name = name.replace("_", "-").lower() return ParsedUrl( package_name=name, version=version, filename=filename, ) return None def parse_maven_url(url: str) -> Optional[ParsedUrl]: """ Parse Maven repository URL to extract artifact info. Format: - https://repo1.maven.org/maven2/{group}/{artifact}/{version}/{artifact}-{version}.jar Examples: - https://repo1.maven.org/maven2/org/apache/commons/commons-lang3/3.12.0/commons-lang3-3.12.0.jar - https://repo1.maven.org/maven2/com/google/guava/guava/31.1-jre/guava-31.1-jre.jar """ parsed = urlparse(url) path = unquote(parsed.path) # Find /maven2/ or similar repository path maven2_idx = path.find("/maven2/") if maven2_idx >= 0: path = path[maven2_idx + 8:] # Remove /maven2/ elif path.startswith("/"): path = path[1:] parts = path.split("/") if len(parts) < 4: return None # Last part is filename, before that is version, before that is artifact filename = parts[-1] version = parts[-2] artifact = parts[-3] group = ".".join(parts[:-3]) # Verify filename matches expected pattern if not filename.startswith(f"{artifact}-{version}"): return None return ParsedUrl( package_name=f"{group}:{artifact}", version=version, filename=filename, ) def parse_docker_url(url: str) -> Optional[ParsedUrl]: """ Parse Docker registry URL to extract image info. Note: Docker registries are more complex (manifests, blobs, etc.) This handles basic blob/manifest URLs. Examples: - https://registry-1.docker.io/v2/library/nginx/blobs/sha256:abc123 - https://registry-1.docker.io/v2/myuser/myimage/manifests/latest """ parsed = urlparse(url) path = unquote(parsed.path) # Pattern: /v2/{namespace}/{image}/blobs/{digest} or /manifests/{tag} pattern = r"^/v2/([^/]+(?:/[^/]+)?)/([^/]+)/(blobs|manifests)/(.+)$" match = re.match(pattern, path) if match: namespace, image, artifact_type, reference = match.groups() if namespace == "library": package_name = image else: package_name = f"{namespace}/{image}" # For manifests, the reference is the tag version = reference if artifact_type == "manifests" else None return ParsedUrl( package_name=package_name, version=version, filename=f"{image}-{reference}" if version else reference, ) return None def parse_generic_url(url: str) -> ParsedUrl: """ Parse a generic URL to extract filename. Attempts to extract meaningful package name and version from filename. Examples: - https://example.com/downloads/myapp-1.2.3.tar.gz - https://github.com/user/repo/releases/download/v1.0/release.zip """ parsed = urlparse(url) path = unquote(parsed.path) filename = path.split("/")[-1] or "artifact" # List of known compound and simple extensions known_extensions = [ ".tar.gz", ".tar.bz2", ".tar.xz", ".zip", ".tgz", ".gz", ".jar", ".war", ".deb", ".rpm" ] # Strip extension from filename first base_name = filename matched_ext = None for ext in known_extensions: if filename.endswith(ext): base_name = filename[:-len(ext)] matched_ext = ext break if matched_ext is None: # Unknown extension, return filename as package name return ParsedUrl( package_name=filename, version=None, filename=filename, ) # Try to extract version from base_name # Pattern: name-version or name_version # Version starts with digit(s) and can include dots, dashes, and alphanumeric suffixes version_pattern = r"^(.+?)[-_](v?\d+(?:\.\d+)*(?:[-_][a-zA-Z0-9]+)?)$" match = re.match(version_pattern, base_name) if match: name, version = match.groups() return ParsedUrl( package_name=name, version=version, filename=filename, ) # No version found, use base_name as package name return ParsedUrl( package_name=base_name, version=None, filename=filename, ) def parse_url(url: str, source_type: str) -> ParsedUrl: """ Parse URL to extract package name and version based on source type. Args: url: The URL to parse. source_type: The source type (npm, pypi, maven, docker, etc.) Returns: ParsedUrl with extracted information. """ parsed = None if source_type == "npm": parsed = parse_npm_url(url) elif source_type == "pypi": parsed = parse_pypi_url(url) elif source_type == "maven": parsed = parse_maven_url(url) elif source_type == "docker": parsed = parse_docker_url(url) # Fall back to generic parsing if type-specific parsing fails if parsed is None: parsed = parse_generic_url(url) return parsed def get_system_project_name(source_type: str) -> str: """Get the system project name for a source type.""" return SYSTEM_PROJECT_NAMES.get(source_type, "_generic") def get_system_project_description(source_type: str) -> str: """Get the system project description for a source type.""" return SYSTEM_PROJECT_DESCRIPTIONS.get( source_type, "System cache for artifacts" )