Add migration tracking with smart error detection

- Add _schema_migrations table to track applied migrations
- Each migration has a unique name and checksum
- Migrations are only run once (tracked by name)
- Checksum changes are detected and logged as warnings
- Smart error detection distinguishes "already applied" errors from real failures
- Real errors now fail hard with clear error messages
- Safe PostgreSQL error codes (42P07, 42701, 42710, 42P16) are recognized
- Fix semver migration to generate UUID for id column
This commit is contained in:
Mondo Diaz
2026-01-28 21:05:45 +00:00
parent c60a7ba1ab
commit 9df50d0963

View File

@@ -1,10 +1,11 @@
from sqlalchemy import create_engine, text, event
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from typing import Generator
from typing import Generator, NamedTuple
from contextlib import contextmanager
import logging
import time
import hashlib
from .config import get_settings
from .models import Base
@@ -12,6 +13,21 @@ from .models import Base
settings = get_settings()
logger = logging.getLogger(__name__)
class Migration(NamedTuple):
"""A database migration with a unique name and SQL to execute."""
name: str
sql: str
# PostgreSQL error codes that indicate "already exists" - safe to skip
SAFE_PG_ERROR_CODES = {
"42P07", # duplicate_table
"42701", # duplicate_column
"42710", # duplicate_object (index, constraint, etc.)
"42P16", # invalid_table_definition (e.g., column already exists)
}
# Build connect_args with query timeout if configured
connect_args = {}
if settings.database_query_timeout > 0:
@@ -65,11 +81,65 @@ def init_db():
_run_migrations()
def _ensure_migrations_table(conn) -> None:
"""Create the migrations tracking table if it doesn't exist."""
conn.execute(text("""
CREATE TABLE IF NOT EXISTS _schema_migrations (
name VARCHAR(255) PRIMARY KEY,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""))
conn.commit()
def _get_applied_migrations(conn) -> dict[str, str]:
"""Get all applied migrations and their checksums."""
result = conn.execute(text(
"SELECT name, checksum FROM _schema_migrations"
))
return {row[0]: row[1] for row in result}
def _compute_checksum(sql: str) -> str:
"""Compute a checksum for migration SQL to detect changes."""
return hashlib.sha256(sql.strip().encode()).hexdigest()[:16]
def _is_safe_error(exception: Exception) -> bool:
"""Check if the error indicates the migration was already applied."""
# Check for psycopg2 errors with pgcode attribute
original = getattr(exception, "orig", None)
if original is not None:
pgcode = getattr(original, "pgcode", None)
if pgcode in SAFE_PG_ERROR_CODES:
return True
# Fallback: check error message for common "already exists" patterns
error_str = str(exception).lower()
safe_patterns = [
"already exists",
"duplicate key",
"relation .* already exists",
"column .* already exists",
]
return any(pattern in error_str for pattern in safe_patterns)
def _record_migration(conn, name: str, checksum: str) -> None:
"""Record a migration as applied."""
conn.execute(text(
"INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)"
), {"name": name, "checksum": checksum})
conn.commit()
def _run_migrations():
"""Run manual migrations for schema updates"""
"""Run manual migrations for schema updates with tracking and error detection."""
migrations = [
# Add format_metadata column to artifacts table
"""
Migration(
name="001_add_format_metadata",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
@@ -80,8 +150,10 @@ def _run_migrations():
END IF;
END $$;
""",
# Add format column to packages table
"""
),
Migration(
name="002_add_package_format",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
@@ -93,8 +165,10 @@ def _run_migrations():
END IF;
END $$;
""",
# Add platform column to packages table
"""
),
Migration(
name="003_add_package_platform",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
@@ -106,18 +180,18 @@ def _run_migrations():
END IF;
END $$;
""",
# Add ref_count index and constraints for artifacts
"""
),
Migration(
name="004_add_ref_count_index_constraint",
sql="""
DO $$
BEGIN
-- Add ref_count index
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
) THEN
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
END IF;
-- Add ref_count >= 0 constraint
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
) THEN
@@ -125,25 +199,24 @@ def _run_migrations():
END IF;
END $$;
""",
# Add composite indexes for packages and tags
"""
),
Migration(
name="005_add_composite_indexes",
sql="""
DO $$
BEGIN
-- Composite index for package lookup by project and name
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF;
-- Composite index for tag lookup by package and name
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
) THEN
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
END IF;
-- Composite index for recent tags queries
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
) THEN
@@ -151,13 +224,13 @@ def _run_migrations():
END IF;
END $$;
""",
# Add package_versions indexes and triggers (007_package_versions.sql)
"""
),
Migration(
name="006_add_package_versions_indexes",
sql="""
DO $$
BEGIN
-- Create indexes for package_versions if table exists
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Indexes for common queries
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF;
@@ -170,8 +243,10 @@ def _run_migrations():
END IF;
END $$;
""",
# Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run)
"""
),
Migration(
name="007_create_ref_count_trigger_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
@@ -179,8 +254,7 @@ def _run_migrations():
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
@@ -188,8 +262,7 @@ def _run_migrations():
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
@@ -201,11 +274,12 @@ def _run_migrations():
END;
$$ LANGUAGE plpgsql;
""",
# Create triggers for tags ref_count management
"""
),
Migration(
name="008_create_tags_ref_count_triggers",
sql="""
DO $$
BEGIN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
CREATE TRIGGER tags_ref_count_insert_trigger
AFTER INSERT ON tags
@@ -226,8 +300,10 @@ def _run_migrations():
EXECUTE FUNCTION update_artifact_ref_count();
END $$;
""",
# Create ref_count trigger functions for package_versions
"""
),
Migration(
name="009_create_version_ref_count_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
@@ -235,8 +311,7 @@ def _run_migrations():
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION decrement_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
@@ -245,12 +320,13 @@ def _run_migrations():
END;
$$ LANGUAGE plpgsql;
""",
# Create triggers for package_versions ref_count
"""
),
Migration(
name="010_create_package_versions_triggers",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TRIGGER package_versions_ref_count_insert
AFTER INSERT ON package_versions
@@ -265,14 +341,16 @@ def _run_migrations():
END IF;
END $$;
""",
# Migrate existing semver tags to package_versions
r"""
),
Migration(
name="011_migrate_semver_tags_to_versions",
sql=r"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.)
INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at)
INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at)
SELECT
gen_random_uuid(),
t.package_id,
t.artifact_id,
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
@@ -285,8 +363,10 @@ def _run_migrations():
END IF;
END $$;
""",
# Teams and multi-tenancy migration (009_teams.sql)
"""
),
Migration(
name="012_create_teams_table",
sql="""
CREATE TABLE IF NOT EXISTS teams (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
@@ -298,7 +378,10 @@ def _run_migrations():
settings JSONB DEFAULT '{}'
);
""",
"""
),
Migration(
name="013_create_team_memberships_table",
sql="""
CREATE TABLE IF NOT EXISTS team_memberships (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
@@ -310,7 +393,10 @@ def _run_migrations():
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
);
""",
"""
),
Migration(
name="014_add_team_id_to_projects",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
@@ -322,7 +408,10 @@ def _run_migrations():
END IF;
END $$;
""",
"""
),
Migration(
name="015_add_teams_indexes",
sql="""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
@@ -339,16 +428,50 @@ def _run_migrations():
END IF;
END $$;
""",
),
]
with engine.connect() as conn:
# Ensure migrations tracking table exists
_ensure_migrations_table(conn)
# Get already-applied migrations
applied = _get_applied_migrations(conn)
for migration in migrations:
checksum = _compute_checksum(migration.sql)
# Check if migration was already applied
if migration.name in applied:
stored_checksum = applied[migration.name]
if stored_checksum != checksum:
logger.warning(
f"Migration '{migration.name}' has changed since it was applied! "
f"Stored checksum: {stored_checksum}, current: {checksum}"
)
continue
# Run the migration
try:
conn.execute(text(migration))
logger.info(f"Running migration: {migration.name}")
conn.execute(text(migration.sql))
conn.commit()
_record_migration(conn, migration.name, checksum)
logger.info(f"Migration '{migration.name}' applied successfully")
except Exception as e:
conn.rollback()
logger.warning(f"Migration failed (may already be applied): {e}")
if _is_safe_error(e):
# Migration was already applied (schema already exists)
logger.info(
f"Migration '{migration.name}' already applied (schema exists), recording as complete"
)
_record_migration(conn, migration.name, checksum)
else:
# Real error - fail hard
logger.error(f"Migration '{migration.name}' failed: {e}")
raise RuntimeError(
f"Migration '{migration.name}' failed with error: {e}"
) from e
def get_db() -> Generator[Session, None, None]: