Add robust PyPI dependency caching with task queue
Replace unbounded thread spawning with managed worker pool:
- New pypi_cache_tasks table tracks caching jobs
- Thread pool with 5 workers (configurable via ORCHARD_PYPI_CACHE_WORKERS)
- Automatic retries with exponential backoff (30s, 60s, then fail)
- Deduplication to prevent duplicate caching attempts
New API endpoints for visibility and control:
- GET /pypi/cache/status - queue health summary
- GET /pypi/cache/failed - list failed tasks with errors
- POST /pypi/cache/retry/{package} - retry single package
- POST /pypi/cache/retry-all - retry all failed packages
This fixes silent failures in background dependency caching where
packages would fail to cache without any tracking or retry mechanism.
This commit is contained in:
@@ -803,3 +803,70 @@ class CachedUrl(Base):
|
||||
return hashlib.sha256(url.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
class PyPICacheTask(Base):
|
||||
"""Task for caching a PyPI package and its dependencies.
|
||||
|
||||
Tracks the status of background caching operations with retry support.
|
||||
Used by the PyPI proxy to ensure reliable dependency caching.
|
||||
"""
|
||||
|
||||
__tablename__ = "pypi_cache_tasks"
|
||||
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
|
||||
# What to cache
|
||||
package_name = Column(String(255), nullable=False)
|
||||
version_constraint = Column(String(255))
|
||||
|
||||
# Origin tracking
|
||||
parent_task_id = Column(
|
||||
UUID(as_uuid=True),
|
||||
ForeignKey("pypi_cache_tasks.id", ondelete="SET NULL"),
|
||||
)
|
||||
depth = Column(Integer, nullable=False, default=0)
|
||||
triggered_by_artifact = Column(
|
||||
String(64),
|
||||
ForeignKey("artifacts.id", ondelete="SET NULL"),
|
||||
)
|
||||
|
||||
# Status
|
||||
status = Column(String(20), nullable=False, default="pending")
|
||||
attempts = Column(Integer, nullable=False, default=0)
|
||||
max_attempts = Column(Integer, nullable=False, default=3)
|
||||
|
||||
# Results
|
||||
cached_artifact_id = Column(
|
||||
String(64),
|
||||
ForeignKey("artifacts.id", ondelete="SET NULL"),
|
||||
)
|
||||
error_message = Column(Text)
|
||||
|
||||
# Timing
|
||||
created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow)
|
||||
started_at = Column(DateTime(timezone=True))
|
||||
completed_at = Column(DateTime(timezone=True))
|
||||
next_retry_at = Column(DateTime(timezone=True))
|
||||
|
||||
# Relationships
|
||||
parent_task = relationship(
|
||||
"PyPICacheTask",
|
||||
remote_side=[id],
|
||||
backref="child_tasks",
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index("idx_pypi_cache_tasks_status_retry", "status", "next_retry_at"),
|
||||
Index("idx_pypi_cache_tasks_package_status", "package_name", "status"),
|
||||
Index("idx_pypi_cache_tasks_parent", "parent_task_id"),
|
||||
Index("idx_pypi_cache_tasks_triggered_by", "triggered_by_artifact"),
|
||||
Index("idx_pypi_cache_tasks_cached_artifact", "cached_artifact_id"),
|
||||
Index("idx_pypi_cache_tasks_depth_created", "depth", "created_at"),
|
||||
CheckConstraint(
|
||||
"status IN ('pending', 'in_progress', 'completed', 'failed')",
|
||||
name="check_task_status",
|
||||
),
|
||||
CheckConstraint("depth >= 0", name="check_depth_non_negative"),
|
||||
CheckConstraint("attempts >= 0", name="check_attempts_non_negative"),
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user