""" Dependency management module for artifact dependencies. Handles: - Parsing orchard.ensure files - Storing dependencies in the database - Querying dependencies and reverse dependencies - Dependency resolution with topological sorting - Circular dependency detection - Conflict detection """ import re import yaml from typing import List, Dict, Any, Optional, Set, Tuple from sqlalchemy.orm import Session from sqlalchemy import and_ # Import packaging for PEP 440 version matching try: from packaging.specifiers import SpecifierSet, InvalidSpecifier from packaging.version import Version, InvalidVersion HAS_PACKAGING = True except ImportError: HAS_PACKAGING = False from .models import ( Project, Package, Artifact, Tag, ArtifactDependency, PackageVersion, ) from .schemas import ( EnsureFileContent, EnsureFileDependency, DependencyResponse, ArtifactDependenciesResponse, DependentInfo, ReverseDependenciesResponse, ResolvedArtifact, DependencyResolutionResponse, DependencyConflict, PaginationMeta, ) class DependencyError(Exception): """Base exception for dependency errors.""" pass class CircularDependencyError(DependencyError): """Raised when a circular dependency is detected.""" def __init__(self, cycle: List[str]): self.cycle = cycle super().__init__(f"Circular dependency detected: {' -> '.join(cycle)}") class DependencyConflictError(DependencyError): """Raised when conflicting dependency versions are detected.""" def __init__(self, conflicts: List[DependencyConflict]): self.conflicts = conflicts super().__init__(f"Dependency conflicts detected: {len(conflicts)} conflict(s)") class DependencyNotFoundError(DependencyError): """Raised when a dependency cannot be resolved.""" def __init__(self, project: str, package: str, constraint: str): self.project = project self.package = package self.constraint = constraint super().__init__(f"Dependency not found: {project}/{package}@{constraint}") class InvalidEnsureFileError(DependencyError): """Raised when the ensure file is invalid.""" pass class DependencyDepthExceededError(DependencyError): """Raised when dependency resolution exceeds max depth.""" def __init__(self, max_depth: int): self.max_depth = max_depth super().__init__(f"Dependency resolution exceeded maximum depth of {max_depth}") # Safety limits to prevent DoS attacks MAX_DEPENDENCY_DEPTH = 50 # Maximum levels of nested dependencies MAX_DEPENDENCIES_PER_ARTIFACT = 200 # Maximum direct dependencies per artifact def parse_ensure_file(content: bytes) -> EnsureFileContent: """ Parse an orchard.ensure file. Args: content: Raw bytes of the ensure file Returns: Parsed EnsureFileContent Raises: InvalidEnsureFileError: If the file is invalid YAML or has wrong structure """ try: data = yaml.safe_load(content.decode('utf-8')) except yaml.YAMLError as e: raise InvalidEnsureFileError(f"Invalid YAML: {e}") except UnicodeDecodeError as e: raise InvalidEnsureFileError(f"Invalid encoding: {e}") if data is None: return EnsureFileContent(dependencies=[]) if not isinstance(data, dict): raise InvalidEnsureFileError("Ensure file must be a YAML dictionary") dependencies = [] deps_data = data.get('dependencies', []) if not isinstance(deps_data, list): raise InvalidEnsureFileError("'dependencies' must be a list") # Safety limit: prevent DoS through excessive dependencies if len(deps_data) > MAX_DEPENDENCIES_PER_ARTIFACT: raise InvalidEnsureFileError( f"Too many dependencies: {len(deps_data)} exceeds maximum of {MAX_DEPENDENCIES_PER_ARTIFACT}" ) for i, dep in enumerate(deps_data): if not isinstance(dep, dict): raise InvalidEnsureFileError(f"Dependency {i} must be a dictionary") project = dep.get('project') package = dep.get('package') version = dep.get('version') tag = dep.get('tag') if not project: raise InvalidEnsureFileError(f"Dependency {i} missing 'project'") if not package: raise InvalidEnsureFileError(f"Dependency {i} missing 'package'") if not version and not tag: raise InvalidEnsureFileError( f"Dependency {i} must have either 'version' or 'tag'" ) if version and tag: raise InvalidEnsureFileError( f"Dependency {i} cannot have both 'version' and 'tag'" ) dependencies.append(EnsureFileDependency( project=project, package=package, version=version, tag=tag, )) return EnsureFileContent(dependencies=dependencies) def validate_dependencies( db: Session, dependencies: List[EnsureFileDependency], ) -> List[str]: """ Validate that all dependency projects exist. Args: db: Database session dependencies: List of dependencies to validate Returns: List of error messages (empty if all valid) """ errors = [] for dep in dependencies: project = db.query(Project).filter(Project.name == dep.project).first() if not project: errors.append(f"Project '{dep.project}' not found") return errors def store_dependencies( db: Session, artifact_id: str, dependencies: List[EnsureFileDependency], ) -> List[ArtifactDependency]: """ Store dependencies for an artifact. Args: db: Database session artifact_id: The artifact ID that has these dependencies dependencies: List of dependencies to store Returns: List of created ArtifactDependency objects """ created = [] for dep in dependencies: artifact_dep = ArtifactDependency( artifact_id=artifact_id, dependency_project=dep.project, dependency_package=dep.package, version_constraint=dep.version, tag_constraint=dep.tag, ) db.add(artifact_dep) created.append(artifact_dep) return created def get_artifact_dependencies( db: Session, artifact_id: str, ) -> List[DependencyResponse]: """ Get all dependencies for an artifact. Args: db: Database session artifact_id: The artifact ID Returns: List of DependencyResponse objects """ deps = db.query(ArtifactDependency).filter( ArtifactDependency.artifact_id == artifact_id ).all() return [DependencyResponse.from_orm_model(dep) for dep in deps] def get_reverse_dependencies( db: Session, project_name: str, package_name: str, page: int = 1, limit: int = 50, ) -> ReverseDependenciesResponse: """ Get all artifacts that depend on a given package. Args: db: Database session project_name: Target project name package_name: Target package name page: Page number (1-indexed) limit: Results per page Returns: ReverseDependenciesResponse with dependents and pagination """ # Query dependencies that point to this project/package query = db.query(ArtifactDependency).filter( ArtifactDependency.dependency_project == project_name, ArtifactDependency.dependency_package == package_name, ) total = query.count() offset = (page - 1) * limit deps = query.offset(offset).limit(limit).all() dependents = [] for dep in deps: # Get artifact info to find the project/package/version artifact = db.query(Artifact).filter(Artifact.id == dep.artifact_id).first() if not artifact: continue # Find which package this artifact belongs to via tags or versions tag = db.query(Tag).filter(Tag.artifact_id == dep.artifact_id).first() if tag: pkg = db.query(Package).filter(Package.id == tag.package_id).first() if pkg: proj = db.query(Project).filter(Project.id == pkg.project_id).first() if proj: # Get version if available version_record = db.query(PackageVersion).filter( PackageVersion.artifact_id == dep.artifact_id, PackageVersion.package_id == pkg.id, ).first() dependents.append(DependentInfo( artifact_id=dep.artifact_id, project=proj.name, package=pkg.name, version=version_record.version if version_record else None, constraint_type="version" if dep.version_constraint else "tag", constraint_value=dep.version_constraint or dep.tag_constraint, )) total_pages = (total + limit - 1) // limit return ReverseDependenciesResponse( project=project_name, package=package_name, dependents=dependents, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) def _is_version_constraint(version_str: str) -> bool: """Check if a version string contains constraint operators.""" if not version_str: return False # Check for common constraint operators return any(op in version_str for op in ['>=', '<=', '!=', '~=', '>', '<', '==', '*']) def _resolve_version_constraint( db: Session, package: Package, constraint: str, ) -> Optional[Tuple[str, str, int]]: """ Resolve a version constraint (e.g., '>=1.9') to a specific version. Uses PEP 440 version matching to find the best matching version. Args: db: Database session package: Package to search versions in constraint: Version constraint string (e.g., '>=1.9', '<2.0,>=1.5') Returns: Tuple of (artifact_id, resolved_version, size) or None if not found """ if not HAS_PACKAGING: # Fallback: if packaging not available, can't do constraint matching return None # Handle wildcard - return latest version if constraint == '*': # Get the latest version by created_at latest = db.query(PackageVersion).filter( PackageVersion.package_id == package.id, ).order_by(PackageVersion.created_at.desc()).first() if latest: artifact = db.query(Artifact).filter(Artifact.id == latest.artifact_id).first() if artifact: return (artifact.id, latest.version, artifact.size) return None try: specifier = SpecifierSet(constraint) except InvalidSpecifier: # Invalid constraint (e.g., ">=" without version) - treat as wildcard # This can happen with malformed metadata from PyPI packages latest = db.query(PackageVersion).filter( PackageVersion.package_id == package.id, ).order_by(PackageVersion.created_at.desc()).first() if latest: artifact = db.query(Artifact).filter(Artifact.id == latest.artifact_id).first() if artifact: return (artifact.id, latest.version, artifact.size) return None # Get all versions for this package all_versions = db.query(PackageVersion).filter( PackageVersion.package_id == package.id, ).all() if not all_versions: return None # Find matching versions matching = [] for pv in all_versions: try: v = Version(pv.version) if v in specifier: matching.append((pv, v)) except InvalidVersion: # Skip invalid versions continue if not matching: return None # Sort by version (descending) and return the latest matching matching.sort(key=lambda x: x[1], reverse=True) best_match = matching[0][0] artifact = db.query(Artifact).filter(Artifact.id == best_match.artifact_id).first() if artifact: return (artifact.id, best_match.version, artifact.size) return None def _resolve_dependency_to_artifact( db: Session, project_name: str, package_name: str, version: Optional[str], tag: Optional[str], ) -> Optional[Tuple[str, str, int]]: """ Resolve a dependency constraint to an artifact ID. Supports: - Exact version matching (e.g., '1.2.3') - Version constraints (e.g., '>=1.9', '<2.0,>=1.5') - Tag matching - Wildcard ('*' for any version) Args: db: Database session project_name: Project name package_name: Package name version: Version or version constraint tag: Tag constraint Returns: Tuple of (artifact_id, resolved_version_or_tag, size) or None if not found """ # Get project and package project = db.query(Project).filter(Project.name == project_name).first() if not project: return None package = db.query(Package).filter( Package.project_id == project.id, Package.name == package_name, ).first() if not package: return None if version: # Check if this is a version constraint (>=, <, etc.) or exact version if _is_version_constraint(version): result = _resolve_version_constraint(db, package, version) if result: return result else: # Look up by exact version pkg_version = db.query(PackageVersion).filter( PackageVersion.package_id == package.id, PackageVersion.version == version, ).first() if pkg_version: artifact = db.query(Artifact).filter( Artifact.id == pkg_version.artifact_id ).first() if artifact: return (artifact.id, version, artifact.size) # Also check if there's a tag with this exact name tag_record = db.query(Tag).filter( Tag.package_id == package.id, Tag.name == version, ).first() if tag_record: artifact = db.query(Artifact).filter( Artifact.id == tag_record.artifact_id ).first() if artifact: return (artifact.id, version, artifact.size) if tag: # Look up by tag tag_record = db.query(Tag).filter( Tag.package_id == package.id, Tag.name == tag, ).first() if tag_record: artifact = db.query(Artifact).filter( Artifact.id == tag_record.artifact_id ).first() if artifact: return (artifact.id, tag, artifact.size) return None def _detect_package_cycle( db: Session, project_name: str, package_name: str, target_project: str, target_package: str, visiting: Set[str], visited: Set[str], path: List[str], ) -> Optional[List[str]]: """ Detect cycles at the package level using DFS. Args: db: Database session project_name: Current project being visited package_name: Current package being visited target_project: The project we're checking for cycles back to target_package: The package we're checking for cycles back to visiting: Set of package keys currently in the recursion stack visited: Set of fully processed package keys path: Current path for cycle reporting Returns: Cycle path if detected, None otherwise """ pkg_key = f"{project_name}/{package_name}" # Check if we've reached the target package (cycle detected) if project_name == target_project and package_name == target_package: return path + [pkg_key] if pkg_key in visiting: # Unexpected internal cycle return None if pkg_key in visited: return None visiting.add(pkg_key) path.append(pkg_key) # Get the package and find any artifacts with dependencies project = db.query(Project).filter(Project.name == project_name).first() if project: package = db.query(Package).filter( Package.project_id == project.id, Package.name == package_name, ).first() if package: # Find all artifacts in this package via tags tags = db.query(Tag).filter(Tag.package_id == package.id).all() artifact_ids = {t.artifact_id for t in tags} # Get dependencies from all artifacts in this package for artifact_id in artifact_ids: deps = db.query(ArtifactDependency).filter( ArtifactDependency.artifact_id == artifact_id ).all() for dep in deps: cycle = _detect_package_cycle( db, dep.dependency_project, dep.dependency_package, target_project, target_package, visiting, visited, path, ) if cycle: return cycle path.pop() visiting.remove(pkg_key) visited.add(pkg_key) return None def check_circular_dependencies( db: Session, artifact_id: str, new_dependencies: List[EnsureFileDependency], project_name: Optional[str] = None, package_name: Optional[str] = None, ) -> Optional[List[str]]: """ Check if adding the new dependencies would create a circular dependency. Args: db: Database session artifact_id: The artifact that will have these dependencies new_dependencies: Dependencies to be added project_name: Project name (optional, will try to look up from tag if not provided) package_name: Package name (optional, will try to look up from tag if not provided) Returns: Cycle path if detected, None otherwise """ # First, get the package info for this artifact to build path labels if project_name and package_name: current_path = f"{project_name}/{package_name}" else: # Try to look up from tag artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() if not artifact: return None # Find package for this artifact tag = db.query(Tag).filter(Tag.artifact_id == artifact_id).first() if not tag: return None package = db.query(Package).filter(Package.id == tag.package_id).first() if not package: return None project = db.query(Project).filter(Project.id == package.project_id).first() if not project: return None current_path = f"{project.name}/{package.name}" # Extract target project and package from current_path if "/" in current_path: target_project, target_package = current_path.split("/", 1) else: return None # For each new dependency, check if it would create a cycle back to our package for dep in new_dependencies: # Check if this dependency (transitively) depends on us at the package level visiting: Set[str] = set() visited: Set[str] = set() path: List[str] = [current_path] # Check from the dependency's package cycle = _detect_package_cycle( db, dep.project, dep.package, target_project, target_package, visiting, visited, path, ) if cycle: return cycle return None def resolve_dependencies( db: Session, project_name: str, package_name: str, ref: str, base_url: str, ) -> DependencyResolutionResponse: """ Resolve all dependencies for an artifact recursively. Args: db: Database session project_name: Project name package_name: Package name ref: Tag or version reference base_url: Base URL for download URLs Returns: DependencyResolutionResponse with all resolved artifacts Raises: DependencyNotFoundError: If a dependency cannot be resolved CircularDependencyError: If circular dependencies are detected DependencyConflictError: If conflicting versions are required """ # Resolve the initial artifact project = db.query(Project).filter(Project.name == project_name).first() if not project: raise DependencyNotFoundError(project_name, package_name, ref) package = db.query(Package).filter( Package.project_id == project.id, Package.name == package_name, ).first() if not package: raise DependencyNotFoundError(project_name, package_name, ref) # Try to find artifact by tag or version resolved = _resolve_dependency_to_artifact( db, project_name, package_name, ref, ref ) if not resolved: raise DependencyNotFoundError(project_name, package_name, ref) root_artifact_id, root_version, root_size = resolved # Track resolved artifacts and their versions resolved_artifacts: Dict[str, ResolvedArtifact] = {} # Track version requirements for conflict detection version_requirements: Dict[str, List[Dict[str, Any]]] = {} # pkg_key -> [(version, required_by)] # Track visiting/visited for cycle detection visiting: Set[str] = set() visited: Set[str] = set() # Resolution order (topological) resolution_order: List[str] = [] def _resolve_recursive( artifact_id: str, proj_name: str, pkg_name: str, version_or_tag: str, size: int, required_by: Optional[str], depth: int = 0, ): """Recursively resolve dependencies with cycle/conflict detection.""" # Safety limit: prevent DoS through deeply nested dependencies if depth > MAX_DEPENDENCY_DEPTH: raise DependencyDepthExceededError(MAX_DEPENDENCY_DEPTH) pkg_key = f"{proj_name}/{pkg_name}" # Cycle detection (at artifact level) if artifact_id in visiting: # Build cycle path raise CircularDependencyError([pkg_key, pkg_key]) # Conflict detection - check if we've seen this package before with a different version if pkg_key in version_requirements: existing_versions = {r["version"] for r in version_requirements[pkg_key]} if version_or_tag not in existing_versions: # Conflict detected - same package, different version requirements = version_requirements[pkg_key] + [ {"version": version_or_tag, "required_by": required_by} ] raise DependencyConflictError([ DependencyConflict( project=proj_name, package=pkg_name, requirements=[ { "version": r["version"], "required_by": [{"path": r["required_by"]}] if r["required_by"] else [] } for r in requirements ], ) ]) # Same version already resolved - skip if artifact_id in visited: return if artifact_id in visited: return visiting.add(artifact_id) # Track version requirement if pkg_key not in version_requirements: version_requirements[pkg_key] = [] version_requirements[pkg_key].append({ "version": version_or_tag, "required_by": required_by, }) # Get dependencies deps = db.query(ArtifactDependency).filter( ArtifactDependency.artifact_id == artifact_id ).all() # Resolve each dependency first (depth-first) for dep in deps: resolved_dep = _resolve_dependency_to_artifact( db, dep.dependency_project, dep.dependency_package, dep.version_constraint, dep.tag_constraint, ) if not resolved_dep: constraint = dep.version_constraint or dep.tag_constraint raise DependencyNotFoundError( dep.dependency_project, dep.dependency_package, constraint, ) dep_artifact_id, dep_version, dep_size = resolved_dep _resolve_recursive( dep_artifact_id, dep.dependency_project, dep.dependency_package, dep_version, dep_size, pkg_key, depth + 1, ) visiting.remove(artifact_id) visited.add(artifact_id) # Add to resolution order (dependencies before dependents) resolution_order.append(artifact_id) # Store resolved artifact info resolved_artifacts[artifact_id] = ResolvedArtifact( artifact_id=artifact_id, project=proj_name, package=pkg_name, version=version_or_tag, size=size, download_url=f"{base_url}/api/v1/project/{proj_name}/{pkg_name}/+/{version_or_tag}", ) # Start resolution from root _resolve_recursive( root_artifact_id, project_name, package_name, root_version, root_size, None, ) # Build response in topological order resolved_list = [resolved_artifacts[aid] for aid in resolution_order] total_size = sum(r.size for r in resolved_list) return DependencyResolutionResponse( requested={ "project": project_name, "package": package_name, "ref": ref, }, resolved=resolved_list, total_size=total_size, artifact_count=len(resolved_list), )