Add teams table, team_memberships table, and team_id column to projects in the runtime migrations. This fixes startup failures on existing databases that don't have the teams schema from the migration file.
433 lines
16 KiB
Python
433 lines
16 KiB
Python
from sqlalchemy import create_engine, text, event
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
from sqlalchemy.pool import QueuePool
|
|
from typing import Generator
|
|
from contextlib import contextmanager
|
|
import logging
|
|
import time
|
|
|
|
from .config import get_settings
|
|
from .models import Base
|
|
|
|
settings = get_settings()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Build connect_args with query timeout if configured
|
|
connect_args = {}
|
|
if settings.database_query_timeout > 0:
|
|
# PostgreSQL statement_timeout is in milliseconds
|
|
connect_args["options"] = f"-c statement_timeout={settings.database_query_timeout * 1000}"
|
|
|
|
# Create engine with connection pool configuration
|
|
engine = create_engine(
|
|
settings.database_url,
|
|
pool_pre_ping=True, # Check connection health before using
|
|
poolclass=QueuePool,
|
|
pool_size=settings.database_pool_size,
|
|
max_overflow=settings.database_max_overflow,
|
|
pool_timeout=settings.database_pool_timeout,
|
|
pool_recycle=settings.database_pool_recycle,
|
|
connect_args=connect_args,
|
|
)
|
|
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
|
|
# Connection pool monitoring
|
|
@event.listens_for(engine, "checkout")
|
|
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
|
|
"""Log when a connection is checked out from the pool"""
|
|
logger.debug(f"Connection checked out from pool: {id(dbapi_connection)}")
|
|
|
|
|
|
@event.listens_for(engine, "checkin")
|
|
def receive_checkin(dbapi_connection, connection_record):
|
|
"""Log when a connection is returned to the pool"""
|
|
logger.debug(f"Connection returned to pool: {id(dbapi_connection)}")
|
|
|
|
|
|
def get_pool_status() -> dict:
|
|
"""Get current connection pool status for monitoring"""
|
|
pool = engine.pool
|
|
return {
|
|
"pool_size": pool.size(),
|
|
"checked_out": pool.checkedout(),
|
|
"overflow": pool.overflow(),
|
|
"checked_in": pool.checkedin(),
|
|
}
|
|
|
|
|
|
def init_db():
|
|
"""Create all tables and run migrations"""
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
# Run migrations for schema updates
|
|
_run_migrations()
|
|
|
|
|
|
def _run_migrations():
|
|
"""Run manual migrations for schema updates"""
|
|
migrations = [
|
|
# Add format_metadata column to artifacts table
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_name = 'artifacts' AND column_name = 'format_metadata'
|
|
) THEN
|
|
ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}';
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Add format column to packages table
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_name = 'packages' AND column_name = 'format'
|
|
) THEN
|
|
ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL;
|
|
CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format);
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Add platform column to packages table
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_name = 'packages' AND column_name = 'platform'
|
|
) THEN
|
|
ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL;
|
|
CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform);
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Add ref_count index and constraints for artifacts
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
-- Add ref_count index
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count'
|
|
) THEN
|
|
CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count);
|
|
END IF;
|
|
|
|
-- Add ref_count >= 0 constraint
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative'
|
|
) THEN
|
|
ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0);
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Add composite indexes for packages and tags
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
-- Composite index for package lookup by project and name
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name'
|
|
) THEN
|
|
CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name);
|
|
END IF;
|
|
|
|
-- Composite index for tag lookup by package and name
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name'
|
|
) THEN
|
|
CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name);
|
|
END IF;
|
|
|
|
-- Composite index for recent tags queries
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at'
|
|
) THEN
|
|
CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at);
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Add package_versions indexes and triggers (007_package_versions.sql)
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
-- Create indexes for package_versions if table exists
|
|
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
|
|
-- Indexes for common queries
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN
|
|
CREATE INDEX idx_package_versions_package_id ON package_versions(package_id);
|
|
END IF;
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN
|
|
CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id);
|
|
END IF;
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN
|
|
CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version);
|
|
END IF;
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run)
|
|
"""
|
|
CREATE OR REPLACE FUNCTION increment_artifact_ref_count()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
|
|
RETURN NEW;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
""",
|
|
"""
|
|
CREATE OR REPLACE FUNCTION decrement_artifact_ref_count()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
|
|
RETURN OLD;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
""",
|
|
"""
|
|
CREATE OR REPLACE FUNCTION update_artifact_ref_count()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
IF OLD.artifact_id != NEW.artifact_id THEN
|
|
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
|
|
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
|
|
END IF;
|
|
RETURN NEW;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
""",
|
|
# Create triggers for tags ref_count management
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
-- Drop and recreate triggers to ensure they're current
|
|
DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags;
|
|
CREATE TRIGGER tags_ref_count_insert_trigger
|
|
AFTER INSERT ON tags
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION increment_artifact_ref_count();
|
|
|
|
DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags;
|
|
CREATE TRIGGER tags_ref_count_delete_trigger
|
|
AFTER DELETE ON tags
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION decrement_artifact_ref_count();
|
|
|
|
DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags;
|
|
CREATE TRIGGER tags_ref_count_update_trigger
|
|
AFTER UPDATE ON tags
|
|
FOR EACH ROW
|
|
WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id)
|
|
EXECUTE FUNCTION update_artifact_ref_count();
|
|
END $$;
|
|
""",
|
|
# Create ref_count trigger functions for package_versions
|
|
"""
|
|
CREATE OR REPLACE FUNCTION increment_version_ref_count()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id;
|
|
RETURN NEW;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
""",
|
|
"""
|
|
CREATE OR REPLACE FUNCTION decrement_version_ref_count()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id;
|
|
RETURN OLD;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
""",
|
|
# Create triggers for package_versions ref_count
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
|
|
-- Drop and recreate triggers to ensure they're current
|
|
DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions;
|
|
CREATE TRIGGER package_versions_ref_count_insert
|
|
AFTER INSERT ON package_versions
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION increment_version_ref_count();
|
|
|
|
DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions;
|
|
CREATE TRIGGER package_versions_ref_count_delete
|
|
AFTER DELETE ON package_versions
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION decrement_version_ref_count();
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Migrate existing semver tags to package_versions
|
|
r"""
|
|
DO $$
|
|
BEGIN
|
|
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN
|
|
-- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.)
|
|
INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at)
|
|
SELECT
|
|
t.package_id,
|
|
t.artifact_id,
|
|
CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END,
|
|
'migrated_from_tag',
|
|
t.created_by,
|
|
t.created_at
|
|
FROM tags t
|
|
WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$'
|
|
ON CONFLICT (package_id, version) DO NOTHING;
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
# Teams and multi-tenancy migration (009_teams.sql)
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS teams (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
name VARCHAR(255) NOT NULL,
|
|
slug VARCHAR(255) NOT NULL UNIQUE,
|
|
description TEXT,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
created_by VARCHAR(255) NOT NULL,
|
|
settings JSONB DEFAULT '{}'
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS team_memberships (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
|
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
role VARCHAR(50) NOT NULL DEFAULT 'member',
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
invited_by VARCHAR(255),
|
|
CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id),
|
|
CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member'))
|
|
);
|
|
""",
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_name = 'projects' AND column_name = 'team_id'
|
|
) THEN
|
|
ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL;
|
|
CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id);
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN
|
|
CREATE INDEX idx_teams_slug ON teams(slug);
|
|
END IF;
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN
|
|
CREATE INDEX idx_teams_created_by ON teams(created_by);
|
|
END IF;
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN
|
|
CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id);
|
|
END IF;
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN
|
|
CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id);
|
|
END IF;
|
|
END $$;
|
|
""",
|
|
]
|
|
|
|
with engine.connect() as conn:
|
|
for migration in migrations:
|
|
try:
|
|
conn.execute(text(migration))
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.warning(f"Migration failed (may already be applied): {e}")
|
|
|
|
|
|
def get_db() -> Generator[Session, None, None]:
|
|
"""Dependency for getting database sessions"""
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@contextmanager
|
|
def transaction(db: Session):
|
|
"""
|
|
Context manager for explicit transaction management with savepoint support.
|
|
|
|
Usage:
|
|
with transaction(db):
|
|
# operations here
|
|
# automatically commits on success, rolls back on exception
|
|
"""
|
|
try:
|
|
yield db
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
raise
|
|
|
|
|
|
@contextmanager
|
|
def savepoint(db: Session, name: str = None):
|
|
"""
|
|
Create a savepoint for partial rollback support.
|
|
|
|
Usage:
|
|
with savepoint(db, "my_savepoint"):
|
|
# operations here
|
|
# rolls back to savepoint on exception, but doesn't rollback whole transaction
|
|
"""
|
|
savepoint_obj = db.begin_nested()
|
|
try:
|
|
yield savepoint_obj
|
|
savepoint_obj.commit()
|
|
except Exception:
|
|
savepoint_obj.rollback()
|
|
raise
|
|
|
|
|
|
def retry_on_deadlock(func, max_retries: int = 3, delay: float = 0.1):
|
|
"""
|
|
Decorator/wrapper to retry operations on deadlock detection.
|
|
|
|
Usage:
|
|
@retry_on_deadlock
|
|
def my_operation(db):
|
|
...
|
|
|
|
Or:
|
|
retry_on_deadlock(lambda: my_operation(db))()
|
|
"""
|
|
import functools
|
|
from sqlalchemy.exc import OperationalError
|
|
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
last_exception = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except OperationalError as e:
|
|
# Check for deadlock error codes (PostgreSQL: 40P01, MySQL: 1213)
|
|
error_str = str(e).lower()
|
|
if "deadlock" in error_str or "40p01" in error_str:
|
|
last_exception = e
|
|
logger.warning(f"Deadlock detected, retrying (attempt {attempt + 1}/{max_retries})")
|
|
time.sleep(delay * (attempt + 1)) # Exponential backoff
|
|
else:
|
|
raise
|
|
raise last_exception
|
|
|
|
return wrapper
|