diff --git a/Dockerfile.prod b/Dockerfile.prod index f4eb52d..9a45fda 100644 --- a/Dockerfile.prod +++ b/Dockerfile.prod @@ -33,7 +33,9 @@ RUN groupadd --gid 1000 spring-app \ && useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app RUN apt-get update && \ - apt-get install -y --no-install-recommends ca-certificates && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + ffmpeg && \ rm -rf /var/lib/apt/lists/* USER spring-app:spring-app diff --git a/pom.xml b/pom.xml index e358423..6fa890c 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,12 @@ ${spring-context-support.version} + + net.bramp.ffmpeg + ffmpeg + 0.8.0 + + org.springframework.boot spring-boot-starter-test diff --git a/src/main/java/com/backend/hls/proxy/HlsProxyApplication.java b/src/main/java/com/backend/hls/proxy/HlsProxyApplication.java index 2bd0016..fe23948 100644 --- a/src/main/java/com/backend/hls/proxy/HlsProxyApplication.java +++ b/src/main/java/com/backend/hls/proxy/HlsProxyApplication.java @@ -2,12 +2,14 @@ package com.backend.hls.proxy; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +@EnableDiscoveryClient @SpringBootApplication public class HlsProxyApplication { - public static void main(String[] args) { - SpringApplication.run(HlsProxyApplication.class, args); - } + public static void main(String[] args) { + SpringApplication.run(HlsProxyApplication.class, args); + } } diff --git a/src/main/java/com/backend/hls/proxy/controller/ProxyController.java b/src/main/java/com/backend/hls/proxy/controller/ProxyController.java index 1e0998b..d671ba5 100644 --- a/src/main/java/com/backend/hls/proxy/controller/ProxyController.java +++ b/src/main/java/com/backend/hls/proxy/controller/ProxyController.java @@ -1,30 +1,49 @@ package com.backend.hls.proxy.controller; +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.support.ServletUriComponentsBuilder; +import com.backend.hls.proxy.dto.CreateProxyDTO; import com.backend.hls.proxy.exception.FetchFailException; import com.backend.hls.proxy.exception.PlaylistParseException; import com.backend.hls.proxy.service.PlaylistProxyService; +import com.backend.hls.proxy.service.URLForwardService; @RestController public class ProxyController { - + private static final Logger logger = LoggerFactory.getLogger(ProxyController.class); private final PlaylistProxyService playlistProxyService; + private final URLForwardService forwardService; - public ProxyController(PlaylistProxyService playlistProxyService) { + public ProxyController(PlaylistProxyService playlistProxyService, URLForwardService forwardService) { this.playlistProxyService = playlistProxyService; + this.forwardService = forwardService; } @GetMapping("/proxy") - public ResponseEntity proxy(@RequestParam("url") String url) throws FetchFailException, PlaylistParseException { + public ResponseEntity proxyPlaylist(@RequestParam("url") String url) + throws FetchFailException, PlaylistParseException { + logger.info("Proxying playlist: {}", url); String fullUrl = ServletUriComponentsBuilder.fromCurrentRequestUri().build().toUriString(); String baseUrl = fullUrl.substring(0, fullUrl.indexOf("/", 8)); + logger.info("Full URL: {}, base URL: {}", fullUrl, baseUrl); String result = playlistProxyService.proxyPlaylist(url, baseUrl); return ResponseEntity.ok(result); } + @PostMapping("/proxy") + public ResponseEntity createProxy(@RequestBody CreateProxyDTO url) { + String location = forwardService.createForwarded(url.getUrl()); + return ResponseEntity.created(URI.create(location)).build(); + } + } diff --git a/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java b/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java index 0c42443..31aba99 100644 --- a/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java +++ b/src/main/java/com/backend/hls/proxy/controller/ProxyServeController.java @@ -4,6 +4,8 @@ import java.io.IOException; import java.net.URI; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -19,18 +21,23 @@ import com.backend.hls.proxy.model.RangeRequest; import com.backend.hls.proxy.model.SimpleResponse; import com.backend.hls.proxy.repository.LinkRepository; import com.backend.hls.proxy.service.FetchService; +import com.backend.hls.proxy.service.PreprocessService; @RestController public class ProxyServeController { + private static final Logger logger = LoggerFactory.getLogger(ProxyServeController.class); + private final LinkRepository linkRepository; private final ProxyController proxyController; private final FetchService fetchService; + private final PreprocessService preprocessService; public ProxyServeController(LinkRepository linkRepository, ProxyController proxyController, - FetchService fetchService) { + FetchService fetchService, PreprocessService preprocessService) { this.linkRepository = linkRepository; this.proxyController = proxyController; this.fetchService = fetchService; + this.preprocessService = preprocessService; } @GetMapping("/proxy/{id}") @@ -40,13 +47,14 @@ public class ProxyServeController { if (id.contains(".")) id = id.substring(0, id.lastIndexOf(".")); Optional link = linkRepository.findById(id); + logger.info("id {}, link is {}", id, link.map(l -> l.getUrl()).orElse("")); if (link.isEmpty()) { return ResponseEntity.notFound().build(); } String url = link.get().getUrl(); if (url.contains(".m3u8")) { try { - return proxyController.proxy(url); + return proxyController.proxyPlaylist(url); } catch (FetchFailException | PlaylistParseException e) { e.printStackTrace(); return redirect(url); @@ -71,7 +79,8 @@ public class ProxyServeController { response.getHeaders().map().forEach((key, values) -> { headers.addAll(key, values); }); - return new ResponseEntity<>(response.getBody(), headers, HttpStatus.OK); + + return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, HttpStatus.OK); } private ResponseEntity handleRangeRequest(String url, String rangeHeader) @@ -107,7 +116,8 @@ public class ProxyServeController { headers.add("Accept-Ranges", "bytes"); headers.setContentLength(range.getLength()); - return new ResponseEntity<>(response.getBody(), headers, HttpStatus.PARTIAL_CONTENT); + return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, + HttpStatus.PARTIAL_CONTENT); } private ResponseEntity redirect(String target) { diff --git a/src/main/java/com/backend/hls/proxy/dto/CreateProxyDTO.java b/src/main/java/com/backend/hls/proxy/dto/CreateProxyDTO.java new file mode 100644 index 0000000..2ad39c5 --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/dto/CreateProxyDTO.java @@ -0,0 +1,19 @@ +package com.backend.hls.proxy.dto; + +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +public class CreateProxyDTO { + @NotNull + @NotEmpty + private final String url; + + public CreateProxyDTO(String url) { + this.url = url; + } + + public String getUrl() { + return url; + } + +} diff --git a/src/main/java/com/backend/hls/proxy/exception/FetchFailException.java b/src/main/java/com/backend/hls/proxy/exception/FetchFailException.java index 0da20ce..fc8344b 100644 --- a/src/main/java/com/backend/hls/proxy/exception/FetchFailException.java +++ b/src/main/java/com/backend/hls/proxy/exception/FetchFailException.java @@ -3,13 +3,17 @@ package com.backend.hls.proxy.exception; import java.net.http.HttpResponse; public class FetchFailException extends Exception { - private final HttpResponse response; + private HttpResponse response; public FetchFailException(String message, HttpResponse response) { super(message); this.response = response; } + public FetchFailException(String message, Exception e) { + super(message, e); + } + public HttpResponse getResponse() { return response; } diff --git a/src/main/java/com/backend/hls/proxy/service/FetchService.java b/src/main/java/com/backend/hls/proxy/service/FetchService.java index f9637cc..ae1ae5f 100644 --- a/src/main/java/com/backend/hls/proxy/service/FetchService.java +++ b/src/main/java/com/backend/hls/proxy/service/FetchService.java @@ -27,19 +27,22 @@ public class FetchService { * @throws FetchFailException */ @Cacheable("hlsPlaylistContent") - public String fetchTextContent(String url) throws IOException, InterruptedException, FetchFailException { + public String fetchTextContent(String url) throws FetchFailException { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(url)) .build(); - HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - - if (response.statusCode() != 200) { - throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(), - response); + try { + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() != 200) { + throw new FetchFailException( + "Failed to fetch content from " + url + ", status: " + response.statusCode(), + response); + } + return response.body(); + } catch (IOException | InterruptedException e) { + throw new FetchFailException("Failed to fetch content from " + url, e); } - - return response.body(); } /** @@ -48,8 +51,7 @@ public class FetchService { * @throws FetchFailException */ @Cacheable("playlistSegmentContent") - public SimpleResponse fetchBinaryContent(String url) - throws IOException, InterruptedException, FetchFailException { + public SimpleResponse fetchBinaryContent(String url) throws FetchFailException { return fetchBinaryContent(url, null); } @@ -60,7 +62,7 @@ public class FetchService { */ @Cacheable("playlistSegmentContent") public SimpleResponse fetchBinaryContent(String url, String rangeHeader) - throws IOException, InterruptedException, FetchFailException { + throws FetchFailException { HttpRequest.Builder builder = HttpRequest.newBuilder() .uri(URI.create(url)); @@ -68,14 +70,20 @@ public class FetchService { builder.header("Range", rangeHeader); } - HttpResponse response = httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofByteArray()); + try { - if (response.statusCode() >= 400) { - throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(), - response); + HttpResponse response = httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofByteArray()); + + if (response.statusCode() >= 400) { + throw new FetchFailException( + "Failed to fetch content from " + url + ", status: " + response.statusCode(), + response); + } + + return new SimpleResponse(response.body(), response.headers()); + } catch (IOException | InterruptedException e) { + throw new FetchFailException("Failed to fetch content from " + url, e); } - - return new SimpleResponse(response.body(), response.headers()); } /** diff --git a/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java b/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java index 79ddfb7..8525221 100644 --- a/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java +++ b/src/main/java/com/backend/hls/proxy/service/PlaylistParseService.java @@ -11,6 +11,7 @@ import io.lindstrom.m3u8.model.MediaPlaylist; import io.lindstrom.m3u8.model.MultivariantPlaylist; import io.lindstrom.m3u8.parser.MediaPlaylistParser; import io.lindstrom.m3u8.parser.MultivariantPlaylistParser; +import io.lindstrom.m3u8.parser.PlaylistParserException; @Service public class PlaylistParseService { @@ -29,16 +30,16 @@ public class PlaylistParseService { throws PlaylistParseException, FetchFailException { try { return playlistParser.readPlaylist(fetchService.fetchTextContent(m3u8URL)); - } catch (IOException | InterruptedException e) { - throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e); + } catch (PlaylistParserException e) { + throw new PlaylistParseException("Unable to parse playlist", e); } } public MediaPlaylist readMediaPlaylist(String m3u8URL) throws PlaylistParseException, FetchFailException { try { return mediaParser.readPlaylist(fetchService.fetchTextContent(m3u8URL)); - } catch (IOException | InterruptedException e) { - throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e); + } catch (PlaylistParserException e) { + throw new PlaylistParseException("Unable to parse playlist", e); } } diff --git a/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java b/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java index 5a350c9..c9cf376 100644 --- a/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java +++ b/src/main/java/com/backend/hls/proxy/service/PlaylistProxyService.java @@ -23,17 +23,20 @@ public class PlaylistProxyService { private static final Logger logger = LoggerFactory.getLogger(PlaylistProxyService.class); private final PlaylistParseService playlistParseService; private final URLForwardService urlForwardService; + private final URIResolveService uriResolveService; - public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService) { + public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService, + URIResolveService uriResolveService) { this.playlistParseService = playlistParseService; this.urlForwardService = urlForwardService; + this.uriResolveService = uriResolveService; } public String proxyPlaylist(String hlsUrl, String proxyUrl) throws FetchFailException { String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1); String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1); - String url = base + suffix; + String url = uriResolveService.resolve(suffix, base); try { MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url); @@ -88,7 +91,7 @@ public class PlaylistProxyService { logger.error("cannot proxy variant: {}", variant); return variant; } - String variantUri = base + variant.uri(); + String variantUri = uriResolveService.resolve(variant.uri(), base); String proxiedUri = urlForwardService.createForwarded(variantUri); return Variant.builder() .from(variant) @@ -103,7 +106,7 @@ public class PlaylistProxyService { if (rendition.uri().isEmpty()) { return rendition; } - String renditionUri = base + rendition.uri().get(); + String renditionUri = uriResolveService.resolve(rendition.uri().get(), base); String proxiedUri = urlForwardService.createForwarded(renditionUri); return AlternativeRendition.builder() .from(rendition) @@ -120,7 +123,7 @@ public class PlaylistProxyService { return segment; } - String segmentUri = base + segment.uri(); + String segmentUri = uriResolveService.resolve(segment.uri(), base); String proxiedUri = urlForwardService.createForwarded(segmentUri); MediaSegment.Builder builder = MediaSegment.builder() @@ -140,7 +143,7 @@ public class PlaylistProxyService { logger.error("cannot proxy segment: {}", segment); return segment; } - String segmentUri = base + segment.uri(); + String segmentUri = uriResolveService.resolve(segment.uri(), base); String proxiedUri = urlForwardService.createForwarded(segmentUri); return PartialSegment.builder() .from(segment) @@ -156,7 +159,7 @@ public class PlaylistProxyService { logger.error("cannot proxy segment: {}", segmentMap); return segmentMap; } - String segmentMapUri = base + segmentMap.uri(); + String segmentMapUri = uriResolveService.resolve(segmentMap.uri(), base); String proxiedUri = urlForwardService.createForwarded(segmentMapUri); return SegmentMap.builder() .from(segmentMap) diff --git a/src/main/java/com/backend/hls/proxy/service/PreprocessService.java b/src/main/java/com/backend/hls/proxy/service/PreprocessService.java new file mode 100644 index 0000000..98ae48c --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/service/PreprocessService.java @@ -0,0 +1,67 @@ +package com.backend.hls.proxy.service; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import com.backend.hls.proxy.util.PipeUtil; + +import net.bramp.ffmpeg.probe.FFmpegProbeResult; + +@Service +public class PreprocessService { + private static final Logger logger = LoggerFactory.getLogger(PreprocessService.class); + + public byte[] preprocess(byte[] data) { + try { + String format = findFormat(data); + logger.info("format is {}", format); + return randomEffectsHLS(data, format, "/usr/bin/ffmpeg"); + } catch (IOException e) { + e.printStackTrace(); + return data; + } + + } + + public static byte[] randomEffectsHLS(byte[] data, String inputFormat, String ffmpegPath) throws IOException { + try (InputStream inputStream = new ByteArrayInputStream(data)) { + String[] effects = { + "hue=s=10", // Color shift + "edgedetect=mode=colormix", // Edge detection + "boxblur=10:1", // Heavy blur + "noise=alls=20:allf=t", // Film grain noise + "colorchannelmixer=.3:.4:.3:0:.3:.4:.3:0:.3:.4:.3", // Vintage + "rotate=0.1*c", // Slight rotation + "scale=iw/2:ih/2" // Pixelate + }; + + Random random = new Random(); + String randomEffect = effects[random.nextInt(effects.length)]; + logger.info("applied effect {}", randomEffect); + + String[] ffmpegArgs = { + "-vf", randomEffect, + "-f", inputFormat, + }; + + return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs); + } + } + + private String findFormat(byte[] data) throws IOException { + FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data)); + logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList())); + if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) { + throw new IOException("No video stream found"); + } + return result.format.format_name; + } + +} diff --git a/src/main/java/com/backend/hls/proxy/service/URIResolveService.java b/src/main/java/com/backend/hls/proxy/service/URIResolveService.java new file mode 100644 index 0000000..93ec830 --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/service/URIResolveService.java @@ -0,0 +1,13 @@ +package com.backend.hls.proxy.service; + +import org.springframework.stereotype.Service; + +@Service +public class URIResolveService { + public String resolve(String url, String base) { + if (url.startsWith("http://") || url.startsWith("https://")) { + return url; + } + return base + url; + } +} diff --git a/src/main/java/com/backend/hls/proxy/util/FractionAdapter.java b/src/main/java/com/backend/hls/proxy/util/FractionAdapter.java new file mode 100644 index 0000000..0e5c70d --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/util/FractionAdapter.java @@ -0,0 +1,110 @@ +package com.backend.hls.proxy.util; + +import java.io.IOException; + +import org.apache.commons.lang3.math.Fraction; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +/** + * Jackson TypeAdapter for Apache Commons Math Fraction Object + * + * @author bramp + */ +public class FractionAdapter { + + private final Fraction zeroByZero; + private final Fraction divideByZero; + + public FractionAdapter() { + this(Fraction.ZERO, Fraction.ZERO); + } + + public FractionAdapter(Fraction zeroByZero, Fraction divideByZero) { + this.zeroByZero = zeroByZero; + this.divideByZero = divideByZero; + } + + /** + * Jackson Deserializer for Fraction + */ + public class FractionDeserializer extends JsonDeserializer { + + @Override + public Fraction deserialize(JsonParser parser, DeserializationContext context) throws IOException { + JsonToken token = parser.currentToken(); + + if (token == JsonToken.VALUE_NULL) { + return null; + } + + if (token == JsonToken.VALUE_NUMBER_INT || token == JsonToken.VALUE_NUMBER_FLOAT) { + return Fraction.getFraction(parser.getDoubleValue()); + } + + if (token == JsonToken.VALUE_STRING) { + String fraction = parser.getText().trim(); + + if (zeroByZero != null && "0/0".equals(fraction)) { + return zeroByZero; + } + + if (divideByZero != null && fraction.endsWith("/0")) { + return divideByZero; + } + + return Fraction.getFraction(fraction); + } + + throw context.wrongTokenException(parser, Fraction.class, token, + "Expected NUMBER or STRING token for Fraction"); + } + } + + /** + * Jackson Serializer for Fraction + */ + public static class FractionSerializer extends JsonSerializer { + + @Override + public void serialize(Fraction value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + if (value == null) { + gen.writeNull(); + } else { + gen.writeString(value.toProperString()); + } + } + } + + /** + * Convenience method to create and register the adapter with a module + */ + public void registerModule(SimpleModule module) { + module.addDeserializer(Fraction.class, new FractionDeserializer()); + module.addSerializer(Fraction.class, new FractionSerializer()); + } + + /** + * Static utility method to create a pre-configured module + */ + public static SimpleModule createModule() { + return createModule(Fraction.ZERO, Fraction.ZERO); + } + + /** + * Static utility method to create a pre-configured module with custom values + */ + public static SimpleModule createModule(Fraction zeroByZero, Fraction divideByZero) { + SimpleModule module = new SimpleModule(); + FractionAdapter adapter = new FractionAdapter(zeroByZero, divideByZero); + adapter.registerModule(module); + return module; + } +} diff --git a/src/main/java/com/backend/hls/proxy/util/PipeUtil.java b/src/main/java/com/backend/hls/proxy/util/PipeUtil.java new file mode 100644 index 0000000..4339192 --- /dev/null +++ b/src/main/java/com/backend/hls/proxy/util/PipeUtil.java @@ -0,0 +1,201 @@ +package com.backend.hls.proxy.util; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; + +import net.bramp.ffmpeg.probe.FFmpegProbeResult; + +/** + * Utility class for piping data to FFmpeg/FFprobe + */ +public class PipeUtil { + private static final Logger logger = LoggerFactory.getLogger(PipeUtil.class); + + private static final ObjectMapper objectMapper = JsonMapper + .builder() + .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + .addModule(FractionAdapter + .createModule()) + .build(); + private static final int BUFFER_SIZE = 8192; + + /** + * Execute FFprobe with pipe input + */ + public static FFmpegProbeResult probeWithPipe(String ffprobe, InputStream inputStream) + throws IOException { + + ProcessBuilder pb = new ProcessBuilder( + ffprobe, + "-v", "quiet", + "-print_format", "json", + "-show_format", + "-show_streams", + "pipe:0"); + + pb.redirectErrorStream(false); + Process process = pb.start(); + + ExecutorService executor = Executors.newFixedThreadPool(4); + + Future errorReader = executor.submit(() -> { + try (BufferedReader stderr = new BufferedReader( + new InputStreamReader(process.getErrorStream()))) { + return stderr.lines().reduce("", (a, b) -> a + "\n" + b); + } catch (IOException e) { + return ""; + } + }); + + try { + // Write input to stdin + Future writer = executor.submit(() -> { + try (OutputStream stdin = process.getOutputStream()) { + inputStream.transferTo(stdin); + stdin.flush(); + } catch (IOException e) { + logger.error("FFMpeg warning: {}", e); + } + }); + + // Read JSON output from stdout + Future reader = executor.submit(() -> { + try (BufferedReader stdout = new BufferedReader( + new InputStreamReader(process.getInputStream()))) { + return stdout.lines().reduce("", (a, b) -> a + b); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + writer.get(30, TimeUnit.SECONDS); + boolean finished = process.waitFor(30, TimeUnit.SECONDS); + + if (!finished || process.exitValue() != 0) { + String error = errorReader.get(1, TimeUnit.SECONDS); + logger.error("FFProbe error: {}", error); + throw new IOException("FFprobe failed"); + } + + String json = reader.get(5, TimeUnit.SECONDS); + logger.info("Probed JSON: {}", json); + return objectMapper.readValue(json, FFmpegProbeResult.class); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + process.destroyForcibly(); + throw new IOException("Error probing stream", e); + } finally { + executor.shutdownNow(); + } + } + + /** + * Execute FFmpeg with pipe input and output + */ + public static byte[] executeWithPipe(String ffmpegPath, InputStream inputStream, + String inputFormat, String... ffmpegArgs) + throws IOException { + + ProcessBuilder pb = new ProcessBuilder(); + pb.command().add(ffmpegPath); + pb.command().add("-f"); + pb.command().add(inputFormat); + pb.command().add("-i"); + pb.command().add("pipe:0"); + + for (String arg : ffmpegArgs) { + pb.command().add(arg); + } + + pb.command().add("pipe:1"); + pb.redirectErrorStream(false); + + Process process = pb.start(); + + ExecutorService executor = Executors.newFixedThreadPool(4); + + Future errorReader = executor.submit(() -> { + try (BufferedReader stderr = new BufferedReader( + new InputStreamReader(process.getErrorStream()))) { + return stderr.lines().reduce("", (a, b) -> a + "\n" + b); + } catch (IOException e) { + return ""; + } + }); + + try { + Future writer = executor.submit(() -> { + try (OutputStream stdin = process.getOutputStream()) { + byte[] buffer = new byte[BUFFER_SIZE]; + int read; + while ((read = inputStream.read(buffer)) != -1) { + stdin.write(buffer, 0, read); + } + stdin.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + Future reader = executor.submit(() -> { + try (InputStream stdout = process.getInputStream()) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[BUFFER_SIZE]; + int read; + while ((read = stdout.read(buffer)) != -1) { + baos.write(buffer, 0, read); + } + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + writer.get(60, TimeUnit.SECONDS); + boolean finished = process.waitFor(120, TimeUnit.SECONDS); + + if (!finished) { + process.destroyForcibly(); + throw new IOException("FFmpeg timeout"); + } + + if (process.exitValue() != 0) { + String error = errorReader.get(1, TimeUnit.SECONDS); + logger.error("FFMpeg error: {}", error); + throw new IOException("FFmpeg failed: " + error); + } + + return reader.get(5, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + try { + String error = errorReader.get(1, TimeUnit.SECONDS); + logger.error("FFMpeg error: {}", error); + } catch (InterruptedException | ExecutionException | TimeoutException e1) { + e1.printStackTrace(); + } + process.destroyForcibly(); + throw new IOException("Error executing FFmpeg", e); + } finally { + executor.shutdownNow(); + } + } +}