Files
cf-uploader/src/main/java/com/cfdeployer/service/ChunkedUploadService.java
2025-10-21 18:48:35 -05:00

175 lines
6.8 KiB
Java

package com.cfdeployer.service;
import com.cfdeployer.model.UploadSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class ChunkedUploadService {
@Value("${cf.upload.session.timeout-minutes:30}")
private int sessionTimeoutMinutes;
private final Map<String, UploadSession> activeSessions = new ConcurrentHashMap<>();
public String createUploadSession(String requestJson) throws IOException {
String sessionId = UUID.randomUUID().toString();
Path workingDir = Files.createTempDirectory("cf-upload-" + sessionId);
UploadSession session = new UploadSession(sessionId, requestJson, workingDir);
activeSessions.put(sessionId, session);
log.info("Created upload session: {} at {}", sessionId, workingDir);
return sessionId;
}
public synchronized void uploadChunk(String sessionId, String fileType, String fileName,
int chunkIndex, int totalChunks, MultipartFile chunk) throws IOException {
UploadSession session = activeSessions.get(sessionId);
if (session == null) {
throw new IllegalArgumentException("Upload session not found or expired: " + sessionId);
}
session.updateLastAccessed();
// Get or create file upload state
UploadSession.FileUploadState fileState = session.getFileStates()
.computeIfAbsent(fileType, k -> {
String targetFileName = fileType.equals("manifest") ? "manifest.yml" : fileName;
Path targetPath = session.getWorkingDirectory().resolve(targetFileName);
return new UploadSession.FileUploadState(fileName, totalChunks, targetPath);
});
// Validate total chunks consistency
if (fileState.getTotalChunks() != totalChunks) {
throw new IllegalArgumentException(
String.format("Total chunks mismatch for %s: expected %d, got %d",
fileType, fileState.getTotalChunks(), totalChunks));
}
// Write chunk to file using sequential append mode
// This supports variable chunk sizes - chunks MUST be uploaded in order (0, 1, 2, ...)
Path targetPath = fileState.getTargetPath();
// Verify chunks are uploaded in order
if (chunkIndex != fileState.getReceivedChunkCount()) {
throw new IllegalArgumentException(
String.format("Chunks must be uploaded in order. Expected chunk %d but received %d",
fileState.getReceivedChunkCount(), chunkIndex));
}
try (var inputStream = chunk.getInputStream();
var outputStream = Files.newOutputStream(targetPath,
java.nio.file.StandardOpenOption.CREATE,
java.nio.file.StandardOpenOption.APPEND)) {
// Stream chunk data in smaller buffers to reduce memory pressure
byte[] buffer = new byte[8192]; // 8KB buffer
int bytesRead;
long totalWritten = 0;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
totalWritten += bytesRead;
}
log.debug("Appended chunk {} ({} bytes) to {}",
chunkIndex, totalWritten, targetPath.getFileName());
}
fileState.markChunkReceived(chunkIndex);
log.info("Session {}: Received chunk {}/{} for {} ({} bytes)",
sessionId, chunkIndex + 1, totalChunks, fileType, chunk.getSize());
if (fileState.isComplete()) {
log.info("Session {}: File {} upload completed ({} chunks)",
sessionId, fileType, totalChunks);
}
}
public UploadSession getSession(String sessionId) {
UploadSession session = activeSessions.get(sessionId);
if (session != null) {
session.updateLastAccessed();
}
return session;
}
public boolean isSessionReady(String sessionId) {
UploadSession session = activeSessions.get(sessionId);
if (session == null) {
return false;
}
// Check if both jarFile and manifest are complete
UploadSession.FileUploadState jarState = session.getFileStates().get("jarFile");
UploadSession.FileUploadState manifestState = session.getFileStates().get("manifest");
return jarState != null && jarState.isComplete() &&
manifestState != null && manifestState.isComplete();
}
public void deleteSession(String sessionId) {
UploadSession session = activeSessions.remove(sessionId);
if (session != null) {
cleanupSessionDirectory(session);
log.info("Deleted upload session: {}", sessionId);
}
}
@Scheduled(fixedRate = 300000) // Run every 5 minutes
public void cleanupExpiredSessions() {
LocalDateTime expirationTime = LocalDateTime.now().minusMinutes(sessionTimeoutMinutes);
int cleanedCount = 0;
for (Map.Entry<String, UploadSession> entry : activeSessions.entrySet()) {
if (entry.getValue().getLastAccessedAt().isBefore(expirationTime)) {
deleteSession(entry.getKey());
cleanedCount++;
}
}
if (cleanedCount > 0) {
log.info("Cleaned up {} expired upload sessions", cleanedCount);
}
}
private void cleanupSessionDirectory(UploadSession session) {
try {
Path workingDir = session.getWorkingDirectory();
if (Files.exists(workingDir)) {
Files.walk(workingDir)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
log.warn("Failed to delete file: {}", path, e);
}
});
log.debug("Cleaned up session directory: {}", workingDir);
}
} catch (IOException e) {
log.warn("Failed to clean up session directory for session: {}", session.getSessionId(), e);
}
}
public int getActiveSessionCount() {
return activeSessions.size();
}
}