4 Commits

Author SHA1 Message Date
Mondo Diaz
c92895ffe9 Merge branch 'fix/migration-rollback' into 'main'
Add rollback after failed migration to allow subsequent migrations to run

See merge request esv/bsf/bsf-integration/orchard/orchard-mvp!51
2026-01-28 15:23:51 -06:00
Mondo Diaz
b147af43d2 Add rollback after failed migration to allow subsequent migrations to run 2026-01-28 15:23:51 -06:00
Mondo Diaz
aed48bb4a2 Merge branch 'fix/teams-migration-runtime-v2' into 'main'
Add teams migration to runtime migrations

See merge request esv/bsf/bsf-integration/orchard/orchard-mvp!50
2026-01-28 14:19:35 -06:00
Mondo Diaz
0e67ebf94f Add teams migration to runtime migrations 2026-01-28 14:19:35 -06:00

View File

@@ -1,10 +1,11 @@
from sqlalchemy import create_engine, text, event from sqlalchemy import create_engine, text, event
from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool from sqlalchemy.pool import QueuePool
from typing import Generator from typing import Generator, NamedTuple
from contextlib import contextmanager from contextlib import contextmanager
import logging import logging
import time import time
import hashlib
from .config import get_settings from .config import get_settings
from .models import Base from .models import Base
@@ -12,6 +13,21 @@ from .models import Base
settings = get_settings() settings = get_settings()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Migration(NamedTuple):
"""A database migration with a unique name and SQL to execute."""
name: str
sql: str
# PostgreSQL error codes that indicate "already exists" - safe to skip
SAFE_PG_ERROR_CODES = {
"42P07", # duplicate_table
"42701", # duplicate_column
"42710", # duplicate_object (index, constraint, etc.)
"42P16", # invalid_table_definition (e.g., column already exists)
}
# Build connect_args with query timeout if configured # Build connect_args with query timeout if configured
connect_args = {} connect_args = {}
if settings.database_query_timeout > 0: if settings.database_query_timeout > 0:
@@ -65,11 +81,65 @@ def init_db():
_run_migrations() _run_migrations()
def _ensure_migrations_table(conn) -> None:
"""Create the migrations tracking table if it doesn't exist."""
conn.execute(text("""
CREATE TABLE IF NOT EXISTS _schema_migrations (
name VARCHAR(255) PRIMARY KEY,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""))
conn.commit()
def _get_applied_migrations(conn) -> dict[str, str]:
"""Get all applied migrations and their checksums."""
result = conn.execute(text(
"SELECT name, checksum FROM _schema_migrations"
))
return {row[0]: row[1] for row in result}
def _compute_checksum(sql: str) -> str:
"""Compute a checksum for migration SQL to detect changes."""
return hashlib.sha256(sql.strip().encode()).hexdigest()[:16]
def _is_safe_error(exception: Exception) -> bool:
"""Check if the error indicates the migration was already applied."""
# Check for psycopg2 errors with pgcode attribute
original = getattr(exception, "orig", None)
if original is not None:
pgcode = getattr(original, "pgcode", None)
if pgcode in SAFE_PG_ERROR_CODES:
return True
# Fallback: check error message for common "already exists" patterns
error_str = str(exception).lower()
safe_patterns = [
"already exists",
"duplicate key",
"relation .* already exists",
"column .* already exists",
]
return any(pattern in error_str for pattern in safe_patterns)
def _record_migration(conn, name: str, checksum: str) -> None:
"""Record a migration as applied."""
conn.execute(text(
"INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)"
), {"name": name, "checksum": checksum})
conn.commit()
def _run_migrations(): def _run_migrations():
"""Run manual migrations for schema updates""" """Run manual migrations for schema updates with tracking and error detection."""
migrations = [ migrations = [
# Add format_metadata column to artifacts table Migration(
""" name="001_add_format_metadata",
sql="""
DO $$ DO $$
BEGIN BEGIN
IF NOT EXISTS ( IF NOT EXISTS (
@@ -80,8 +150,10 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
# Add format column to packages table ),
""" Migration(
name="002_add_package_format",
sql="""
DO $$ DO $$
BEGIN BEGIN
IF NOT EXISTS ( IF NOT EXISTS (
@@ -93,8 +165,10 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
# Add platform column to packages table ),
""" Migration(
name="003_add_package_platform",
sql="""
DO $$ DO $$
BEGIN BEGIN
IF NOT EXISTS ( IF NOT EXISTS (
@@ -106,18 +180,18 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
# Add ref_count index and constraints for artifacts ),
""" Migration(
name="004_add_ref_count_index_constraint",
sql="""
DO $$ DO $$
BEGIN BEGIN
-- Add ref_count index
IF NOT EXISTS ( IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count' SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
) THEN ) THEN
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count); CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
END IF; END IF;
-- Add ref_count >= 0 constraint
IF NOT EXISTS ( IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative' SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
) THEN ) THEN
@@ -125,25 +199,24 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
# Add composite indexes for packages and tags ),
""" Migration(
name="005_add_composite_indexes",
sql="""
DO $$ DO $$
BEGIN BEGIN
-- Composite index for package lookup by project and name
IF NOT EXISTS ( IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name' SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN ) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name); CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF; END IF;
-- Composite index for tag lookup by package and name
IF NOT EXISTS ( IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name' SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
) THEN ) THEN
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name); CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
END IF; END IF;
-- Composite index for recent tags queries
IF NOT EXISTS ( IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at' SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
) THEN ) THEN
@@ -151,13 +224,13 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
# Add package_versions indexes and triggers (007_package_versions.sql) ),
""" Migration(
name="006_add_package_versions_indexes",
sql="""
DO $$ DO $$
BEGIN BEGIN
-- Create indexes for package_versions if table exists
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Indexes for common queries
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id); CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF; END IF;
@@ -170,8 +243,10 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
# Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run) ),
""" Migration(
name="007_create_ref_count_trigger_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_artifact_ref_count() CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
BEGIN BEGIN
@@ -179,8 +254,7 @@ def _run_migrations():
RETURN NEW; RETURN NEW;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count() CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
BEGIN BEGIN
@@ -188,8 +262,7 @@ def _run_migrations():
RETURN OLD; RETURN OLD;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION update_artifact_ref_count() CREATE OR REPLACE FUNCTION update_artifact_ref_count()
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
BEGIN BEGIN
@@ -201,11 +274,12 @@ def _run_migrations():
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
""", """,
# Create triggers for tags ref_count management ),
""" Migration(
name="008_create_tags_ref_count_triggers",
sql="""
DO $$ DO $$
BEGIN BEGIN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags; DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
CREATE TRIGGER tags_ref_count_insert_trigger CREATE TRIGGER tags_ref_count_insert_trigger
AFTER INSERT ON tags AFTER INSERT ON tags
@@ -226,8 +300,10 @@ def _run_migrations():
EXECUTE FUNCTION update_artifact_ref_count(); EXECUTE FUNCTION update_artifact_ref_count();
END $$; END $$;
""", """,
# Create ref_count trigger functions for package_versions ),
""" Migration(
name="009_create_version_ref_count_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_version_ref_count() CREATE OR REPLACE FUNCTION increment_version_ref_count()
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
BEGIN BEGIN
@@ -235,8 +311,7 @@ def _run_migrations():
RETURN NEW; RETURN NEW;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION decrement_version_ref_count() CREATE OR REPLACE FUNCTION decrement_version_ref_count()
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
BEGIN BEGIN
@@ -245,12 +320,13 @@ def _run_migrations():
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
""", """,
# Create triggers for package_versions ref_count ),
""" Migration(
name="010_create_package_versions_triggers",
sql="""
DO $$ DO $$
BEGIN BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions; DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TRIGGER package_versions_ref_count_insert CREATE TRIGGER package_versions_ref_count_insert
AFTER INSERT ON package_versions AFTER INSERT ON package_versions
@@ -265,14 +341,16 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
# Migrate existing semver tags to package_versions ),
r""" Migration(
name="011_migrate_semver_tags_to_versions",
sql=r"""
DO $$ DO $$
BEGIN BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.) INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at)
INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at)
SELECT SELECT
gen_random_uuid(),
t.package_id, t.package_id,
t.artifact_id, t.artifact_id,
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END, CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
@@ -285,15 +363,115 @@ def _run_migrations():
END IF; END IF;
END $$; END $$;
""", """,
),
Migration(
name="012_create_teams_table",
sql="""
CREATE TABLE IF NOT EXISTS teams (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(255) NOT NULL UNIQUE,
description TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by VARCHAR(255) NOT NULL,
settings JSONB DEFAULT '{}'
);
""",
),
Migration(
name="013_create_team_memberships_table",
sql="""
CREATE TABLE IF NOT EXISTS team_memberships (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL DEFAULT 'member',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
invited_by VARCHAR(255),
CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id),
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
);
""",
),
Migration(
name="014_add_team_id_to_projects",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'projects' AND column_name = 'team_id'
) THEN
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id);
END IF;
END $$;
""",
),
Migration(
name="015_add_teams_indexes",
sql="""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
CREATE INDEX idx_teams_slug ON teams(slug);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN
CREATE INDEX idx_teams_created_by ON teams(created_by);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id);
END IF;
END $$;
""",
),
] ]
with engine.connect() as conn: with engine.connect() as conn:
# Ensure migrations tracking table exists
_ensure_migrations_table(conn)
# Get already-applied migrations
applied = _get_applied_migrations(conn)
for migration in migrations: for migration in migrations:
checksum = _compute_checksum(migration.sql)
# Check if migration was already applied
if migration.name in applied:
stored_checksum = applied[migration.name]
if stored_checksum != checksum:
logger.warning(
f"Migration '{migration.name}' has changed since it was applied! "
f"Stored checksum: {stored_checksum}, current: {checksum}"
)
continue
# Run the migration
try: try:
conn.execute(text(migration)) logger.info(f"Running migration: {migration.name}")
conn.execute(text(migration.sql))
conn.commit() conn.commit()
_record_migration(conn, migration.name, checksum)
logger.info(f"Migration '{migration.name}' applied successfully")
except Exception as e: except Exception as e:
logger.warning(f"Migration failed (may already be applied): {e}") conn.rollback()
if _is_safe_error(e):
# Migration was already applied (schema already exists)
logger.info(
f"Migration '{migration.name}' already applied (schema exists), recording as complete"
)
_record_migration(conn, migration.name, checksum)
else:
# Real error - fail hard
logger.error(f"Migration '{migration.name}' failed: {e}")
raise RuntimeError(
f"Migration '{migration.name}' failed with error: {e}"
) from e
def get_db() -> Generator[Session, None, None]: def get_db() -> Generator[Session, None, None]: