1 Commits

Author SHA1 Message Date
Mondo Diaz
832e4b27a8 Add teams migration to runtime migrations
Add teams table, team_memberships table, and team_id column to projects
in the runtime migrations. This fixes startup failures on existing databases
that don't have the teams schema from the migration file.
2026-01-28 20:25:57 +00:00

View File

@@ -1,11 +1,10 @@
from sqlalchemy import create_engine, text, event from sqlalchemy import create_engine, text, event
from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool from sqlalchemy.pool import QueuePool
from typing import Generator, NamedTuple from typing import Generator
from contextlib import contextmanager from contextlib import contextmanager
import logging import logging
import time import time
import hashlib
from .config import get_settings from .config import get_settings
from .models import Base from .models import Base
@@ -13,21 +12,6 @@ from .models import Base
settings = get_settings() settings = get_settings()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Migration(NamedTuple):
"""A database migration with a unique name and SQL to execute."""
name: str
sql: str
# PostgreSQL error codes that indicate "already exists" - safe to skip
SAFE_PG_ERROR_CODES = {
"42P07", # duplicate_table
"42701", # duplicate_column
"42710", # duplicate_object (index, constraint, etc.)
"42P16", # invalid_table_definition (e.g., column already exists)
}
# Build connect_args with query timeout if configured # Build connect_args with query timeout if configured
connect_args = {} connect_args = {}
if settings.database_query_timeout > 0: if settings.database_query_timeout > 0:
@@ -81,397 +65,290 @@ def init_db():
_run_migrations() _run_migrations()
def _ensure_migrations_table(conn) -> None:
"""Create the migrations tracking table if it doesn't exist."""
conn.execute(text("""
CREATE TABLE IF NOT EXISTS _schema_migrations (
name VARCHAR(255) PRIMARY KEY,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""))
conn.commit()
def _get_applied_migrations(conn) -> dict[str, str]:
"""Get all applied migrations and their checksums."""
result = conn.execute(text(
"SELECT name, checksum FROM _schema_migrations"
))
return {row[0]: row[1] for row in result}
def _compute_checksum(sql: str) -> str:
"""Compute a checksum for migration SQL to detect changes."""
return hashlib.sha256(sql.strip().encode()).hexdigest()[:16]
def _is_safe_error(exception: Exception) -> bool:
"""Check if the error indicates the migration was already applied."""
# Check for psycopg2 errors with pgcode attribute
original = getattr(exception, "orig", None)
if original is not None:
pgcode = getattr(original, "pgcode", None)
if pgcode in SAFE_PG_ERROR_CODES:
return True
# Fallback: check error message for common "already exists" patterns
error_str = str(exception).lower()
safe_patterns = [
"already exists",
"duplicate key",
"relation .* already exists",
"column .* already exists",
]
return any(pattern in error_str for pattern in safe_patterns)
def _record_migration(conn, name: str, checksum: str) -> None:
"""Record a migration as applied."""
conn.execute(text(
"INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)"
), {"name": name, "checksum": checksum})
conn.commit()
def _run_migrations(): def _run_migrations():
"""Run manual migrations for schema updates with tracking and error detection.""" """Run manual migrations for schema updates"""
migrations = [ migrations = [
Migration( # Add format_metadata column to artifacts table
name="001_add_format_metadata", """
sql=""" DO $$
DO $$ BEGIN
BEGIN IF NOT EXISTS (
IF NOT EXISTS ( SELECT 1 FROM information_schema.columns
SELECT 1 FROM information_schema.columns WHERE table_name = 'artifacts' AND column_name = 'format_metadata'
WHERE table_name = 'artifacts' AND column_name = 'format_metadata' ) THEN
) THEN ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}';
ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}'; END IF;
END IF; END $$;
END $$; """,
""", # Add format column to packages table
), """
Migration( DO $$
name="002_add_package_format", BEGIN
sql=""" IF NOT EXISTS (
DO $$ SELECT 1 FROM information_schema.columns
BEGIN WHERE table_name = 'packages' AND column_name = 'format'
IF NOT EXISTS ( ) THEN
SELECT 1 FROM information_schema.columns ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL;
WHERE table_name = 'packages' AND column_name = 'format' CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format);
) THEN END IF;
ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL; END $$;
CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format); """,
END IF; # Add platform column to packages table
END $$; """
""", DO $$
), BEGIN
Migration( IF NOT EXISTS (
name="003_add_package_platform", SELECT 1 FROM information_schema.columns
sql=""" WHERE table_name = 'packages' AND column_name = 'platform'
DO $$ ) THEN
BEGIN ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL;
IF NOT EXISTS ( CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform);
SELECT 1 FROM information_schema.columns END IF;
WHERE table_name = 'packages' AND column_name = 'platform' END $$;
) THEN """,
ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL; # Add ref_count index and constraints for artifacts
CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform); """
END IF; DO $$
END $$; BEGIN
""", -- Add ref_count index
), IF NOT EXISTS (
Migration( SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
name="004_add_ref_count_index_constraint", ) THEN
sql=""" CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
DO $$ END IF;
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
) THEN
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
END IF;
IF NOT EXISTS ( -- Add ref_count >= 0 constraint
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative' IF NOT EXISTS (
) THEN SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0); ) THEN
END IF; ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0);
END $$; END IF;
""", END $$;
), """,
Migration( # Add composite indexes for packages and tags
name="005_add_composite_indexes", """
sql=""" DO $$
DO $$ BEGIN
BEGIN -- Composite index for package lookup by project and name
IF NOT EXISTS ( IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name' SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN ) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name); CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF; END IF;
IF NOT EXISTS ( -- Composite index for tag lookup by package and name
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name' IF NOT EXISTS (
) THEN SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name); ) THEN
END IF; CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
END IF;
IF NOT EXISTS ( -- Composite index for recent tags queries
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at' IF NOT EXISTS (
) THEN SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at); ) THEN
CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at);
END IF;
END $$;
""",
# Add package_versions indexes and triggers (007_package_versions.sql)
"""
DO $$
BEGIN
-- Create indexes for package_versions if table exists
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Indexes for common queries
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF; END IF;
END $$; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN
""", CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id);
),
Migration(
name="006_add_package_versions_indexes",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN
CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN
CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version);
END IF;
END IF; END IF;
END $$; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN
""", CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version);
), END IF;
Migration( END IF;
name="007_create_ref_count_trigger_functions", END $$;
sql=""" """,
CREATE OR REPLACE FUNCTION increment_artifact_ref_count() # Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run)
RETURNS TRIGGER AS $$ """
BEGIN CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; RETURNS TRIGGER AS $$
RETURN NEW; BEGIN
END; UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
$$ LANGUAGE plpgsql; RETURN NEW;
END;
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count() $$ LANGUAGE plpgsql;
RETURNS TRIGGER AS $$ """,
BEGIN """
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.artifact_id != NEW.artifact_id THEN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.artifact_id != NEW.artifact_id THEN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
),
Migration(
name="008_create_tags_ref_count_triggers",
sql="""
DO $$
BEGIN
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
CREATE TRIGGER tags_ref_count_insert_trigger
AFTER INSERT ON tags
FOR EACH ROW
EXECUTE FUNCTION increment_artifact_ref_count();
DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags;
CREATE TRIGGER tags_ref_count_delete_trigger
AFTER DELETE ON tags
FOR EACH ROW
EXECUTE FUNCTION decrement_artifact_ref_count();
DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags;
CREATE TRIGGER tags_ref_count_update_trigger
AFTER UPDATE ON tags
FOR EACH ROW
WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id)
EXECUTE FUNCTION update_artifact_ref_count();
END $$;
""",
),
Migration(
name="009_create_version_ref_count_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
RETURN NEW; END IF;
END; RETURN NEW;
$$ LANGUAGE plpgsql; END;
$$ LANGUAGE plpgsql;
""",
# Create triggers for tags ref_count management
"""
DO $$
BEGIN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
CREATE TRIGGER tags_ref_count_insert_trigger
AFTER INSERT ON tags
FOR EACH ROW
EXECUTE FUNCTION increment_artifact_ref_count();
CREATE OR REPLACE FUNCTION decrement_version_ref_count() DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags;
RETURNS TRIGGER AS $$ CREATE TRIGGER tags_ref_count_delete_trigger
BEGIN AFTER DELETE ON tags
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; FOR EACH ROW
RETURN OLD; EXECUTE FUNCTION decrement_artifact_ref_count();
END;
$$ LANGUAGE plpgsql;
""",
),
Migration(
name="010_create_package_versions_triggers",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TRIGGER package_versions_ref_count_insert
AFTER INSERT ON package_versions
FOR EACH ROW
EXECUTE FUNCTION increment_version_ref_count();
DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions; DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags;
CREATE TRIGGER package_versions_ref_count_delete CREATE TRIGGER tags_ref_count_update_trigger
AFTER DELETE ON package_versions AFTER UPDATE ON tags
FOR EACH ROW FOR EACH ROW
EXECUTE FUNCTION decrement_version_ref_count(); WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id)
END IF; EXECUTE FUNCTION update_artifact_ref_count();
END $$; END $$;
""", """,
), # Create ref_count trigger functions for package_versions
Migration( """
name="011_migrate_semver_tags_to_versions", CREATE OR REPLACE FUNCTION increment_version_ref_count()
sql=r""" RETURNS TRIGGER AS $$
DO $$ BEGIN
BEGIN UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN RETURN NEW;
INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at) END;
SELECT $$ LANGUAGE plpgsql;
gen_random_uuid(), """,
t.package_id, """
t.artifact_id, CREATE OR REPLACE FUNCTION decrement_version_ref_count()
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END, RETURNS TRIGGER AS $$
'migrated_from_tag', BEGIN
t.created_by, UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
t.created_at RETURN OLD;
FROM tags t END;
WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$' $$ LANGUAGE plpgsql;
ON CONFLICT (package_id, version) DO NOTHING; """,
END IF; # Create triggers for package_versions ref_count
END $$; """
""", DO $$
), BEGIN
Migration( IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
name="012_create_teams_table", -- Drop and recreate triggers to ensure they're current
sql=""" DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TABLE IF NOT EXISTS teams ( CREATE TRIGGER package_versions_ref_count_insert
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), AFTER INSERT ON package_versions
name VARCHAR(255) NOT NULL, FOR EACH ROW
slug VARCHAR(255) NOT NULL UNIQUE, EXECUTE FUNCTION increment_version_ref_count();
description TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions;
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), CREATE TRIGGER package_versions_ref_count_delete
created_by VARCHAR(255) NOT NULL, AFTER DELETE ON package_versions
settings JSONB DEFAULT '{}' FOR EACH ROW
); EXECUTE FUNCTION decrement_version_ref_count();
""", END IF;
), END $$;
Migration( """,
name="013_create_team_memberships_table", # Migrate existing semver tags to package_versions
sql=""" r"""
CREATE TABLE IF NOT EXISTS team_memberships ( DO $$
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), BEGIN
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE, IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, -- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.)
role VARCHAR(50) NOT NULL DEFAULT 'member', INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at)
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), SELECT
invited_by VARCHAR(255), t.package_id,
CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id), t.artifact_id,
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member')) CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
); 'migrated_from_tag',
""", t.created_by,
), t.created_at
Migration( FROM tags t
name="014_add_team_id_to_projects", WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$'
sql=""" ON CONFLICT (package_id, version) DO NOTHING;
DO $$ END IF;
BEGIN END $$;
IF NOT EXISTS ( """,
SELECT 1 FROM information_schema.columns # Teams and multi-tenancy migration (009_teams.sql)
WHERE table_name = 'projects' AND column_name = 'team_id' """
) THEN CREATE TABLE IF NOT EXISTS teams (
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL; id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id); name VARCHAR(255) NOT NULL,
END IF; slug VARCHAR(255) NOT NULL UNIQUE,
END $$; description TEXT,
""", created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
Migration( created_by VARCHAR(255) NOT NULL,
name="015_add_teams_indexes", settings JSONB DEFAULT '{}'
sql=""" );
DO $$ """,
BEGIN """
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN CREATE TABLE IF NOT EXISTS team_memberships (
CREATE INDEX idx_teams_slug ON teams(slug); id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
END IF; team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
CREATE INDEX idx_teams_created_by ON teams(created_by); role VARCHAR(50) NOT NULL DEFAULT 'member',
END IF; created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN invited_by VARCHAR(255),
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id); CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id),
END IF; CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN );
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id); """,
END IF; """
END $$; DO $$
""", BEGIN
), IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'projects' AND column_name = 'team_id'
) THEN
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id);
END IF;
END $$;
""",
"""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
CREATE INDEX idx_teams_slug ON teams(slug);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN
CREATE INDEX idx_teams_created_by ON teams(created_by);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id);
END IF;
END $$;
""",
] ]
with engine.connect() as conn: with engine.connect() as conn:
# Ensure migrations tracking table exists
_ensure_migrations_table(conn)
# Get already-applied migrations
applied = _get_applied_migrations(conn)
for migration in migrations: for migration in migrations:
checksum = _compute_checksum(migration.sql)
# Check if migration was already applied
if migration.name in applied:
stored_checksum = applied[migration.name]
if stored_checksum != checksum:
logger.warning(
f"Migration '{migration.name}' has changed since it was applied! "
f"Stored checksum: {stored_checksum}, current: {checksum}"
)
continue
# Run the migration
try: try:
logger.info(f"Running migration: {migration.name}") conn.execute(text(migration))
conn.execute(text(migration.sql))
conn.commit() conn.commit()
_record_migration(conn, migration.name, checksum)
logger.info(f"Migration '{migration.name}' applied successfully")
except Exception as e: except Exception as e:
conn.rollback() conn.rollback()
if _is_safe_error(e): logger.warning(f"Migration failed (may already be applied): {e}")
# Migration was already applied (schema already exists)
logger.info(
f"Migration '{migration.name}' already applied (schema exists), recording as complete"
)
_record_migration(conn, migration.name, checksum)
else:
# Real error - fail hard
logger.error(f"Migration '{migration.name}' failed: {e}")
raise RuntimeError(
f"Migration '{migration.name}' failed with error: {e}"
) from e
def get_db() -> Generator[Session, None, None]: def get_db() -> Generator[Session, None, None]: