Fix concurrent upload race conditions and bump dev resources

- Add IntegrityError handling for concurrent artifact creation
- Add IntegrityError handling for concurrent tag creation with retry
- Add with_for_update() lock on tag lookup to prevent races
- Add database pool config env vars to Helm deployment template
- Bump dev environment resources (256Mi -> 512Mi memory for all services)
- Increase database pool settings for dev (10 connections, 20 overflow)
This commit is contained in:
Mondo Diaz
2026-01-27 21:14:38 +00:00
parent aa853b5b32
commit 996e3ee4ce
3 changed files with 72 additions and 18 deletions

View File

@@ -406,14 +406,21 @@ def _create_or_update_tag(
""" """
Create or update a tag, handling ref_count and history. Create or update a tag, handling ref_count and history.
Uses SELECT FOR UPDATE to prevent race conditions during concurrent uploads.
Returns: Returns:
tuple of (tag, is_new, old_artifact_id) tuple of (tag, is_new, old_artifact_id)
- tag: The created/updated Tag object - tag: The created/updated Tag object
- is_new: True if tag was created, False if updated - is_new: True if tag was created, False if updated
- old_artifact_id: Previous artifact_id if tag was updated, None otherwise - old_artifact_id: Previous artifact_id if tag was updated, None otherwise
""" """
# Use with_for_update() to lock the row and prevent race conditions
# during concurrent uploads to the same tag
existing_tag = ( existing_tag = (
db.query(Tag).filter(Tag.package_id == package_id, Tag.name == tag_name).first() db.query(Tag)
.filter(Tag.package_id == package_id, Tag.name == tag_name)
.with_for_update()
.first()
) )
if existing_tag: if existing_tag:
@@ -447,7 +454,9 @@ def _create_or_update_tag(
# Same artifact, no change needed # Same artifact, no change needed
return existing_tag, False, None return existing_tag, False, None
else: else:
# Create new tag # Create new tag with race condition handling
from sqlalchemy.exc import IntegrityError
new_tag = Tag( new_tag = Tag(
package_id=package_id, package_id=package_id,
name=tag_name, name=tag_name,
@@ -455,7 +464,15 @@ def _create_or_update_tag(
created_by=user_id, created_by=user_id,
) )
db.add(new_tag) db.add(new_tag)
db.flush() # Get the tag ID
try:
db.flush() # Get the tag ID - may fail if concurrent insert happened
except IntegrityError:
# Another request created the tag concurrently
# Rollback the failed insert and retry as update
db.rollback()
logger.info(f"Tag '{tag_name}' created concurrently, retrying as update")
return _create_or_update_tag(db, package_id, tag_name, new_artifact_id, user_id)
# Record history for creation # Record history for creation
history = TagHistory( history = TagHistory(
@@ -2608,6 +2625,8 @@ def upload_artifact(
# Create new artifact with ref_count=0 # Create new artifact with ref_count=0
# NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE # NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE
# When a tag is created for this artifact, the trigger will increment ref_count # When a tag is created for this artifact, the trigger will increment ref_count
from sqlalchemy.exc import IntegrityError
artifact = Artifact( artifact = Artifact(
id=storage_result.sha256, id=storage_result.sha256,
size=storage_result.size, size=storage_result.size,
@@ -2623,6 +2642,22 @@ def upload_artifact(
) )
db.add(artifact) db.add(artifact)
# Flush to detect concurrent artifact creation race condition
try:
db.flush()
except IntegrityError:
# Another request created the artifact concurrently - fetch and use it
db.rollback()
logger.info(f"Artifact {storage_result.sha256[:12]}... created concurrently, fetching existing")
artifact = db.query(Artifact).filter(Artifact.id == storage_result.sha256).first()
if not artifact:
raise HTTPException(
status_code=500,
detail="Failed to create or fetch artifact record",
)
deduplicated = True
saved_bytes = storage_result.size
# Calculate upload duration # Calculate upload duration
duration_ms = int((time.time() - start_time) * 1000) duration_ms = int((time.time() - start_time) * 1000)

View File

@@ -128,6 +128,18 @@ spec:
value: {{ .Values.orchard.rateLimit.login | quote }} value: {{ .Values.orchard.rateLimit.login | quote }}
{{- end }} {{- end }}
{{- end }} {{- end }}
{{- if .Values.orchard.database.poolSize }}
- name: ORCHARD_DATABASE_POOL_SIZE
value: {{ .Values.orchard.database.poolSize | quote }}
{{- end }}
{{- if .Values.orchard.database.maxOverflow }}
- name: ORCHARD_DATABASE_MAX_OVERFLOW
value: {{ .Values.orchard.database.maxOverflow | quote }}
{{- end }}
{{- if .Values.orchard.database.poolTimeout }}
- name: ORCHARD_DATABASE_POOL_TIMEOUT
value: {{ .Values.orchard.database.poolTimeout | quote }}
{{- end }}
{{- if .Values.orchard.auth }} {{- if .Values.orchard.auth }}
{{- if or .Values.orchard.auth.secretsManager .Values.orchard.auth.existingSecret .Values.orchard.auth.adminPassword }} {{- if or .Values.orchard.auth.secretsManager .Values.orchard.auth.existingSecret .Values.orchard.auth.adminPassword }}
- name: ORCHARD_ADMIN_PASSWORD - name: ORCHARD_ADMIN_PASSWORD

View File

@@ -53,15 +53,16 @@ ingress:
hosts: hosts:
- orchard-dev.common.global.bsf.tools # Overridden by CI - orchard-dev.common.global.bsf.tools # Overridden by CI
# Lighter resources for ephemeral environments # Resources for dev/feature environments
# Bumped to handle concurrent integration tests
# Note: memory requests must equal limits per cluster policy # Note: memory requests must equal limits per cluster policy
resources: resources:
limits: limits:
cpu: 250m cpu: 500m
memory: 256Mi memory: 512Mi
requests: requests:
cpu: 100m cpu: 200m
memory: 256Mi memory: 512Mi
livenessProbe: livenessProbe:
httpGet: httpGet:
@@ -103,6 +104,10 @@ orchard:
sslmode: disable sslmode: disable
existingSecret: "" existingSecret: ""
existingSecretPasswordKey: "password" existingSecretPasswordKey: "password"
# Increased pool settings for concurrent integration tests
poolSize: 10
maxOverflow: 20
poolTimeout: 60
s3: s3:
endpoint: "" endpoint: ""
@@ -138,15 +143,16 @@ postgresql:
primary: primary:
persistence: persistence:
enabled: false enabled: false
# Resources with memory requests = limits per cluster policy # Bumped resources for concurrent integration tests
# Note: memory requests must equal limits per cluster policy
resourcesPreset: "none" resourcesPreset: "none"
resources: resources:
limits: limits:
cpu: 250m cpu: 500m
memory: 256Mi memory: 512Mi
requests: requests:
cpu: 100m cpu: 200m
memory: 256Mi memory: 512Mi
# Volume permissions init container # Volume permissions init container
volumePermissions: volumePermissions:
resourcesPreset: "none" resourcesPreset: "none"
@@ -172,15 +178,16 @@ minio:
defaultBuckets: "orchard-artifacts" defaultBuckets: "orchard-artifacts"
persistence: persistence:
enabled: false enabled: false
# Resources with memory requests = limits per cluster policy # Bumped resources for concurrent integration tests
# Note: memory requests must equal limits per cluster policy
resourcesPreset: "none" # Disable preset to use explicit resources resourcesPreset: "none" # Disable preset to use explicit resources
resources: resources:
limits: limits:
cpu: 250m cpu: 500m
memory: 256Mi memory: 512Mi
requests: requests:
cpu: 100m cpu: 200m
memory: 256Mi memory: 512Mi
# Init container resources # Init container resources
defaultInitContainers: defaultInitContainers:
volumePermissions: volumePermissions: