From d064f99dc5f3900578215208989cb982c57e00fc Mon Sep 17 00:00:00 2001 From: bivashy Date: Thu, 19 Feb 2026 16:50:05 +0500 Subject: [PATCH] Implement `SegmentMutator` and `PreprocessQueueService` --- .../backend/hls/proxy/config/CacheConfig.java | 3 +- .../hls/proxy/config/ProcessConfig.java | 9 +- .../controller/ProxyServeController.java | 10 +- .../proxy/exception/CompositeException.java | 21 ++ .../proxy/service/PlaylistParseService.java | 2 - .../proxy/service/PlaylistProxyService.java | 10 +- .../hls/proxy/service/PreprocessService.java | 5 - .../RandomEffectPreprocessService.java | 67 ----- .../mutator/AV1ConverterMutatorService.java | 66 +++++ .../proxy/service/mutator/SegmentMutator.java | 25 ++ .../NoopPreprocessService.java | 4 +- .../preprocess/PreprocessQueueService.java | 271 ++++++++++++++++++ .../service/preprocess/PreprocessService.java | 5 + .../com/backend/hls/proxy/util/PipeUtil.java | 1 + 14 files changed, 412 insertions(+), 87 deletions(-) create mode 100644 src/main/java/com/backend/hls/proxy/exception/CompositeException.java delete mode 100644 src/main/java/com/backend/hls/proxy/service/PreprocessService.java delete mode 100644 src/main/java/com/backend/hls/proxy/service/RandomEffectPreprocessService.java create mode 100644 src/main/java/com/backend/hls/proxy/service/mutator/AV1ConverterMutatorService.java create mode 100644 src/main/java/com/backend/hls/proxy/service/mutator/SegmentMutator.java rename src/main/java/com/backend/hls/proxy/service/{ => preprocess}/NoopPreprocessService.java (61%) create mode 100644 src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessQueueService.java create mode 100644 src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessService.java diff --git a/src/main/java/com/backend/hls/proxy/config/CacheConfig.java b/src/main/java/com/backend/hls/proxy/config/CacheConfig.java index 5a71470..dbbe92d 100644 --- a/src/main/java/com/backend/hls/proxy/config/CacheConfig.java +++ b/src/main/java/com/backend/hls/proxy/config/CacheConfig.java @@ -17,7 +17,8 @@ public class CacheConfig { @Bean public CacheManager cacheManager() { - CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent"); + CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent", + "preprocessedVideo"); cacheManager.setCaffeine(Caffeine.newBuilder() .expireAfterAccess(1, TimeUnit.HOURS) .weigher((Object key, Object value) -> { diff --git a/src/main/java/com/backend/hls/proxy/config/ProcessConfig.java b/src/main/java/com/backend/hls/proxy/config/ProcessConfig.java index 0914c7e..7516ade 100644 --- a/src/main/java/com/backend/hls/proxy/config/ProcessConfig.java +++ b/src/main/java/com/backend/hls/proxy/config/ProcessConfig.java @@ -1,16 +1,19 @@ package com.backend.hls.proxy.config; +import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.backend.hls.proxy.service.NoopPreprocessService; -import com.backend.hls.proxy.service.PreprocessService; +import com.backend.hls.proxy.service.mutator.AV1ConverterMutatorService; +import com.backend.hls.proxy.service.preprocess.NoopPreprocessService; +import com.backend.hls.proxy.service.preprocess.PreprocessService; @Configuration public class ProcessConfig { @Bean - public PreprocessService preprocessService() { + public PreprocessService preprocessService(CacheManager cacheManager, AV1ConverterMutatorService av1Converter) { return new NoopPreprocessService(); + // return new PreprocessQueueService(cacheManager, List.of(av1Converter)); } } diff --git a/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java b/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java index b775f4d..94b5295 100644 --- a/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java +++ b/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java @@ -6,8 +6,6 @@ import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.cache.annotation.Cacheable; -import org.springframework.cache.annotation.Caching; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -23,7 +21,7 @@ import com.backend.hls.proxy.model.RangeRequest; import com.backend.hls.proxy.model.SimpleResponse; import com.backend.hls.proxy.repository.LinkRepository; import com.backend.hls.proxy.service.FetchService; -import com.backend.hls.proxy.service.PreprocessService; +import com.backend.hls.proxy.service.preprocess.PreprocessService; @RestController public class ProxyServeController { @@ -82,7 +80,8 @@ public class ProxyServeController { headers.addAll(key, values); }); - return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, HttpStatus.OK); + var preprocessedData = preprocessService.preprocess(url, response.getBody()); + return new ResponseEntity<>(preprocessedData, headers, HttpStatus.OK); } private ResponseEntity handleRangeRequest(String url, String rangeHeader) @@ -117,7 +116,7 @@ public class ProxyServeController { headers.add("Accept-Ranges", "bytes"); headers.setContentLength(range.getLength()); - return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, + return new ResponseEntity<>(preprocessService.preprocess(url, response.getBody()), headers, HttpStatus.PARTIAL_CONTENT); } @@ -126,4 +125,5 @@ public class ProxyServeController { headers.setLocation(URI.create(target)); return new ResponseEntity<>(headers, HttpStatus.MOVED_PERMANENTLY); } + } diff --git a/src/main/java/com/backend/hls/proxy/exception/CompositeException.java b/src/main/java/com/backend/hls/proxy/exception/CompositeException.java new file mode 100644 index 0000000..43e064d --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/exception/CompositeException.java @@ -0,0 +1,21 @@ +package com.backend.hls.proxy.exception; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class CompositeException extends Exception { + private final List exceptions; + + public CompositeException(Exception... exceptions) { + super("Multiple exceptions occurred: " + + Arrays.stream(exceptions) + .map(Exception::getMessage) + .collect(Collectors.joining("; "))); + this.exceptions = Arrays.asList(exceptions); + } + + public List getExceptions() { + return exceptions; + } +} diff --git a/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java b/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java index 8525221..292bd81 100644 --- a/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java +++ b/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java @@ -1,7 +1,5 @@ package com.backend.hls.proxy.service; -import java.io.IOException; - import org.springframework.stereotype.Service; import com.backend.hls.proxy.exception.FetchFailException; diff --git a/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java b/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java index c9cf376..7bb528a 100644 --- a/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java +++ b/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java @@ -7,6 +7,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import com.backend.hls.proxy.exception.CompositeException; import com.backend.hls.proxy.exception.FetchFailException; import com.backend.hls.proxy.exception.PlaylistParseException; @@ -37,6 +38,9 @@ public class PlaylistProxyService { String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1); String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1); String url = uriResolveService.resolve(suffix, base); + + PlaylistParseException multivariantException = null; + PlaylistParseException mediaException = null; try { MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url); @@ -56,6 +60,7 @@ public class PlaylistProxyService { .build(); return playlistParseService.writeAsString(proxiedPlaylist); } catch (PlaylistParseException e) { + multivariantException = e; } try { MediaPlaylist playlist = playlistParseService.readMediaPlaylist(url); @@ -78,9 +83,10 @@ public class PlaylistProxyService { return playlistParseService.writeAsString(proxiedPlaylist); } catch (PlaylistParseException e) { - e.printStackTrace(); + mediaException = e; } - throw new IllegalStateException("Unknown playlist format"); + throw new IllegalStateException("Unknown playlist format", + new CompositeException(multivariantException, mediaException)); } /** diff --git a/src/main/java/com/backend/hls/proxy/service/PreprocessService.java b/src/main/java/com/backend/hls/proxy/service/PreprocessService.java deleted file mode 100644 index 42cd643..0000000 --- a/src/main/java/com/backend/hls/proxy/service/PreprocessService.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.backend.hls.proxy.service; - -public interface PreprocessService { - byte[] preprocess(byte[] data); -} diff --git a/src/main/java/com/backend/hls/proxy/service/RandomEffectPreprocessService.java b/src/main/java/com/backend/hls/proxy/service/RandomEffectPreprocessService.java deleted file mode 100644 index a65f169..0000000 --- a/src/main/java/com/backend/hls/proxy/service/RandomEffectPreprocessService.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.backend.hls.proxy.service; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Random; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import com.backend.hls.proxy.util.PipeUtil; - -import net.bramp.ffmpeg.probe.FFmpegProbeResult; - -@Service -public class RandomEffectPreprocessService implements PreprocessService { - private static final Logger logger = LoggerFactory.getLogger(PreprocessService.class); - - public byte[] preprocess(byte[] data) { - try { - String format = findFormat(data); - logger.info("format is {}", format); - return randomEffectsHLS(data, format, "/usr/bin/ffmpeg"); - } catch (IOException e) { - e.printStackTrace(); - return data; - } - - } - - public static byte[] randomEffectsHLS(byte[] data, String inputFormat, String ffmpegPath) throws IOException { - try (InputStream inputStream = new ByteArrayInputStream(data)) { - String[] effects = { - "hue=s=10", // Color shift - "edgedetect=mode=colormix", // Edge detection - "boxblur=10:1", // Heavy blur - "noise=alls=20:allf=t", // Film grain noise - "colorchannelmixer=.3:.4:.3:0:.3:.4:.3:0:.3:.4:.3", // Vintage - "rotate=0.1*c", // Slight rotation - "scale=iw/2:ih/2" // Pixelate - }; - - Random random = new Random(); - String randomEffect = effects[random.nextInt(effects.length)]; - logger.info("applied effect {}", randomEffect); - - String[] ffmpegArgs = { - "-vf", randomEffect, - "-f", inputFormat, - }; - - return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs); - } - } - - private String findFormat(byte[] data) throws IOException { - FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data)); - logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList())); - if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) { - throw new IOException("No video stream found"); - } - return result.format.format_name; - } - -} diff --git a/src/main/java/com/backend/hls/proxy/service/mutator/AV1ConverterMutatorService.java b/src/main/java/com/backend/hls/proxy/service/mutator/AV1ConverterMutatorService.java new file mode 100644 index 0000000..45ce06f --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/service/mutator/AV1ConverterMutatorService.java @@ -0,0 +1,66 @@ +package com.backend.hls.proxy.service.mutator; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import com.backend.hls.proxy.util.PipeUtil; + +@Service +public class AV1ConverterMutatorService implements SegmentMutator { + private static final Logger logger = LoggerFactory.getLogger(AV1ConverterMutatorService.class); + private static final Set SUPPORTED_INPUT_FORMATS = Set.of( + "h264", "avc", "h265", "hevc", "vp8", "vp9", "mpeg2", "mpeg4", "mpegts"); + + @Override + public byte[] mutate(String url, String format, byte[] data) { + try { + logger.info("format is {}", format); + return convert(data, format, "/usr/bin/ffmpeg"); + } catch (IOException e) { + e.printStackTrace(); + return data; + } + } + + @Override + public boolean supports(String url, String format, byte[] data) { + if (format != null && SUPPORTED_INPUT_FORMATS.contains(format.toLowerCase())) { + return true; + } + + return url != null && (url.endsWith(".ts") || + url.endsWith(".m4s") || + url.endsWith(".mp4") || + url.endsWith(".mkv") || + url.contains("/video/")); + } + + @Override + public int getPriority() { + return 100; + } + + public static byte[] convert(byte[] data, String inputFormat, String ffmpegPath) throws IOException { + try (InputStream inputStream = new ByteArrayInputStream(data)) { + String[] ffmpegArgs = { + "-c:v", "libaom-av1", // AV1 codec + "-crf", "45", // Quality (0-63, lower is better) + "-strict", "experimental", // Required for AV1 + "-cpu-used", "8", // Encoding speed (0-8, higher is faster but lower quality) + "-row-mt", "1", + "-f", "mp4", // Still MP4 container + "-movflags", "+frag_keyframe+empty_moov", // Enable fragmentation + "-fflags", "+genpts" // Generate PTS if missing + }; + + return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs); + } + } + +} diff --git a/src/main/java/com/backend/hls/proxy/service/mutator/SegmentMutator.java b/src/main/java/com/backend/hls/proxy/service/mutator/SegmentMutator.java new file mode 100644 index 0000000..b4efaf6 --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/service/mutator/SegmentMutator.java @@ -0,0 +1,25 @@ +package com.backend.hls.proxy.service.mutator; + +public interface SegmentMutator { + /** + * Mutated video data + * + * @param url The source URL + * @param format ffprobe format + * @param data The raw video data + * @return Mutated video data + */ + byte[] mutate(String url, String format, byte[] data); + + /** + * Check if this mutator can handle the given URL/data + */ + boolean supports(String url, String format, byte[] data); + + /** + * Get the priority of this mutator (higher priority first) + */ + default int getPriority() { + return 0; + } +} diff --git a/src/main/java/com/backend/hls/proxy/service/NoopPreprocessService.java b/src/main/java/com/backend/hls/proxy/service/preprocess/NoopPreprocessService.java similarity index 61% rename from src/main/java/com/backend/hls/proxy/service/NoopPreprocessService.java rename to src/main/java/com/backend/hls/proxy/service/preprocess/NoopPreprocessService.java index d2bc7e1..9254937 100644 --- a/src/main/java/com/backend/hls/proxy/service/NoopPreprocessService.java +++ b/src/main/java/com/backend/hls/proxy/service/preprocess/NoopPreprocessService.java @@ -1,4 +1,4 @@ -package com.backend.hls.proxy.service; +package com.backend.hls.proxy.service.preprocess; import org.springframework.stereotype.Service; @@ -6,7 +6,7 @@ import org.springframework.stereotype.Service; public class NoopPreprocessService implements PreprocessService { @Override - public byte[] preprocess(byte[] data) { + public byte[] preprocess(String url, byte[] data) { return data; } diff --git a/src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessQueueService.java b/src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessQueueService.java new file mode 100644 index 0000000..782b3bc --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessQueueService.java @@ -0,0 +1,271 @@ +package com.backend.hls.proxy.service.preprocess; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Service; + +import com.backend.hls.proxy.service.mutator.SegmentMutator; +import com.backend.hls.proxy.util.PipeUtil; + +import net.bramp.ffmpeg.probe.FFmpegProbeResult; + +@Service +public class PreprocessQueueService implements PreprocessService { + private static final Logger logger = LoggerFactory.getLogger(PreprocessQueueService.class); + private final ExecutorService processingExecutor; + private final ConcurrentHashMap> processingTasks; + private final CacheManager cacheManager; + private final BlockingQueue taskQueue; + private final int maxQueueSize; + + private final List mutators; + + public PreprocessQueueService( + CacheManager cacheManager, + List mutators) { + this.cacheManager = cacheManager; + this.mutators = mutators.stream() + .sorted(Comparator.comparingInt(SegmentMutator::getPriority).reversed()) + .collect(Collectors.toList()); + + this.processingExecutor = Executors.newFixedThreadPool(5); + this.processingTasks = new ConcurrentHashMap<>(); + this.taskQueue = new LinkedBlockingQueue<>(100); + this.maxQueueSize = 100; + + startQueueProcessor(); + + logger.info("Initialized PreprocessQueueService with {} mutators: {}", + mutators.size(), + mutators.stream().map(m -> m.getClass().getSimpleName()).collect(Collectors.toList())); + } + + /** + * Main preprocessing method with queue support + */ + @Cacheable(value = "preprocessedVideo", key = "#url", unless = "#result == null") + @Override + public byte[] preprocess(String url, byte[] data) { + String format; + try { + format = findFormat(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + return processWithQueue(url, format, data); + } + + private String findFormat(byte[] data) throws IOException { + FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data)); + logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList())); + if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) { + throw new IOException("No video stream found"); + } + return result.format.format_name; + } + + /** + * Overloaded method that accepts format information + */ + public byte[] preprocess(String url, String format, byte[] data) { + return processWithQueue(url, format, data); + } + + private byte[] processWithQueue(String url, String format, byte[] data) { + logger.info("Processing segment URL: {}, format: {}", url, format); + SegmentMutator mutator = findMutator(url, format, data); + if (mutator == null) { + logger.debug("No mutator found for URL: {}, format: {}, returning original data", url, format); + return data; + } + + CompletableFuture existingTask = processingTasks.get(url); + if (existingTask != null) { + try { + logger.debug("Waiting for existing preprocessing task for URL: {}", url); + return existingTask.get(30, TimeUnit.SECONDS); + } catch (TimeoutException e) { + processingTasks.remove(url); + throw new RuntimeException("Preprocessing timeout for: " + url); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for preprocessing", e); + } catch (ExecutionException e) { + processingTasks.remove(url); + throw new RuntimeException("Preprocessing failed", e); + } + } + + if (taskQueue.size() >= maxQueueSize) { + logger.warn("Preprocess queue is full (size: {}), rejecting URL: {}", taskQueue.size(), url); + throw new RuntimeException("Preprocess queue is full. Try again later."); + } + + CompletableFuture future = new CompletableFuture<>(); + PreprocessTask task = new PreprocessTask(url, format, data, future, mutator); + + CompletableFuture existing = processingTasks.putIfAbsent(url, future); + if (existing != null) { + try { + return existing.get(); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to wait for preprocessing", e); + } + } + + try { + if (!taskQueue.offer(task, 5, TimeUnit.SECONDS)) { + processingTasks.remove(url); + throw new RuntimeException("Failed to add task to queue (timeout)"); + } + + logger.debug("Added task to queue for URL: {}, mutator: {}", url, mutator.getClass().getSimpleName()); + + var result = future.get(60, TimeUnit.SECONDS); + logger.info("returning task result..."); + return result; + } catch (TimeoutException e) { + processingTasks.remove(url); + throw new RuntimeException("Preprocessing timeout for: " + url); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + processingTasks.remove(url); + throw new RuntimeException("Preprocessing interrupted", e); + } catch (ExecutionException e) { + processingTasks.remove(url); + throw new RuntimeException("Preprocessing failed: " + e.getMessage(), e); + } + } + + /** + * Find the appropriate mutator for this task + */ + private SegmentMutator findMutator(String url, String format, byte[] data) { + return mutators.stream() + .filter(m -> { + try { + return m.supports(url, format, data); + } catch (Exception e) { + logger.error("Error checking support for mutator: {}", m.getClass().getSimpleName(), e); + return false; + } + }) + .findFirst() + .orElse(null); + } + + private void startQueueProcessor() { + Thread processorThread = new Thread(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + PreprocessTask task = taskQueue.take(); + + processingExecutor.submit(() -> { + String taskUrl = task.url; + String taskFormat = task.format; + + try { + logger.debug("Processing task for URL: {} with mutator: {}", + taskUrl, task.mutator.getClass().getSimpleName()); + byte[] result = task.mutator.mutate(taskUrl, task.format, task.data); + + task.future.complete(result); + updateCache(taskUrl, result); + logger.debug("Successfully processed URL: {}", taskUrl); + } catch (Exception e) { + logger.error("Failed to process task for URL: {}", taskUrl, e); + task.future.completeExceptionally(e); + } finally { + processingTasks.remove(taskUrl); + } + }); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.info("Queue processor interrupted"); + break; + } + } + }); + + processorThread.setName("preprocess-queue-processor"); + processorThread.setDaemon(true); + processorThread.start(); + + logger.info("Preprocess queue processor started"); + } + + private void updateCache(String url, byte[] result) { + try { + Cache cache = cacheManager.getCache("preprocessedVideo"); + if (cache != null && result != null) { + cache.put(url, result); + logger.debug("Updated cache for URL: {}", url); + } + } catch (Exception e) { + logger.error("Failed to update cache for URL: {}", url, e); + } + } + + /** + * Inner class representing a preprocessing task + */ + private static class PreprocessTask { + final String url; + final String format; + final byte[] data; + final CompletableFuture future; + final SegmentMutator mutator; + + PreprocessTask(String url, String format, byte[] data, + CompletableFuture future, SegmentMutator mutator) { + this.url = url; + this.format = format; + this.data = data; + this.future = future; + this.mutator = mutator; + } + } + + /** + * Get queue statistics + */ + public PreprocessStats getStats() { + return new PreprocessStats( + taskQueue.size(), + processingTasks.size(), + maxQueueSize, + mutators.stream() + .map(m -> m.getClass().getSimpleName()) + .collect(Collectors.toList())); + } + + /** + * Statistics record + */ + public record PreprocessStats( + int queueSize, + int processingCount, + int maxQueueSize, + List availableMutators) { + } +} diff --git a/src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessService.java b/src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessService.java new file mode 100644 index 0000000..096d8f9 --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/service/preprocess/PreprocessService.java @@ -0,0 +1,5 @@ +package com.backend.hls.proxy.service.preprocess; + +public interface PreprocessService { + byte[] preprocess(String url, byte[] data); +} diff --git a/src/main/java/com/backend/hls/proxy/util/PipeUtil.java b/src/main/java/com/backend/hls/proxy/util/PipeUtil.java index 4339192..522c381 100644 --- a/src/main/java/com/backend/hls/proxy/util/PipeUtil.java +++ b/src/main/java/com/backend/hls/proxy/util/PipeUtil.java @@ -117,6 +117,7 @@ public class PipeUtil { ProcessBuilder pb = new ProcessBuilder(); pb.command().add(ffmpegPath); + pb.command().add("-report"); pb.command().add("-f"); pb.command().add(inputFormat); pb.command().add("-i");