[Feature] Preprocessing and dev QoL changes (#1)
Reviewed-on: #1 Co-authored-by: bivashy <botyrbojey@gmail.com> Co-committed-by: bivashy <botyrbojey@gmail.com>
This commit is contained in:
35
.dockerignore
Normal file
35
.dockerignore
Normal file
@ -0,0 +1,35 @@
|
||||
HELP.md
|
||||
.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
|
||||
|
||||
.env
|
||||
35
Dockerfile.dev
Normal file
35
Dockerfile.dev
Normal file
@ -0,0 +1,35 @@
|
||||
FROM eclipse-temurin:21-jdk-jammy
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
curl \
|
||||
vim \
|
||||
git \
|
||||
ca-certificates \
|
||||
ffmpeg && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Create non-root user
|
||||
RUN groupadd --gid 1000 spring-app && \
|
||||
useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app
|
||||
|
||||
RUN mkdir -p /home/spring-app/.m2 && \
|
||||
chown -R spring-app:spring-app /home/spring-app/.m2
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY mvnw .
|
||||
COPY .mvn .mvn
|
||||
COPY pom.xml .
|
||||
COPY target target
|
||||
|
||||
RUN chmod +x mvnw && \
|
||||
chown -R spring-app:spring-app /app
|
||||
|
||||
USER spring-app:spring-app
|
||||
|
||||
COPY src ./src
|
||||
|
||||
EXPOSE 8080
|
||||
|
||||
ENTRYPOINT ["./mvnw", "spring-boot:run"]
|
||||
@ -33,8 +33,9 @@ RUN groupadd --gid 1000 spring-app \
|
||||
&& useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends ca-certificates && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
apt-get install -y --no-install-recommends \
|
||||
ca-certificates \
|
||||
ffmpeg
|
||||
|
||||
USER spring-app:spring-app
|
||||
WORKDIR /opt/workspace
|
||||
|
||||
10
compose.yml
10
compose.yml
@ -5,13 +5,21 @@ services:
|
||||
- anyame-shared
|
||||
|
||||
hls-proxy:
|
||||
image: hls-proxy:latest
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile.dev
|
||||
env_file: .env
|
||||
ports:
|
||||
- 8082:8080
|
||||
networks:
|
||||
- anyame-shared
|
||||
- elk-network
|
||||
volumes:
|
||||
- .:/app
|
||||
- maven-repo:/home/spring-app/.m2/
|
||||
|
||||
volumes:
|
||||
maven-repo:
|
||||
|
||||
networks:
|
||||
anyame-shared:
|
||||
|
||||
11
pom.xml
11
pom.xml
@ -90,6 +90,17 @@
|
||||
<version>${spring-context-support.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.bramp.ffmpeg</groupId>
|
||||
<artifactId>ffmpeg</artifactId>
|
||||
<version>0.8.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-devtools</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
|
||||
@ -2,7 +2,9 @@ package com.backend.hls.proxy;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
|
||||
@EnableDiscoveryClient
|
||||
@SpringBootApplication
|
||||
public class HlsProxyApplication {
|
||||
|
||||
|
||||
@ -1,18 +1,37 @@
|
||||
package com.backend.hls.proxy.config;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
import org.springframework.cache.caffeine.CaffeineCacheManager;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.backend.hls.proxy.model.SimpleResponse;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
|
||||
@Configuration
|
||||
@EnableCaching
|
||||
public class CacheConfig {
|
||||
|
||||
@Bean
|
||||
public CacheManager cacheManager() {
|
||||
return new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent");
|
||||
CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent");
|
||||
cacheManager.setCaffeine(Caffeine.newBuilder()
|
||||
.expireAfterAccess(1, TimeUnit.HOURS)
|
||||
.weigher((Object key, Object value) -> {
|
||||
if (value instanceof byte[] valueBytes) {
|
||||
return valueBytes.length;
|
||||
}
|
||||
if (value instanceof SimpleResponse<?> response && response.getBody() instanceof byte[] body) {
|
||||
return body.length;
|
||||
}
|
||||
return 0;
|
||||
})
|
||||
.maximumWeight(500 * 1024 * 1024)
|
||||
.recordStats());
|
||||
return cacheManager;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
package com.backend.hls.proxy.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import com.backend.hls.proxy.service.NoopPreprocessService;
|
||||
import com.backend.hls.proxy.service.PreprocessService;
|
||||
|
||||
@Configuration
|
||||
public class ProcessConfig {
|
||||
@Bean
|
||||
public PreprocessService preprocessService() {
|
||||
return new NoopPreprocessService();
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,30 +1,49 @@
|
||||
package com.backend.hls.proxy.controller;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
|
||||
|
||||
import com.backend.hls.proxy.dto.CreateProxyDTO;
|
||||
import com.backend.hls.proxy.exception.FetchFailException;
|
||||
import com.backend.hls.proxy.exception.PlaylistParseException;
|
||||
import com.backend.hls.proxy.service.PlaylistProxyService;
|
||||
import com.backend.hls.proxy.service.URLForwardService;
|
||||
|
||||
@RestController
|
||||
public class ProxyController {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProxyController.class);
|
||||
private final PlaylistProxyService playlistProxyService;
|
||||
private final URLForwardService forwardService;
|
||||
|
||||
public ProxyController(PlaylistProxyService playlistProxyService) {
|
||||
public ProxyController(PlaylistProxyService playlistProxyService, URLForwardService forwardService) {
|
||||
this.playlistProxyService = playlistProxyService;
|
||||
this.forwardService = forwardService;
|
||||
}
|
||||
|
||||
@GetMapping("/proxy")
|
||||
public ResponseEntity<?> proxy(@RequestParam("url") String url) throws FetchFailException, PlaylistParseException {
|
||||
public ResponseEntity<?> proxyPlaylist(@RequestParam("url") String url)
|
||||
throws FetchFailException, PlaylistParseException {
|
||||
logger.info("Proxying playlist: {}", url);
|
||||
String fullUrl = ServletUriComponentsBuilder.fromCurrentRequestUri().build().toUriString();
|
||||
String baseUrl = fullUrl.substring(0, fullUrl.indexOf("/", 8));
|
||||
logger.info("Full URL: {}, base URL: {}", fullUrl, baseUrl);
|
||||
String result = playlistProxyService.proxyPlaylist(url, baseUrl);
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
|
||||
@PostMapping("/proxy")
|
||||
public ResponseEntity<?> createProxy(@RequestBody CreateProxyDTO url) {
|
||||
String location = forwardService.createForwarded(url.getUrl());
|
||||
return ResponseEntity.created(URI.create(location)).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -4,6 +4,10 @@ import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.cache.annotation.Caching;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
@ -19,18 +23,23 @@ import com.backend.hls.proxy.model.RangeRequest;
|
||||
import com.backend.hls.proxy.model.SimpleResponse;
|
||||
import com.backend.hls.proxy.repository.LinkRepository;
|
||||
import com.backend.hls.proxy.service.FetchService;
|
||||
import com.backend.hls.proxy.service.PreprocessService;
|
||||
|
||||
@RestController
|
||||
public class ProxyServeController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProxyServeController.class);
|
||||
|
||||
private final LinkRepository linkRepository;
|
||||
private final ProxyController proxyController;
|
||||
private final FetchService fetchService;
|
||||
private final PreprocessService preprocessService;
|
||||
|
||||
public ProxyServeController(LinkRepository linkRepository, ProxyController proxyController,
|
||||
FetchService fetchService) {
|
||||
FetchService fetchService, PreprocessService preprocessService) {
|
||||
this.linkRepository = linkRepository;
|
||||
this.proxyController = proxyController;
|
||||
this.fetchService = fetchService;
|
||||
this.preprocessService = preprocessService;
|
||||
}
|
||||
|
||||
@GetMapping("/proxy/{id}")
|
||||
@ -40,13 +49,14 @@ public class ProxyServeController {
|
||||
if (id.contains("."))
|
||||
id = id.substring(0, id.lastIndexOf("."));
|
||||
Optional<Link> link = linkRepository.findById(id);
|
||||
logger.info("id {}, link is {}", id, link.map(l -> l.getUrl()).orElse("<null>"));
|
||||
if (link.isEmpty()) {
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
String url = link.get().getUrl();
|
||||
if (url.contains(".m3u8")) {
|
||||
try {
|
||||
return proxyController.proxy(url);
|
||||
return proxyController.proxyPlaylist(url);
|
||||
} catch (FetchFailException | PlaylistParseException e) {
|
||||
e.printStackTrace();
|
||||
return redirect(url);
|
||||
@ -71,7 +81,8 @@ public class ProxyServeController {
|
||||
response.getHeaders().map().forEach((key, values) -> {
|
||||
headers.addAll(key, values);
|
||||
});
|
||||
return new ResponseEntity<>(response.getBody(), headers, HttpStatus.OK);
|
||||
|
||||
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, HttpStatus.OK);
|
||||
}
|
||||
|
||||
private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader)
|
||||
@ -84,7 +95,6 @@ public class ProxyServeController {
|
||||
|
||||
RangeRequest range = RangeRequest.parse(rangeHeader, contentLength);
|
||||
if (range == null) {
|
||||
// Invalid range, return 416 Range Not Satisfiable
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("Content-Range", "bytes */" + contentLength);
|
||||
return new ResponseEntity<>(headers, HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE);
|
||||
@ -107,7 +117,8 @@ public class ProxyServeController {
|
||||
headers.add("Accept-Ranges", "bytes");
|
||||
headers.setContentLength(range.getLength());
|
||||
|
||||
return new ResponseEntity<>(response.getBody(), headers, HttpStatus.PARTIAL_CONTENT);
|
||||
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers,
|
||||
HttpStatus.PARTIAL_CONTENT);
|
||||
}
|
||||
|
||||
private ResponseEntity<?> redirect(String target) {
|
||||
|
||||
19
src/main/java/com/backend/hls/proxy/dto/CreateProxyDTO.java
Normal file
19
src/main/java/com/backend/hls/proxy/dto/CreateProxyDTO.java
Normal file
@ -0,0 +1,19 @@
|
||||
package com.backend.hls.proxy.dto;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
public class CreateProxyDTO {
|
||||
@NotNull
|
||||
@NotEmpty
|
||||
private final String url;
|
||||
|
||||
public CreateProxyDTO(String url) {
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
}
|
||||
@ -3,13 +3,17 @@ package com.backend.hls.proxy.exception;
|
||||
import java.net.http.HttpResponse;
|
||||
|
||||
public class FetchFailException extends Exception {
|
||||
private final HttpResponse<?> response;
|
||||
private HttpResponse<?> response;
|
||||
|
||||
public FetchFailException(String message, HttpResponse<?> response) {
|
||||
super(message);
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
public FetchFailException(String message, Exception e) {
|
||||
super(message, e);
|
||||
}
|
||||
|
||||
public HttpResponse<?> getResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
@ -27,19 +27,22 @@ public class FetchService {
|
||||
* @throws FetchFailException
|
||||
*/
|
||||
@Cacheable("hlsPlaylistContent")
|
||||
public String fetchTextContent(String url) throws IOException, InterruptedException, FetchFailException {
|
||||
public String fetchTextContent(String url) throws FetchFailException {
|
||||
HttpRequest request = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url))
|
||||
.build();
|
||||
|
||||
try {
|
||||
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
|
||||
|
||||
if (response.statusCode() != 200) {
|
||||
throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(),
|
||||
throw new FetchFailException(
|
||||
"Failed to fetch content from " + url + ", status: " + response.statusCode(),
|
||||
response);
|
||||
}
|
||||
|
||||
return response.body();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new FetchFailException("Failed to fetch content from " + url, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -48,8 +51,7 @@ public class FetchService {
|
||||
* @throws FetchFailException
|
||||
*/
|
||||
@Cacheable("playlistSegmentContent")
|
||||
public SimpleResponse<byte[]> fetchBinaryContent(String url)
|
||||
throws IOException, InterruptedException, FetchFailException {
|
||||
public SimpleResponse<byte[]> fetchBinaryContent(String url) throws FetchFailException {
|
||||
return fetchBinaryContent(url, null);
|
||||
}
|
||||
|
||||
@ -60,7 +62,7 @@ public class FetchService {
|
||||
*/
|
||||
@Cacheable("playlistSegmentContent")
|
||||
public SimpleResponse<byte[]> fetchBinaryContent(String url, String rangeHeader)
|
||||
throws IOException, InterruptedException, FetchFailException {
|
||||
throws FetchFailException {
|
||||
HttpRequest.Builder builder = HttpRequest.newBuilder()
|
||||
.uri(URI.create(url));
|
||||
|
||||
@ -68,14 +70,20 @@ public class FetchService {
|
||||
builder.header("Range", rangeHeader);
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
HttpResponse<byte[]> response = httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofByteArray());
|
||||
|
||||
if (response.statusCode() >= 400) {
|
||||
throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(),
|
||||
throw new FetchFailException(
|
||||
"Failed to fetch content from " + url + ", status: " + response.statusCode(),
|
||||
response);
|
||||
}
|
||||
|
||||
return new SimpleResponse<byte[]>(response.body(), response.headers());
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new FetchFailException("Failed to fetch content from " + url, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,13 @@
|
||||
package com.backend.hls.proxy.service;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class NoopPreprocessService implements PreprocessService {
|
||||
|
||||
@Override
|
||||
public byte[] preprocess(byte[] data) {
|
||||
return data;
|
||||
}
|
||||
|
||||
}
|
||||
@ -11,6 +11,7 @@ import io.lindstrom.m3u8.model.MediaPlaylist;
|
||||
import io.lindstrom.m3u8.model.MultivariantPlaylist;
|
||||
import io.lindstrom.m3u8.parser.MediaPlaylistParser;
|
||||
import io.lindstrom.m3u8.parser.MultivariantPlaylistParser;
|
||||
import io.lindstrom.m3u8.parser.PlaylistParserException;
|
||||
|
||||
@Service
|
||||
public class PlaylistParseService {
|
||||
@ -29,16 +30,16 @@ public class PlaylistParseService {
|
||||
throws PlaylistParseException, FetchFailException {
|
||||
try {
|
||||
return playlistParser.readPlaylist(fetchService.fetchTextContent(m3u8URL));
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e);
|
||||
} catch (PlaylistParserException e) {
|
||||
throw new PlaylistParseException("Unable to parse playlist", e);
|
||||
}
|
||||
}
|
||||
|
||||
public MediaPlaylist readMediaPlaylist(String m3u8URL) throws PlaylistParseException, FetchFailException {
|
||||
try {
|
||||
return mediaParser.readPlaylist(fetchService.fetchTextContent(m3u8URL));
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e);
|
||||
} catch (PlaylistParserException e) {
|
||||
throw new PlaylistParseException("Unable to parse playlist", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -23,17 +23,20 @@ public class PlaylistProxyService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PlaylistProxyService.class);
|
||||
private final PlaylistParseService playlistParseService;
|
||||
private final URLForwardService urlForwardService;
|
||||
private final URIResolveService uriResolveService;
|
||||
|
||||
public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService) {
|
||||
public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService,
|
||||
URIResolveService uriResolveService) {
|
||||
this.playlistParseService = playlistParseService;
|
||||
this.urlForwardService = urlForwardService;
|
||||
this.uriResolveService = uriResolveService;
|
||||
}
|
||||
|
||||
public String proxyPlaylist(String hlsUrl, String proxyUrl)
|
||||
throws FetchFailException {
|
||||
String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1);
|
||||
String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1);
|
||||
String url = base + suffix;
|
||||
String url = uriResolveService.resolve(suffix, base);
|
||||
try {
|
||||
MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url);
|
||||
|
||||
@ -88,7 +91,7 @@ public class PlaylistProxyService {
|
||||
logger.error("cannot proxy variant: {}", variant);
|
||||
return variant;
|
||||
}
|
||||
String variantUri = base + variant.uri();
|
||||
String variantUri = uriResolveService.resolve(variant.uri(), base);
|
||||
String proxiedUri = urlForwardService.createForwarded(variantUri);
|
||||
return Variant.builder()
|
||||
.from(variant)
|
||||
@ -103,7 +106,7 @@ public class PlaylistProxyService {
|
||||
if (rendition.uri().isEmpty()) {
|
||||
return rendition;
|
||||
}
|
||||
String renditionUri = base + rendition.uri().get();
|
||||
String renditionUri = uriResolveService.resolve(rendition.uri().get(), base);
|
||||
String proxiedUri = urlForwardService.createForwarded(renditionUri);
|
||||
return AlternativeRendition.builder()
|
||||
.from(rendition)
|
||||
@ -120,7 +123,7 @@ public class PlaylistProxyService {
|
||||
return segment;
|
||||
}
|
||||
|
||||
String segmentUri = base + segment.uri();
|
||||
String segmentUri = uriResolveService.resolve(segment.uri(), base);
|
||||
String proxiedUri = urlForwardService.createForwarded(segmentUri);
|
||||
|
||||
MediaSegment.Builder builder = MediaSegment.builder()
|
||||
@ -140,7 +143,7 @@ public class PlaylistProxyService {
|
||||
logger.error("cannot proxy segment: {}", segment);
|
||||
return segment;
|
||||
}
|
||||
String segmentUri = base + segment.uri();
|
||||
String segmentUri = uriResolveService.resolve(segment.uri(), base);
|
||||
String proxiedUri = urlForwardService.createForwarded(segmentUri);
|
||||
return PartialSegment.builder()
|
||||
.from(segment)
|
||||
@ -156,7 +159,7 @@ public class PlaylistProxyService {
|
||||
logger.error("cannot proxy segment: {}", segmentMap);
|
||||
return segmentMap;
|
||||
}
|
||||
String segmentMapUri = base + segmentMap.uri();
|
||||
String segmentMapUri = uriResolveService.resolve(segmentMap.uri(), base);
|
||||
String proxiedUri = urlForwardService.createForwarded(segmentMapUri);
|
||||
return SegmentMap.builder()
|
||||
.from(segmentMap)
|
||||
|
||||
@ -0,0 +1,5 @@
|
||||
package com.backend.hls.proxy.service;
|
||||
|
||||
public interface PreprocessService {
|
||||
byte[] preprocess(byte[] data);
|
||||
}
|
||||
@ -0,0 +1,67 @@
|
||||
package com.backend.hls.proxy.service;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.backend.hls.proxy.util.PipeUtil;
|
||||
|
||||
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
|
||||
|
||||
@Service
|
||||
public class RandomEffectPreprocessService implements PreprocessService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PreprocessService.class);
|
||||
|
||||
public byte[] preprocess(byte[] data) {
|
||||
try {
|
||||
String format = findFormat(data);
|
||||
logger.info("format is {}", format);
|
||||
return randomEffectsHLS(data, format, "/usr/bin/ffmpeg");
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return data;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static byte[] randomEffectsHLS(byte[] data, String inputFormat, String ffmpegPath) throws IOException {
|
||||
try (InputStream inputStream = new ByteArrayInputStream(data)) {
|
||||
String[] effects = {
|
||||
"hue=s=10", // Color shift
|
||||
"edgedetect=mode=colormix", // Edge detection
|
||||
"boxblur=10:1", // Heavy blur
|
||||
"noise=alls=20:allf=t", // Film grain noise
|
||||
"colorchannelmixer=.3:.4:.3:0:.3:.4:.3:0:.3:.4:.3", // Vintage
|
||||
"rotate=0.1*c", // Slight rotation
|
||||
"scale=iw/2:ih/2" // Pixelate
|
||||
};
|
||||
|
||||
Random random = new Random();
|
||||
String randomEffect = effects[random.nextInt(effects.length)];
|
||||
logger.info("applied effect {}", randomEffect);
|
||||
|
||||
String[] ffmpegArgs = {
|
||||
"-vf", randomEffect,
|
||||
"-f", inputFormat,
|
||||
};
|
||||
|
||||
return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs);
|
||||
}
|
||||
}
|
||||
|
||||
private String findFormat(byte[] data) throws IOException {
|
||||
FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data));
|
||||
logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList()));
|
||||
if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) {
|
||||
throw new IOException("No video stream found");
|
||||
}
|
||||
return result.format.format_name;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package com.backend.hls.proxy.service;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class URIResolveService {
|
||||
public String resolve(String url, String base) {
|
||||
if (url.startsWith("http://") || url.startsWith("https://")) {
|
||||
return url;
|
||||
}
|
||||
return base + url;
|
||||
}
|
||||
}
|
||||
44
src/main/java/com/backend/hls/proxy/service/cache/FileCacheManager.java
vendored
Normal file
44
src/main/java/com/backend/hls/proxy/service/cache/FileCacheManager.java
vendored
Normal file
@ -0,0 +1,44 @@
|
||||
package com.backend.hls.proxy.service.cache;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.springframework.cache.CacheManager;
|
||||
|
||||
public class FileCacheManager implements CacheManager {
|
||||
|
||||
private final ConcurrentMap<String, TempFileCache> caches = new ConcurrentHashMap<>();
|
||||
private final String baseDirectory;
|
||||
|
||||
public FileCacheManager(String baseDirectory) {
|
||||
this.baseDirectory = baseDirectory;
|
||||
|
||||
try {
|
||||
Files.createDirectories(Paths.get(baseDirectory));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to create cache directory", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.springframework.cache.Cache getCache(String name) {
|
||||
return caches.computeIfAbsent(name, cacheName -> {
|
||||
try {
|
||||
return new TempFileCache(cacheName,
|
||||
Paths.get(baseDirectory, cacheName));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to create cache: " + cacheName, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> getCacheNames() {
|
||||
return caches.keySet();
|
||||
}
|
||||
|
||||
}
|
||||
274
src/main/java/com/backend/hls/proxy/service/cache/TempFileCache.java
vendored
Normal file
274
src/main/java/com/backend/hls/proxy/service/cache/TempFileCache.java
vendored
Normal file
@ -0,0 +1,274 @@
|
||||
package com.backend.hls.proxy.service.cache;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.RemovalCause;
|
||||
import org.springframework.cache.support.SimpleValueWrapper;
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.*;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class TempFileCache implements org.springframework.cache.Cache {
|
||||
|
||||
private final String name;
|
||||
private final Path cacheDirectory;
|
||||
private final Cache<Object, CacheMetadata> metadataCache;
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
private static class CacheMetadata {
|
||||
String filename;
|
||||
long size;
|
||||
long lastAccessed;
|
||||
long createdTime;
|
||||
Path filePath;
|
||||
|
||||
CacheMetadata(String filename, long size, Path filePath) {
|
||||
this.filename = filename;
|
||||
this.size = size;
|
||||
this.filePath = filePath;
|
||||
this.lastAccessed = System.currentTimeMillis();
|
||||
this.createdTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
void updateAccess() {
|
||||
this.lastAccessed = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
boolean isFileValid() {
|
||||
return Files.exists(filePath);
|
||||
}
|
||||
}
|
||||
|
||||
public TempFileCache(String name, Path cacheDirectory) throws IOException {
|
||||
this.name = name;
|
||||
this.cacheDirectory = cacheDirectory;
|
||||
|
||||
this.metadataCache = Caffeine.newBuilder()
|
||||
.maximumSize(1000)
|
||||
.expireAfterAccess(1, TimeUnit.HOURS)
|
||||
.removalListener((Object key, CacheMetadata metadata, RemovalCause cause) -> {
|
||||
if (metadata != null && metadata.filePath != null) {
|
||||
try {
|
||||
Files.deleteIfExists(metadata.filePath);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
})
|
||||
.build();
|
||||
|
||||
Files.createDirectories(cacheDirectory);
|
||||
cleanUpOrphanedFiles();
|
||||
}
|
||||
|
||||
private void cleanUpOrphanedFiles() {
|
||||
try {
|
||||
Files.list(cacheDirectory)
|
||||
.filter(Files::isRegularFile)
|
||||
.forEach(file -> {
|
||||
boolean hasMetadata = metadataCache.asMap().values().stream()
|
||||
.anyMatch(meta -> meta.filePath.equals(file));
|
||||
if (!hasMetadata) {
|
||||
try {
|
||||
Files.delete(file);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getNativeCache() {
|
||||
return metadataCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public ValueWrapper get(Object key) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
CacheMetadata meta = metadataCache.getIfPresent(key);
|
||||
if (meta == null || !meta.isFileValid()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
byte[] data = Files.readAllBytes(meta.filePath);
|
||||
meta.updateAccess();
|
||||
metadataCache.put(key, meta);
|
||||
return new SimpleValueWrapper(data);
|
||||
} catch (IOException e) {
|
||||
metadataCache.invalidate(key);
|
||||
return null;
|
||||
}
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public <T> T get(Object key, @Nullable Class<T> type) {
|
||||
ValueWrapper wrapper = get(key);
|
||||
if (wrapper == null) {
|
||||
return null;
|
||||
}
|
||||
Object value = wrapper.get();
|
||||
if (type != null && !type.isInstance(value)) {
|
||||
throw new IllegalStateException("Cached value is not of required type");
|
||||
}
|
||||
return (T) value;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public <T> T get(Object key, Callable<T> valueLoader) {
|
||||
ValueWrapper wrapper = get(key);
|
||||
if (wrapper != null) {
|
||||
return (T) wrapper.get();
|
||||
}
|
||||
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
// Double-check after acquiring write lock
|
||||
wrapper = get(key);
|
||||
if (wrapper != null) {
|
||||
return (T) wrapper.get();
|
||||
}
|
||||
|
||||
T value = valueLoader.call();
|
||||
if (value instanceof byte[]) {
|
||||
put(key, value);
|
||||
}
|
||||
return value;
|
||||
} catch (Exception e) {
|
||||
throw new ValueRetrievalException(key, valueLoader, e);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(Object key, @Nullable Object value) {
|
||||
if (value == null) {
|
||||
evict(key);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(value instanceof byte[])) {
|
||||
throw new IllegalArgumentException("TempFileCache only supports byte arrays");
|
||||
}
|
||||
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
byte[] data = (byte[]) value;
|
||||
|
||||
String filename = generateFilename(key, data);
|
||||
Path filePath = cacheDirectory.resolve(filename);
|
||||
|
||||
Files.write(filePath, data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
|
||||
|
||||
CacheMetadata newMeta = new CacheMetadata(filename, data.length, filePath);
|
||||
CacheMetadata oldMeta = metadataCache.asMap().put(key, newMeta);
|
||||
|
||||
if (oldMeta != null && oldMeta.filePath != null && !oldMeta.filePath.equals(filePath)) {
|
||||
Files.deleteIfExists(oldMeta.filePath);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to cache file", e);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private String generateFilename(Object key, byte[] data) {
|
||||
String hash = Integer.toHexString(key.hashCode());
|
||||
String sizeSuffix = "_" + data.length;
|
||||
String timestamp = "_" + System.currentTimeMillis();
|
||||
return hash + sizeSuffix + timestamp + ".cache";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(Object key) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
CacheMetadata meta = metadataCache.asMap().remove(key);
|
||||
if (meta != null && meta.filePath != null) {
|
||||
try {
|
||||
Files.deleteIfExists(meta.filePath);
|
||||
} catch (IOException e) {
|
||||
// Log warning
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
metadataCache.invalidateAll();
|
||||
|
||||
try {
|
||||
Files.list(cacheDirectory)
|
||||
.filter(Files::isRegularFile)
|
||||
.forEach(path -> {
|
||||
try {
|
||||
Files.delete(path);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public ValueWrapper putIfAbsent(Object key, @Nullable Object value) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
ValueWrapper existing = get(key);
|
||||
if (existing == null) {
|
||||
put(key, value);
|
||||
return null;
|
||||
}
|
||||
return existing;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public com.github.benmanes.caffeine.cache.stats.CacheStats getMetadataStats() {
|
||||
return metadataCache.stats();
|
||||
}
|
||||
|
||||
public long getCacheSize() {
|
||||
return metadataCache.asMap().values().stream()
|
||||
.mapToLong(meta -> meta.size)
|
||||
.sum();
|
||||
}
|
||||
|
||||
public int getCacheCount() {
|
||||
return (int) metadataCache.estimatedSize();
|
||||
}
|
||||
|
||||
public void cleanupExpiredEntries() {
|
||||
metadataCache.cleanUp();
|
||||
}
|
||||
}
|
||||
110
src/main/java/com/backend/hls/proxy/util/FractionAdapter.java
Normal file
110
src/main/java/com/backend/hls/proxy/util/FractionAdapter.java
Normal file
@ -0,0 +1,110 @@
|
||||
package com.backend.hls.proxy.util;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.lang3.math.Fraction;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
|
||||
/**
|
||||
* Jackson TypeAdapter for Apache Commons Math Fraction Object
|
||||
*
|
||||
* @author bramp
|
||||
*/
|
||||
public class FractionAdapter {
|
||||
|
||||
private final Fraction zeroByZero;
|
||||
private final Fraction divideByZero;
|
||||
|
||||
public FractionAdapter() {
|
||||
this(Fraction.ZERO, Fraction.ZERO);
|
||||
}
|
||||
|
||||
public FractionAdapter(Fraction zeroByZero, Fraction divideByZero) {
|
||||
this.zeroByZero = zeroByZero;
|
||||
this.divideByZero = divideByZero;
|
||||
}
|
||||
|
||||
/**
|
||||
* Jackson Deserializer for Fraction
|
||||
*/
|
||||
public class FractionDeserializer extends JsonDeserializer<Fraction> {
|
||||
|
||||
@Override
|
||||
public Fraction deserialize(JsonParser parser, DeserializationContext context) throws IOException {
|
||||
JsonToken token = parser.currentToken();
|
||||
|
||||
if (token == JsonToken.VALUE_NULL) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (token == JsonToken.VALUE_NUMBER_INT || token == JsonToken.VALUE_NUMBER_FLOAT) {
|
||||
return Fraction.getFraction(parser.getDoubleValue());
|
||||
}
|
||||
|
||||
if (token == JsonToken.VALUE_STRING) {
|
||||
String fraction = parser.getText().trim();
|
||||
|
||||
if (zeroByZero != null && "0/0".equals(fraction)) {
|
||||
return zeroByZero;
|
||||
}
|
||||
|
||||
if (divideByZero != null && fraction.endsWith("/0")) {
|
||||
return divideByZero;
|
||||
}
|
||||
|
||||
return Fraction.getFraction(fraction);
|
||||
}
|
||||
|
||||
throw context.wrongTokenException(parser, Fraction.class, token,
|
||||
"Expected NUMBER or STRING token for Fraction");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Jackson Serializer for Fraction
|
||||
*/
|
||||
public static class FractionSerializer extends JsonSerializer<Fraction> {
|
||||
|
||||
@Override
|
||||
public void serialize(Fraction value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
|
||||
if (value == null) {
|
||||
gen.writeNull();
|
||||
} else {
|
||||
gen.writeString(value.toProperString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to create and register the adapter with a module
|
||||
*/
|
||||
public void registerModule(SimpleModule module) {
|
||||
module.addDeserializer(Fraction.class, new FractionDeserializer());
|
||||
module.addSerializer(Fraction.class, new FractionSerializer());
|
||||
}
|
||||
|
||||
/**
|
||||
* Static utility method to create a pre-configured module
|
||||
*/
|
||||
public static SimpleModule createModule() {
|
||||
return createModule(Fraction.ZERO, Fraction.ZERO);
|
||||
}
|
||||
|
||||
/**
|
||||
* Static utility method to create a pre-configured module with custom values
|
||||
*/
|
||||
public static SimpleModule createModule(Fraction zeroByZero, Fraction divideByZero) {
|
||||
SimpleModule module = new SimpleModule();
|
||||
FractionAdapter adapter = new FractionAdapter(zeroByZero, divideByZero);
|
||||
adapter.registerModule(module);
|
||||
return module;
|
||||
}
|
||||
}
|
||||
201
src/main/java/com/backend/hls/proxy/util/PipeUtil.java
Normal file
201
src/main/java/com/backend/hls/proxy/util/PipeUtil.java
Normal file
@ -0,0 +1,201 @@
|
||||
package com.backend.hls.proxy.util;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.MapperFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
|
||||
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
|
||||
|
||||
/**
|
||||
* Utility class for piping data to FFmpeg/FFprobe
|
||||
*/
|
||||
public class PipeUtil {
|
||||
private static final Logger logger = LoggerFactory.getLogger(PipeUtil.class);
|
||||
|
||||
private static final ObjectMapper objectMapper = JsonMapper
|
||||
.builder()
|
||||
.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
|
||||
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
|
||||
.addModule(FractionAdapter
|
||||
.createModule())
|
||||
.build();
|
||||
private static final int BUFFER_SIZE = 8192;
|
||||
|
||||
/**
|
||||
* Execute FFprobe with pipe input
|
||||
*/
|
||||
public static FFmpegProbeResult probeWithPipe(String ffprobe, InputStream inputStream)
|
||||
throws IOException {
|
||||
|
||||
ProcessBuilder pb = new ProcessBuilder(
|
||||
ffprobe,
|
||||
"-v", "quiet",
|
||||
"-print_format", "json",
|
||||
"-show_format",
|
||||
"-show_streams",
|
||||
"pipe:0");
|
||||
|
||||
pb.redirectErrorStream(false);
|
||||
Process process = pb.start();
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(4);
|
||||
|
||||
Future<String> errorReader = executor.submit(() -> {
|
||||
try (BufferedReader stderr = new BufferedReader(
|
||||
new InputStreamReader(process.getErrorStream()))) {
|
||||
return stderr.lines().reduce("", (a, b) -> a + "\n" + b);
|
||||
} catch (IOException e) {
|
||||
return "";
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
// Write input to stdin
|
||||
Future<?> writer = executor.submit(() -> {
|
||||
try (OutputStream stdin = process.getOutputStream()) {
|
||||
inputStream.transferTo(stdin);
|
||||
stdin.flush();
|
||||
} catch (IOException e) {
|
||||
logger.error("FFMpeg warning: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
// Read JSON output from stdout
|
||||
Future<String> reader = executor.submit(() -> {
|
||||
try (BufferedReader stdout = new BufferedReader(
|
||||
new InputStreamReader(process.getInputStream()))) {
|
||||
return stdout.lines().reduce("", (a, b) -> a + b);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
writer.get(30, TimeUnit.SECONDS);
|
||||
boolean finished = process.waitFor(30, TimeUnit.SECONDS);
|
||||
|
||||
if (!finished || process.exitValue() != 0) {
|
||||
String error = errorReader.get(1, TimeUnit.SECONDS);
|
||||
logger.error("FFProbe error: {}", error);
|
||||
throw new IOException("FFprobe failed");
|
||||
}
|
||||
|
||||
String json = reader.get(5, TimeUnit.SECONDS);
|
||||
logger.info("Probed JSON: {}", json);
|
||||
return objectMapper.readValue(json, FFmpegProbeResult.class);
|
||||
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
process.destroyForcibly();
|
||||
throw new IOException("Error probing stream", e);
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute FFmpeg with pipe input and output
|
||||
*/
|
||||
public static byte[] executeWithPipe(String ffmpegPath, InputStream inputStream,
|
||||
String inputFormat, String... ffmpegArgs)
|
||||
throws IOException {
|
||||
|
||||
ProcessBuilder pb = new ProcessBuilder();
|
||||
pb.command().add(ffmpegPath);
|
||||
pb.command().add("-f");
|
||||
pb.command().add(inputFormat);
|
||||
pb.command().add("-i");
|
||||
pb.command().add("pipe:0");
|
||||
|
||||
for (String arg : ffmpegArgs) {
|
||||
pb.command().add(arg);
|
||||
}
|
||||
|
||||
pb.command().add("pipe:1");
|
||||
pb.redirectErrorStream(false);
|
||||
|
||||
Process process = pb.start();
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(4);
|
||||
|
||||
Future<String> errorReader = executor.submit(() -> {
|
||||
try (BufferedReader stderr = new BufferedReader(
|
||||
new InputStreamReader(process.getErrorStream()))) {
|
||||
return stderr.lines().reduce("", (a, b) -> a + "\n" + b);
|
||||
} catch (IOException e) {
|
||||
return "";
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
Future<?> writer = executor.submit(() -> {
|
||||
try (OutputStream stdin = process.getOutputStream()) {
|
||||
byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int read;
|
||||
while ((read = inputStream.read(buffer)) != -1) {
|
||||
stdin.write(buffer, 0, read);
|
||||
}
|
||||
stdin.flush();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
Future<byte[]> reader = executor.submit(() -> {
|
||||
try (InputStream stdout = process.getInputStream()) {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int read;
|
||||
while ((read = stdout.read(buffer)) != -1) {
|
||||
baos.write(buffer, 0, read);
|
||||
}
|
||||
return baos.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
writer.get(60, TimeUnit.SECONDS);
|
||||
boolean finished = process.waitFor(120, TimeUnit.SECONDS);
|
||||
|
||||
if (!finished) {
|
||||
process.destroyForcibly();
|
||||
throw new IOException("FFmpeg timeout");
|
||||
}
|
||||
|
||||
if (process.exitValue() != 0) {
|
||||
String error = errorReader.get(1, TimeUnit.SECONDS);
|
||||
logger.error("FFMpeg error: {}", error);
|
||||
throw new IOException("FFmpeg failed: " + error);
|
||||
}
|
||||
|
||||
return reader.get(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
try {
|
||||
String error = errorReader.get(1, TimeUnit.SECONDS);
|
||||
logger.error("FFMpeg error: {}", error);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e1) {
|
||||
e1.printStackTrace();
|
||||
}
|
||||
process.destroyForcibly();
|
||||
throw new IOException("Error executing FFmpeg", e);
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user