4 Commits

Author SHA1 Message Date
Mondo Diaz
9df50d0963 Add migration tracking with smart error detection
- Add _schema_migrations table to track applied migrations
- Each migration has a unique name and checksum
- Migrations are only run once (tracked by name)
- Checksum changes are detected and logged as warnings
- Smart error detection distinguishes "already applied" errors from real failures
- Real errors now fail hard with clear error messages
- Safe PostgreSQL error codes (42P07, 42701, 42710, 42P16) are recognized
- Fix semver migration to generate UUID for id column
2026-01-28 21:05:45 +00:00
Mondo Diaz
c60a7ba1ab Add rollback after failed migration to allow subsequent migrations to run
When a migration fails, the transaction is left in a failed state. Without
rollback, all subsequent migrations fail with "current transaction is aborted".
This was preventing the team_id column from being added to projects.
2026-01-28 20:27:46 +00:00
Mondo Diaz
aed48bb4a2 Merge branch 'fix/teams-migration-runtime-v2' into 'main'
Add teams migration to runtime migrations

See merge request esv/bsf/bsf-integration/orchard/orchard-mvp!50
2026-01-28 14:19:35 -06:00
Mondo Diaz
0e67ebf94f Add teams migration to runtime migrations 2026-01-28 14:19:35 -06:00

View File

@@ -1,10 +1,11 @@
from sqlalchemy import create_engine, text, event
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from typing import Generator
from typing import Generator, NamedTuple
from contextlib import contextmanager
import logging
import time
import hashlib
from .config import get_settings
from .models import Base
@@ -12,6 +13,21 @@ from .models import Base
settings = get_settings()
logger = logging.getLogger(__name__)
class Migration(NamedTuple):
"""A database migration with a unique name and SQL to execute."""
name: str
sql: str
# PostgreSQL error codes that indicate "already exists" - safe to skip
SAFE_PG_ERROR_CODES = {
"42P07", # duplicate_table
"42701", # duplicate_column
"42710", # duplicate_object (index, constraint, etc.)
"42P16", # invalid_table_definition (e.g., column already exists)
}
# Build connect_args with query timeout if configured
connect_args = {}
if settings.database_query_timeout > 0:
@@ -65,235 +81,397 @@ def init_db():
_run_migrations()
def _ensure_migrations_table(conn) -> None:
"""Create the migrations tracking table if it doesn't exist."""
conn.execute(text("""
CREATE TABLE IF NOT EXISTS _schema_migrations (
name VARCHAR(255) PRIMARY KEY,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""))
conn.commit()
def _get_applied_migrations(conn) -> dict[str, str]:
"""Get all applied migrations and their checksums."""
result = conn.execute(text(
"SELECT name, checksum FROM _schema_migrations"
))
return {row[0]: row[1] for row in result}
def _compute_checksum(sql: str) -> str:
"""Compute a checksum for migration SQL to detect changes."""
return hashlib.sha256(sql.strip().encode()).hexdigest()[:16]
def _is_safe_error(exception: Exception) -> bool:
"""Check if the error indicates the migration was already applied."""
# Check for psycopg2 errors with pgcode attribute
original = getattr(exception, "orig", None)
if original is not None:
pgcode = getattr(original, "pgcode", None)
if pgcode in SAFE_PG_ERROR_CODES:
return True
# Fallback: check error message for common "already exists" patterns
error_str = str(exception).lower()
safe_patterns = [
"already exists",
"duplicate key",
"relation .* already exists",
"column .* already exists",
]
return any(pattern in error_str for pattern in safe_patterns)
def _record_migration(conn, name: str, checksum: str) -> None:
"""Record a migration as applied."""
conn.execute(text(
"INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)"
), {"name": name, "checksum": checksum})
conn.commit()
def _run_migrations():
"""Run manual migrations for schema updates"""
"""Run manual migrations for schema updates with tracking and error detection."""
migrations = [
# Add format_metadata column to artifacts table
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'artifacts' AND column_name = 'format_metadata'
) THEN
ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}';
END IF;
END $$;
""",
# Add format column to packages table
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'format'
) THEN
ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format);
END IF;
END $$;
""",
# Add platform column to packages table
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'platform'
) THEN
ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform);
END IF;
END $$;
""",
# Add ref_count index and constraints for artifacts
"""
DO $$
BEGIN
-- Add ref_count index
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
) THEN
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
END IF;
-- Add ref_count >= 0 constraint
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
) THEN
ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0);
END IF;
END $$;
""",
# Add composite indexes for packages and tags
"""
DO $$
BEGIN
-- Composite index for package lookup by project and name
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF;
-- Composite index for tag lookup by package and name
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
) THEN
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
END IF;
-- Composite index for recent tags queries
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
) THEN
CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at);
END IF;
END $$;
""",
# Add package_versions indexes and triggers (007_package_versions.sql)
"""
DO $$
BEGIN
-- Create indexes for package_versions if table exists
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Indexes for common queries
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
Migration(
name="001_add_format_metadata",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'artifacts' AND column_name = 'format_metadata'
) THEN
ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}';
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN
CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id);
END $$;
""",
),
Migration(
name="002_add_package_format",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'format'
) THEN
ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN
CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version);
END $$;
""",
),
Migration(
name="003_add_package_platform",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'packages' AND column_name = 'platform'
) THEN
ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL;
CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform);
END IF;
END IF;
END $$;
""",
# Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run)
"""
CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.artifact_id != NEW.artifact_id THEN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
END $$;
""",
),
Migration(
name="004_add_ref_count_index_constraint",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
) THEN
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
) THEN
ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0);
END IF;
END $$;
""",
),
Migration(
name="005_add_composite_indexes",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
) THEN
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
) THEN
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
) THEN
CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at);
END IF;
END $$;
""",
),
Migration(
name="006_add_package_versions_indexes",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN
CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN
CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version);
END IF;
END IF;
END $$;
""",
),
Migration(
name="007_create_ref_count_trigger_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
# Create triggers for tags ref_count management
"""
DO $$
BEGIN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
CREATE TRIGGER tags_ref_count_insert_trigger
AFTER INSERT ON tags
FOR EACH ROW
EXECUTE FUNCTION increment_artifact_ref_count();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags;
CREATE TRIGGER tags_ref_count_delete_trigger
AFTER DELETE ON tags
FOR EACH ROW
EXECUTE FUNCTION decrement_artifact_ref_count();
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags;
CREATE TRIGGER tags_ref_count_update_trigger
AFTER UPDATE ON tags
FOR EACH ROW
WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id)
EXECUTE FUNCTION update_artifact_ref_count();
END $$;
""",
# Create ref_count trigger functions for package_versions
"""
CREATE OR REPLACE FUNCTION increment_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
"""
CREATE OR REPLACE FUNCTION decrement_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
""",
# Create triggers for package_versions ref_count
"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Drop and recreate triggers to ensure they're current
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TRIGGER package_versions_ref_count_insert
AFTER INSERT ON package_versions
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.artifact_id != NEW.artifact_id THEN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""",
),
Migration(
name="008_create_tags_ref_count_triggers",
sql="""
DO $$
BEGIN
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
CREATE TRIGGER tags_ref_count_insert_trigger
AFTER INSERT ON tags
FOR EACH ROW
EXECUTE FUNCTION increment_version_ref_count();
EXECUTE FUNCTION increment_artifact_ref_count();
DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions;
CREATE TRIGGER package_versions_ref_count_delete
AFTER DELETE ON package_versions
DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags;
CREATE TRIGGER tags_ref_count_delete_trigger
AFTER DELETE ON tags
FOR EACH ROW
EXECUTE FUNCTION decrement_version_ref_count();
END IF;
END $$;
""",
# Migrate existing semver tags to package_versions
r"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
-- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.)
INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at)
SELECT
t.package_id,
t.artifact_id,
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
'migrated_from_tag',
t.created_by,
t.created_at
FROM tags t
WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$'
ON CONFLICT (package_id, version) DO NOTHING;
END IF;
END $$;
""",
EXECUTE FUNCTION decrement_artifact_ref_count();
DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags;
CREATE TRIGGER tags_ref_count_update_trigger
AFTER UPDATE ON tags
FOR EACH ROW
WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id)
EXECUTE FUNCTION update_artifact_ref_count();
END $$;
""",
),
Migration(
name="009_create_version_ref_count_functions",
sql="""
CREATE OR REPLACE FUNCTION increment_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION decrement_version_ref_count()
RETURNS TRIGGER AS $$
BEGIN
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
""",
),
Migration(
name="010_create_package_versions_triggers",
sql="""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
CREATE TRIGGER package_versions_ref_count_insert
AFTER INSERT ON package_versions
FOR EACH ROW
EXECUTE FUNCTION increment_version_ref_count();
DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions;
CREATE TRIGGER package_versions_ref_count_delete
AFTER DELETE ON package_versions
FOR EACH ROW
EXECUTE FUNCTION decrement_version_ref_count();
END IF;
END $$;
""",
),
Migration(
name="011_migrate_semver_tags_to_versions",
sql=r"""
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at)
SELECT
gen_random_uuid(),
t.package_id,
t.artifact_id,
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
'migrated_from_tag',
t.created_by,
t.created_at
FROM tags t
WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$'
ON CONFLICT (package_id, version) DO NOTHING;
END IF;
END $$;
""",
),
Migration(
name="012_create_teams_table",
sql="""
CREATE TABLE IF NOT EXISTS teams (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(255) NOT NULL UNIQUE,
description TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by VARCHAR(255) NOT NULL,
settings JSONB DEFAULT '{}'
);
""",
),
Migration(
name="013_create_team_memberships_table",
sql="""
CREATE TABLE IF NOT EXISTS team_memberships (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL DEFAULT 'member',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
invited_by VARCHAR(255),
CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id),
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
);
""",
),
Migration(
name="014_add_team_id_to_projects",
sql="""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'projects' AND column_name = 'team_id'
) THEN
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id);
END IF;
END $$;
""",
),
Migration(
name="015_add_teams_indexes",
sql="""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
CREATE INDEX idx_teams_slug ON teams(slug);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN
CREATE INDEX idx_teams_created_by ON teams(created_by);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id);
END IF;
END $$;
""",
),
]
with engine.connect() as conn:
# Ensure migrations tracking table exists
_ensure_migrations_table(conn)
# Get already-applied migrations
applied = _get_applied_migrations(conn)
for migration in migrations:
checksum = _compute_checksum(migration.sql)
# Check if migration was already applied
if migration.name in applied:
stored_checksum = applied[migration.name]
if stored_checksum != checksum:
logger.warning(
f"Migration '{migration.name}' has changed since it was applied! "
f"Stored checksum: {stored_checksum}, current: {checksum}"
)
continue
# Run the migration
try:
conn.execute(text(migration))
logger.info(f"Running migration: {migration.name}")
conn.execute(text(migration.sql))
conn.commit()
_record_migration(conn, migration.name, checksum)
logger.info(f"Migration '{migration.name}' applied successfully")
except Exception as e:
logger.warning(f"Migration failed (may already be applied): {e}")
conn.rollback()
if _is_safe_error(e):
# Migration was already applied (schema already exists)
logger.info(
f"Migration '{migration.name}' already applied (schema exists), recording as complete"
)
_record_migration(conn, migration.name, checksum)
else:
# Real error - fail hard
logger.error(f"Migration '{migration.name}' failed: {e}")
raise RuntimeError(
f"Migration '{migration.name}' failed with error: {e}"
) from e
def get_db() -> Generator[Session, None, None]: