from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker, Session from typing import Generator import logging from .config import get_settings from .models import Base settings = get_settings() logger = logging.getLogger(__name__) engine = create_engine(settings.database_url, pool_pre_ping=True) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def init_db(): """Create all tables and run migrations""" Base.metadata.create_all(bind=engine) # Run migrations for schema updates _run_migrations() def _run_migrations(): """Run manual migrations for schema updates""" migrations = [ # Add format_metadata column to artifacts table """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'artifacts' AND column_name = 'format_metadata' ) THEN ALTER TABLE artifacts ADD COLUMN format_metadata JSONB DEFAULT '{}'; END IF; END $$; """, ] with engine.connect() as conn: for migration in migrations: try: conn.execute(text(migration)) conn.commit() except Exception as e: logger.warning(f"Migration failed (may already be applied): {e}") def get_db() -> Generator[Session, None, None]: """Dependency for getting database sessions""" db = SessionLocal() try: yield db finally: db.close()