From 996e3ee4ce717147a2e5b6bf3147639947ad441c Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Tue, 27 Jan 2026 21:14:38 +0000 Subject: [PATCH] Fix concurrent upload race conditions and bump dev resources - Add IntegrityError handling for concurrent artifact creation - Add IntegrityError handling for concurrent tag creation with retry - Add with_for_update() lock on tag lookup to prevent races - Add database pool config env vars to Helm deployment template - Bump dev environment resources (256Mi -> 512Mi memory for all services) - Increase database pool settings for dev (10 connections, 20 overflow) --- backend/app/routes.py | 41 ++++++++++++++++++++++++-- helm/orchard/templates/deployment.yaml | 12 ++++++++ helm/orchard/values-dev.yaml | 37 +++++++++++++---------- 3 files changed, 72 insertions(+), 18 deletions(-) diff --git a/backend/app/routes.py b/backend/app/routes.py index 68121b2..afc65b9 100644 --- a/backend/app/routes.py +++ b/backend/app/routes.py @@ -406,14 +406,21 @@ def _create_or_update_tag( """ Create or update a tag, handling ref_count and history. + Uses SELECT FOR UPDATE to prevent race conditions during concurrent uploads. + Returns: tuple of (tag, is_new, old_artifact_id) - tag: The created/updated Tag object - is_new: True if tag was created, False if updated - old_artifact_id: Previous artifact_id if tag was updated, None otherwise """ + # Use with_for_update() to lock the row and prevent race conditions + # during concurrent uploads to the same tag existing_tag = ( - db.query(Tag).filter(Tag.package_id == package_id, Tag.name == tag_name).first() + db.query(Tag) + .filter(Tag.package_id == package_id, Tag.name == tag_name) + .with_for_update() + .first() ) if existing_tag: @@ -447,7 +454,9 @@ def _create_or_update_tag( # Same artifact, no change needed return existing_tag, False, None else: - # Create new tag + # Create new tag with race condition handling + from sqlalchemy.exc import IntegrityError + new_tag = Tag( package_id=package_id, name=tag_name, @@ -455,7 +464,15 @@ def _create_or_update_tag( created_by=user_id, ) db.add(new_tag) - db.flush() # Get the tag ID + + try: + db.flush() # Get the tag ID - may fail if concurrent insert happened + except IntegrityError: + # Another request created the tag concurrently + # Rollback the failed insert and retry as update + db.rollback() + logger.info(f"Tag '{tag_name}' created concurrently, retrying as update") + return _create_or_update_tag(db, package_id, tag_name, new_artifact_id, user_id) # Record history for creation history = TagHistory( @@ -2608,6 +2625,8 @@ def upload_artifact( # Create new artifact with ref_count=0 # NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE # When a tag is created for this artifact, the trigger will increment ref_count + from sqlalchemy.exc import IntegrityError + artifact = Artifact( id=storage_result.sha256, size=storage_result.size, @@ -2623,6 +2642,22 @@ def upload_artifact( ) db.add(artifact) + # Flush to detect concurrent artifact creation race condition + try: + db.flush() + except IntegrityError: + # Another request created the artifact concurrently - fetch and use it + db.rollback() + logger.info(f"Artifact {storage_result.sha256[:12]}... created concurrently, fetching existing") + artifact = db.query(Artifact).filter(Artifact.id == storage_result.sha256).first() + if not artifact: + raise HTTPException( + status_code=500, + detail="Failed to create or fetch artifact record", + ) + deduplicated = True + saved_bytes = storage_result.size + # Calculate upload duration duration_ms = int((time.time() - start_time) * 1000) diff --git a/helm/orchard/templates/deployment.yaml b/helm/orchard/templates/deployment.yaml index ef5376c..582c738 100644 --- a/helm/orchard/templates/deployment.yaml +++ b/helm/orchard/templates/deployment.yaml @@ -128,6 +128,18 @@ spec: value: {{ .Values.orchard.rateLimit.login | quote }} {{- end }} {{- end }} + {{- if .Values.orchard.database.poolSize }} + - name: ORCHARD_DATABASE_POOL_SIZE + value: {{ .Values.orchard.database.poolSize | quote }} + {{- end }} + {{- if .Values.orchard.database.maxOverflow }} + - name: ORCHARD_DATABASE_MAX_OVERFLOW + value: {{ .Values.orchard.database.maxOverflow | quote }} + {{- end }} + {{- if .Values.orchard.database.poolTimeout }} + - name: ORCHARD_DATABASE_POOL_TIMEOUT + value: {{ .Values.orchard.database.poolTimeout | quote }} + {{- end }} {{- if .Values.orchard.auth }} {{- if or .Values.orchard.auth.secretsManager .Values.orchard.auth.existingSecret .Values.orchard.auth.adminPassword }} - name: ORCHARD_ADMIN_PASSWORD diff --git a/helm/orchard/values-dev.yaml b/helm/orchard/values-dev.yaml index 8aafb1d..d21257e 100644 --- a/helm/orchard/values-dev.yaml +++ b/helm/orchard/values-dev.yaml @@ -53,15 +53,16 @@ ingress: hosts: - orchard-dev.common.global.bsf.tools # Overridden by CI -# Lighter resources for ephemeral environments +# Resources for dev/feature environments +# Bumped to handle concurrent integration tests # Note: memory requests must equal limits per cluster policy resources: limits: - cpu: 250m - memory: 256Mi + cpu: 500m + memory: 512Mi requests: - cpu: 100m - memory: 256Mi + cpu: 200m + memory: 512Mi livenessProbe: httpGet: @@ -103,6 +104,10 @@ orchard: sslmode: disable existingSecret: "" existingSecretPasswordKey: "password" + # Increased pool settings for concurrent integration tests + poolSize: 10 + maxOverflow: 20 + poolTimeout: 60 s3: endpoint: "" @@ -138,15 +143,16 @@ postgresql: primary: persistence: enabled: false - # Resources with memory requests = limits per cluster policy + # Bumped resources for concurrent integration tests + # Note: memory requests must equal limits per cluster policy resourcesPreset: "none" resources: limits: - cpu: 250m - memory: 256Mi + cpu: 500m + memory: 512Mi requests: - cpu: 100m - memory: 256Mi + cpu: 200m + memory: 512Mi # Volume permissions init container volumePermissions: resourcesPreset: "none" @@ -172,15 +178,16 @@ minio: defaultBuckets: "orchard-artifacts" persistence: enabled: false - # Resources with memory requests = limits per cluster policy + # Bumped resources for concurrent integration tests + # Note: memory requests must equal limits per cluster policy resourcesPreset: "none" # Disable preset to use explicit resources resources: limits: - cpu: 250m - memory: 256Mi + cpu: 500m + memory: 512Mi requests: - cpu: 100m - memory: 256Mi + cpu: 200m + memory: 512Mi # Init container resources defaultInitContainers: volumePermissions: