Compare commits
4 Commits
main
...
feature/pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
47fe15177f
|
|||
|
0230cae852
|
|||
|
a89b00d2b5
|
|||
| ea3eb33e8d |
@@ -17,8 +17,7 @@ public class CacheConfig {
|
|||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public CacheManager cacheManager() {
|
public CacheManager cacheManager() {
|
||||||
CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent",
|
CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent");
|
||||||
"preprocessedVideo");
|
|
||||||
cacheManager.setCaffeine(Caffeine.newBuilder()
|
cacheManager.setCaffeine(Caffeine.newBuilder()
|
||||||
.expireAfterAccess(1, TimeUnit.HOURS)
|
.expireAfterAccess(1, TimeUnit.HOURS)
|
||||||
.weigher((Object key, Object value) -> {
|
.weigher((Object key, Object value) -> {
|
||||||
|
|||||||
@@ -1,19 +1,16 @@
|
|||||||
package com.backend.hls.proxy.config;
|
package com.backend.hls.proxy.config;
|
||||||
|
|
||||||
import org.springframework.cache.CacheManager;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
import com.backend.hls.proxy.service.mutator.AV1ConverterMutatorService;
|
import com.backend.hls.proxy.service.NoopPreprocessService;
|
||||||
import com.backend.hls.proxy.service.preprocess.NoopPreprocessService;
|
import com.backend.hls.proxy.service.PreprocessService;
|
||||||
import com.backend.hls.proxy.service.preprocess.PreprocessService;
|
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
public class ProcessConfig {
|
public class ProcessConfig {
|
||||||
@Bean
|
@Bean
|
||||||
public PreprocessService preprocessService(CacheManager cacheManager, AV1ConverterMutatorService av1Converter) {
|
public PreprocessService preprocessService() {
|
||||||
return new NoopPreprocessService();
|
return new NoopPreprocessService();
|
||||||
// return new PreprocessQueueService(cacheManager, List.of(av1Converter));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import java.util.Optional;
|
|||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.cache.annotation.Cacheable;
|
||||||
|
import org.springframework.cache.annotation.Caching;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
@@ -21,7 +23,7 @@ import com.backend.hls.proxy.model.RangeRequest;
|
|||||||
import com.backend.hls.proxy.model.SimpleResponse;
|
import com.backend.hls.proxy.model.SimpleResponse;
|
||||||
import com.backend.hls.proxy.repository.LinkRepository;
|
import com.backend.hls.proxy.repository.LinkRepository;
|
||||||
import com.backend.hls.proxy.service.FetchService;
|
import com.backend.hls.proxy.service.FetchService;
|
||||||
import com.backend.hls.proxy.service.preprocess.PreprocessService;
|
import com.backend.hls.proxy.service.PreprocessService;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
public class ProxyServeController {
|
public class ProxyServeController {
|
||||||
@@ -80,8 +82,7 @@ public class ProxyServeController {
|
|||||||
headers.addAll(key, values);
|
headers.addAll(key, values);
|
||||||
});
|
});
|
||||||
|
|
||||||
var preprocessedData = preprocessService.preprocess(url, response.getBody());
|
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, HttpStatus.OK);
|
||||||
return new ResponseEntity<>(preprocessedData, headers, HttpStatus.OK);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader)
|
private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader)
|
||||||
@@ -116,7 +117,7 @@ public class ProxyServeController {
|
|||||||
headers.add("Accept-Ranges", "bytes");
|
headers.add("Accept-Ranges", "bytes");
|
||||||
headers.setContentLength(range.getLength());
|
headers.setContentLength(range.getLength());
|
||||||
|
|
||||||
return new ResponseEntity<>(preprocessService.preprocess(url, response.getBody()), headers,
|
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers,
|
||||||
HttpStatus.PARTIAL_CONTENT);
|
HttpStatus.PARTIAL_CONTENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,5 +126,4 @@ public class ProxyServeController {
|
|||||||
headers.setLocation(URI.create(target));
|
headers.setLocation(URI.create(target));
|
||||||
return new ResponseEntity<>(headers, HttpStatus.MOVED_PERMANENTLY);
|
return new ResponseEntity<>(headers, HttpStatus.MOVED_PERMANENTLY);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,21 +0,0 @@
|
|||||||
package com.backend.hls.proxy.exception;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class CompositeException extends Exception {
|
|
||||||
private final List<Exception> exceptions;
|
|
||||||
|
|
||||||
public CompositeException(Exception... exceptions) {
|
|
||||||
super("Multiple exceptions occurred: " +
|
|
||||||
Arrays.stream(exceptions)
|
|
||||||
.map(Exception::getMessage)
|
|
||||||
.collect(Collectors.joining("; ")));
|
|
||||||
this.exceptions = Arrays.asList(exceptions);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Exception> getExceptions() {
|
|
||||||
return exceptions;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.backend.hls.proxy.service.preprocess;
|
package com.backend.hls.proxy.service;
|
||||||
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@@ -6,7 +6,7 @@ import org.springframework.stereotype.Service;
|
|||||||
public class NoopPreprocessService implements PreprocessService {
|
public class NoopPreprocessService implements PreprocessService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] preprocess(String url, byte[] data) {
|
public byte[] preprocess(byte[] data) {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
package com.backend.hls.proxy.service;
|
package com.backend.hls.proxy.service;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import com.backend.hls.proxy.exception.FetchFailException;
|
import com.backend.hls.proxy.exception.FetchFailException;
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import com.backend.hls.proxy.exception.CompositeException;
|
|
||||||
import com.backend.hls.proxy.exception.FetchFailException;
|
import com.backend.hls.proxy.exception.FetchFailException;
|
||||||
import com.backend.hls.proxy.exception.PlaylistParseException;
|
import com.backend.hls.proxy.exception.PlaylistParseException;
|
||||||
|
|
||||||
@@ -38,9 +37,6 @@ public class PlaylistProxyService {
|
|||||||
String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1);
|
String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1);
|
||||||
String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1);
|
String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1);
|
||||||
String url = uriResolveService.resolve(suffix, base);
|
String url = uriResolveService.resolve(suffix, base);
|
||||||
|
|
||||||
PlaylistParseException multivariantException = null;
|
|
||||||
PlaylistParseException mediaException = null;
|
|
||||||
try {
|
try {
|
||||||
MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url);
|
MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url);
|
||||||
|
|
||||||
@@ -60,7 +56,6 @@ public class PlaylistProxyService {
|
|||||||
.build();
|
.build();
|
||||||
return playlistParseService.writeAsString(proxiedPlaylist);
|
return playlistParseService.writeAsString(proxiedPlaylist);
|
||||||
} catch (PlaylistParseException e) {
|
} catch (PlaylistParseException e) {
|
||||||
multivariantException = e;
|
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
MediaPlaylist playlist = playlistParseService.readMediaPlaylist(url);
|
MediaPlaylist playlist = playlistParseService.readMediaPlaylist(url);
|
||||||
@@ -83,10 +78,9 @@ public class PlaylistProxyService {
|
|||||||
|
|
||||||
return playlistParseService.writeAsString(proxiedPlaylist);
|
return playlistParseService.writeAsString(proxiedPlaylist);
|
||||||
} catch (PlaylistParseException e) {
|
} catch (PlaylistParseException e) {
|
||||||
mediaException = e;
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
throw new IllegalStateException("Unknown playlist format",
|
throw new IllegalStateException("Unknown playlist format");
|
||||||
new CompositeException(multivariantException, mediaException));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,5 @@
|
|||||||
|
package com.backend.hls.proxy.service;
|
||||||
|
|
||||||
|
public interface PreprocessService {
|
||||||
|
byte[] preprocess(byte[] data);
|
||||||
|
}
|
||||||
@@ -0,0 +1,67 @@
|
|||||||
|
package com.backend.hls.proxy.service;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import com.backend.hls.proxy.util.PipeUtil;
|
||||||
|
|
||||||
|
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class RandomEffectPreprocessService implements PreprocessService {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(PreprocessService.class);
|
||||||
|
|
||||||
|
public byte[] preprocess(byte[] data) {
|
||||||
|
try {
|
||||||
|
String format = findFormat(data);
|
||||||
|
logger.info("format is {}", format);
|
||||||
|
return randomEffectsHLS(data, format, "/usr/bin/ffmpeg");
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte[] randomEffectsHLS(byte[] data, String inputFormat, String ffmpegPath) throws IOException {
|
||||||
|
try (InputStream inputStream = new ByteArrayInputStream(data)) {
|
||||||
|
String[] effects = {
|
||||||
|
"hue=s=10", // Color shift
|
||||||
|
"edgedetect=mode=colormix", // Edge detection
|
||||||
|
"boxblur=10:1", // Heavy blur
|
||||||
|
"noise=alls=20:allf=t", // Film grain noise
|
||||||
|
"colorchannelmixer=.3:.4:.3:0:.3:.4:.3:0:.3:.4:.3", // Vintage
|
||||||
|
"rotate=0.1*c", // Slight rotation
|
||||||
|
"scale=iw/2:ih/2" // Pixelate
|
||||||
|
};
|
||||||
|
|
||||||
|
Random random = new Random();
|
||||||
|
String randomEffect = effects[random.nextInt(effects.length)];
|
||||||
|
logger.info("applied effect {}", randomEffect);
|
||||||
|
|
||||||
|
String[] ffmpegArgs = {
|
||||||
|
"-vf", randomEffect,
|
||||||
|
"-f", inputFormat,
|
||||||
|
};
|
||||||
|
|
||||||
|
return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String findFormat(byte[] data) throws IOException {
|
||||||
|
FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data));
|
||||||
|
logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList()));
|
||||||
|
if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) {
|
||||||
|
throw new IOException("No video stream found");
|
||||||
|
}
|
||||||
|
return result.format.format_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,66 +0,0 @@
|
|||||||
package com.backend.hls.proxy.service.mutator;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import com.backend.hls.proxy.util.PipeUtil;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class AV1ConverterMutatorService implements SegmentMutator {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(AV1ConverterMutatorService.class);
|
|
||||||
private static final Set<String> SUPPORTED_INPUT_FORMATS = Set.of(
|
|
||||||
"h264", "avc", "h265", "hevc", "vp8", "vp9", "mpeg2", "mpeg4", "mpegts");
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] mutate(String url, String format, byte[] data) {
|
|
||||||
try {
|
|
||||||
logger.info("format is {}", format);
|
|
||||||
return convert(data, format, "/usr/bin/ffmpeg");
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean supports(String url, String format, byte[] data) {
|
|
||||||
if (format != null && SUPPORTED_INPUT_FORMATS.contains(format.toLowerCase())) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return url != null && (url.endsWith(".ts") ||
|
|
||||||
url.endsWith(".m4s") ||
|
|
||||||
url.endsWith(".mp4") ||
|
|
||||||
url.endsWith(".mkv") ||
|
|
||||||
url.contains("/video/"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getPriority() {
|
|
||||||
return 100;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static byte[] convert(byte[] data, String inputFormat, String ffmpegPath) throws IOException {
|
|
||||||
try (InputStream inputStream = new ByteArrayInputStream(data)) {
|
|
||||||
String[] ffmpegArgs = {
|
|
||||||
"-c:v", "libaom-av1", // AV1 codec
|
|
||||||
"-crf", "45", // Quality (0-63, lower is better)
|
|
||||||
"-strict", "experimental", // Required for AV1
|
|
||||||
"-cpu-used", "8", // Encoding speed (0-8, higher is faster but lower quality)
|
|
||||||
"-row-mt", "1",
|
|
||||||
"-f", "mp4", // Still MP4 container
|
|
||||||
"-movflags", "+frag_keyframe+empty_moov", // Enable fragmentation
|
|
||||||
"-fflags", "+genpts" // Generate PTS if missing
|
|
||||||
};
|
|
||||||
|
|
||||||
return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
package com.backend.hls.proxy.service.mutator;
|
|
||||||
|
|
||||||
public interface SegmentMutator {
|
|
||||||
/**
|
|
||||||
* Mutated video data
|
|
||||||
*
|
|
||||||
* @param url The source URL
|
|
||||||
* @param format ffprobe format
|
|
||||||
* @param data The raw video data
|
|
||||||
* @return Mutated video data
|
|
||||||
*/
|
|
||||||
byte[] mutate(String url, String format, byte[] data);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if this mutator can handle the given URL/data
|
|
||||||
*/
|
|
||||||
boolean supports(String url, String format, byte[] data);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the priority of this mutator (higher priority first)
|
|
||||||
*/
|
|
||||||
default int getPriority() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,271 +0,0 @@
|
|||||||
package com.backend.hls.proxy.service.preprocess;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.cache.Cache;
|
|
||||||
import org.springframework.cache.CacheManager;
|
|
||||||
import org.springframework.cache.annotation.Cacheable;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import com.backend.hls.proxy.service.mutator.SegmentMutator;
|
|
||||||
import com.backend.hls.proxy.util.PipeUtil;
|
|
||||||
|
|
||||||
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class PreprocessQueueService implements PreprocessService {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(PreprocessQueueService.class);
|
|
||||||
private final ExecutorService processingExecutor;
|
|
||||||
private final ConcurrentHashMap<String, CompletableFuture<byte[]>> processingTasks;
|
|
||||||
private final CacheManager cacheManager;
|
|
||||||
private final BlockingQueue<PreprocessTask> taskQueue;
|
|
||||||
private final int maxQueueSize;
|
|
||||||
|
|
||||||
private final List<SegmentMutator> mutators;
|
|
||||||
|
|
||||||
public PreprocessQueueService(
|
|
||||||
CacheManager cacheManager,
|
|
||||||
List<SegmentMutator> mutators) {
|
|
||||||
this.cacheManager = cacheManager;
|
|
||||||
this.mutators = mutators.stream()
|
|
||||||
.sorted(Comparator.comparingInt(SegmentMutator::getPriority).reversed())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
this.processingExecutor = Executors.newFixedThreadPool(5);
|
|
||||||
this.processingTasks = new ConcurrentHashMap<>();
|
|
||||||
this.taskQueue = new LinkedBlockingQueue<>(100);
|
|
||||||
this.maxQueueSize = 100;
|
|
||||||
|
|
||||||
startQueueProcessor();
|
|
||||||
|
|
||||||
logger.info("Initialized PreprocessQueueService with {} mutators: {}",
|
|
||||||
mutators.size(),
|
|
||||||
mutators.stream().map(m -> m.getClass().getSimpleName()).collect(Collectors.toList()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Main preprocessing method with queue support
|
|
||||||
*/
|
|
||||||
@Cacheable(value = "preprocessedVideo", key = "#url", unless = "#result == null")
|
|
||||||
@Override
|
|
||||||
public byte[] preprocess(String url, byte[] data) {
|
|
||||||
String format;
|
|
||||||
try {
|
|
||||||
format = findFormat(data);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
return processWithQueue(url, format, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
private String findFormat(byte[] data) throws IOException {
|
|
||||||
FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data));
|
|
||||||
logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList()));
|
|
||||||
if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) {
|
|
||||||
throw new IOException("No video stream found");
|
|
||||||
}
|
|
||||||
return result.format.format_name;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Overloaded method that accepts format information
|
|
||||||
*/
|
|
||||||
public byte[] preprocess(String url, String format, byte[] data) {
|
|
||||||
return processWithQueue(url, format, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] processWithQueue(String url, String format, byte[] data) {
|
|
||||||
logger.info("Processing segment URL: {}, format: {}", url, format);
|
|
||||||
SegmentMutator mutator = findMutator(url, format, data);
|
|
||||||
if (mutator == null) {
|
|
||||||
logger.debug("No mutator found for URL: {}, format: {}, returning original data", url, format);
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
CompletableFuture<byte[]> existingTask = processingTasks.get(url);
|
|
||||||
if (existingTask != null) {
|
|
||||||
try {
|
|
||||||
logger.debug("Waiting for existing preprocessing task for URL: {}", url);
|
|
||||||
return existingTask.get(30, TimeUnit.SECONDS);
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
processingTasks.remove(url);
|
|
||||||
throw new RuntimeException("Preprocessing timeout for: " + url);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("Interrupted while waiting for preprocessing", e);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
processingTasks.remove(url);
|
|
||||||
throw new RuntimeException("Preprocessing failed", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taskQueue.size() >= maxQueueSize) {
|
|
||||||
logger.warn("Preprocess queue is full (size: {}), rejecting URL: {}", taskQueue.size(), url);
|
|
||||||
throw new RuntimeException("Preprocess queue is full. Try again later.");
|
|
||||||
}
|
|
||||||
|
|
||||||
CompletableFuture<byte[]> future = new CompletableFuture<>();
|
|
||||||
PreprocessTask task = new PreprocessTask(url, format, data, future, mutator);
|
|
||||||
|
|
||||||
CompletableFuture<byte[]> existing = processingTasks.putIfAbsent(url, future);
|
|
||||||
if (existing != null) {
|
|
||||||
try {
|
|
||||||
return existing.get();
|
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("Failed to wait for preprocessing", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (!taskQueue.offer(task, 5, TimeUnit.SECONDS)) {
|
|
||||||
processingTasks.remove(url);
|
|
||||||
throw new RuntimeException("Failed to add task to queue (timeout)");
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug("Added task to queue for URL: {}, mutator: {}", url, mutator.getClass().getSimpleName());
|
|
||||||
|
|
||||||
var result = future.get(60, TimeUnit.SECONDS);
|
|
||||||
logger.info("returning task result...");
|
|
||||||
return result;
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
processingTasks.remove(url);
|
|
||||||
throw new RuntimeException("Preprocessing timeout for: " + url);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
processingTasks.remove(url);
|
|
||||||
throw new RuntimeException("Preprocessing interrupted", e);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
processingTasks.remove(url);
|
|
||||||
throw new RuntimeException("Preprocessing failed: " + e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Find the appropriate mutator for this task
|
|
||||||
*/
|
|
||||||
private SegmentMutator findMutator(String url, String format, byte[] data) {
|
|
||||||
return mutators.stream()
|
|
||||||
.filter(m -> {
|
|
||||||
try {
|
|
||||||
return m.supports(url, format, data);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Error checking support for mutator: {}", m.getClass().getSimpleName(), e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.findFirst()
|
|
||||||
.orElse(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startQueueProcessor() {
|
|
||||||
Thread processorThread = new Thread(() -> {
|
|
||||||
while (!Thread.currentThread().isInterrupted()) {
|
|
||||||
try {
|
|
||||||
PreprocessTask task = taskQueue.take();
|
|
||||||
|
|
||||||
processingExecutor.submit(() -> {
|
|
||||||
String taskUrl = task.url;
|
|
||||||
String taskFormat = task.format;
|
|
||||||
|
|
||||||
try {
|
|
||||||
logger.debug("Processing task for URL: {} with mutator: {}",
|
|
||||||
taskUrl, task.mutator.getClass().getSimpleName());
|
|
||||||
byte[] result = task.mutator.mutate(taskUrl, task.format, task.data);
|
|
||||||
|
|
||||||
task.future.complete(result);
|
|
||||||
updateCache(taskUrl, result);
|
|
||||||
logger.debug("Successfully processed URL: {}", taskUrl);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Failed to process task for URL: {}", taskUrl, e);
|
|
||||||
task.future.completeExceptionally(e);
|
|
||||||
} finally {
|
|
||||||
processingTasks.remove(taskUrl);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
logger.info("Queue processor interrupted");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
processorThread.setName("preprocess-queue-processor");
|
|
||||||
processorThread.setDaemon(true);
|
|
||||||
processorThread.start();
|
|
||||||
|
|
||||||
logger.info("Preprocess queue processor started");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateCache(String url, byte[] result) {
|
|
||||||
try {
|
|
||||||
Cache cache = cacheManager.getCache("preprocessedVideo");
|
|
||||||
if (cache != null && result != null) {
|
|
||||||
cache.put(url, result);
|
|
||||||
logger.debug("Updated cache for URL: {}", url);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Failed to update cache for URL: {}", url, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Inner class representing a preprocessing task
|
|
||||||
*/
|
|
||||||
private static class PreprocessTask {
|
|
||||||
final String url;
|
|
||||||
final String format;
|
|
||||||
final byte[] data;
|
|
||||||
final CompletableFuture<byte[]> future;
|
|
||||||
final SegmentMutator mutator;
|
|
||||||
|
|
||||||
PreprocessTask(String url, String format, byte[] data,
|
|
||||||
CompletableFuture<byte[]> future, SegmentMutator mutator) {
|
|
||||||
this.url = url;
|
|
||||||
this.format = format;
|
|
||||||
this.data = data;
|
|
||||||
this.future = future;
|
|
||||||
this.mutator = mutator;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get queue statistics
|
|
||||||
*/
|
|
||||||
public PreprocessStats getStats() {
|
|
||||||
return new PreprocessStats(
|
|
||||||
taskQueue.size(),
|
|
||||||
processingTasks.size(),
|
|
||||||
maxQueueSize,
|
|
||||||
mutators.stream()
|
|
||||||
.map(m -> m.getClass().getSimpleName())
|
|
||||||
.collect(Collectors.toList()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Statistics record
|
|
||||||
*/
|
|
||||||
public record PreprocessStats(
|
|
||||||
int queueSize,
|
|
||||||
int processingCount,
|
|
||||||
int maxQueueSize,
|
|
||||||
List<String> availableMutators) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,5 +0,0 @@
|
|||||||
package com.backend.hls.proxy.service.preprocess;
|
|
||||||
|
|
||||||
public interface PreprocessService {
|
|
||||||
byte[] preprocess(String url, byte[] data);
|
|
||||||
}
|
|
||||||
@@ -117,7 +117,6 @@ public class PipeUtil {
|
|||||||
|
|
||||||
ProcessBuilder pb = new ProcessBuilder();
|
ProcessBuilder pb = new ProcessBuilder();
|
||||||
pb.command().add(ffmpegPath);
|
pb.command().add(ffmpegPath);
|
||||||
pb.command().add("-report");
|
|
||||||
pb.command().add("-f");
|
pb.command().add("-f");
|
||||||
pb.command().add(inputFormat);
|
pb.command().add(inputFormat);
|
||||||
pb.command().add("-i");
|
pb.command().add("-i");
|
||||||
|
|||||||
Reference in New Issue
Block a user