From 9df50d09634359eb4021e5f338735af34d88f757 Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Wed, 28 Jan 2026 21:05:45 +0000 Subject: [PATCH] Add migration tracking with smart error detection - Add _schema_migrations table to track applied migrations - Each migration has a unique name and checksum - Migrations are only run once (tracked by name) - Checksum changes are detected and logged as warnings - Smart error detection distinguishes "already applied" errors from real failures - Real errors now fail hard with clear error messages - Safe PostgreSQL error codes (42P07, 42701, 42710, 42P16) are recognized - Fix semver migration to generate UUID for id column --- backend/app/database.py | 655 ++++++++++++++++++++++++---------------- 1 file changed, 389 insertions(+), 266 deletions(-) diff --git a/backend/app/database.py b/backend/app/database.py index 1e6a3c7..ef82a90 100644 --- a/backend/app/database.py +++ b/backend/app/database.py @@ -1,10 +1,11 @@ from sqlalchemy import create_engine, text, event from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.pool import QueuePool -from typing import Generator +from typing import Generator, NamedTuple from contextlib import contextmanager import logging import time +import hashlib from .config import get_settings from .models import Base @@ -12,6 +13,21 @@ from .models import Base settings = get_settings() logger = logging.getLogger(__name__) + +class Migration(NamedTuple): + """A database migration with a unique name and SQL to execute.""" + name: str + sql: str + + +# PostgreSQL error codes that indicate "already exists" - safe to skip +SAFE_PG_ERROR_CODES = { + "42P07", # duplicate_table + "42701", # duplicate_column + "42710", # duplicate_object (index, constraint, etc.) + "42P16", # invalid_table_definition (e.g., column already exists) +} + # Build connect_args with query timeout if configured connect_args = {} if settings.database_query_timeout > 0: @@ -65,290 +81,397 @@ def init_db(): _run_migrations() +def _ensure_migrations_table(conn) -> None: + """Create the migrations tracking table if it doesn't exist.""" + conn.execute(text(""" + CREATE TABLE IF NOT EXISTS _schema_migrations ( + name VARCHAR(255) PRIMARY KEY, + checksum VARCHAR(64) NOT NULL, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() + ); + """)) + conn.commit() + + +def _get_applied_migrations(conn) -> dict[str, str]: + """Get all applied migrations and their checksums.""" + result = conn.execute(text( + "SELECT name, checksum FROM _schema_migrations" + )) + return {row[0]: row[1] for row in result} + + +def _compute_checksum(sql: str) -> str: + """Compute a checksum for migration SQL to detect changes.""" + return hashlib.sha256(sql.strip().encode()).hexdigest()[:16] + + +def _is_safe_error(exception: Exception) -> bool: + """Check if the error indicates the migration was already applied.""" + # Check for psycopg2 errors with pgcode attribute + original = getattr(exception, "orig", None) + if original is not None: + pgcode = getattr(original, "pgcode", None) + if pgcode in SAFE_PG_ERROR_CODES: + return True + + # Fallback: check error message for common "already exists" patterns + error_str = str(exception).lower() + safe_patterns = [ + "already exists", + "duplicate key", + "relation .* already exists", + "column .* already exists", + ] + return any(pattern in error_str for pattern in safe_patterns) + + +def _record_migration(conn, name: str, checksum: str) -> None: + """Record a migration as applied.""" + conn.execute(text( + "INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)" + ), {"name": name, "checksum": checksum}) + conn.commit() + + def _run_migrations(): - """Run manual migrations for schema updates""" + """Run manual migrations for schema updates with tracking and error detection.""" migrations = [ - # Add format_metadata column to artifacts table - """ - DO $$ - BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'artifacts' AND column_name = 'format_metadata' - ) THEN - ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}'; - END IF; - END $$; - """, - # Add format column to packages table - """ - DO $$ - BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'packages' AND column_name = 'format' - ) THEN - ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL; - CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format); - END IF; - END $$; - """, - # Add platform column to packages table - """ - DO $$ - BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'packages' AND column_name = 'platform' - ) THEN - ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL; - CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform); - END IF; - END $$; - """, - # Add ref_count index and constraints for artifacts - """ - DO $$ - BEGIN - -- Add ref_count index - IF NOT EXISTS ( - SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count' - ) THEN - CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count); - END IF; - - -- Add ref_count >= 0 constraint - IF NOT EXISTS ( - SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative' - ) THEN - ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0); - END IF; - END $$; - """, - # Add composite indexes for packages and tags - """ - DO $$ - BEGIN - -- Composite index for package lookup by project and name - IF NOT EXISTS ( - SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name' - ) THEN - CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name); - END IF; - - -- Composite index for tag lookup by package and name - IF NOT EXISTS ( - SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name' - ) THEN - CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name); - END IF; - - -- Composite index for recent tags queries - IF NOT EXISTS ( - SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at' - ) THEN - CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at); - END IF; - END $$; - """, - # Add package_versions indexes and triggers (007_package_versions.sql) - """ - DO $$ - BEGIN - -- Create indexes for package_versions if table exists - IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN - -- Indexes for common queries - IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN - CREATE INDEX idx_package_versions_package_id ON package_versions(package_id); + Migration( + name="001_add_format_metadata", + sql=""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'artifacts' AND column_name = 'format_metadata' + ) THEN + ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}'; END IF; - IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN - CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id); + END $$; + """, + ), + Migration( + name="002_add_package_format", + sql=""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'packages' AND column_name = 'format' + ) THEN + ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL; + CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format); END IF; - IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN - CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version); + END $$; + """, + ), + Migration( + name="003_add_package_platform", + sql=""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'packages' AND column_name = 'platform' + ) THEN + ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL; + CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform); END IF; - END IF; - END $$; - """, - # Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run) - """ - CREATE OR REPLACE FUNCTION increment_artifact_ref_count() - RETURNS TRIGGER AS $$ - BEGIN - UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - """, - """ - CREATE OR REPLACE FUNCTION decrement_artifact_ref_count() - RETURNS TRIGGER AS $$ - BEGIN - UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; - RETURN OLD; - END; - $$ LANGUAGE plpgsql; - """, - """ - CREATE OR REPLACE FUNCTION update_artifact_ref_count() - RETURNS TRIGGER AS $$ - BEGIN - IF OLD.artifact_id != NEW.artifact_id THEN - UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; + END $$; + """, + ), + Migration( + name="004_add_ref_count_index_constraint", + sql=""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count' + ) THEN + CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count); + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative' + ) THEN + ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0); + END IF; + END $$; + """, + ), + Migration( + name="005_add_composite_indexes", + sql=""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name' + ) THEN + CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name); + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name' + ) THEN + CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name); + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at' + ) THEN + CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at); + END IF; + END $$; + """, + ), + Migration( + name="006_add_package_versions_indexes", + sql=""" + DO $$ + BEGIN + IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN + IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN + CREATE INDEX idx_package_versions_package_id ON package_versions(package_id); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN + CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN + CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version); + END IF; + END IF; + END $$; + """, + ), + Migration( + name="007_create_ref_count_trigger_functions", + sql=""" + CREATE OR REPLACE FUNCTION increment_artifact_ref_count() + RETURNS TRIGGER AS $$ + BEGIN UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; - END IF; - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - """, - # Create triggers for tags ref_count management - """ - DO $$ - BEGIN - -- Drop and recreate triggers to ensure they're current - DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags; - CREATE TRIGGER tags_ref_count_insert_trigger - AFTER INSERT ON tags - FOR EACH ROW - EXECUTE FUNCTION increment_artifact_ref_count(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; - DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags; - CREATE TRIGGER tags_ref_count_delete_trigger - AFTER DELETE ON tags - FOR EACH ROW - EXECUTE FUNCTION decrement_artifact_ref_count(); + CREATE OR REPLACE FUNCTION decrement_artifact_ref_count() + RETURNS TRIGGER AS $$ + BEGIN + UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; + RETURN OLD; + END; + $$ LANGUAGE plpgsql; - DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags; - CREATE TRIGGER tags_ref_count_update_trigger - AFTER UPDATE ON tags - FOR EACH ROW - WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id) - EXECUTE FUNCTION update_artifact_ref_count(); - END $$; - """, - # Create ref_count trigger functions for package_versions - """ - CREATE OR REPLACE FUNCTION increment_version_ref_count() - RETURNS TRIGGER AS $$ - BEGIN - UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - """, - """ - CREATE OR REPLACE FUNCTION decrement_version_ref_count() - RETURNS TRIGGER AS $$ - BEGIN - UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; - RETURN OLD; - END; - $$ LANGUAGE plpgsql; - """, - # Create triggers for package_versions ref_count - """ - DO $$ - BEGIN - IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN - -- Drop and recreate triggers to ensure they're current - DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions; - CREATE TRIGGER package_versions_ref_count_insert - AFTER INSERT ON package_versions + CREATE OR REPLACE FUNCTION update_artifact_ref_count() + RETURNS TRIGGER AS $$ + BEGIN + IF OLD.artifact_id != NEW.artifact_id THEN + UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; + UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """, + ), + Migration( + name="008_create_tags_ref_count_triggers", + sql=""" + DO $$ + BEGIN + DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags; + CREATE TRIGGER tags_ref_count_insert_trigger + AFTER INSERT ON tags FOR EACH ROW - EXECUTE FUNCTION increment_version_ref_count(); + EXECUTE FUNCTION increment_artifact_ref_count(); - DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions; - CREATE TRIGGER package_versions_ref_count_delete - AFTER DELETE ON package_versions + DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags; + CREATE TRIGGER tags_ref_count_delete_trigger + AFTER DELETE ON tags FOR EACH ROW - EXECUTE FUNCTION decrement_version_ref_count(); - END IF; - END $$; - """, - # Migrate existing semver tags to package_versions - r""" - DO $$ - BEGIN - IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN - -- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.) - INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at) - SELECT - t.package_id, - t.artifact_id, - CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END, - 'migrated_from_tag', - t.created_by, - t.created_at - FROM tags t - WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$' - ON CONFLICT (package_id, version) DO NOTHING; - END IF; - END $$; - """, - # Teams and multi-tenancy migration (009_teams.sql) - """ - CREATE TABLE IF NOT EXISTS teams ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - name VARCHAR(255) NOT NULL, - slug VARCHAR(255) NOT NULL UNIQUE, - description TEXT, - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - created_by VARCHAR(255) NOT NULL, - settings JSONB DEFAULT '{}' - ); - """, - """ - CREATE TABLE IF NOT EXISTS team_memberships ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE, - user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, - role VARCHAR(50) NOT NULL DEFAULT 'member', - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - invited_by VARCHAR(255), - CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id), - CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member')) - ); - """, - """ - DO $$ - BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.columns - WHERE table_name = 'projects' AND column_name = 'team_id' - ) THEN - ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL; - CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id); - END IF; - END $$; - """, - """ - DO $$ - BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN - CREATE INDEX idx_teams_slug ON teams(slug); - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN - CREATE INDEX idx_teams_created_by ON teams(created_by); - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN - CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id); - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN - CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id); - END IF; - END $$; - """, + EXECUTE FUNCTION decrement_artifact_ref_count(); + + DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags; + CREATE TRIGGER tags_ref_count_update_trigger + AFTER UPDATE ON tags + FOR EACH ROW + WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id) + EXECUTE FUNCTION update_artifact_ref_count(); + END $$; + """, + ), + Migration( + name="009_create_version_ref_count_functions", + sql=""" + CREATE OR REPLACE FUNCTION increment_version_ref_count() + RETURNS TRIGGER AS $$ + BEGIN + UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + CREATE OR REPLACE FUNCTION decrement_version_ref_count() + RETURNS TRIGGER AS $$ + BEGIN + UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; + RETURN OLD; + END; + $$ LANGUAGE plpgsql; + """, + ), + Migration( + name="010_create_package_versions_triggers", + sql=""" + DO $$ + BEGIN + IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN + DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions; + CREATE TRIGGER package_versions_ref_count_insert + AFTER INSERT ON package_versions + FOR EACH ROW + EXECUTE FUNCTION increment_version_ref_count(); + + DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions; + CREATE TRIGGER package_versions_ref_count_delete + AFTER DELETE ON package_versions + FOR EACH ROW + EXECUTE FUNCTION decrement_version_ref_count(); + END IF; + END $$; + """, + ), + Migration( + name="011_migrate_semver_tags_to_versions", + sql=r""" + DO $$ + BEGIN + IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN + INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at) + SELECT + gen_random_uuid(), + t.package_id, + t.artifact_id, + CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END, + 'migrated_from_tag', + t.created_by, + t.created_at + FROM tags t + WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$' + ON CONFLICT (package_id, version) DO NOTHING; + END IF; + END $$; + """, + ), + Migration( + name="012_create_teams_table", + sql=""" + CREATE TABLE IF NOT EXISTS teams ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL, + slug VARCHAR(255) NOT NULL UNIQUE, + description TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + created_by VARCHAR(255) NOT NULL, + settings JSONB DEFAULT '{}' + ); + """, + ), + Migration( + name="013_create_team_memberships_table", + sql=""" + CREATE TABLE IF NOT EXISTS team_memberships ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE, + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + role VARCHAR(50) NOT NULL DEFAULT 'member', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + invited_by VARCHAR(255), + CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id), + CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member')) + ); + """, + ), + Migration( + name="014_add_team_id_to_projects", + sql=""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'projects' AND column_name = 'team_id' + ) THEN + ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL; + CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id); + END IF; + END $$; + """, + ), + Migration( + name="015_add_teams_indexes", + sql=""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN + CREATE INDEX idx_teams_slug ON teams(slug); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN + CREATE INDEX idx_teams_created_by ON teams(created_by); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN + CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id); + END IF; + IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN + CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id); + END IF; + END $$; + """, + ), ] with engine.connect() as conn: + # Ensure migrations tracking table exists + _ensure_migrations_table(conn) + + # Get already-applied migrations + applied = _get_applied_migrations(conn) + for migration in migrations: + checksum = _compute_checksum(migration.sql) + + # Check if migration was already applied + if migration.name in applied: + stored_checksum = applied[migration.name] + if stored_checksum != checksum: + logger.warning( + f"Migration '{migration.name}' has changed since it was applied! " + f"Stored checksum: {stored_checksum}, current: {checksum}" + ) + continue + + # Run the migration try: - conn.execute(text(migration)) + logger.info(f"Running migration: {migration.name}") + conn.execute(text(migration.sql)) conn.commit() + _record_migration(conn, migration.name, checksum) + logger.info(f"Migration '{migration.name}' applied successfully") except Exception as e: conn.rollback() - logger.warning(f"Migration failed (may already be applied): {e}") + if _is_safe_error(e): + # Migration was already applied (schema already exists) + logger.info( + f"Migration '{migration.name}' already applied (schema exists), recording as complete" + ) + _record_migration(conn, migration.name, checksum) + else: + # Real error - fail hard + logger.error(f"Migration '{migration.name}' failed: {e}") + raise RuntimeError( + f"Migration '{migration.name}' failed with error: {e}" + ) from e def get_db() -> Generator[Session, None, None]: