Files
orchard/backend/app/database.py
Mondo Diaz 45a48cc1ee Add inline migration for tag removal (024_remove_tags)
Adds the tag removal migration to the inline migrations in database.py:
- Drops tag-related triggers and functions
- Removes tag_constraint column from artifact_dependencies
- Makes version_constraint NOT NULL
- Drops tags and tag_history tables
- Renames uploads.tag_name to version
2026-02-03 17:22:40 -06:00

721 lines
28 KiB
Python

from sqlalchemy import create_engine, text, event
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from typing import Generator, NamedTuple
from contextlib import contextmanager
import logging
import time
import hashlib
from .config import get_settings
from .models import Base
from .purge_seed_data import should_purge_seed_data, purge_seed_data
settings = get_settings()
logger = logging.getLogger(__name__)
class Migration(NamedTuple):
"""A database migration with a unique name and SQL to execute."""
name: str
sql: str
# PostgreSQL error codes that indicate "already exists" - safe to skip
SAFE_PG_ERROR_CODES = {
"42P07", # duplicate_table
"42701", # duplicate_column
"42710", # duplicate_object (index, constraint, etc.)
"42P16", # invalid_table_definition (e.g., column already exists)
}
# Build connect_args with query timeout if configured
connect_args = {}
if settings.database_query_timeout > 0:
# PostgreSQL statement_timeout is in milliseconds
connect_args["options"] = f"-c statement_timeout={settings.database_query_timeout * 1000}"
# Create engine with connection pool configuration
engine = create_engine(
settings.database_url,
pool_pre_ping=True, # Check connection health before using
poolclass=QueuePool,
pool_size=settings.database_pool_size,
max_overflow=settings.database_max_overflow,
pool_timeout=settings.database_pool_timeout,
pool_recycle=settings.database_pool_recycle,
connect_args=connect_args,
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Connection pool monitoring
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
"""Log when a connection is checked out from the pool"""
logger.debug(f"Connection checked out from pool: {id(dbapi_connection)}")
@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_connection, connection_record):
"""Log when a connection is returned to the pool"""
logger.debug(f"Connection returned to pool: {id(dbapi_connection)}")
def get_pool_status() -> dict:
"""Get current connection pool status for monitoring"""
pool = engine.pool
return {
"pool_size": pool.size(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"checked_in": pool.checkedin(),
}
def init_db():
"""Create all tables and run migrations"""
Base.metadata.create_all(bind=engine)
# Run migrations for schema updates
_run_migrations()
# Purge seed data if requested (for transitioning to production-like environment)
if should_purge_seed_data():
db = SessionLocal()
try:
purge_seed_data(db)
finally:
db.close()
def _ensure_migrations_table(conn) -> None:
"""Create the migrations tracking table if it doesn't exist."""
conn.execute(text("""
CREATE TABLE IF NOT EXISTS _schema_migrations (
name VARCHAR(255) PRIMARY KEY,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""))
conn.commit()
def _get_applied_migrations(conn) -> dict[str, str]:
"""Get all applied migrations and their checksums."""
result = conn.execute(text(
"SELECT name, checksum FROM _schema_migrations"
))
return {row[0]: row[1] for row in result}
def _compute_checksum(sql: str) -> str:
"""Compute a checksum for migration SQL to detect changes."""
return hashlib.sha256(sql.strip().encode()).hexdigest()[:16]
def _is_safe_error(exception: Exception) -> bool:
"""Check if the error indicates the migration was already applied."""
# Check for psycopg2 errors with pgcode attribute
original = getattr(exception, "orig", None)
if original is not None:
pgcode = getattr(original, "pgcode", None)
if pgcode in SAFE_PG_ERROR_CODES:
return True
# Fallback: check error message for common "already exists" patterns
error_str = str(exception).lower()
safe_patterns = [
"already exists",
"duplicate key",
"relation .* already exists",
"column .* already exists",
]
return any(pattern in error_str for pattern in safe_patterns)
def _record_migration(conn, name: str, checksum: str) -> None:
"""Record a migration as applied."""
conn.execute(text(
"INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)"
), {"name": name, "checksum": checksum})
conn.commit()
def _run_migrations():
"""Run manual migrations for schema updates with tracking and error detection."""
migrations = [
Migration(
name="001_add_format_metadata",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'artifacts' AND column_name = 'format_metadata'
) THEN
ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}';
END IF;
END $$;
""",
),
Migration(
name="002_add_package_format",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'format'
) THEN
ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format);
END IF;
END $$;
""",
),
Migration(
name="003_add_package_platform",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'platform'
) THEN
ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform);
END IF;
END $$;
""",
),
Migration(
name="004_add_ref_count_index_constraint",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
) THEN
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
) THEN
ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0);
END IF;
END $$;
""",
),
Migration(
name="005_add_composite_indexes",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF;
-- Tag indexes removed: tags table no longer exists (removed in tag system removal)
END $$;
""",
),
Migration(
name="006_add_package_versions_indexes",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN
CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN
CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version);
END IF;
END IF;
END $$;
""",
),
Migration(
name="007_create_ref_count_trigger_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.artifact_id != NEW.artifact_id THEN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
),
Migration(
name="008_create_tags_ref_count_triggers",
sql="""
-- Tags table removed: triggers no longer needed (tag system removed)
DO $$ BEGIN NULL; END $$;
""",
),
Migration(
name="009_create_version_ref_count_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION decrement_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
""",
),
Migration(
name="010_create_package_versions_triggers",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TRIGGER package_versions_ref_count_insert
AFTER INSERT ON package_versions
FOR EACH ROW
EXECUTE FUNCTION increment_version_ref_count();
DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions;
CREATE TRIGGER package_versions_ref_count_delete
AFTER DELETE ON package_versions
FOR EACH ROW
EXECUTE FUNCTION decrement_version_ref_count();
END IF;
END $$;
""",
),
Migration(
name="011_migrate_semver_tags_to_versions",
sql=r"""
-- Migrate semver tags to versions (only if both tables exist - for existing databases)
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions')
AND EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'tags') THEN
INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at)
SELECT
gen_random_uuid(),
t.package_id,
t.artifact_id,
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
'migrated_from_tag',
t.created_by,
t.created_at
FROM tags t
WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$'
ON CONFLICT (package_id, version) DO NOTHING;
END IF;
END $$;
""",
),
Migration(
name="012_create_teams_table",
sql="""
CREATE TABLE IF NOT EXISTS teams (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(255) NOT NULL UNIQUE,
description TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by VARCHAR(255) NOT NULL,
settings JSONB DEFAULT '{}'
);
""",
),
Migration(
name="013_create_team_memberships_table",
sql="""
CREATE TABLE IF NOT EXISTS team_memberships (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL DEFAULT 'member',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
invited_by VARCHAR(255),
CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id),
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
);
""",
),
Migration(
name="014_add_team_id_to_projects",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'projects' AND column_name = 'team_id'
) THEN
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id);
END IF;
END $$;
""",
),
Migration(
name="015_add_teams_indexes",
sql="""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
CREATE INDEX idx_teams_slug ON teams(slug);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN
CREATE INDEX idx_teams_created_by ON teams(created_by);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id);
END IF;
END $$;
""",
),
Migration(
name="016_add_is_system_to_projects",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'projects' AND column_name = 'is_system'
) THEN
ALTER TABLE projects ADD COLUMN is_system BOOLEAN NOT NULL DEFAULT FALSE;
CREATE INDEX IF NOT EXISTS idx_projects_is_system ON projects(is_system);
END IF;
END $$;
""",
),
Migration(
name="017_create_upstream_sources",
sql="""
CREATE TABLE IF NOT EXISTS upstream_sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL UNIQUE,
source_type VARCHAR(50) NOT NULL DEFAULT 'generic',
url VARCHAR(2048) NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT FALSE,
auth_type VARCHAR(20) NOT NULL DEFAULT 'none',
username VARCHAR(255),
password_encrypted BYTEA,
headers_encrypted BYTEA,
priority INTEGER NOT NULL DEFAULT 100,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
CONSTRAINT check_source_type CHECK (
source_type IN ('npm', 'pypi', 'maven', 'docker', 'helm', 'nuget', 'deb', 'rpm', 'generic')
),
CONSTRAINT check_auth_type CHECK (
auth_type IN ('none', 'basic', 'bearer', 'api_key')
),
CONSTRAINT check_priority_positive CHECK (priority > 0)
);
CREATE INDEX IF NOT EXISTS idx_upstream_sources_enabled ON upstream_sources(enabled);
CREATE INDEX IF NOT EXISTS idx_upstream_sources_source_type ON upstream_sources(source_type);
CREATE INDEX IF NOT EXISTS idx_upstream_sources_priority ON upstream_sources(priority);
""",
),
Migration(
name="018_create_cache_settings",
sql="""
CREATE TABLE IF NOT EXISTS cache_settings (
id INTEGER PRIMARY KEY DEFAULT 1,
auto_create_system_projects BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
CONSTRAINT check_cache_settings_singleton CHECK (id = 1)
);
INSERT INTO cache_settings (id, auto_create_system_projects)
VALUES (1, TRUE)
ON CONFLICT (id) DO NOTHING;
""",
),
Migration(
name="019_create_cached_urls",
sql="""
CREATE TABLE IF NOT EXISTS cached_urls (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
url VARCHAR(4096) NOT NULL,
url_hash VARCHAR(64) NOT NULL UNIQUE,
artifact_id VARCHAR(64) NOT NULL REFERENCES artifacts(id),
source_id UUID REFERENCES upstream_sources(id) ON DELETE SET NULL,
fetched_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
response_headers JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_cached_urls_url_hash ON cached_urls(url_hash);
CREATE INDEX IF NOT EXISTS idx_cached_urls_artifact_id ON cached_urls(artifact_id);
CREATE INDEX IF NOT EXISTS idx_cached_urls_source_id ON cached_urls(source_id);
CREATE INDEX IF NOT EXISTS idx_cached_urls_fetched_at ON cached_urls(fetched_at);
""",
),
Migration(
name="020_seed_default_upstream_sources",
sql="""
-- Originally seeded public sources, but these are no longer used.
-- Migration 023 deletes any previously seeded sources.
-- This migration is now a no-op for fresh installs.
SELECT 1;
""",
),
Migration(
name="021_remove_is_public_from_upstream_sources",
sql="""
DO $$
BEGIN
-- Drop the index if it exists
DROP INDEX IF EXISTS idx_upstream_sources_is_public;
-- Drop the column if it exists
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'upstream_sources' AND column_name = 'is_public'
) THEN
ALTER TABLE upstream_sources DROP COLUMN is_public;
END IF;
END $$;
""",
),
Migration(
name="022_remove_allow_public_internet_from_cache_settings",
sql="""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'cache_settings' AND column_name = 'allow_public_internet'
) THEN
ALTER TABLE cache_settings DROP COLUMN allow_public_internet;
END IF;
END $$;
""",
),
Migration(
name="023_delete_seeded_public_sources",
sql="""
-- Delete the seeded public sources that were added by migration 020
DELETE FROM upstream_sources
WHERE name IN ('npm-public', 'pypi-public', 'maven-central', 'docker-hub');
""",
),
Migration(
name="024_remove_tags",
sql="""
-- Remove tag system, keeping only versions for artifact references
DO $$
BEGIN
-- Drop triggers on tags table (if they exist)
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags;
DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags;
DROP TRIGGER IF EXISTS tags_updated_at_trigger ON tags;
DROP TRIGGER IF EXISTS tag_changes_trigger ON tags;
-- Drop the tag change tracking function
DROP FUNCTION IF EXISTS track_tag_changes();
-- Remove tag_constraint from artifact_dependencies
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'check_constraint_type'
AND table_name = 'artifact_dependencies'
) THEN
ALTER TABLE artifact_dependencies DROP CONSTRAINT check_constraint_type;
END IF;
-- Remove the tag_constraint column if it exists
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'artifact_dependencies' AND column_name = 'tag_constraint'
) THEN
ALTER TABLE artifact_dependencies DROP COLUMN tag_constraint;
END IF;
-- Make version_constraint NOT NULL
UPDATE artifact_dependencies SET version_constraint = '*' WHERE version_constraint IS NULL;
ALTER TABLE artifact_dependencies ALTER COLUMN version_constraint SET NOT NULL;
-- Drop tag_history table first (depends on tags)
DROP TABLE IF EXISTS tag_history;
-- Drop tags table
DROP TABLE IF EXISTS tags;
-- Rename uploads.tag_name to version if it exists and version doesn't
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'uploads' AND column_name = 'tag_name'
) AND NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'uploads' AND column_name = 'version'
) THEN
ALTER TABLE uploads RENAME COLUMN tag_name TO version;
END IF;
END $$;
""",
),
]
with engine.connect() as conn:
# Ensure migrations tracking table exists
_ensure_migrations_table(conn)
# Get already-applied migrations
applied = _get_applied_migrations(conn)
for migration in migrations:
checksum = _compute_checksum(migration.sql)
# Check if migration was already applied
if migration.name in applied:
stored_checksum = applied[migration.name]
if stored_checksum != checksum:
logger.warning(
f"Migration '{migration.name}' has changed since it was applied! "
f"Stored checksum: {stored_checksum}, current: {checksum}"
)
continue
# Run the migration
try:
logger.info(f"Running migration: {migration.name}")
conn.execute(text(migration.sql))
conn.commit()
_record_migration(conn, migration.name, checksum)
logger.info(f"Migration '{migration.name}' applied successfully")
except Exception as e:
conn.rollback()
if _is_safe_error(e):
# Migration was already applied (schema already exists)
logger.info(
f"Migration '{migration.name}' already applied (schema exists), recording as complete"
)
_record_migration(conn, migration.name, checksum)
else:
# Real error - fail hard
logger.error(f"Migration '{migration.name}' failed: {e}")
raise RuntimeError(
f"Migration '{migration.name}' failed with error: {e}"
) from e
def get_db() -> Generator[Session, None, None]:
"""Dependency for getting database sessions"""
db = SessionLocal()
try:
yield db
finally:
db.close()
@contextmanager
def transaction(db: Session):
"""
Context manager for explicit transaction management with savepoint support.
Usage:
with transaction(db):
# operations here
# automatically commits on success, rolls back on exception
"""
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
@contextmanager
def savepoint(db: Session, name: str = None):
"""
Create a savepoint for partial rollback support.
Usage:
with savepoint(db, "my_savepoint"):
# operations here
# rolls back to savepoint on exception, but doesn't rollback whole transaction
"""
savepoint_obj = db.begin_nested()
try:
yield savepoint_obj
savepoint_obj.commit()
except Exception:
savepoint_obj.rollback()
raise
def retry_on_deadlock(func, max_retries: int = 3, delay: float = 0.1):
"""
Decorator/wrapper to retry operations on deadlock detection.
Usage:
@retry_on_deadlock
def my_operation(db):
...
Or:
retry_on_deadlock(lambda: my_operation(db))()
"""
import functools
from sqlalchemy.exc import OperationalError
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except OperationalError as e:
# Check for deadlock error codes (PostgreSQL: 40P01, MySQL: 1213)
error_str = str(e).lower()
if "deadlock" in error_str or "40p01" in error_str:
last_exception = e
logger.warning(f"Deadlock detected, retrying (attempt {attempt + 1}/{max_retries})")
time.sleep(delay * (attempt + 1)) # Exponential backoff
else:
raise
raise last_exception
return wrapper