Try out chunking
This commit is contained in:
161
src/main/java/com/cfdeployer/service/ChunkedUploadService.java
Normal file
161
src/main/java/com/cfdeployer/service/ChunkedUploadService.java
Normal file
@@ -0,0 +1,161 @@
|
||||
package com.cfdeployer.service;
|
||||
|
||||
import com.cfdeployer.model.UploadSession;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ChunkedUploadService {
|
||||
|
||||
@Value("${cf.upload.session.timeout-minutes:30}")
|
||||
private int sessionTimeoutMinutes;
|
||||
|
||||
private final Map<String, UploadSession> activeSessions = new ConcurrentHashMap<>();
|
||||
|
||||
public String createUploadSession(String requestJson) throws IOException {
|
||||
String sessionId = UUID.randomUUID().toString();
|
||||
Path workingDir = Files.createTempDirectory("cf-upload-" + sessionId);
|
||||
|
||||
UploadSession session = new UploadSession(sessionId, requestJson, workingDir);
|
||||
activeSessions.put(sessionId, session);
|
||||
|
||||
log.info("Created upload session: {} at {}", sessionId, workingDir);
|
||||
return sessionId;
|
||||
}
|
||||
|
||||
public synchronized void uploadChunk(String sessionId, String fileType, String fileName,
|
||||
int chunkIndex, int totalChunks, MultipartFile chunk) throws IOException {
|
||||
UploadSession session = activeSessions.get(sessionId);
|
||||
if (session == null) {
|
||||
throw new IllegalArgumentException("Upload session not found or expired: " + sessionId);
|
||||
}
|
||||
|
||||
session.updateLastAccessed();
|
||||
|
||||
// Get or create file upload state
|
||||
UploadSession.FileUploadState fileState = session.getFileStates()
|
||||
.computeIfAbsent(fileType, k -> {
|
||||
String targetFileName = fileType.equals("manifest") ? "manifest.yml" : fileName;
|
||||
Path targetPath = session.getWorkingDirectory().resolve(targetFileName);
|
||||
return new UploadSession.FileUploadState(fileName, totalChunks, targetPath);
|
||||
});
|
||||
|
||||
// Validate total chunks consistency
|
||||
if (fileState.getTotalChunks() != totalChunks) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Total chunks mismatch for %s: expected %d, got %d",
|
||||
fileType, fileState.getTotalChunks(), totalChunks));
|
||||
}
|
||||
|
||||
// Write chunk to file
|
||||
Path targetPath = fileState.getTargetPath();
|
||||
long offset = (long) chunkIndex * getChunkSize();
|
||||
|
||||
try (RandomAccessFile raf = new RandomAccessFile(targetPath.toFile(), "rw")) {
|
||||
raf.seek(offset);
|
||||
byte[] data = chunk.getBytes();
|
||||
raf.write(data);
|
||||
log.debug("Wrote chunk {} ({} bytes) to {} at offset {}",
|
||||
chunkIndex, data.length, targetPath.getFileName(), offset);
|
||||
}
|
||||
|
||||
fileState.markChunkReceived(chunkIndex);
|
||||
log.info("Session {}: Received chunk {}/{} for {} ({} bytes)",
|
||||
sessionId, chunkIndex + 1, totalChunks, fileType, chunk.getSize());
|
||||
|
||||
if (fileState.isComplete()) {
|
||||
log.info("Session {}: File {} upload completed ({} chunks)",
|
||||
sessionId, fileType, totalChunks);
|
||||
}
|
||||
}
|
||||
|
||||
public UploadSession getSession(String sessionId) {
|
||||
UploadSession session = activeSessions.get(sessionId);
|
||||
if (session != null) {
|
||||
session.updateLastAccessed();
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
public boolean isSessionReady(String sessionId) {
|
||||
UploadSession session = activeSessions.get(sessionId);
|
||||
if (session == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if both jarFile and manifest are complete
|
||||
UploadSession.FileUploadState jarState = session.getFileStates().get("jarFile");
|
||||
UploadSession.FileUploadState manifestState = session.getFileStates().get("manifest");
|
||||
|
||||
return jarState != null && jarState.isComplete() &&
|
||||
manifestState != null && manifestState.isComplete();
|
||||
}
|
||||
|
||||
public void deleteSession(String sessionId) {
|
||||
UploadSession session = activeSessions.remove(sessionId);
|
||||
if (session != null) {
|
||||
cleanupSessionDirectory(session);
|
||||
log.info("Deleted upload session: {}", sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 300000) // Run every 5 minutes
|
||||
public void cleanupExpiredSessions() {
|
||||
LocalDateTime expirationTime = LocalDateTime.now().minusMinutes(sessionTimeoutMinutes);
|
||||
int cleanedCount = 0;
|
||||
|
||||
for (Map.Entry<String, UploadSession> entry : activeSessions.entrySet()) {
|
||||
if (entry.getValue().getLastAccessedAt().isBefore(expirationTime)) {
|
||||
deleteSession(entry.getKey());
|
||||
cleanedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (cleanedCount > 0) {
|
||||
log.info("Cleaned up {} expired upload sessions", cleanedCount);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupSessionDirectory(UploadSession session) {
|
||||
try {
|
||||
Path workingDir = session.getWorkingDirectory();
|
||||
if (Files.exists(workingDir)) {
|
||||
Files.walk(workingDir)
|
||||
.sorted(Comparator.reverseOrder())
|
||||
.forEach(path -> {
|
||||
try {
|
||||
Files.delete(path);
|
||||
} catch (IOException e) {
|
||||
log.warn("Failed to delete file: {}", path, e);
|
||||
}
|
||||
});
|
||||
log.debug("Cleaned up session directory: {}", workingDir);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.warn("Failed to clean up session directory for session: {}", session.getSessionId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private int getChunkSize() {
|
||||
// Default chunk size - should match client-side
|
||||
return 5 * 1024 * 1024; // 5MB
|
||||
}
|
||||
|
||||
public int getActiveSessionCount() {
|
||||
return activeSessions.size();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user