""" Format-specific metadata extraction for uploaded artifacts. Supports extracting version info and other metadata from package formats. """ import struct import gzip import tarfile import io import re import logging from typing import Dict, Any, Optional, BinaryIO logger = logging.getLogger(__name__) def extract_metadata(file: BinaryIO, filename: str, content_type: Optional[str] = None) -> Dict[str, Any]: """ Extract format-specific metadata from an uploaded file. Returns a dict with extracted metadata fields. """ metadata = {} # Determine format from filename extension lower_filename = filename.lower() if filename else "" try: if lower_filename.endswith(".deb"): metadata = extract_deb_metadata(file) elif lower_filename.endswith(".rpm"): metadata = extract_rpm_metadata(file) elif lower_filename.endswith(".tar.gz") or lower_filename.endswith(".tgz"): metadata = extract_tarball_metadata(file, filename) elif lower_filename.endswith(".whl"): metadata = extract_wheel_metadata(file) elif lower_filename.endswith(".jar"): metadata = extract_jar_metadata(file) elif lower_filename.endswith(".zip"): metadata = extract_zip_metadata(file) except Exception as e: logger.warning(f"Failed to extract metadata from {filename}: {e}") # Always seek back to start after reading try: file.seek(0) except Exception: pass return metadata def extract_deb_metadata(file: BinaryIO) -> Dict[str, Any]: """ Extract metadata from a Debian .deb package. Deb files are ar archives containing control.tar.gz with package info. """ metadata = {} # Read ar archive header ar_magic = file.read(8) if ar_magic != b"!\n": return metadata # Parse ar archive to find control.tar.gz or control.tar.xz while True: # Read ar entry header (60 bytes) header = file.read(60) if len(header) < 60: break name = header[0:16].decode("ascii").strip() size_str = header[48:58].decode("ascii").strip() try: size = int(size_str) except ValueError: break if name.startswith("control.tar"): # Read control archive control_data = file.read(size) # Decompress and read control file try: if name.endswith(".gz"): control_data = gzip.decompress(control_data) # Parse tar archive with tarfile.open(fileobj=io.BytesIO(control_data), mode="r:*") as tar: for member in tar.getmembers(): if member.name in ("./control", "control"): f = tar.extractfile(member) if f: control_content = f.read().decode("utf-8", errors="replace") metadata = parse_deb_control(control_content) break except Exception as e: logger.debug(f"Failed to parse deb control: {e}") break else: # Skip to next entry (align to 2 bytes) file.seek(size + (size % 2), 1) return metadata def parse_deb_control(content: str) -> Dict[str, Any]: """Parse Debian control file format""" metadata = {} current_key = None current_value = [] for line in content.split("\n"): if line.startswith(" ") or line.startswith("\t"): # Continuation line if current_key: current_value.append(line.strip()) elif ":" in line: # Save previous field if current_key: metadata[current_key] = "\n".join(current_value) # Parse new field key, value = line.split(":", 1) current_key = key.strip().lower() current_value = [value.strip()] else: # Empty line or malformed if current_key: metadata[current_key] = "\n".join(current_value) current_key = None current_value = [] # Don't forget the last field if current_key: metadata[current_key] = "\n".join(current_value) # Extract key fields result = {} if "package" in metadata: result["package_name"] = metadata["package"] if "version" in metadata: result["version"] = metadata["version"] if "architecture" in metadata: result["architecture"] = metadata["architecture"] if "maintainer" in metadata: result["maintainer"] = metadata["maintainer"] if "description" in metadata: result["description"] = metadata["description"].split("\n")[0] # First line only if "depends" in metadata: result["depends"] = metadata["depends"] result["format"] = "deb" return result def extract_rpm_metadata(file: BinaryIO) -> Dict[str, Any]: """ Extract metadata from an RPM package. RPM files have a lead, signature, and header with metadata. """ metadata = {"format": "rpm"} # Read RPM lead (96 bytes) lead = file.read(96) if len(lead) < 96: return metadata # Check magic number if lead[0:4] != b"\xed\xab\xee\xdb": return metadata # Read name from lead (offset 10, max 66 bytes) name_bytes = lead[10:76] null_idx = name_bytes.find(b"\x00") if null_idx > 0: metadata["package_name"] = name_bytes[:null_idx].decode("ascii", errors="replace") # Skip signature header to get to the main header # This is complex - simplified version just extracts from lead try: # Skip to header while True: header_magic = file.read(8) if len(header_magic) < 8: break if header_magic[0:3] == b"\x8e\xad\xe8": # Found header magic # Read header index count and data size index_count = struct.unpack(">I", header_magic[4:8])[0] data_size_bytes = file.read(4) if len(data_size_bytes) < 4: break data_size = struct.unpack(">I", data_size_bytes)[0] # Read header entries entries = [] for _ in range(index_count): entry = file.read(16) if len(entry) < 16: break tag, type_, offset, count = struct.unpack(">IIII", entry) entries.append((tag, type_, offset, count)) # Read header data header_data = file.read(data_size) # Extract relevant tags # Tag 1000 = Name, Tag 1001 = Version, Tag 1002 = Release # Tag 1004 = Summary, Tag 1022 = Arch for tag, type_, offset, count in entries: if type_ == 6: # STRING type end = header_data.find(b"\x00", offset) if end > offset: value = header_data[offset:end].decode("utf-8", errors="replace") if tag == 1000: metadata["package_name"] = value elif tag == 1001: metadata["version"] = value elif tag == 1002: metadata["release"] = value elif tag == 1004: metadata["description"] = value elif tag == 1022: metadata["architecture"] = value break except Exception as e: logger.debug(f"Failed to parse RPM header: {e}") return metadata def extract_tarball_metadata(file: BinaryIO, filename: str) -> Dict[str, Any]: """Extract metadata from a tarball (name and version from filename)""" metadata = {"format": "tarball"} # Try to extract name and version from filename # Common patterns: package-1.0.0.tar.gz, package_1.0.0.tar.gz basename = filename for suffix in [".tar.gz", ".tgz", ".tar.bz2", ".tar.xz"]: if basename.lower().endswith(suffix): basename = basename[:-len(suffix)] break # Try to split name and version patterns = [ r"^(.+)-(\d+\.\d+(?:\.\d+)?(?:[-._]\w+)?)$", # name-version r"^(.+)_(\d+\.\d+(?:\.\d+)?(?:[-._]\w+)?)$", # name_version ] for pattern in patterns: match = re.match(pattern, basename) if match: metadata["package_name"] = match.group(1) metadata["version"] = match.group(2) break return metadata def extract_wheel_metadata(file: BinaryIO) -> Dict[str, Any]: """Extract metadata from a Python wheel (.whl) file""" import zipfile metadata = {"format": "wheel"} try: with zipfile.ZipFile(file, "r") as zf: # Find METADATA file in .dist-info directory for name in zf.namelist(): if name.endswith("/METADATA") and ".dist-info/" in name: with zf.open(name) as f: content = f.read().decode("utf-8", errors="replace") # Parse email-style headers for line in content.split("\n"): if line.startswith("Name:"): metadata["package_name"] = line[5:].strip() elif line.startswith("Version:"): metadata["version"] = line[8:].strip() elif line.startswith("Summary:"): metadata["description"] = line[8:].strip() elif line.startswith("Author:"): metadata["author"] = line[7:].strip() elif line == "": break # End of headers break except Exception as e: logger.debug(f"Failed to parse wheel: {e}") return metadata def extract_jar_metadata(file: BinaryIO) -> Dict[str, Any]: """Extract metadata from a Java JAR file""" import zipfile metadata = {"format": "jar"} try: with zipfile.ZipFile(file, "r") as zf: # Look for MANIFEST.MF if "META-INF/MANIFEST.MF" in zf.namelist(): with zf.open("META-INF/MANIFEST.MF") as f: content = f.read().decode("utf-8", errors="replace") for line in content.split("\n"): line = line.strip() if line.startswith("Implementation-Title:"): metadata["package_name"] = line[21:].strip() elif line.startswith("Implementation-Version:"): metadata["version"] = line[23:].strip() elif line.startswith("Bundle-Name:"): metadata["bundle_name"] = line[12:].strip() elif line.startswith("Bundle-Version:"): metadata["bundle_version"] = line[15:].strip() # Also look for pom.properties in Maven JARs for name in zf.namelist(): if name.endswith("/pom.properties"): with zf.open(name) as f: content = f.read().decode("utf-8", errors="replace") for line in content.split("\n"): if line.startswith("artifactId="): metadata["artifact_id"] = line[11:].strip() elif line.startswith("groupId="): metadata["group_id"] = line[8:].strip() elif line.startswith("version="): if "version" not in metadata: metadata["version"] = line[8:].strip() break except Exception as e: logger.debug(f"Failed to parse JAR: {e}") return metadata def extract_zip_metadata(file: BinaryIO) -> Dict[str, Any]: """Extract basic metadata from a ZIP file""" import zipfile metadata = {"format": "zip"} try: with zipfile.ZipFile(file, "r") as zf: metadata["file_count"] = len(zf.namelist()) # Calculate total uncompressed size total_size = sum(info.file_size for info in zf.infolist()) metadata["uncompressed_size"] = total_size except Exception as e: logger.debug(f"Failed to parse ZIP: {e}") return metadata