Implement SegmentMutator and PreprocessQueueService

This commit is contained in:
2026-02-19 16:50:05 +05:00
parent 8be1dd8a53
commit d064f99dc5
14 changed files with 412 additions and 87 deletions

View File

@@ -17,7 +17,8 @@ public class CacheConfig {
@Bean @Bean
public CacheManager cacheManager() { public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent"); CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent",
"preprocessedVideo");
cacheManager.setCaffeine(Caffeine.newBuilder() cacheManager.setCaffeine(Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS) .expireAfterAccess(1, TimeUnit.HOURS)
.weigher((Object key, Object value) -> { .weigher((Object key, Object value) -> {

View File

@@ -1,16 +1,19 @@
package com.backend.hls.proxy.config; package com.backend.hls.proxy.config;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import com.backend.hls.proxy.service.NoopPreprocessService; import com.backend.hls.proxy.service.mutator.AV1ConverterMutatorService;
import com.backend.hls.proxy.service.PreprocessService; import com.backend.hls.proxy.service.preprocess.NoopPreprocessService;
import com.backend.hls.proxy.service.preprocess.PreprocessService;
@Configuration @Configuration
public class ProcessConfig { public class ProcessConfig {
@Bean @Bean
public PreprocessService preprocessService() { public PreprocessService preprocessService(CacheManager cacheManager, AV1ConverterMutatorService av1Converter) {
return new NoopPreprocessService(); return new NoopPreprocessService();
// return new PreprocessQueueService(cacheManager, List.of(av1Converter));
} }
} }

View File

@@ -6,8 +6,6 @@ import java.util.Optional;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@@ -23,7 +21,7 @@ import com.backend.hls.proxy.model.RangeRequest;
import com.backend.hls.proxy.model.SimpleResponse; import com.backend.hls.proxy.model.SimpleResponse;
import com.backend.hls.proxy.repository.LinkRepository; import com.backend.hls.proxy.repository.LinkRepository;
import com.backend.hls.proxy.service.FetchService; import com.backend.hls.proxy.service.FetchService;
import com.backend.hls.proxy.service.PreprocessService; import com.backend.hls.proxy.service.preprocess.PreprocessService;
@RestController @RestController
public class ProxyServeController { public class ProxyServeController {
@@ -82,7 +80,8 @@ public class ProxyServeController {
headers.addAll(key, values); headers.addAll(key, values);
}); });
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, HttpStatus.OK); var preprocessedData = preprocessService.preprocess(url, response.getBody());
return new ResponseEntity<>(preprocessedData, headers, HttpStatus.OK);
} }
private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader) private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader)
@@ -117,7 +116,7 @@ public class ProxyServeController {
headers.add("Accept-Ranges", "bytes"); headers.add("Accept-Ranges", "bytes");
headers.setContentLength(range.getLength()); headers.setContentLength(range.getLength());
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, return new ResponseEntity<>(preprocessService.preprocess(url, response.getBody()), headers,
HttpStatus.PARTIAL_CONTENT); HttpStatus.PARTIAL_CONTENT);
} }
@@ -126,4 +125,5 @@ public class ProxyServeController {
headers.setLocation(URI.create(target)); headers.setLocation(URI.create(target));
return new ResponseEntity<>(headers, HttpStatus.MOVED_PERMANENTLY); return new ResponseEntity<>(headers, HttpStatus.MOVED_PERMANENTLY);
} }
} }

View File

@@ -0,0 +1,21 @@
package com.backend.hls.proxy.exception;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class CompositeException extends Exception {
private final List<Exception> exceptions;
public CompositeException(Exception... exceptions) {
super("Multiple exceptions occurred: " +
Arrays.stream(exceptions)
.map(Exception::getMessage)
.collect(Collectors.joining("; ")));
this.exceptions = Arrays.asList(exceptions);
}
public List<Exception> getExceptions() {
return exceptions;
}
}

View File

@@ -1,7 +1,5 @@
package com.backend.hls.proxy.service; package com.backend.hls.proxy.service;
import java.io.IOException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.backend.hls.proxy.exception.FetchFailException; import com.backend.hls.proxy.exception.FetchFailException;

View File

@@ -7,6 +7,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.backend.hls.proxy.exception.CompositeException;
import com.backend.hls.proxy.exception.FetchFailException; import com.backend.hls.proxy.exception.FetchFailException;
import com.backend.hls.proxy.exception.PlaylistParseException; import com.backend.hls.proxy.exception.PlaylistParseException;
@@ -37,6 +38,9 @@ public class PlaylistProxyService {
String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1); String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1);
String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1); String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1);
String url = uriResolveService.resolve(suffix, base); String url = uriResolveService.resolve(suffix, base);
PlaylistParseException multivariantException = null;
PlaylistParseException mediaException = null;
try { try {
MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url); MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url);
@@ -56,6 +60,7 @@ public class PlaylistProxyService {
.build(); .build();
return playlistParseService.writeAsString(proxiedPlaylist); return playlistParseService.writeAsString(proxiedPlaylist);
} catch (PlaylistParseException e) { } catch (PlaylistParseException e) {
multivariantException = e;
} }
try { try {
MediaPlaylist playlist = playlistParseService.readMediaPlaylist(url); MediaPlaylist playlist = playlistParseService.readMediaPlaylist(url);
@@ -78,9 +83,10 @@ public class PlaylistProxyService {
return playlistParseService.writeAsString(proxiedPlaylist); return playlistParseService.writeAsString(proxiedPlaylist);
} catch (PlaylistParseException e) { } catch (PlaylistParseException e) {
e.printStackTrace(); mediaException = e;
} }
throw new IllegalStateException("Unknown playlist format"); throw new IllegalStateException("Unknown playlist format",
new CompositeException(multivariantException, mediaException));
} }
/** /**

View File

@@ -1,5 +0,0 @@
package com.backend.hls.proxy.service;
public interface PreprocessService {
byte[] preprocess(byte[] data);
}

View File

@@ -1,67 +0,0 @@
package com.backend.hls.proxy.service;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.backend.hls.proxy.util.PipeUtil;
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
@Service
public class RandomEffectPreprocessService implements PreprocessService {
private static final Logger logger = LoggerFactory.getLogger(PreprocessService.class);
public byte[] preprocess(byte[] data) {
try {
String format = findFormat(data);
logger.info("format is {}", format);
return randomEffectsHLS(data, format, "/usr/bin/ffmpeg");
} catch (IOException e) {
e.printStackTrace();
return data;
}
}
public static byte[] randomEffectsHLS(byte[] data, String inputFormat, String ffmpegPath) throws IOException {
try (InputStream inputStream = new ByteArrayInputStream(data)) {
String[] effects = {
"hue=s=10", // Color shift
"edgedetect=mode=colormix", // Edge detection
"boxblur=10:1", // Heavy blur
"noise=alls=20:allf=t", // Film grain noise
"colorchannelmixer=.3:.4:.3:0:.3:.4:.3:0:.3:.4:.3", // Vintage
"rotate=0.1*c", // Slight rotation
"scale=iw/2:ih/2" // Pixelate
};
Random random = new Random();
String randomEffect = effects[random.nextInt(effects.length)];
logger.info("applied effect {}", randomEffect);
String[] ffmpegArgs = {
"-vf", randomEffect,
"-f", inputFormat,
};
return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs);
}
}
private String findFormat(byte[] data) throws IOException {
FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data));
logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList()));
if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) {
throw new IOException("No video stream found");
}
return result.format.format_name;
}
}

View File

@@ -0,0 +1,66 @@
package com.backend.hls.proxy.service.mutator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.backend.hls.proxy.util.PipeUtil;
@Service
public class AV1ConverterMutatorService implements SegmentMutator {
private static final Logger logger = LoggerFactory.getLogger(AV1ConverterMutatorService.class);
private static final Set<String> SUPPORTED_INPUT_FORMATS = Set.of(
"h264", "avc", "h265", "hevc", "vp8", "vp9", "mpeg2", "mpeg4", "mpegts");
@Override
public byte[] mutate(String url, String format, byte[] data) {
try {
logger.info("format is {}", format);
return convert(data, format, "/usr/bin/ffmpeg");
} catch (IOException e) {
e.printStackTrace();
return data;
}
}
@Override
public boolean supports(String url, String format, byte[] data) {
if (format != null && SUPPORTED_INPUT_FORMATS.contains(format.toLowerCase())) {
return true;
}
return url != null && (url.endsWith(".ts") ||
url.endsWith(".m4s") ||
url.endsWith(".mp4") ||
url.endsWith(".mkv") ||
url.contains("/video/"));
}
@Override
public int getPriority() {
return 100;
}
public static byte[] convert(byte[] data, String inputFormat, String ffmpegPath) throws IOException {
try (InputStream inputStream = new ByteArrayInputStream(data)) {
String[] ffmpegArgs = {
"-c:v", "libaom-av1", // AV1 codec
"-crf", "45", // Quality (0-63, lower is better)
"-strict", "experimental", // Required for AV1
"-cpu-used", "8", // Encoding speed (0-8, higher is faster but lower quality)
"-row-mt", "1",
"-f", "mp4", // Still MP4 container
"-movflags", "+frag_keyframe+empty_moov", // Enable fragmentation
"-fflags", "+genpts" // Generate PTS if missing
};
return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs);
}
}
}

View File

@@ -0,0 +1,25 @@
package com.backend.hls.proxy.service.mutator;
public interface SegmentMutator {
/**
* Mutated video data
*
* @param url The source URL
* @param format ffprobe format
* @param data The raw video data
* @return Mutated video data
*/
byte[] mutate(String url, String format, byte[] data);
/**
* Check if this mutator can handle the given URL/data
*/
boolean supports(String url, String format, byte[] data);
/**
* Get the priority of this mutator (higher priority first)
*/
default int getPriority() {
return 0;
}
}

View File

@@ -1,4 +1,4 @@
package com.backend.hls.proxy.service; package com.backend.hls.proxy.service.preprocess;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -6,7 +6,7 @@ import org.springframework.stereotype.Service;
public class NoopPreprocessService implements PreprocessService { public class NoopPreprocessService implements PreprocessService {
@Override @Override
public byte[] preprocess(byte[] data) { public byte[] preprocess(String url, byte[] data) {
return data; return data;
} }

View File

@@ -0,0 +1,271 @@
package com.backend.hls.proxy.service.preprocess;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import com.backend.hls.proxy.service.mutator.SegmentMutator;
import com.backend.hls.proxy.util.PipeUtil;
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
@Service
public class PreprocessQueueService implements PreprocessService {
private static final Logger logger = LoggerFactory.getLogger(PreprocessQueueService.class);
private final ExecutorService processingExecutor;
private final ConcurrentHashMap<String, CompletableFuture<byte[]>> processingTasks;
private final CacheManager cacheManager;
private final BlockingQueue<PreprocessTask> taskQueue;
private final int maxQueueSize;
private final List<SegmentMutator> mutators;
public PreprocessQueueService(
CacheManager cacheManager,
List<SegmentMutator> mutators) {
this.cacheManager = cacheManager;
this.mutators = mutators.stream()
.sorted(Comparator.comparingInt(SegmentMutator::getPriority).reversed())
.collect(Collectors.toList());
this.processingExecutor = Executors.newFixedThreadPool(5);
this.processingTasks = new ConcurrentHashMap<>();
this.taskQueue = new LinkedBlockingQueue<>(100);
this.maxQueueSize = 100;
startQueueProcessor();
logger.info("Initialized PreprocessQueueService with {} mutators: {}",
mutators.size(),
mutators.stream().map(m -> m.getClass().getSimpleName()).collect(Collectors.toList()));
}
/**
* Main preprocessing method with queue support
*/
@Cacheable(value = "preprocessedVideo", key = "#url", unless = "#result == null")
@Override
public byte[] preprocess(String url, byte[] data) {
String format;
try {
format = findFormat(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
return processWithQueue(url, format, data);
}
private String findFormat(byte[] data) throws IOException {
FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data));
logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList()));
if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) {
throw new IOException("No video stream found");
}
return result.format.format_name;
}
/**
* Overloaded method that accepts format information
*/
public byte[] preprocess(String url, String format, byte[] data) {
return processWithQueue(url, format, data);
}
private byte[] processWithQueue(String url, String format, byte[] data) {
logger.info("Processing segment URL: {}, format: {}", url, format);
SegmentMutator mutator = findMutator(url, format, data);
if (mutator == null) {
logger.debug("No mutator found for URL: {}, format: {}, returning original data", url, format);
return data;
}
CompletableFuture<byte[]> existingTask = processingTasks.get(url);
if (existingTask != null) {
try {
logger.debug("Waiting for existing preprocessing task for URL: {}", url);
return existingTask.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
processingTasks.remove(url);
throw new RuntimeException("Preprocessing timeout for: " + url);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for preprocessing", e);
} catch (ExecutionException e) {
processingTasks.remove(url);
throw new RuntimeException("Preprocessing failed", e);
}
}
if (taskQueue.size() >= maxQueueSize) {
logger.warn("Preprocess queue is full (size: {}), rejecting URL: {}", taskQueue.size(), url);
throw new RuntimeException("Preprocess queue is full. Try again later.");
}
CompletableFuture<byte[]> future = new CompletableFuture<>();
PreprocessTask task = new PreprocessTask(url, format, data, future, mutator);
CompletableFuture<byte[]> existing = processingTasks.putIfAbsent(url, future);
if (existing != null) {
try {
return existing.get();
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to wait for preprocessing", e);
}
}
try {
if (!taskQueue.offer(task, 5, TimeUnit.SECONDS)) {
processingTasks.remove(url);
throw new RuntimeException("Failed to add task to queue (timeout)");
}
logger.debug("Added task to queue for URL: {}, mutator: {}", url, mutator.getClass().getSimpleName());
var result = future.get(60, TimeUnit.SECONDS);
logger.info("returning task result...");
return result;
} catch (TimeoutException e) {
processingTasks.remove(url);
throw new RuntimeException("Preprocessing timeout for: " + url);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
processingTasks.remove(url);
throw new RuntimeException("Preprocessing interrupted", e);
} catch (ExecutionException e) {
processingTasks.remove(url);
throw new RuntimeException("Preprocessing failed: " + e.getMessage(), e);
}
}
/**
* Find the appropriate mutator for this task
*/
private SegmentMutator findMutator(String url, String format, byte[] data) {
return mutators.stream()
.filter(m -> {
try {
return m.supports(url, format, data);
} catch (Exception e) {
logger.error("Error checking support for mutator: {}", m.getClass().getSimpleName(), e);
return false;
}
})
.findFirst()
.orElse(null);
}
private void startQueueProcessor() {
Thread processorThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
PreprocessTask task = taskQueue.take();
processingExecutor.submit(() -> {
String taskUrl = task.url;
String taskFormat = task.format;
try {
logger.debug("Processing task for URL: {} with mutator: {}",
taskUrl, task.mutator.getClass().getSimpleName());
byte[] result = task.mutator.mutate(taskUrl, task.format, task.data);
task.future.complete(result);
updateCache(taskUrl, result);
logger.debug("Successfully processed URL: {}", taskUrl);
} catch (Exception e) {
logger.error("Failed to process task for URL: {}", taskUrl, e);
task.future.completeExceptionally(e);
} finally {
processingTasks.remove(taskUrl);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Queue processor interrupted");
break;
}
}
});
processorThread.setName("preprocess-queue-processor");
processorThread.setDaemon(true);
processorThread.start();
logger.info("Preprocess queue processor started");
}
private void updateCache(String url, byte[] result) {
try {
Cache cache = cacheManager.getCache("preprocessedVideo");
if (cache != null && result != null) {
cache.put(url, result);
logger.debug("Updated cache for URL: {}", url);
}
} catch (Exception e) {
logger.error("Failed to update cache for URL: {}", url, e);
}
}
/**
* Inner class representing a preprocessing task
*/
private static class PreprocessTask {
final String url;
final String format;
final byte[] data;
final CompletableFuture<byte[]> future;
final SegmentMutator mutator;
PreprocessTask(String url, String format, byte[] data,
CompletableFuture<byte[]> future, SegmentMutator mutator) {
this.url = url;
this.format = format;
this.data = data;
this.future = future;
this.mutator = mutator;
}
}
/**
* Get queue statistics
*/
public PreprocessStats getStats() {
return new PreprocessStats(
taskQueue.size(),
processingTasks.size(),
maxQueueSize,
mutators.stream()
.map(m -> m.getClass().getSimpleName())
.collect(Collectors.toList()));
}
/**
* Statistics record
*/
public record PreprocessStats(
int queueSize,
int processingCount,
int maxQueueSize,
List<String> availableMutators) {
}
}

View File

@@ -0,0 +1,5 @@
package com.backend.hls.proxy.service.preprocess;
public interface PreprocessService {
byte[] preprocess(String url, byte[] data);
}

View File

@@ -117,6 +117,7 @@ public class PipeUtil {
ProcessBuilder pb = new ProcessBuilder(); ProcessBuilder pb = new ProcessBuilder();
pb.command().add(ffmpegPath); pb.command().add(ffmpegPath);
pb.command().add("-report");
pb.command().add("-f"); pb.command().add("-f");
pb.command().add(inputFormat); pb.command().add(inputFormat);
pb.command().add("-i"); pb.command().add("-i");