Compare commits
2 Commits
fix/pypi-p
...
fix/migrat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9df50d0963 | ||
|
|
c60a7ba1ab |
@@ -1,10 +1,11 @@
|
||||
from sqlalchemy import create_engine, text, event
|
||||
from sqlalchemy.orm import sessionmaker, Session
|
||||
from sqlalchemy.pool import QueuePool
|
||||
from typing import Generator
|
||||
from typing import Generator, NamedTuple
|
||||
from contextlib import contextmanager
|
||||
import logging
|
||||
import time
|
||||
import hashlib
|
||||
|
||||
from .config import get_settings
|
||||
from .models import Base
|
||||
@@ -12,6 +13,21 @@ from .models import Base
|
||||
settings = get_settings()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Migration(NamedTuple):
|
||||
"""A database migration with a unique name and SQL to execute."""
|
||||
name: str
|
||||
sql: str
|
||||
|
||||
|
||||
# PostgreSQL error codes that indicate "already exists" - safe to skip
|
||||
SAFE_PG_ERROR_CODES = {
|
||||
"42P07", # duplicate_table
|
||||
"42701", # duplicate_column
|
||||
"42710", # duplicate_object (index, constraint, etc.)
|
||||
"42P16", # invalid_table_definition (e.g., column already exists)
|
||||
}
|
||||
|
||||
# Build connect_args with query timeout if configured
|
||||
connect_args = {}
|
||||
if settings.database_query_timeout > 0:
|
||||
@@ -65,11 +81,65 @@ def init_db():
|
||||
_run_migrations()
|
||||
|
||||
|
||||
def _ensure_migrations_table(conn) -> None:
|
||||
"""Create the migrations tracking table if it doesn't exist."""
|
||||
conn.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS _schema_migrations (
|
||||
name VARCHAR(255) PRIMARY KEY,
|
||||
checksum VARCHAR(64) NOT NULL,
|
||||
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
);
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
|
||||
def _get_applied_migrations(conn) -> dict[str, str]:
|
||||
"""Get all applied migrations and their checksums."""
|
||||
result = conn.execute(text(
|
||||
"SELECT name, checksum FROM _schema_migrations"
|
||||
))
|
||||
return {row[0]: row[1] for row in result}
|
||||
|
||||
|
||||
def _compute_checksum(sql: str) -> str:
|
||||
"""Compute a checksum for migration SQL to detect changes."""
|
||||
return hashlib.sha256(sql.strip().encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
def _is_safe_error(exception: Exception) -> bool:
|
||||
"""Check if the error indicates the migration was already applied."""
|
||||
# Check for psycopg2 errors with pgcode attribute
|
||||
original = getattr(exception, "orig", None)
|
||||
if original is not None:
|
||||
pgcode = getattr(original, "pgcode", None)
|
||||
if pgcode in SAFE_PG_ERROR_CODES:
|
||||
return True
|
||||
|
||||
# Fallback: check error message for common "already exists" patterns
|
||||
error_str = str(exception).lower()
|
||||
safe_patterns = [
|
||||
"already exists",
|
||||
"duplicate key",
|
||||
"relation .* already exists",
|
||||
"column .* already exists",
|
||||
]
|
||||
return any(pattern in error_str for pattern in safe_patterns)
|
||||
|
||||
|
||||
def _record_migration(conn, name: str, checksum: str) -> None:
|
||||
"""Record a migration as applied."""
|
||||
conn.execute(text(
|
||||
"INSERT INTO _schema_migrations (name, checksum) VALUES (:name, :checksum)"
|
||||
), {"name": name, "checksum": checksum})
|
||||
conn.commit()
|
||||
|
||||
|
||||
def _run_migrations():
|
||||
"""Run manual migrations for schema updates"""
|
||||
"""Run manual migrations for schema updates with tracking and error detection."""
|
||||
migrations = [
|
||||
# Add format_metadata column to artifacts table
|
||||
"""
|
||||
Migration(
|
||||
name="001_add_format_metadata",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
@@ -80,8 +150,10 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Add format column to packages table
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="002_add_package_format",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
@@ -93,8 +165,10 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Add platform column to packages table
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="003_add_package_platform",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
@@ -106,18 +180,18 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Add ref_count index and constraints for artifacts
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="004_add_ref_count_index_constraint",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
-- Add ref_count index
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
|
||||
) THEN
|
||||
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
|
||||
END IF;
|
||||
|
||||
-- Add ref_count >= 0 constraint
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
|
||||
) THEN
|
||||
@@ -125,25 +199,24 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Add composite indexes for packages and tags
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="005_add_composite_indexes",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
-- Composite index for package lookup by project and name
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
|
||||
) THEN
|
||||
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
|
||||
END IF;
|
||||
|
||||
-- Composite index for tag lookup by package and name
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
|
||||
) THEN
|
||||
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
|
||||
END IF;
|
||||
|
||||
-- Composite index for recent tags queries
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
|
||||
) THEN
|
||||
@@ -151,13 +224,13 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Add package_versions indexes and triggers (007_package_versions.sql)
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="006_add_package_versions_indexes",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
-- Create indexes for package_versions if table exists
|
||||
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
|
||||
-- Indexes for common queries
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
|
||||
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
|
||||
END IF;
|
||||
@@ -170,8 +243,10 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run)
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="007_create_ref_count_trigger_functions",
|
||||
sql="""
|
||||
CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
@@ -179,8 +254,7 @@ def _run_migrations():
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""",
|
||||
"""
|
||||
|
||||
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
@@ -188,8 +262,7 @@ def _run_migrations():
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""",
|
||||
"""
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
@@ -201,11 +274,12 @@ def _run_migrations():
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""",
|
||||
# Create triggers for tags ref_count management
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="008_create_tags_ref_count_triggers",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
-- Drop and recreate triggers to ensure they're current
|
||||
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
|
||||
CREATE TRIGGER tags_ref_count_insert_trigger
|
||||
AFTER INSERT ON tags
|
||||
@@ -226,8 +300,10 @@ def _run_migrations():
|
||||
EXECUTE FUNCTION update_artifact_ref_count();
|
||||
END $$;
|
||||
""",
|
||||
# Create ref_count trigger functions for package_versions
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="009_create_version_ref_count_functions",
|
||||
sql="""
|
||||
CREATE OR REPLACE FUNCTION increment_version_ref_count()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
@@ -235,8 +311,7 @@ def _run_migrations():
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""",
|
||||
"""
|
||||
|
||||
CREATE OR REPLACE FUNCTION decrement_version_ref_count()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
@@ -245,12 +320,13 @@ def _run_migrations():
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""",
|
||||
# Create triggers for package_versions ref_count
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="010_create_package_versions_triggers",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
|
||||
-- Drop and recreate triggers to ensure they're current
|
||||
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
|
||||
CREATE TRIGGER package_versions_ref_count_insert
|
||||
AFTER INSERT ON package_versions
|
||||
@@ -265,14 +341,16 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Migrate existing semver tags to package_versions
|
||||
r"""
|
||||
),
|
||||
Migration(
|
||||
name="011_migrate_semver_tags_to_versions",
|
||||
sql=r"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
|
||||
-- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.)
|
||||
INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at)
|
||||
INSERT INTO package_versions (id, package_id, artifact_id, version, version_source, created_by, created_at)
|
||||
SELECT
|
||||
gen_random_uuid(),
|
||||
t.package_id,
|
||||
t.artifact_id,
|
||||
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
|
||||
@@ -285,8 +363,10 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
# Teams and multi-tenancy migration (009_teams.sql)
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="012_create_teams_table",
|
||||
sql="""
|
||||
CREATE TABLE IF NOT EXISTS teams (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
name VARCHAR(255) NOT NULL,
|
||||
@@ -298,7 +378,10 @@ def _run_migrations():
|
||||
settings JSONB DEFAULT '{}'
|
||||
);
|
||||
""",
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="013_create_team_memberships_table",
|
||||
sql="""
|
||||
CREATE TABLE IF NOT EXISTS team_memberships (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
@@ -310,7 +393,10 @@ def _run_migrations():
|
||||
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
|
||||
);
|
||||
""",
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="014_add_team_id_to_projects",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
@@ -322,7 +408,10 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
"""
|
||||
),
|
||||
Migration(
|
||||
name="015_add_teams_indexes",
|
||||
sql="""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
|
||||
@@ -339,15 +428,50 @@ def _run_migrations():
|
||||
END IF;
|
||||
END $$;
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
with engine.connect() as conn:
|
||||
# Ensure migrations tracking table exists
|
||||
_ensure_migrations_table(conn)
|
||||
|
||||
# Get already-applied migrations
|
||||
applied = _get_applied_migrations(conn)
|
||||
|
||||
for migration in migrations:
|
||||
checksum = _compute_checksum(migration.sql)
|
||||
|
||||
# Check if migration was already applied
|
||||
if migration.name in applied:
|
||||
stored_checksum = applied[migration.name]
|
||||
if stored_checksum != checksum:
|
||||
logger.warning(
|
||||
f"Migration '{migration.name}' has changed since it was applied! "
|
||||
f"Stored checksum: {stored_checksum}, current: {checksum}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Run the migration
|
||||
try:
|
||||
conn.execute(text(migration))
|
||||
logger.info(f"Running migration: {migration.name}")
|
||||
conn.execute(text(migration.sql))
|
||||
conn.commit()
|
||||
_record_migration(conn, migration.name, checksum)
|
||||
logger.info(f"Migration '{migration.name}' applied successfully")
|
||||
except Exception as e:
|
||||
logger.warning(f"Migration failed (may already be applied): {e}")
|
||||
conn.rollback()
|
||||
if _is_safe_error(e):
|
||||
# Migration was already applied (schema already exists)
|
||||
logger.info(
|
||||
f"Migration '{migration.name}' already applied (schema exists), recording as complete"
|
||||
)
|
||||
_record_migration(conn, migration.name, checksum)
|
||||
else:
|
||||
# Real error - fail hard
|
||||
logger.error(f"Migration '{migration.name}' failed: {e}")
|
||||
raise RuntimeError(
|
||||
f"Migration '{migration.name}' failed with error: {e}"
|
||||
) from e
|
||||
|
||||
|
||||
def get_db() -> Generator[Session, None, None]:
|
||||
|
||||
Reference in New Issue
Block a user