Apply preprocessing into HLS stream

This commit is contained in:
2025-10-22 01:14:05 +05:00
parent 7c85e05e02
commit ea3eb33e8d
14 changed files with 505 additions and 40 deletions

View File

@ -33,7 +33,9 @@ RUN groupadd --gid 1000 spring-app \
&& useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app && useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \ apt-get install -y --no-install-recommends \
ca-certificates \
ffmpeg && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/*
USER spring-app:spring-app USER spring-app:spring-app

View File

@ -90,6 +90,12 @@
<version>${spring-context-support.version}</version> <version>${spring-context-support.version}</version>
</dependency> </dependency>
<dependency>
<groupId>net.bramp.ffmpeg</groupId>
<artifactId>ffmpeg</artifactId>
<version>0.8.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>

View File

@ -2,7 +2,9 @@ package com.backend.hls.proxy;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@EnableDiscoveryClient
@SpringBootApplication @SpringBootApplication
public class HlsProxyApplication { public class HlsProxyApplication {

View File

@ -1,30 +1,49 @@
package com.backend.hls.proxy.controller; package com.backend.hls.proxy.controller;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder; import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
import com.backend.hls.proxy.dto.CreateProxyDTO;
import com.backend.hls.proxy.exception.FetchFailException; import com.backend.hls.proxy.exception.FetchFailException;
import com.backend.hls.proxy.exception.PlaylistParseException; import com.backend.hls.proxy.exception.PlaylistParseException;
import com.backend.hls.proxy.service.PlaylistProxyService; import com.backend.hls.proxy.service.PlaylistProxyService;
import com.backend.hls.proxy.service.URLForwardService;
@RestController @RestController
public class ProxyController { public class ProxyController {
private static final Logger logger = LoggerFactory.getLogger(ProxyController.class);
private final PlaylistProxyService playlistProxyService; private final PlaylistProxyService playlistProxyService;
private final URLForwardService forwardService;
public ProxyController(PlaylistProxyService playlistProxyService) { public ProxyController(PlaylistProxyService playlistProxyService, URLForwardService forwardService) {
this.playlistProxyService = playlistProxyService; this.playlistProxyService = playlistProxyService;
this.forwardService = forwardService;
} }
@GetMapping("/proxy") @GetMapping("/proxy")
public ResponseEntity<?> proxy(@RequestParam("url") String url) throws FetchFailException, PlaylistParseException { public ResponseEntity<?> proxyPlaylist(@RequestParam("url") String url)
throws FetchFailException, PlaylistParseException {
logger.info("Proxying playlist: {}", url);
String fullUrl = ServletUriComponentsBuilder.fromCurrentRequestUri().build().toUriString(); String fullUrl = ServletUriComponentsBuilder.fromCurrentRequestUri().build().toUriString();
String baseUrl = fullUrl.substring(0, fullUrl.indexOf("/", 8)); String baseUrl = fullUrl.substring(0, fullUrl.indexOf("/", 8));
logger.info("Full URL: {}, base URL: {}", fullUrl, baseUrl);
String result = playlistProxyService.proxyPlaylist(url, baseUrl); String result = playlistProxyService.proxyPlaylist(url, baseUrl);
return ResponseEntity.ok(result); return ResponseEntity.ok(result);
} }
@PostMapping("/proxy")
public ResponseEntity<?> createProxy(@RequestBody CreateProxyDTO url) {
String location = forwardService.createForwarded(url.getUrl());
return ResponseEntity.created(URI.create(location)).build();
}
} }

View File

@ -4,6 +4,8 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Optional; import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@ -19,18 +21,23 @@ import com.backend.hls.proxy.model.RangeRequest;
import com.backend.hls.proxy.model.SimpleResponse; import com.backend.hls.proxy.model.SimpleResponse;
import com.backend.hls.proxy.repository.LinkRepository; import com.backend.hls.proxy.repository.LinkRepository;
import com.backend.hls.proxy.service.FetchService; import com.backend.hls.proxy.service.FetchService;
import com.backend.hls.proxy.service.PreprocessService;
@RestController @RestController
public class ProxyServeController { public class ProxyServeController {
private static final Logger logger = LoggerFactory.getLogger(ProxyServeController.class);
private final LinkRepository linkRepository; private final LinkRepository linkRepository;
private final ProxyController proxyController; private final ProxyController proxyController;
private final FetchService fetchService; private final FetchService fetchService;
private final PreprocessService preprocessService;
public ProxyServeController(LinkRepository linkRepository, ProxyController proxyController, public ProxyServeController(LinkRepository linkRepository, ProxyController proxyController,
FetchService fetchService) { FetchService fetchService, PreprocessService preprocessService) {
this.linkRepository = linkRepository; this.linkRepository = linkRepository;
this.proxyController = proxyController; this.proxyController = proxyController;
this.fetchService = fetchService; this.fetchService = fetchService;
this.preprocessService = preprocessService;
} }
@GetMapping("/proxy/{id}") @GetMapping("/proxy/{id}")
@ -40,13 +47,14 @@ public class ProxyServeController {
if (id.contains(".")) if (id.contains("."))
id = id.substring(0, id.lastIndexOf(".")); id = id.substring(0, id.lastIndexOf("."));
Optional<Link> link = linkRepository.findById(id); Optional<Link> link = linkRepository.findById(id);
logger.info("id {}, link is {}", id, link.map(l -> l.getUrl()).orElse("<null>"));
if (link.isEmpty()) { if (link.isEmpty()) {
return ResponseEntity.notFound().build(); return ResponseEntity.notFound().build();
} }
String url = link.get().getUrl(); String url = link.get().getUrl();
if (url.contains(".m3u8")) { if (url.contains(".m3u8")) {
try { try {
return proxyController.proxy(url); return proxyController.proxyPlaylist(url);
} catch (FetchFailException | PlaylistParseException e) { } catch (FetchFailException | PlaylistParseException e) {
e.printStackTrace(); e.printStackTrace();
return redirect(url); return redirect(url);
@ -71,7 +79,8 @@ public class ProxyServeController {
response.getHeaders().map().forEach((key, values) -> { response.getHeaders().map().forEach((key, values) -> {
headers.addAll(key, values); headers.addAll(key, values);
}); });
return new ResponseEntity<>(response.getBody(), headers, HttpStatus.OK);
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, HttpStatus.OK);
} }
private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader) private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader)
@ -107,7 +116,8 @@ public class ProxyServeController {
headers.add("Accept-Ranges", "bytes"); headers.add("Accept-Ranges", "bytes");
headers.setContentLength(range.getLength()); headers.setContentLength(range.getLength());
return new ResponseEntity<>(response.getBody(), headers, HttpStatus.PARTIAL_CONTENT); return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers,
HttpStatus.PARTIAL_CONTENT);
} }
private ResponseEntity<?> redirect(String target) { private ResponseEntity<?> redirect(String target) {

View File

@ -0,0 +1,19 @@
package com.backend.hls.proxy.dto;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
public class CreateProxyDTO {
@NotNull
@NotEmpty
private final String url;
public CreateProxyDTO(String url) {
this.url = url;
}
public String getUrl() {
return url;
}
}

View File

@ -3,13 +3,17 @@ package com.backend.hls.proxy.exception;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
public class FetchFailException extends Exception { public class FetchFailException extends Exception {
private final HttpResponse<?> response; private HttpResponse<?> response;
public FetchFailException(String message, HttpResponse<?> response) { public FetchFailException(String message, HttpResponse<?> response) {
super(message); super(message);
this.response = response; this.response = response;
} }
public FetchFailException(String message, Exception e) {
super(message, e);
}
public HttpResponse<?> getResponse() { public HttpResponse<?> getResponse() {
return response; return response;
} }

View File

@ -27,19 +27,22 @@ public class FetchService {
* @throws FetchFailException * @throws FetchFailException
*/ */
@Cacheable("hlsPlaylistContent") @Cacheable("hlsPlaylistContent")
public String fetchTextContent(String url) throws IOException, InterruptedException, FetchFailException { public String fetchTextContent(String url) throws FetchFailException {
HttpRequest request = HttpRequest.newBuilder() HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url)) .uri(URI.create(url))
.build(); .build();
try {
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) { if (response.statusCode() != 200) {
throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(), throw new FetchFailException(
"Failed to fetch content from " + url + ", status: " + response.statusCode(),
response); response);
} }
return response.body(); return response.body();
} catch (IOException | InterruptedException e) {
throw new FetchFailException("Failed to fetch content from " + url, e);
}
} }
/** /**
@ -48,8 +51,7 @@ public class FetchService {
* @throws FetchFailException * @throws FetchFailException
*/ */
@Cacheable("playlistSegmentContent") @Cacheable("playlistSegmentContent")
public SimpleResponse<byte[]> fetchBinaryContent(String url) public SimpleResponse<byte[]> fetchBinaryContent(String url) throws FetchFailException {
throws IOException, InterruptedException, FetchFailException {
return fetchBinaryContent(url, null); return fetchBinaryContent(url, null);
} }
@ -60,7 +62,7 @@ public class FetchService {
*/ */
@Cacheable("playlistSegmentContent") @Cacheable("playlistSegmentContent")
public SimpleResponse<byte[]> fetchBinaryContent(String url, String rangeHeader) public SimpleResponse<byte[]> fetchBinaryContent(String url, String rangeHeader)
throws IOException, InterruptedException, FetchFailException { throws FetchFailException {
HttpRequest.Builder builder = HttpRequest.newBuilder() HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(url)); .uri(URI.create(url));
@ -68,14 +70,20 @@ public class FetchService {
builder.header("Range", rangeHeader); builder.header("Range", rangeHeader);
} }
try {
HttpResponse<byte[]> response = httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofByteArray()); HttpResponse<byte[]> response = httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofByteArray());
if (response.statusCode() >= 400) { if (response.statusCode() >= 400) {
throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(), throw new FetchFailException(
"Failed to fetch content from " + url + ", status: " + response.statusCode(),
response); response);
} }
return new SimpleResponse<byte[]>(response.body(), response.headers()); return new SimpleResponse<byte[]>(response.body(), response.headers());
} catch (IOException | InterruptedException e) {
throw new FetchFailException("Failed to fetch content from " + url, e);
}
} }
/** /**

View File

@ -11,6 +11,7 @@ import io.lindstrom.m3u8.model.MediaPlaylist;
import io.lindstrom.m3u8.model.MultivariantPlaylist; import io.lindstrom.m3u8.model.MultivariantPlaylist;
import io.lindstrom.m3u8.parser.MediaPlaylistParser; import io.lindstrom.m3u8.parser.MediaPlaylistParser;
import io.lindstrom.m3u8.parser.MultivariantPlaylistParser; import io.lindstrom.m3u8.parser.MultivariantPlaylistParser;
import io.lindstrom.m3u8.parser.PlaylistParserException;
@Service @Service
public class PlaylistParseService { public class PlaylistParseService {
@ -29,16 +30,16 @@ public class PlaylistParseService {
throws PlaylistParseException, FetchFailException { throws PlaylistParseException, FetchFailException {
try { try {
return playlistParser.readPlaylist(fetchService.fetchTextContent(m3u8URL)); return playlistParser.readPlaylist(fetchService.fetchTextContent(m3u8URL));
} catch (IOException | InterruptedException e) { } catch (PlaylistParserException e) {
throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e); throw new PlaylistParseException("Unable to parse playlist", e);
} }
} }
public MediaPlaylist readMediaPlaylist(String m3u8URL) throws PlaylistParseException, FetchFailException { public MediaPlaylist readMediaPlaylist(String m3u8URL) throws PlaylistParseException, FetchFailException {
try { try {
return mediaParser.readPlaylist(fetchService.fetchTextContent(m3u8URL)); return mediaParser.readPlaylist(fetchService.fetchTextContent(m3u8URL));
} catch (IOException | InterruptedException e) { } catch (PlaylistParserException e) {
throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e); throw new PlaylistParseException("Unable to parse playlist", e);
} }
} }

View File

@ -23,17 +23,20 @@ public class PlaylistProxyService {
private static final Logger logger = LoggerFactory.getLogger(PlaylistProxyService.class); private static final Logger logger = LoggerFactory.getLogger(PlaylistProxyService.class);
private final PlaylistParseService playlistParseService; private final PlaylistParseService playlistParseService;
private final URLForwardService urlForwardService; private final URLForwardService urlForwardService;
private final URIResolveService uriResolveService;
public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService) { public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService,
URIResolveService uriResolveService) {
this.playlistParseService = playlistParseService; this.playlistParseService = playlistParseService;
this.urlForwardService = urlForwardService; this.urlForwardService = urlForwardService;
this.uriResolveService = uriResolveService;
} }
public String proxyPlaylist(String hlsUrl, String proxyUrl) public String proxyPlaylist(String hlsUrl, String proxyUrl)
throws FetchFailException { throws FetchFailException {
String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1); String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1);
String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1); String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1);
String url = base + suffix; String url = uriResolveService.resolve(suffix, base);
try { try {
MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url); MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url);
@ -88,7 +91,7 @@ public class PlaylistProxyService {
logger.error("cannot proxy variant: {}", variant); logger.error("cannot proxy variant: {}", variant);
return variant; return variant;
} }
String variantUri = base + variant.uri(); String variantUri = uriResolveService.resolve(variant.uri(), base);
String proxiedUri = urlForwardService.createForwarded(variantUri); String proxiedUri = urlForwardService.createForwarded(variantUri);
return Variant.builder() return Variant.builder()
.from(variant) .from(variant)
@ -103,7 +106,7 @@ public class PlaylistProxyService {
if (rendition.uri().isEmpty()) { if (rendition.uri().isEmpty()) {
return rendition; return rendition;
} }
String renditionUri = base + rendition.uri().get(); String renditionUri = uriResolveService.resolve(rendition.uri().get(), base);
String proxiedUri = urlForwardService.createForwarded(renditionUri); String proxiedUri = urlForwardService.createForwarded(renditionUri);
return AlternativeRendition.builder() return AlternativeRendition.builder()
.from(rendition) .from(rendition)
@ -120,7 +123,7 @@ public class PlaylistProxyService {
return segment; return segment;
} }
String segmentUri = base + segment.uri(); String segmentUri = uriResolveService.resolve(segment.uri(), base);
String proxiedUri = urlForwardService.createForwarded(segmentUri); String proxiedUri = urlForwardService.createForwarded(segmentUri);
MediaSegment.Builder builder = MediaSegment.builder() MediaSegment.Builder builder = MediaSegment.builder()
@ -140,7 +143,7 @@ public class PlaylistProxyService {
logger.error("cannot proxy segment: {}", segment); logger.error("cannot proxy segment: {}", segment);
return segment; return segment;
} }
String segmentUri = base + segment.uri(); String segmentUri = uriResolveService.resolve(segment.uri(), base);
String proxiedUri = urlForwardService.createForwarded(segmentUri); String proxiedUri = urlForwardService.createForwarded(segmentUri);
return PartialSegment.builder() return PartialSegment.builder()
.from(segment) .from(segment)
@ -156,7 +159,7 @@ public class PlaylistProxyService {
logger.error("cannot proxy segment: {}", segmentMap); logger.error("cannot proxy segment: {}", segmentMap);
return segmentMap; return segmentMap;
} }
String segmentMapUri = base + segmentMap.uri(); String segmentMapUri = uriResolveService.resolve(segmentMap.uri(), base);
String proxiedUri = urlForwardService.createForwarded(segmentMapUri); String proxiedUri = urlForwardService.createForwarded(segmentMapUri);
return SegmentMap.builder() return SegmentMap.builder()
.from(segmentMap) .from(segmentMap)

View File

@ -0,0 +1,67 @@
package com.backend.hls.proxy.service;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.backend.hls.proxy.util.PipeUtil;
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
@Service
public class PreprocessService {
private static final Logger logger = LoggerFactory.getLogger(PreprocessService.class);
public byte[] preprocess(byte[] data) {
try {
String format = findFormat(data);
logger.info("format is {}", format);
return randomEffectsHLS(data, format, "/usr/bin/ffmpeg");
} catch (IOException e) {
e.printStackTrace();
return data;
}
}
public static byte[] randomEffectsHLS(byte[] data, String inputFormat, String ffmpegPath) throws IOException {
try (InputStream inputStream = new ByteArrayInputStream(data)) {
String[] effects = {
"hue=s=10", // Color shift
"edgedetect=mode=colormix", // Edge detection
"boxblur=10:1", // Heavy blur
"noise=alls=20:allf=t", // Film grain noise
"colorchannelmixer=.3:.4:.3:0:.3:.4:.3:0:.3:.4:.3", // Vintage
"rotate=0.1*c", // Slight rotation
"scale=iw/2:ih/2" // Pixelate
};
Random random = new Random();
String randomEffect = effects[random.nextInt(effects.length)];
logger.info("applied effect {}", randomEffect);
String[] ffmpegArgs = {
"-vf", randomEffect,
"-f", inputFormat,
};
return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs);
}
}
private String findFormat(byte[] data) throws IOException {
FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data));
logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList()));
if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) {
throw new IOException("No video stream found");
}
return result.format.format_name;
}
}

View File

@ -0,0 +1,13 @@
package com.backend.hls.proxy.service;
import org.springframework.stereotype.Service;
@Service
public class URIResolveService {
public String resolve(String url, String base) {
if (url.startsWith("http://") || url.startsWith("https://")) {
return url;
}
return base + url;
}
}

View File

@ -0,0 +1,110 @@
package com.backend.hls.proxy.util;
import java.io.IOException;
import org.apache.commons.lang3.math.Fraction;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
/**
* Jackson TypeAdapter for Apache Commons Math Fraction Object
*
* @author bramp
*/
public class FractionAdapter {
private final Fraction zeroByZero;
private final Fraction divideByZero;
public FractionAdapter() {
this(Fraction.ZERO, Fraction.ZERO);
}
public FractionAdapter(Fraction zeroByZero, Fraction divideByZero) {
this.zeroByZero = zeroByZero;
this.divideByZero = divideByZero;
}
/**
* Jackson Deserializer for Fraction
*/
public class FractionDeserializer extends JsonDeserializer<Fraction> {
@Override
public Fraction deserialize(JsonParser parser, DeserializationContext context) throws IOException {
JsonToken token = parser.currentToken();
if (token == JsonToken.VALUE_NULL) {
return null;
}
if (token == JsonToken.VALUE_NUMBER_INT || token == JsonToken.VALUE_NUMBER_FLOAT) {
return Fraction.getFraction(parser.getDoubleValue());
}
if (token == JsonToken.VALUE_STRING) {
String fraction = parser.getText().trim();
if (zeroByZero != null && "0/0".equals(fraction)) {
return zeroByZero;
}
if (divideByZero != null && fraction.endsWith("/0")) {
return divideByZero;
}
return Fraction.getFraction(fraction);
}
throw context.wrongTokenException(parser, Fraction.class, token,
"Expected NUMBER or STRING token for Fraction");
}
}
/**
* Jackson Serializer for Fraction
*/
public static class FractionSerializer extends JsonSerializer<Fraction> {
@Override
public void serialize(Fraction value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
if (value == null) {
gen.writeNull();
} else {
gen.writeString(value.toProperString());
}
}
}
/**
* Convenience method to create and register the adapter with a module
*/
public void registerModule(SimpleModule module) {
module.addDeserializer(Fraction.class, new FractionDeserializer());
module.addSerializer(Fraction.class, new FractionSerializer());
}
/**
* Static utility method to create a pre-configured module
*/
public static SimpleModule createModule() {
return createModule(Fraction.ZERO, Fraction.ZERO);
}
/**
* Static utility method to create a pre-configured module with custom values
*/
public static SimpleModule createModule(Fraction zeroByZero, Fraction divideByZero) {
SimpleModule module = new SimpleModule();
FractionAdapter adapter = new FractionAdapter(zeroByZero, divideByZero);
adapter.registerModule(module);
return module;
}
}

View File

@ -0,0 +1,201 @@
package com.backend.hls.proxy.util;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
/**
* Utility class for piping data to FFmpeg/FFprobe
*/
public class PipeUtil {
private static final Logger logger = LoggerFactory.getLogger(PipeUtil.class);
private static final ObjectMapper objectMapper = JsonMapper
.builder()
.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.addModule(FractionAdapter
.createModule())
.build();
private static final int BUFFER_SIZE = 8192;
/**
* Execute FFprobe with pipe input
*/
public static FFmpegProbeResult probeWithPipe(String ffprobe, InputStream inputStream)
throws IOException {
ProcessBuilder pb = new ProcessBuilder(
ffprobe,
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
"pipe:0");
pb.redirectErrorStream(false);
Process process = pb.start();
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<String> errorReader = executor.submit(() -> {
try (BufferedReader stderr = new BufferedReader(
new InputStreamReader(process.getErrorStream()))) {
return stderr.lines().reduce("", (a, b) -> a + "\n" + b);
} catch (IOException e) {
return "";
}
});
try {
// Write input to stdin
Future<?> writer = executor.submit(() -> {
try (OutputStream stdin = process.getOutputStream()) {
inputStream.transferTo(stdin);
stdin.flush();
} catch (IOException e) {
logger.error("FFMpeg warning: {}", e);
}
});
// Read JSON output from stdout
Future<String> reader = executor.submit(() -> {
try (BufferedReader stdout = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
return stdout.lines().reduce("", (a, b) -> a + b);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
writer.get(30, TimeUnit.SECONDS);
boolean finished = process.waitFor(30, TimeUnit.SECONDS);
if (!finished || process.exitValue() != 0) {
String error = errorReader.get(1, TimeUnit.SECONDS);
logger.error("FFProbe error: {}", error);
throw new IOException("FFprobe failed");
}
String json = reader.get(5, TimeUnit.SECONDS);
logger.info("Probed JSON: {}", json);
return objectMapper.readValue(json, FFmpegProbeResult.class);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
process.destroyForcibly();
throw new IOException("Error probing stream", e);
} finally {
executor.shutdownNow();
}
}
/**
* Execute FFmpeg with pipe input and output
*/
public static byte[] executeWithPipe(String ffmpegPath, InputStream inputStream,
String inputFormat, String... ffmpegArgs)
throws IOException {
ProcessBuilder pb = new ProcessBuilder();
pb.command().add(ffmpegPath);
pb.command().add("-f");
pb.command().add(inputFormat);
pb.command().add("-i");
pb.command().add("pipe:0");
for (String arg : ffmpegArgs) {
pb.command().add(arg);
}
pb.command().add("pipe:1");
pb.redirectErrorStream(false);
Process process = pb.start();
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<String> errorReader = executor.submit(() -> {
try (BufferedReader stderr = new BufferedReader(
new InputStreamReader(process.getErrorStream()))) {
return stderr.lines().reduce("", (a, b) -> a + "\n" + b);
} catch (IOException e) {
return "";
}
});
try {
Future<?> writer = executor.submit(() -> {
try (OutputStream stdin = process.getOutputStream()) {
byte[] buffer = new byte[BUFFER_SIZE];
int read;
while ((read = inputStream.read(buffer)) != -1) {
stdin.write(buffer, 0, read);
}
stdin.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Future<byte[]> reader = executor.submit(() -> {
try (InputStream stdout = process.getInputStream()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[BUFFER_SIZE];
int read;
while ((read = stdout.read(buffer)) != -1) {
baos.write(buffer, 0, read);
}
return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
writer.get(60, TimeUnit.SECONDS);
boolean finished = process.waitFor(120, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new IOException("FFmpeg timeout");
}
if (process.exitValue() != 0) {
String error = errorReader.get(1, TimeUnit.SECONDS);
logger.error("FFMpeg error: {}", error);
throw new IOException("FFmpeg failed: " + error);
}
return reader.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
try {
String error = errorReader.get(1, TimeUnit.SECONDS);
logger.error("FFMpeg error: {}", error);
} catch (InterruptedException | ExecutionException | TimeoutException e1) {
e1.printStackTrace();
}
process.destroyForcibly();
throw new IOException("Error executing FFmpeg", e);
} finally {
executor.shutdownNow();
}
}
}