Files
orchard/backend/app/metadata.py
Mondo Diaz 6eb2f9db7b Implement backend upload/download API enhancements
- Add S3 multipart upload support for files > 100MB
- Add resumable upload API endpoints (init, upload part, complete, abort, status)
- Add HTTP range request support for partial downloads
- Add HEAD request endpoint for artifact metadata
- Add format-specific metadata extraction (deb, rpm, tar.gz, wheel, jar, zip)
- Add format_metadata column to artifacts table
- Add database migration for schema updates
- Add deduplication indicator in upload response
- Set Accept-Ranges header on downloads
- Return Content-Length header on all downloads
2025-12-11 17:07:10 -06:00

355 lines
12 KiB
Python

"""
Format-specific metadata extraction for uploaded artifacts.
Supports extracting version info and other metadata from package formats.
"""
import struct
import gzip
import tarfile
import io
import re
import logging
from typing import Dict, Any, Optional, BinaryIO
logger = logging.getLogger(__name__)
def extract_metadata(file: BinaryIO, filename: str, content_type: Optional[str] = None) -> Dict[str, Any]:
"""
Extract format-specific metadata from an uploaded file.
Returns a dict with extracted metadata fields.
"""
metadata = {}
# Determine format from filename extension
lower_filename = filename.lower() if filename else ""
try:
if lower_filename.endswith(".deb"):
metadata = extract_deb_metadata(file)
elif lower_filename.endswith(".rpm"):
metadata = extract_rpm_metadata(file)
elif lower_filename.endswith(".tar.gz") or lower_filename.endswith(".tgz"):
metadata = extract_tarball_metadata(file, filename)
elif lower_filename.endswith(".whl"):
metadata = extract_wheel_metadata(file)
elif lower_filename.endswith(".jar"):
metadata = extract_jar_metadata(file)
elif lower_filename.endswith(".zip"):
metadata = extract_zip_metadata(file)
except Exception as e:
logger.warning(f"Failed to extract metadata from {filename}: {e}")
# Always seek back to start after reading
try:
file.seek(0)
except Exception:
pass
return metadata
def extract_deb_metadata(file: BinaryIO) -> Dict[str, Any]:
"""
Extract metadata from a Debian .deb package.
Deb files are ar archives containing control.tar.gz with package info.
"""
metadata = {}
# Read ar archive header
ar_magic = file.read(8)
if ar_magic != b"!<arch>\n":
return metadata
# Parse ar archive to find control.tar.gz or control.tar.xz
while True:
# Read ar entry header (60 bytes)
header = file.read(60)
if len(header) < 60:
break
name = header[0:16].decode("ascii").strip()
size_str = header[48:58].decode("ascii").strip()
try:
size = int(size_str)
except ValueError:
break
if name.startswith("control.tar"):
# Read control archive
control_data = file.read(size)
# Decompress and read control file
try:
if name.endswith(".gz"):
control_data = gzip.decompress(control_data)
# Parse tar archive
with tarfile.open(fileobj=io.BytesIO(control_data), mode="r:*") as tar:
for member in tar.getmembers():
if member.name in ("./control", "control"):
f = tar.extractfile(member)
if f:
control_content = f.read().decode("utf-8", errors="replace")
metadata = parse_deb_control(control_content)
break
except Exception as e:
logger.debug(f"Failed to parse deb control: {e}")
break
else:
# Skip to next entry (align to 2 bytes)
file.seek(size + (size % 2), 1)
return metadata
def parse_deb_control(content: str) -> Dict[str, Any]:
"""Parse Debian control file format"""
metadata = {}
current_key = None
current_value = []
for line in content.split("\n"):
if line.startswith(" ") or line.startswith("\t"):
# Continuation line
if current_key:
current_value.append(line.strip())
elif ":" in line:
# Save previous field
if current_key:
metadata[current_key] = "\n".join(current_value)
# Parse new field
key, value = line.split(":", 1)
current_key = key.strip().lower()
current_value = [value.strip()]
else:
# Empty line or malformed
if current_key:
metadata[current_key] = "\n".join(current_value)
current_key = None
current_value = []
# Don't forget the last field
if current_key:
metadata[current_key] = "\n".join(current_value)
# Extract key fields
result = {}
if "package" in metadata:
result["package_name"] = metadata["package"]
if "version" in metadata:
result["version"] = metadata["version"]
if "architecture" in metadata:
result["architecture"] = metadata["architecture"]
if "maintainer" in metadata:
result["maintainer"] = metadata["maintainer"]
if "description" in metadata:
result["description"] = metadata["description"].split("\n")[0] # First line only
if "depends" in metadata:
result["depends"] = metadata["depends"]
result["format"] = "deb"
return result
def extract_rpm_metadata(file: BinaryIO) -> Dict[str, Any]:
"""
Extract metadata from an RPM package.
RPM files have a lead, signature, and header with metadata.
"""
metadata = {"format": "rpm"}
# Read RPM lead (96 bytes)
lead = file.read(96)
if len(lead) < 96:
return metadata
# Check magic number
if lead[0:4] != b"\xed\xab\xee\xdb":
return metadata
# Read name from lead (offset 10, max 66 bytes)
name_bytes = lead[10:76]
null_idx = name_bytes.find(b"\x00")
if null_idx > 0:
metadata["package_name"] = name_bytes[:null_idx].decode("ascii", errors="replace")
# Skip signature header to get to the main header
# This is complex - simplified version just extracts from lead
try:
# Skip to header
while True:
header_magic = file.read(8)
if len(header_magic) < 8:
break
if header_magic[0:3] == b"\x8e\xad\xe8":
# Found header magic
# Read header index count and data size
index_count = struct.unpack(">I", header_magic[4:8])[0]
data_size_bytes = file.read(4)
if len(data_size_bytes) < 4:
break
data_size = struct.unpack(">I", data_size_bytes)[0]
# Read header entries
entries = []
for _ in range(index_count):
entry = file.read(16)
if len(entry) < 16:
break
tag, type_, offset, count = struct.unpack(">IIII", entry)
entries.append((tag, type_, offset, count))
# Read header data
header_data = file.read(data_size)
# Extract relevant tags
# Tag 1000 = Name, Tag 1001 = Version, Tag 1002 = Release
# Tag 1004 = Summary, Tag 1022 = Arch
for tag, type_, offset, count in entries:
if type_ == 6: # STRING type
end = header_data.find(b"\x00", offset)
if end > offset:
value = header_data[offset:end].decode("utf-8", errors="replace")
if tag == 1000:
metadata["package_name"] = value
elif tag == 1001:
metadata["version"] = value
elif tag == 1002:
metadata["release"] = value
elif tag == 1004:
metadata["description"] = value
elif tag == 1022:
metadata["architecture"] = value
break
except Exception as e:
logger.debug(f"Failed to parse RPM header: {e}")
return metadata
def extract_tarball_metadata(file: BinaryIO, filename: str) -> Dict[str, Any]:
"""Extract metadata from a tarball (name and version from filename)"""
metadata = {"format": "tarball"}
# Try to extract name and version from filename
# Common patterns: package-1.0.0.tar.gz, package_1.0.0.tar.gz
basename = filename
for suffix in [".tar.gz", ".tgz", ".tar.bz2", ".tar.xz"]:
if basename.lower().endswith(suffix):
basename = basename[:-len(suffix)]
break
# Try to split name and version
patterns = [
r"^(.+)-(\d+\.\d+(?:\.\d+)?(?:[-._]\w+)?)$", # name-version
r"^(.+)_(\d+\.\d+(?:\.\d+)?(?:[-._]\w+)?)$", # name_version
]
for pattern in patterns:
match = re.match(pattern, basename)
if match:
metadata["package_name"] = match.group(1)
metadata["version"] = match.group(2)
break
return metadata
def extract_wheel_metadata(file: BinaryIO) -> Dict[str, Any]:
"""Extract metadata from a Python wheel (.whl) file"""
import zipfile
metadata = {"format": "wheel"}
try:
with zipfile.ZipFile(file, "r") as zf:
# Find METADATA file in .dist-info directory
for name in zf.namelist():
if name.endswith("/METADATA") and ".dist-info/" in name:
with zf.open(name) as f:
content = f.read().decode("utf-8", errors="replace")
# Parse email-style headers
for line in content.split("\n"):
if line.startswith("Name:"):
metadata["package_name"] = line[5:].strip()
elif line.startswith("Version:"):
metadata["version"] = line[8:].strip()
elif line.startswith("Summary:"):
metadata["description"] = line[8:].strip()
elif line.startswith("Author:"):
metadata["author"] = line[7:].strip()
elif line == "":
break # End of headers
break
except Exception as e:
logger.debug(f"Failed to parse wheel: {e}")
return metadata
def extract_jar_metadata(file: BinaryIO) -> Dict[str, Any]:
"""Extract metadata from a Java JAR file"""
import zipfile
metadata = {"format": "jar"}
try:
with zipfile.ZipFile(file, "r") as zf:
# Look for MANIFEST.MF
if "META-INF/MANIFEST.MF" in zf.namelist():
with zf.open("META-INF/MANIFEST.MF") as f:
content = f.read().decode("utf-8", errors="replace")
for line in content.split("\n"):
line = line.strip()
if line.startswith("Implementation-Title:"):
metadata["package_name"] = line[21:].strip()
elif line.startswith("Implementation-Version:"):
metadata["version"] = line[23:].strip()
elif line.startswith("Bundle-Name:"):
metadata["bundle_name"] = line[12:].strip()
elif line.startswith("Bundle-Version:"):
metadata["bundle_version"] = line[15:].strip()
# Also look for pom.properties in Maven JARs
for name in zf.namelist():
if name.endswith("/pom.properties"):
with zf.open(name) as f:
content = f.read().decode("utf-8", errors="replace")
for line in content.split("\n"):
if line.startswith("artifactId="):
metadata["artifact_id"] = line[11:].strip()
elif line.startswith("groupId="):
metadata["group_id"] = line[8:].strip()
elif line.startswith("version="):
if "version" not in metadata:
metadata["version"] = line[8:].strip()
break
except Exception as e:
logger.debug(f"Failed to parse JAR: {e}")
return metadata
def extract_zip_metadata(file: BinaryIO) -> Dict[str, Any]:
"""Extract basic metadata from a ZIP file"""
import zipfile
metadata = {"format": "zip"}
try:
with zipfile.ZipFile(file, "r") as zf:
metadata["file_count"] = len(zf.namelist())
# Calculate total uncompressed size
total_size = sum(info.file_size for info in zf.infolist())
metadata["uncompressed_size"] = total_size
except Exception as e:
logger.debug(f"Failed to parse ZIP: {e}")
return metadata