from sqlalchemy import create_engine, text, event from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.pool import QueuePool from typing import Generator from contextlib import contextmanager import logging import time from .config import get_settings from .models import Base settings = get_settings() logger = logging.getLogger(__name__) # Build connect_args with query timeout if configured connect_args = {} if settings.database_query_timeout > 0: # PostgreSQL statement_timeout is in milliseconds connect_args["options"] = f"-c statement_timeout={settings.database_query_timeout * 1000}" # Create engine with connection pool configuration engine = create_engine( settings.database_url, pool_pre_ping=True, # Check connection health before using poolclass=QueuePool, pool_size=settings.database_pool_size, max_overflow=settings.database_max_overflow, pool_timeout=settings.database_pool_timeout, pool_recycle=settings.database_pool_recycle, connect_args=connect_args, ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Connection pool monitoring @event.listens_for(engine, "checkout") def receive_checkout(dbapi_connection, connection_record, connection_proxy): """Log when a connection is checked out from the pool""" logger.debug(f"Connection checked out from pool: {id(dbapi_connection)}") @event.listens_for(engine, "checkin") def receive_checkin(dbapi_connection, connection_record): """Log when a connection is returned to the pool""" logger.debug(f"Connection returned to pool: {id(dbapi_connection)}") def get_pool_status() -> dict: """Get current connection pool status for monitoring""" pool = engine.pool return { "pool_size": pool.size(), "checked_out": pool.checkedout(), "overflow": pool.overflow(), "checked_in": pool.checkedin(), } def init_db(): """Create all tables and run migrations""" Base.metadata.create_all(bind=engine) # Run migrations for schema updates _run_migrations() def _run_migrations(): """Run manual migrations for schema updates""" migrations = [ # Add format_metadata column to artifacts table """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'artifacts' AND column_name = 'format_metadata' ) THEN ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}'; END IF; END $$; """, # Add format column to packages table """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'packages' AND column_name = 'format' ) THEN ALTER TABLE packages ADD COLUMN format VARCHAR(50) DEFAULT 'generic' NOT NULL; CREATE INDEX IF NOT EXISTS idx_packages_format ON packages(format); END IF; END $$; """, # Add platform column to packages table """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'packages' AND column_name = 'platform' ) THEN ALTER TABLE packages ADD COLUMN platform VARCHAR(50) DEFAULT 'any' NOT NULL; CREATE INDEX IF NOT EXISTS idx_packages_platform ON packages(platform); END IF; END $$; """, # Add ref_count index and constraints for artifacts """ DO $$ BEGIN -- Add ref_count index IF NOT EXISTS ( SELECT 1 FROM pg_indexes WHERE indexname = 'idx_artifacts_ref_count' ) THEN CREATE INDEX idx_artifacts_ref_count ON artifacts(ref_count); END IF; -- Add ref_count >= 0 constraint IF NOT EXISTS ( SELECT 1 FROM pg_constraint WHERE conname = 'check_ref_count_non_negative' ) THEN ALTER TABLE artifacts ADD CONSTRAINT check_ref_count_non_negative CHECK (ref_count >= 0); END IF; END $$; """, # Add composite indexes for packages and tags """ DO $$ BEGIN -- Composite index for package lookup by project and name IF NOT EXISTS ( SELECT 1 FROM pg_indexes WHERE indexname = 'idx_packages_project_name' ) THEN CREATE UNIQUE INDEX idx_packages_project_name ON packages(project_id, name); END IF; -- Composite index for tag lookup by package and name IF NOT EXISTS ( SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_name' ) THEN CREATE UNIQUE INDEX idx_tags_package_name ON tags(package_id, name); END IF; -- Composite index for recent tags queries IF NOT EXISTS ( SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tags_package_created_at' ) THEN CREATE INDEX idx_tags_package_created_at ON tags(package_id, created_at); END IF; END $$; """, # Add package_versions indexes and triggers (007_package_versions.sql) """ DO $$ BEGIN -- Create indexes for package_versions if table exists IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN -- Indexes for common queries IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_id') THEN CREATE INDEX idx_package_versions_package_id ON package_versions(package_id); END IF; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_artifact_id') THEN CREATE INDEX idx_package_versions_artifact_id ON package_versions(artifact_id); END IF; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_package_versions_package_version') THEN CREATE INDEX idx_package_versions_package_version ON package_versions(package_id, version); END IF; END IF; END $$; """, # Create ref_count trigger functions for tags (ensures triggers exist even if initial migration wasn't run) """ CREATE OR REPLACE FUNCTION increment_artifact_ref_count() RETURNS TRIGGER AS $$ BEGIN UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; RETURN NEW; END; $$ LANGUAGE plpgsql; """, """ CREATE OR REPLACE FUNCTION decrement_artifact_ref_count() RETURNS TRIGGER AS $$ BEGIN UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; RETURN OLD; END; $$ LANGUAGE plpgsql; """, """ CREATE OR REPLACE FUNCTION update_artifact_ref_count() RETURNS TRIGGER AS $$ BEGIN IF OLD.artifact_id != NEW.artifact_id THEN UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; """, # Create triggers for tags ref_count management """ DO $$ BEGIN -- Drop and recreate triggers to ensure they're current DROP TRIGGER IF EXISTS tags_ref_count_insert_trigger ON tags; CREATE TRIGGER tags_ref_count_insert_trigger AFTER INSERT ON tags FOR EACH ROW EXECUTE FUNCTION increment_artifact_ref_count(); DROP TRIGGER IF EXISTS tags_ref_count_delete_trigger ON tags; CREATE TRIGGER tags_ref_count_delete_trigger AFTER DELETE ON tags FOR EACH ROW EXECUTE FUNCTION decrement_artifact_ref_count(); DROP TRIGGER IF EXISTS tags_ref_count_update_trigger ON tags; CREATE TRIGGER tags_ref_count_update_trigger AFTER UPDATE ON tags FOR EACH ROW WHEN (OLD.artifact_id IS DISTINCT FROM NEW.artifact_id) EXECUTE FUNCTION update_artifact_ref_count(); END $$; """, # Create ref_count trigger functions for package_versions """ CREATE OR REPLACE FUNCTION increment_version_ref_count() RETURNS TRIGGER AS $$ BEGIN UPDATE artifacts SET ref_count = ref_count + 1 WHERE id = NEW.artifact_id; RETURN NEW; END; $$ LANGUAGE plpgsql; """, """ CREATE OR REPLACE FUNCTION decrement_version_ref_count() RETURNS TRIGGER AS $$ BEGIN UPDATE artifacts SET ref_count = ref_count - 1 WHERE id = OLD.artifact_id; RETURN OLD; END; $$ LANGUAGE plpgsql; """, # Create triggers for package_versions ref_count """ DO $$ BEGIN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN -- Drop and recreate triggers to ensure they're current DROP TRIGGER IF EXISTS package_versions_ref_count_insert ON package_versions; CREATE TRIGGER package_versions_ref_count_insert AFTER INSERT ON package_versions FOR EACH ROW EXECUTE FUNCTION increment_version_ref_count(); DROP TRIGGER IF EXISTS package_versions_ref_count_delete ON package_versions; CREATE TRIGGER package_versions_ref_count_delete AFTER DELETE ON package_versions FOR EACH ROW EXECUTE FUNCTION decrement_version_ref_count(); END IF; END $$; """, # Migrate existing semver tags to package_versions r""" DO $$ BEGIN IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'package_versions') THEN -- Migrate tags that look like versions (v1.0.0, 1.2.3, 2.0.0-beta, etc.) INSERT INTO package_versions (package_id, artifact_id, version, version_source, created_by, created_at) SELECT t.package_id, t.artifact_id, CASE WHEN t.name LIKE 'v%' THEN substring(t.name from 2) ELSE t.name END, 'migrated_from_tag', t.created_by, t.created_at FROM tags t WHERE t.name ~ '^v?[0-9]+\.[0-9]+(\.[0-9]+)?([-.][a-zA-Z0-9]+)?$' ON CONFLICT (package_id, version) DO NOTHING; END IF; END $$; """, # Teams and multi-tenancy migration (009_teams.sql) """ CREATE TABLE IF NOT EXISTS teams ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(255) NOT NULL, slug VARCHAR(255) NOT NULL UNIQUE, description TEXT, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), created_by VARCHAR(255) NOT NULL, settings JSONB DEFAULT '{}' ); """, """ CREATE TABLE IF NOT EXISTS team_memberships ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE, user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, role VARCHAR(50) NOT NULL DEFAULT 'member', created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), invited_by VARCHAR(255), CONSTRAINT team_memberships_unique UNIQUE (team_id, user_id), CONSTRAINT team_memberships_role_check CHECK (role IN ('owner', 'admin', 'member')) ); """, """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'projects' AND column_name = 'team_id' ) THEN ALTER TABLE projects ADD COLUMN team_id UUID REFERENCES teams(id) ON DELETE SET NULL; CREATE INDEX IF NOT EXISTS idx_projects_team_id ON projects(team_id); END IF; END $$; """, """ DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_slug') THEN CREATE INDEX idx_teams_slug ON teams(slug); END IF; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_teams_created_by') THEN CREATE INDEX idx_teams_created_by ON teams(created_by); END IF; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_team_id') THEN CREATE INDEX idx_team_memberships_team_id ON team_memberships(team_id); END IF; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_team_memberships_user_id') THEN CREATE INDEX idx_team_memberships_user_id ON team_memberships(user_id); END IF; END $$; """, ] with engine.connect() as conn: for migration in migrations: try: conn.execute(text(migration)) conn.commit() except Exception as e: logger.warning(f"Migration failed (may already be applied): {e}") def get_db() -> Generator[Session, None, None]: """Dependency for getting database sessions""" db = SessionLocal() try: yield db finally: db.close() @contextmanager def transaction(db: Session): """ Context manager for explicit transaction management with savepoint support. Usage: with transaction(db): # operations here # automatically commits on success, rolls back on exception """ try: yield db db.commit() except Exception: db.rollback() raise @contextmanager def savepoint(db: Session, name: str = None): """ Create a savepoint for partial rollback support. Usage: with savepoint(db, "my_savepoint"): # operations here # rolls back to savepoint on exception, but doesn't rollback whole transaction """ savepoint_obj = db.begin_nested() try: yield savepoint_obj savepoint_obj.commit() except Exception: savepoint_obj.rollback() raise def retry_on_deadlock(func, max_retries: int = 3, delay: float = 0.1): """ Decorator/wrapper to retry operations on deadlock detection. Usage: @retry_on_deadlock def my_operation(db): ... Or: retry_on_deadlock(lambda: my_operation(db))() """ import functools from sqlalchemy.exc import OperationalError @functools.wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries): try: return func(*args, **kwargs) except OperationalError as e: # Check for deadlock error codes (PostgreSQL: 40P01, MySQL: 1213) error_str = str(e).lower() if "deadlock" in error_str or "40p01" in error_str: last_exception = e logger.warning(f"Deadlock detected, retrying (attempt {attempt + 1}/{max_retries})") time.sleep(delay * (attempt + 1)) # Exponential backoff else: raise raise last_exception return wrapper