2 Commits

Author SHA1 Message Date
Mondo Diaz
9df50d0963 Add migration tracking with smart error detection
- Add _schema_migrations table to track applied migrations
- Each migration has a unique name and checksum
- Migrations are only run once (tracked by name)
- Checksum changes are detected and logged as warnings
- Smart error detection distinguishes "already applied" errors from real failures
- Real errors now fail hard with clear error messages
- Safe PostgreSQL error codes (42P07, 42701, 42710, 42P16) are recognized
- Fix semver migration to generate UUID for id column
2026-01-28 21:05:45 +00:00
Mondo Diaz
c60a7ba1ab Add rollback after failed migration to allow subsequent migrations to run
When a migration fails, the transaction is left in a failed state. Without
rollback, all subsequent migrations fail with "current transaction is aborted".
This was preventing the team_id column from being added to projects.
2026-01-28 20:27:46 +00:00

View File

@@ -1,10 +1,11 @@
from sqlalchemy import create_engine, text, event from sqlalchemy import create_engine, text, event
from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool from sqlalchemy.pool import QueuePool
from typing import Generator from typing import Generator, NamedTuple
from contextlib import contextmanager from contextlib import contextmanager
import logging import logging
import time import time
import hashlib
from .config import get_settings from .config import get_settings
from .models import Base from .models import Base
@@ -12,6 +13,21 @@ from .models import Base
settings = get_settings() settings = get_settings()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Migration(NamedTuple):
"""A database migration with a unique name and SQL to execute."""
name: str
sql: str
# PostgreSQL error codes that indicate "already exists" - safe to skip
SAFE_PG_ERROR_CODES = {
"42P07", # duplicate_table
"42701", # duplicate_column
"42710", # duplicate_object (index, constraint, etc.)
"42P16", # invalid_table_definition (e.g., column already exists)
}
# Build connect_args with query timeout if configured # Build connect_args with query timeout if configured
connect_args = {} connect_args = {}
if settings.database_query_timeout > 0: if settings.database_query_timeout > 0:
@@ -65,289 +81,397 @@ def init_db():
_run_migrations() _run_migrations()
def _ensure_migrations_table(conn) -> None:
"""Create the migrations tracking table if it doesn't exist."""
conn.execute(text("""
CREATE TABLE IF NOT EXISTS _schema_migrations (
name VARCHAR(255) PRIMARY KEY,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""))
conn.commit()
def _get_applied_migrations(conn) -> dict[str, str]:
"""Get all applied migrations and their checksums."""
result = conn.execute(text(
"SELECT name, checksum FROM _schema_migrations"
))
return {row[0]: row[1] for row in result}
def _compute_checksum(sql: str) -> str:
"""Compute a checksum for migration SQL to detect changes."""
return hashlib.sha256(sql.strip().encode()).hexdigest()[:16]
def _is_safe_error(exception: Exception) -> bool:
"""Check if the error indicates the migration was already applied."""
# Check for psycopg2 errors with pgcode attribute
original = getattr(exception, "orig", None)
if original is not None:
pgcode = getattr(original, "pgcode", None)
if pgcode in SAFE_PG_ERROR_CODES:
return True
# Fallback: check error message for common "already exists" patterns
error_str = str(exception).lower()
safe_patterns = [
"already exists",
"duplicate key",
"relation .* already exists",
"column .* already exists",
]
return any(pattern in error_str for pattern in safe_patterns)
def _record_migration(conn, name: str, checksum: str) -> None:
"""Record a migration as applied."""
conn.execute(text(
"INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)"
), {"name": name, "checksum": checksum})
conn.commit()
def _run_migrations(): def _run_migrations():
"""Run manual migrations for schema updates""" """Run manual migrations for schema updates with tracking and error detection."""
migrations = [ migrations = [
# Add format_metadata column to artifacts table Migration(
""" name="001_add_format_metadata",
DO $$ sql="""
BEGIN DO $$
IF NOT EXISTS ( BEGIN
SELECT 1 FROM information_schema.columns IF NOT EXISTS (
WHERE table_name = 'artifacts' AND column_name = 'format_metadata' SELECT 1 FROM information_schema.columns
) THEN WHERE table_name = 'artifacts' AND column_name = 'format_metadata'
ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}'; ) THEN
END IF; ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}';
END $$;
""",
# Add format column to packages table
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'format'
) THEN
ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format);
END IF;
END $$;
""",
# Add platform column to packages table
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'platform'
) THEN
ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform);
END IF;
END $$;
""",
# Add ref_count index and constraints for artifacts
"""
DO $$
BEGIN
-- Add ref_count index
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
) THEN
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
END IF;
-- Add ref_count >= 0 constraint
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
) THEN
ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0);
END IF;
END $$;
""",
# Add composite indexes for packages and tags
"""
DO $$
BEGIN
-- Composite index for package lookup by project and name
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF;
-- Composite index for tag lookup by package and name
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
) THEN
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
END IF;
-- Composite index for recent tags queries
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
) THEN
CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at);
END IF;
END $$;
""",
# Add package_versions indexes and triggers (007_package_versions.sql)
"""
DO $$
BEGIN
-- Create indexes for package_versions if table exists
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Indexes for common queries
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF; END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN END $$;
CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id); """,
),
Migration(
name="002_add_package_format",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'format'
) THEN
ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format);
END IF; END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN END $$;
CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version); """,
),
Migration(
name="003_add_package_platform",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'platform'
) THEN
ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform);
END IF; END IF;
END IF; END $$;
END $$; """,
""", ),
# Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run) Migration(
""" name="004_add_ref_count_index_constraint",
CREATE OR REPLACE FUNCTION increment_artifact_ref_count() sql="""
RETURNS TRIGGER AS $$ DO $$
BEGIN BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; IF NOT EXISTS (
RETURN NEW; SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
END; ) THEN
$$ LANGUAGE plpgsql; CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
""", END IF;
"""
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count() IF NOT EXISTS (
RETURNS TRIGGER AS $$ SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
BEGIN ) THEN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0);
RETURN OLD; END IF;
END; END $$;
$$ LANGUAGE plpgsql; """,
""", ),
""" Migration(
CREATE OR REPLACE FUNCTION update_artifact_ref_count() name="005_add_composite_indexes",
RETURNS TRIGGER AS $$ sql="""
BEGIN DO $$
IF OLD.artifact_id != NEW.artifact_id THEN BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
) THEN
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
) THEN
CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at);
END IF;
END $$;
""",
),
Migration(
name="006_add_package_versions_indexes",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN
CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN
CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version);
END IF;
END IF;
END $$;
""",
),
Migration(
name="007_create_ref_count_trigger_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
END IF; RETURN NEW;
RETURN NEW; END;
END; $$ LANGUAGE plpgsql;
$$ LANGUAGE plpgsql;
""",
# Create triggers for tags ref_count management
"""
DO $$
BEGIN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
CREATE TRIGGER tags_ref_count_insert_trigger
AFTER INSERT ON tags
FOR EACH ROW
EXECUTE FUNCTION increment_artifact_ref_count();
DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags; CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
CREATE TRIGGER tags_ref_count_delete_trigger RETURNS TRIGGER AS $$
AFTER DELETE ON tags BEGIN
FOR EACH ROW UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
EXECUTE FUNCTION decrement_artifact_ref_count(); RETURN OLD;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags; CREATE OR REPLACE FUNCTION update_artifact_ref_count()
CREATE TRIGGER tags_ref_count_update_trigger RETURNS TRIGGER AS $$
AFTER UPDATE ON tags BEGIN
FOR EACH ROW IF OLD.artifact_id != NEW.artifact_id THEN
WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id) UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
EXECUTE FUNCTION update_artifact_ref_count(); UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
END $$; END IF;
""", RETURN NEW;
# Create ref_count trigger functions for package_versions END;
""" $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION increment_version_ref_count() """,
RETURNS TRIGGER AS $$ ),
BEGIN Migration(
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; name="008_create_tags_ref_count_triggers",
RETURN NEW; sql="""
END; DO $$
$$ LANGUAGE plpgsql; BEGIN
""", DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
""" CREATE TRIGGER tags_ref_count_insert_trigger
CREATE OR REPLACE FUNCTION decrement_version_ref_count() AFTER INSERT ON tags
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
""",
# Create triggers for package_versions ref_count
"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TRIGGER package_versions_ref_count_insert
AFTER INSERT ON package_versions
FOR EACH ROW FOR EACH ROW
EXECUTE FUNCTION increment_version_ref_count(); EXECUTE FUNCTION increment_artifact_ref_count();
DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions; DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags;
CREATE TRIGGER package_versions_ref_count_delete CREATE TRIGGER tags_ref_count_delete_trigger
AFTER DELETE ON package_versions AFTER DELETE ON tags
FOR EACH ROW FOR EACH ROW
EXECUTE FUNCTION decrement_version_ref_count(); EXECUTE FUNCTION decrement_artifact_ref_count();
END IF;
END $$; DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags;
""", CREATE TRIGGER tags_ref_count_update_trigger
# Migrate existing semver tags to package_versions AFTER UPDATE ON tags
r""" FOR EACH ROW
DO $$ WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id)
BEGIN EXECUTE FUNCTION update_artifact_ref_count();
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN END $$;
-- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.) """,
INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at) ),
SELECT Migration(
t.package_id, name="009_create_version_ref_count_functions",
t.artifact_id, sql="""
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END, CREATE OR REPLACE FUNCTION increment_version_ref_count()
'migrated_from_tag', RETURNS TRIGGER AS $$
t.created_by, BEGIN
t.created_at UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
FROM tags t RETURN NEW;
WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$' END;
ON CONFLICT (package_id, version) DO NOTHING; $$ LANGUAGE plpgsql;
END IF;
END $$; CREATE OR REPLACE FUNCTION decrement_version_ref_count()
""", RETURNS TRIGGER AS $$
# Teams and multi-tenancy migration (009_teams.sql) BEGIN
""" UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
CREATE TABLE IF NOT EXISTS teams ( RETURN OLD;
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), END;
name VARCHAR(255) NOT NULL, $$ LANGUAGE plpgsql;
slug VARCHAR(255) NOT NULL UNIQUE, """,
description TEXT, ),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), Migration(
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), name="010_create_package_versions_triggers",
created_by VARCHAR(255) NOT NULL, sql="""
settings JSONB DEFAULT '{}' DO $$
); BEGIN
""", IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
""" DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TABLE IF NOT EXISTS team_memberships ( CREATE TRIGGER package_versions_ref_count_insert
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), AFTER INSERT ON package_versions
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE, FOR EACH ROW
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, EXECUTE FUNCTION increment_version_ref_count();
role VARCHAR(50) NOT NULL DEFAULT 'member',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions;
invited_by VARCHAR(255), CREATE TRIGGER package_versions_ref_count_delete
CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id), AFTER DELETE ON package_versions
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member')) FOR EACH ROW
); EXECUTE FUNCTION decrement_version_ref_count();
""", END IF;
""" END $$;
DO $$ """,
BEGIN ),
IF NOT EXISTS ( Migration(
SELECT 1 FROM information_schema.columns name="011_migrate_semver_tags_to_versions",
WHERE table_name = 'projects' AND column_name = 'team_id' sql=r"""
) THEN DO $$
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL; BEGIN
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id); IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
END IF; INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at)
END $$; SELECT
""", gen_random_uuid(),
""" t.package_id,
DO $$ t.artifact_id,
BEGIN CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN 'migrated_from_tag',
CREATE INDEX idx_teams_slug ON teams(slug); t.created_by,
END IF; t.created_at
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN FROM tags t
CREATE INDEX idx_teams_created_by ON teams(created_by); WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$'
END IF; ON CONFLICT (package_id, version) DO NOTHING;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN END IF;
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id); END $$;
END IF; """,
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN ),
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id); Migration(
END IF; name="012_create_teams_table",
END $$; sql="""
""", CREATE TABLE IF NOT EXISTS teams (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(255) NOT NULL UNIQUE,
description TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by VARCHAR(255) NOT NULL,
settings JSONB DEFAULT '{}'
);
""",
),
Migration(
name="013_create_team_memberships_table",
sql="""
CREATE TABLE IF NOT EXISTS team_memberships (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL DEFAULT 'member',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
invited_by VARCHAR(255),
CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id),
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
);
""",
),
Migration(
name="014_add_team_id_to_projects",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'projects' AND column_name = 'team_id'
) THEN
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id);
END IF;
END $$;
""",
),
Migration(
name="015_add_teams_indexes",
sql="""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
CREATE INDEX idx_teams_slug ON teams(slug);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN
CREATE INDEX idx_teams_created_by ON teams(created_by);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id);
END IF;
END $$;
""",
),
] ]
with engine.connect() as conn: with engine.connect() as conn:
# Ensure migrations tracking table exists
_ensure_migrations_table(conn)
# Get already-applied migrations
applied = _get_applied_migrations(conn)
for migration in migrations: for migration in migrations:
checksum = _compute_checksum(migration.sql)
# Check if migration was already applied
if migration.name in applied:
stored_checksum = applied[migration.name]
if stored_checksum != checksum:
logger.warning(
f"Migration '{migration.name}' has changed since it was applied! "
f"Stored checksum: {stored_checksum}, current: {checksum}"
)
continue
# Run the migration
try: try:
conn.execute(text(migration)) logger.info(f"Running migration: {migration.name}")
conn.execute(text(migration.sql))
conn.commit() conn.commit()
_record_migration(conn, migration.name, checksum)
logger.info(f"Migration '{migration.name}' applied successfully")
except Exception as e: except Exception as e:
logger.warning(f"Migration failed (may already be applied): {e}") conn.rollback()
if _is_safe_error(e):
# Migration was already applied (schema already exists)
logger.info(
f"Migration '{migration.name}' already applied (schema exists), recording as complete"
)
_record_migration(conn, migration.name, checksum)
else:
# Real error - fail hard
logger.error(f"Migration '{migration.name}' failed: {e}")
raise RuntimeError(
f"Migration '{migration.name}' failed with error: {e}"
) from e
def get_db() -> Generator[Session, None, None]: def get_db() -> Generator[Session, None, None]: