Use CI variable for stage admin password
This commit is contained in:
@@ -406,14 +406,21 @@ def _create_or_update_tag(
|
||||
"""
|
||||
Create or update a tag, handling ref_count and history.
|
||||
|
||||
Uses SELECT FOR UPDATE to prevent race conditions during concurrent uploads.
|
||||
|
||||
Returns:
|
||||
tuple of (tag, is_new, old_artifact_id)
|
||||
- tag: The created/updated Tag object
|
||||
- is_new: True if tag was created, False if updated
|
||||
- old_artifact_id: Previous artifact_id if tag was updated, None otherwise
|
||||
"""
|
||||
# Use with_for_update() to lock the row and prevent race conditions
|
||||
# during concurrent uploads to the same tag
|
||||
existing_tag = (
|
||||
db.query(Tag).filter(Tag.package_id == package_id, Tag.name == tag_name).first()
|
||||
db.query(Tag)
|
||||
.filter(Tag.package_id == package_id, Tag.name == tag_name)
|
||||
.with_for_update()
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing_tag:
|
||||
@@ -447,7 +454,9 @@ def _create_or_update_tag(
|
||||
# Same artifact, no change needed
|
||||
return existing_tag, False, None
|
||||
else:
|
||||
# Create new tag
|
||||
# Create new tag with race condition handling
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
new_tag = Tag(
|
||||
package_id=package_id,
|
||||
name=tag_name,
|
||||
@@ -455,7 +464,15 @@ def _create_or_update_tag(
|
||||
created_by=user_id,
|
||||
)
|
||||
db.add(new_tag)
|
||||
db.flush() # Get the tag ID
|
||||
|
||||
try:
|
||||
db.flush() # Get the tag ID - may fail if concurrent insert happened
|
||||
except IntegrityError:
|
||||
# Another request created the tag concurrently
|
||||
# Rollback the failed insert and retry as update
|
||||
db.rollback()
|
||||
logger.info(f"Tag '{tag_name}' created concurrently, retrying as update")
|
||||
return _create_or_update_tag(db, package_id, tag_name, new_artifact_id, user_id)
|
||||
|
||||
# Record history for creation
|
||||
history = TagHistory(
|
||||
@@ -2608,6 +2625,8 @@ def upload_artifact(
|
||||
# Create new artifact with ref_count=0
|
||||
# NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE
|
||||
# When a tag is created for this artifact, the trigger will increment ref_count
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
artifact = Artifact(
|
||||
id=storage_result.sha256,
|
||||
size=storage_result.size,
|
||||
@@ -2623,6 +2642,22 @@ def upload_artifact(
|
||||
)
|
||||
db.add(artifact)
|
||||
|
||||
# Flush to detect concurrent artifact creation race condition
|
||||
try:
|
||||
db.flush()
|
||||
except IntegrityError:
|
||||
# Another request created the artifact concurrently - fetch and use it
|
||||
db.rollback()
|
||||
logger.info(f"Artifact {storage_result.sha256[:12]}... created concurrently, fetching existing")
|
||||
artifact = db.query(Artifact).filter(Artifact.id == storage_result.sha256).first()
|
||||
if not artifact:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Failed to create or fetch artifact record",
|
||||
)
|
||||
deduplicated = True
|
||||
saved_bytes = storage_result.size
|
||||
|
||||
# Calculate upload duration
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user