Compare commits

4 Commits

23 changed files with 963 additions and 44 deletions

35
.dockerignore Normal file
View File

@ -0,0 +1,35 @@
HELP.md
.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
.env

35
Dockerfile.dev Normal file
View File

@ -0,0 +1,35 @@
FROM eclipse-temurin:21-jdk-jammy
RUN apt-get update && \
apt-get install -y --no-install-recommends \
curl \
vim \
git \
ca-certificates \
ffmpeg && \
rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN groupadd --gid 1000 spring-app && \
useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app
RUN mkdir -p /home/spring-app/.m2 && \
chown -R spring-app:spring-app /home/spring-app/.m2
WORKDIR /app
COPY mvnw .
COPY .mvn .mvn
COPY pom.xml .
COPY target target
RUN chmod +x mvnw && \
chown -R spring-app:spring-app /app
USER spring-app:spring-app
COPY src ./src
EXPOSE 8080
ENTRYPOINT ["./mvnw", "spring-boot:run"]

View File

@ -33,8 +33,9 @@ RUN groupadd --gid 1000 spring-app \
&& useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app && useradd --uid 1000 --gid spring-app --shell /bin/bash --create-home spring-app
RUN apt-get update && \ RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \ apt-get install -y --no-install-recommends \
rm -rf /var/lib/apt/lists/* ca-certificates \
ffmpeg
USER spring-app:spring-app USER spring-app:spring-app
WORKDIR /opt/workspace WORKDIR /opt/workspace

View File

@ -5,13 +5,21 @@ services:
- anyame-shared - anyame-shared
hls-proxy: hls-proxy:
image: hls-proxy:latest build:
context: .
dockerfile: Dockerfile.dev
env_file: .env env_file: .env
ports: ports:
- 8082:8080 - 8082:8080
networks: networks:
- anyame-shared - anyame-shared
- elk-network - elk-network
volumes:
- .:/app
- maven-repo:/home/spring-app/.m2/
volumes:
maven-repo:
networks: networks:
anyame-shared: anyame-shared:

11
pom.xml
View File

@ -90,6 +90,17 @@
<version>${spring-context-support.version}</version> <version>${spring-context-support.version}</version>
</dependency> </dependency>
<dependency>
<groupId>net.bramp.ffmpeg</groupId>
<artifactId>ffmpeg</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>

View File

@ -2,7 +2,9 @@ package com.backend.hls.proxy;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@EnableDiscoveryClient
@SpringBootApplication @SpringBootApplication
public class HlsProxyApplication { public class HlsProxyApplication {

View File

@ -1,18 +1,37 @@
package com.backend.hls.proxy.config; package com.backend.hls.proxy.config;
import java.util.concurrent.TimeUnit;
import org.springframework.cache.CacheManager; import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching; import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCacheManager; import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import com.backend.hls.proxy.model.SimpleResponse;
import com.github.benmanes.caffeine.cache.Caffeine;
@Configuration @Configuration
@EnableCaching @EnableCaching
public class CacheConfig { public class CacheConfig {
@Bean @Bean
public CacheManager cacheManager() { public CacheManager cacheManager() {
return new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent"); CaffeineCacheManager cacheManager = new CaffeineCacheManager("hlsPlaylistContent", "playlistSegmentContent");
cacheManager.setCaffeine(Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.HOURS)
.weigher((Object key, Object value) -> {
if (value instanceof byte[] valueBytes) {
return valueBytes.length;
}
if (value instanceof SimpleResponse<?> response && response.getBody() instanceof byte[] body) {
return body.length;
}
return 0;
})
.maximumWeight(500 * 1024 * 1024)
.recordStats());
return cacheManager;
} }
} }

View File

@ -0,0 +1,16 @@
package com.backend.hls.proxy.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.backend.hls.proxy.service.NoopPreprocessService;
import com.backend.hls.proxy.service.PreprocessService;
@Configuration
public class ProcessConfig {
@Bean
public PreprocessService preprocessService() {
return new NoopPreprocessService();
}
}

View File

@ -1,30 +1,49 @@
package com.backend.hls.proxy.controller; package com.backend.hls.proxy.controller;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder; import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
import com.backend.hls.proxy.dto.CreateProxyDTO;
import com.backend.hls.proxy.exception.FetchFailException; import com.backend.hls.proxy.exception.FetchFailException;
import com.backend.hls.proxy.exception.PlaylistParseException; import com.backend.hls.proxy.exception.PlaylistParseException;
import com.backend.hls.proxy.service.PlaylistProxyService; import com.backend.hls.proxy.service.PlaylistProxyService;
import com.backend.hls.proxy.service.URLForwardService;
@RestController @RestController
public class ProxyController { public class ProxyController {
private static final Logger logger = LoggerFactory.getLogger(ProxyController.class);
private final PlaylistProxyService playlistProxyService; private final PlaylistProxyService playlistProxyService;
private final URLForwardService forwardService;
public ProxyController(PlaylistProxyService playlistProxyService) { public ProxyController(PlaylistProxyService playlistProxyService, URLForwardService forwardService) {
this.playlistProxyService = playlistProxyService; this.playlistProxyService = playlistProxyService;
this.forwardService = forwardService;
} }
@GetMapping("/proxy") @GetMapping("/proxy")
public ResponseEntity<?> proxy(@RequestParam("url") String url) throws FetchFailException, PlaylistParseException { public ResponseEntity<?> proxyPlaylist(@RequestParam("url") String url)
throws FetchFailException, PlaylistParseException {
logger.info("Proxying playlist: {}", url);
String fullUrl = ServletUriComponentsBuilder.fromCurrentRequestUri().build().toUriString(); String fullUrl = ServletUriComponentsBuilder.fromCurrentRequestUri().build().toUriString();
String baseUrl = fullUrl.substring(0, fullUrl.indexOf("/", 8)); String baseUrl = fullUrl.substring(0, fullUrl.indexOf("/", 8));
logger.info("Full URL: {}, base URL: {}", fullUrl, baseUrl);
String result = playlistProxyService.proxyPlaylist(url, baseUrl); String result = playlistProxyService.proxyPlaylist(url, baseUrl);
return ResponseEntity.ok(result); return ResponseEntity.ok(result);
} }
@PostMapping("/proxy")
public ResponseEntity<?> createProxy(@RequestBody CreateProxyDTO url) {
String location = forwardService.createForwarded(url.getUrl());
return ResponseEntity.created(URI.create(location)).build();
}
} }

View File

@ -4,6 +4,10 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Optional; import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@ -19,18 +23,23 @@ import com.backend.hls.proxy.model.RangeRequest;
import com.backend.hls.proxy.model.SimpleResponse; import com.backend.hls.proxy.model.SimpleResponse;
import com.backend.hls.proxy.repository.LinkRepository; import com.backend.hls.proxy.repository.LinkRepository;
import com.backend.hls.proxy.service.FetchService; import com.backend.hls.proxy.service.FetchService;
import com.backend.hls.proxy.service.PreprocessService;
@RestController @RestController
public class ProxyServeController { public class ProxyServeController {
private static final Logger logger = LoggerFactory.getLogger(ProxyServeController.class);
private final LinkRepository linkRepository; private final LinkRepository linkRepository;
private final ProxyController proxyController; private final ProxyController proxyController;
private final FetchService fetchService; private final FetchService fetchService;
private final PreprocessService preprocessService;
public ProxyServeController(LinkRepository linkRepository, ProxyController proxyController, public ProxyServeController(LinkRepository linkRepository, ProxyController proxyController,
FetchService fetchService) { FetchService fetchService, PreprocessService preprocessService) {
this.linkRepository = linkRepository; this.linkRepository = linkRepository;
this.proxyController = proxyController; this.proxyController = proxyController;
this.fetchService = fetchService; this.fetchService = fetchService;
this.preprocessService = preprocessService;
} }
@GetMapping("/proxy/{id}") @GetMapping("/proxy/{id}")
@ -40,13 +49,14 @@ public class ProxyServeController {
if (id.contains(".")) if (id.contains("."))
id = id.substring(0, id.lastIndexOf(".")); id = id.substring(0, id.lastIndexOf("."));
Optional<Link> link = linkRepository.findById(id); Optional<Link> link = linkRepository.findById(id);
logger.info("id {}, link is {}", id, link.map(l -> l.getUrl()).orElse("<null>"));
if (link.isEmpty()) { if (link.isEmpty()) {
return ResponseEntity.notFound().build(); return ResponseEntity.notFound().build();
} }
String url = link.get().getUrl(); String url = link.get().getUrl();
if (url.contains(".m3u8")) { if (url.contains(".m3u8")) {
try { try {
return proxyController.proxy(url); return proxyController.proxyPlaylist(url);
} catch (FetchFailException | PlaylistParseException e) { } catch (FetchFailException | PlaylistParseException e) {
e.printStackTrace(); e.printStackTrace();
return redirect(url); return redirect(url);
@ -71,7 +81,8 @@ public class ProxyServeController {
response.getHeaders().map().forEach((key, values) -> { response.getHeaders().map().forEach((key, values) -> {
headers.addAll(key, values); headers.addAll(key, values);
}); });
return new ResponseEntity<>(response.getBody(), headers, HttpStatus.OK);
return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers, HttpStatus.OK);
} }
private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader) private ResponseEntity<?> handleRangeRequest(String url, String rangeHeader)
@ -84,7 +95,6 @@ public class ProxyServeController {
RangeRequest range = RangeRequest.parse(rangeHeader, contentLength); RangeRequest range = RangeRequest.parse(rangeHeader, contentLength);
if (range == null) { if (range == null) {
// Invalid range, return 416 Range Not Satisfiable
HttpHeaders headers = new HttpHeaders(); HttpHeaders headers = new HttpHeaders();
headers.add("Content-Range", "bytes */" + contentLength); headers.add("Content-Range", "bytes */" + contentLength);
return new ResponseEntity<>(headers, HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE); return new ResponseEntity<>(headers, HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE);
@ -107,7 +117,8 @@ public class ProxyServeController {
headers.add("Accept-Ranges", "bytes"); headers.add("Accept-Ranges", "bytes");
headers.setContentLength(range.getLength()); headers.setContentLength(range.getLength());
return new ResponseEntity<>(response.getBody(), headers, HttpStatus.PARTIAL_CONTENT); return new ResponseEntity<>(preprocessService.preprocess(response.getBody()), headers,
HttpStatus.PARTIAL_CONTENT);
} }
private ResponseEntity<?> redirect(String target) { private ResponseEntity<?> redirect(String target) {

View File

@ -0,0 +1,19 @@
package com.backend.hls.proxy.dto;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
public class CreateProxyDTO {
@NotNull
@NotEmpty
private final String url;
public CreateProxyDTO(String url) {
this.url = url;
}
public String getUrl() {
return url;
}
}

View File

@ -3,13 +3,17 @@ package com.backend.hls.proxy.exception;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
public class FetchFailException extends Exception { public class FetchFailException extends Exception {
private final HttpResponse<?> response; private HttpResponse<?> response;
public FetchFailException(String message, HttpResponse<?> response) { public FetchFailException(String message, HttpResponse<?> response) {
super(message); super(message);
this.response = response; this.response = response;
} }
public FetchFailException(String message, Exception e) {
super(message, e);
}
public HttpResponse<?> getResponse() { public HttpResponse<?> getResponse() {
return response; return response;
} }

View File

@ -27,19 +27,22 @@ public class FetchService {
* @throws FetchFailException * @throws FetchFailException
*/ */
@Cacheable("hlsPlaylistContent") @Cacheable("hlsPlaylistContent")
public String fetchTextContent(String url) throws IOException, InterruptedException, FetchFailException { public String fetchTextContent(String url) throws FetchFailException {
HttpRequest request = HttpRequest.newBuilder() HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url)) .uri(URI.create(url))
.build(); .build();
try {
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) { if (response.statusCode() != 200) {
throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(), throw new FetchFailException(
"Failed to fetch content from " + url + ", status: " + response.statusCode(),
response); response);
} }
return response.body(); return response.body();
} catch (IOException | InterruptedException e) {
throw new FetchFailException("Failed to fetch content from " + url, e);
}
} }
/** /**
@ -48,8 +51,7 @@ public class FetchService {
* @throws FetchFailException * @throws FetchFailException
*/ */
@Cacheable("playlistSegmentContent") @Cacheable("playlistSegmentContent")
public SimpleResponse<byte[]> fetchBinaryContent(String url) public SimpleResponse<byte[]> fetchBinaryContent(String url) throws FetchFailException {
throws IOException, InterruptedException, FetchFailException {
return fetchBinaryContent(url, null); return fetchBinaryContent(url, null);
} }
@ -60,7 +62,7 @@ public class FetchService {
*/ */
@Cacheable("playlistSegmentContent") @Cacheable("playlistSegmentContent")
public SimpleResponse<byte[]> fetchBinaryContent(String url, String rangeHeader) public SimpleResponse<byte[]> fetchBinaryContent(String url, String rangeHeader)
throws IOException, InterruptedException, FetchFailException { throws FetchFailException {
HttpRequest.Builder builder = HttpRequest.newBuilder() HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create(url)); .uri(URI.create(url));
@ -68,14 +70,20 @@ public class FetchService {
builder.header("Range", rangeHeader); builder.header("Range", rangeHeader);
} }
try {
HttpResponse<byte[]> response = httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofByteArray()); HttpResponse<byte[]> response = httpClient.send(builder.build(), HttpResponse.BodyHandlers.ofByteArray());
if (response.statusCode() >= 400) { if (response.statusCode() >= 400) {
throw new FetchFailException("Failed to fetch content from " + url + ", status: " + response.statusCode(), throw new FetchFailException(
"Failed to fetch content from " + url + ", status: " + response.statusCode(),
response); response);
} }
return new SimpleResponse<byte[]>(response.body(), response.headers()); return new SimpleResponse<byte[]>(response.body(), response.headers());
} catch (IOException | InterruptedException e) {
throw new FetchFailException("Failed to fetch content from " + url, e);
}
} }
/** /**

View File

@ -0,0 +1,13 @@
package com.backend.hls.proxy.service;
import org.springframework.stereotype.Service;
@Service
public class NoopPreprocessService implements PreprocessService {
@Override
public byte[] preprocess(byte[] data) {
return data;
}
}

View File

@ -11,6 +11,7 @@ import io.lindstrom.m3u8.model.MediaPlaylist;
import io.lindstrom.m3u8.model.MultivariantPlaylist; import io.lindstrom.m3u8.model.MultivariantPlaylist;
import io.lindstrom.m3u8.parser.MediaPlaylistParser; import io.lindstrom.m3u8.parser.MediaPlaylistParser;
import io.lindstrom.m3u8.parser.MultivariantPlaylistParser; import io.lindstrom.m3u8.parser.MultivariantPlaylistParser;
import io.lindstrom.m3u8.parser.PlaylistParserException;
@Service @Service
public class PlaylistParseService { public class PlaylistParseService {
@ -29,16 +30,16 @@ public class PlaylistParseService {
throws PlaylistParseException, FetchFailException { throws PlaylistParseException, FetchFailException {
try { try {
return playlistParser.readPlaylist(fetchService.fetchTextContent(m3u8URL)); return playlistParser.readPlaylist(fetchService.fetchTextContent(m3u8URL));
} catch (IOException | InterruptedException e) { } catch (PlaylistParserException e) {
throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e); throw new PlaylistParseException("Unable to parse playlist", e);
} }
} }
public MediaPlaylist readMediaPlaylist(String m3u8URL) throws PlaylistParseException, FetchFailException { public MediaPlaylist readMediaPlaylist(String m3u8URL) throws PlaylistParseException, FetchFailException {
try { try {
return mediaParser.readPlaylist(fetchService.fetchTextContent(m3u8URL)); return mediaParser.readPlaylist(fetchService.fetchTextContent(m3u8URL));
} catch (IOException | InterruptedException e) { } catch (PlaylistParserException e) {
throw new PlaylistParseException("Failed to read playlist from " + m3u8URL, e); throw new PlaylistParseException("Unable to parse playlist", e);
} }
} }

View File

@ -23,17 +23,20 @@ public class PlaylistProxyService {
private static final Logger logger = LoggerFactory.getLogger(PlaylistProxyService.class); private static final Logger logger = LoggerFactory.getLogger(PlaylistProxyService.class);
private final PlaylistParseService playlistParseService; private final PlaylistParseService playlistParseService;
private final URLForwardService urlForwardService; private final URLForwardService urlForwardService;
private final URIResolveService uriResolveService;
public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService) { public PlaylistProxyService(PlaylistParseService playlistParseService, URLForwardService urlForwardService,
URIResolveService uriResolveService) {
this.playlistParseService = playlistParseService; this.playlistParseService = playlistParseService;
this.urlForwardService = urlForwardService; this.urlForwardService = urlForwardService;
this.uriResolveService = uriResolveService;
} }
public String proxyPlaylist(String hlsUrl, String proxyUrl) public String proxyPlaylist(String hlsUrl, String proxyUrl)
throws FetchFailException { throws FetchFailException {
String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1); String base = hlsUrl.substring(0, hlsUrl.lastIndexOf('/') + 1);
String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1); String suffix = hlsUrl.substring(hlsUrl.lastIndexOf('/') + 1);
String url = base + suffix; String url = uriResolveService.resolve(suffix, base);
try { try {
MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url); MultivariantPlaylist playlist = playlistParseService.readMultivariantPlaylist(url);
@ -88,7 +91,7 @@ public class PlaylistProxyService {
logger.error("cannot proxy variant: {}", variant); logger.error("cannot proxy variant: {}", variant);
return variant; return variant;
} }
String variantUri = base + variant.uri(); String variantUri = uriResolveService.resolve(variant.uri(), base);
String proxiedUri = urlForwardService.createForwarded(variantUri); String proxiedUri = urlForwardService.createForwarded(variantUri);
return Variant.builder() return Variant.builder()
.from(variant) .from(variant)
@ -103,7 +106,7 @@ public class PlaylistProxyService {
if (rendition.uri().isEmpty()) { if (rendition.uri().isEmpty()) {
return rendition; return rendition;
} }
String renditionUri = base + rendition.uri().get(); String renditionUri = uriResolveService.resolve(rendition.uri().get(), base);
String proxiedUri = urlForwardService.createForwarded(renditionUri); String proxiedUri = urlForwardService.createForwarded(renditionUri);
return AlternativeRendition.builder() return AlternativeRendition.builder()
.from(rendition) .from(rendition)
@ -120,7 +123,7 @@ public class PlaylistProxyService {
return segment; return segment;
} }
String segmentUri = base + segment.uri(); String segmentUri = uriResolveService.resolve(segment.uri(), base);
String proxiedUri = urlForwardService.createForwarded(segmentUri); String proxiedUri = urlForwardService.createForwarded(segmentUri);
MediaSegment.Builder builder = MediaSegment.builder() MediaSegment.Builder builder = MediaSegment.builder()
@ -140,7 +143,7 @@ public class PlaylistProxyService {
logger.error("cannot proxy segment: {}", segment); logger.error("cannot proxy segment: {}", segment);
return segment; return segment;
} }
String segmentUri = base + segment.uri(); String segmentUri = uriResolveService.resolve(segment.uri(), base);
String proxiedUri = urlForwardService.createForwarded(segmentUri); String proxiedUri = urlForwardService.createForwarded(segmentUri);
return PartialSegment.builder() return PartialSegment.builder()
.from(segment) .from(segment)
@ -156,7 +159,7 @@ public class PlaylistProxyService {
logger.error("cannot proxy segment: {}", segmentMap); logger.error("cannot proxy segment: {}", segmentMap);
return segmentMap; return segmentMap;
} }
String segmentMapUri = base + segmentMap.uri(); String segmentMapUri = uriResolveService.resolve(segmentMap.uri(), base);
String proxiedUri = urlForwardService.createForwarded(segmentMapUri); String proxiedUri = urlForwardService.createForwarded(segmentMapUri);
return SegmentMap.builder() return SegmentMap.builder()
.from(segmentMap) .from(segmentMap)

View File

@ -0,0 +1,5 @@
package com.backend.hls.proxy.service;
public interface PreprocessService {
byte[] preprocess(byte[] data);
}

View File

@ -0,0 +1,67 @@
package com.backend.hls.proxy.service;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.backend.hls.proxy.util.PipeUtil;
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
@Service
public class RandomEffectPreprocessService implements PreprocessService {
private static final Logger logger = LoggerFactory.getLogger(PreprocessService.class);
public byte[] preprocess(byte[] data) {
try {
String format = findFormat(data);
logger.info("format is {}", format);
return randomEffectsHLS(data, format, "/usr/bin/ffmpeg");
} catch (IOException e) {
e.printStackTrace();
return data;
}
}
public static byte[] randomEffectsHLS(byte[] data, String inputFormat, String ffmpegPath) throws IOException {
try (InputStream inputStream = new ByteArrayInputStream(data)) {
String[] effects = {
"hue=s=10", // Color shift
"edgedetect=mode=colormix", // Edge detection
"boxblur=10:1", // Heavy blur
"noise=alls=20:allf=t", // Film grain noise
"colorchannelmixer=.3:.4:.3:0:.3:.4:.3:0:.3:.4:.3", // Vintage
"rotate=0.1*c", // Slight rotation
"scale=iw/2:ih/2" // Pixelate
};
Random random = new Random();
String randomEffect = effects[random.nextInt(effects.length)];
logger.info("applied effect {}", randomEffect);
String[] ffmpegArgs = {
"-vf", randomEffect,
"-f", inputFormat,
};
return PipeUtil.executeWithPipe(ffmpegPath, inputStream, inputFormat, ffmpegArgs);
}
}
private String findFormat(byte[] data) throws IOException {
FFmpegProbeResult result = PipeUtil.probeWithPipe("/usr/bin/ffprobe", new ByteArrayInputStream(data));
logger.info("info: {}", result.streams.stream().map(stream -> stream.codec_type).collect(Collectors.toList()));
if (result.streams.stream().noneMatch(stream -> stream.codec_type.name().equals("VIDEO"))) {
throw new IOException("No video stream found");
}
return result.format.format_name;
}
}

View File

@ -0,0 +1,13 @@
package com.backend.hls.proxy.service;
import org.springframework.stereotype.Service;
@Service
public class URIResolveService {
public String resolve(String url, String base) {
if (url.startsWith("http://") || url.startsWith("https://")) {
return url;
}
return base + url;
}
}

View File

@ -0,0 +1,44 @@
package com.backend.hls.proxy.service.cache;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.cache.CacheManager;
public class FileCacheManager implements CacheManager {
private final ConcurrentMap<String, TempFileCache> caches = new ConcurrentHashMap<>();
private final String baseDirectory;
public FileCacheManager(String baseDirectory) {
this.baseDirectory = baseDirectory;
try {
Files.createDirectories(Paths.get(baseDirectory));
} catch (IOException e) {
throw new RuntimeException("Failed to create cache directory", e);
}
}
@Override
public org.springframework.cache.Cache getCache(String name) {
return caches.computeIfAbsent(name, cacheName -> {
try {
return new TempFileCache(cacheName,
Paths.get(baseDirectory, cacheName));
} catch (IOException e) {
throw new RuntimeException("Failed to create cache: " + cacheName, e);
}
});
}
@Override
public Collection<String> getCacheNames() {
return caches.keySet();
}
}

View File

@ -0,0 +1,274 @@
package com.backend.hls.proxy.service.cache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.springframework.cache.support.SimpleValueWrapper;
import org.springframework.lang.Nullable;
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TempFileCache implements org.springframework.cache.Cache {
private final String name;
private final Path cacheDirectory;
private final Cache<Object, CacheMetadata> metadataCache;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static class CacheMetadata {
String filename;
long size;
long lastAccessed;
long createdTime;
Path filePath;
CacheMetadata(String filename, long size, Path filePath) {
this.filename = filename;
this.size = size;
this.filePath = filePath;
this.lastAccessed = System.currentTimeMillis();
this.createdTime = System.currentTimeMillis();
}
void updateAccess() {
this.lastAccessed = System.currentTimeMillis();
}
boolean isFileValid() {
return Files.exists(filePath);
}
}
public TempFileCache(String name, Path cacheDirectory) throws IOException {
this.name = name;
this.cacheDirectory = cacheDirectory;
this.metadataCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterAccess(1, TimeUnit.HOURS)
.removalListener((Object key, CacheMetadata metadata, RemovalCause cause) -> {
if (metadata != null && metadata.filePath != null) {
try {
Files.deleteIfExists(metadata.filePath);
} catch (IOException e) {
}
}
})
.build();
Files.createDirectories(cacheDirectory);
cleanUpOrphanedFiles();
}
private void cleanUpOrphanedFiles() {
try {
Files.list(cacheDirectory)
.filter(Files::isRegularFile)
.forEach(file -> {
boolean hasMetadata = metadataCache.asMap().values().stream()
.anyMatch(meta -> meta.filePath.equals(file));
if (!hasMetadata) {
try {
Files.delete(file);
} catch (IOException e) {
}
}
});
} catch (IOException e) {
}
}
@Override
public String getName() {
return name;
}
@Override
public Object getNativeCache() {
return metadataCache;
}
@Override
@Nullable
public ValueWrapper get(Object key) {
lock.readLock().lock();
try {
CacheMetadata meta = metadataCache.getIfPresent(key);
if (meta == null || !meta.isFileValid()) {
return null;
}
try {
byte[] data = Files.readAllBytes(meta.filePath);
meta.updateAccess();
metadataCache.put(key, meta);
return new SimpleValueWrapper(data);
} catch (IOException e) {
metadataCache.invalidate(key);
return null;
}
} finally {
lock.readLock().unlock();
}
}
@Override
@Nullable
public <T> T get(Object key, @Nullable Class<T> type) {
ValueWrapper wrapper = get(key);
if (wrapper == null) {
return null;
}
Object value = wrapper.get();
if (type != null && !type.isInstance(value)) {
throw new IllegalStateException("Cached value is not of required type");
}
return (T) value;
}
@Override
@Nullable
public <T> T get(Object key, Callable<T> valueLoader) {
ValueWrapper wrapper = get(key);
if (wrapper != null) {
return (T) wrapper.get();
}
lock.writeLock().lock();
try {
// Double-check after acquiring write lock
wrapper = get(key);
if (wrapper != null) {
return (T) wrapper.get();
}
T value = valueLoader.call();
if (value instanceof byte[]) {
put(key, value);
}
return value;
} catch (Exception e) {
throw new ValueRetrievalException(key, valueLoader, e);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void put(Object key, @Nullable Object value) {
if (value == null) {
evict(key);
return;
}
if (!(value instanceof byte[])) {
throw new IllegalArgumentException("TempFileCache only supports byte arrays");
}
lock.writeLock().lock();
try {
byte[] data = (byte[]) value;
String filename = generateFilename(key, data);
Path filePath = cacheDirectory.resolve(filename);
Files.write(filePath, data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
CacheMetadata newMeta = new CacheMetadata(filename, data.length, filePath);
CacheMetadata oldMeta = metadataCache.asMap().put(key, newMeta);
if (oldMeta != null && oldMeta.filePath != null && !oldMeta.filePath.equals(filePath)) {
Files.deleteIfExists(oldMeta.filePath);
}
} catch (IOException e) {
throw new RuntimeException("Failed to cache file", e);
} finally {
lock.writeLock().unlock();
}
}
private String generateFilename(Object key, byte[] data) {
String hash = Integer.toHexString(key.hashCode());
String sizeSuffix = "_" + data.length;
String timestamp = "_" + System.currentTimeMillis();
return hash + sizeSuffix + timestamp + ".cache";
}
@Override
public void evict(Object key) {
lock.writeLock().lock();
try {
CacheMetadata meta = metadataCache.asMap().remove(key);
if (meta != null && meta.filePath != null) {
try {
Files.deleteIfExists(meta.filePath);
} catch (IOException e) {
// Log warning
}
}
} finally {
lock.writeLock().unlock();
}
}
@Override
public void clear() {
lock.writeLock().lock();
try {
metadataCache.invalidateAll();
try {
Files.list(cacheDirectory)
.filter(Files::isRegularFile)
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
}
});
} catch (IOException e) {
}
} finally {
lock.writeLock().unlock();
}
}
@Override
@Nullable
public ValueWrapper putIfAbsent(Object key, @Nullable Object value) {
lock.writeLock().lock();
try {
ValueWrapper existing = get(key);
if (existing == null) {
put(key, value);
return null;
}
return existing;
} finally {
lock.writeLock().unlock();
}
}
public com.github.benmanes.caffeine.cache.stats.CacheStats getMetadataStats() {
return metadataCache.stats();
}
public long getCacheSize() {
return metadataCache.asMap().values().stream()
.mapToLong(meta -> meta.size)
.sum();
}
public int getCacheCount() {
return (int) metadataCache.estimatedSize();
}
public void cleanupExpiredEntries() {
metadataCache.cleanUp();
}
}

View File

@ -0,0 +1,110 @@
package com.backend.hls.proxy.util;
import java.io.IOException;
import org.apache.commons.lang3.math.Fraction;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
/**
* Jackson TypeAdapter for Apache Commons Math Fraction Object
*
* @author bramp
*/
public class FractionAdapter {
private final Fraction zeroByZero;
private final Fraction divideByZero;
public FractionAdapter() {
this(Fraction.ZERO, Fraction.ZERO);
}
public FractionAdapter(Fraction zeroByZero, Fraction divideByZero) {
this.zeroByZero = zeroByZero;
this.divideByZero = divideByZero;
}
/**
* Jackson Deserializer for Fraction
*/
public class FractionDeserializer extends JsonDeserializer<Fraction> {
@Override
public Fraction deserialize(JsonParser parser, DeserializationContext context) throws IOException {
JsonToken token = parser.currentToken();
if (token == JsonToken.VALUE_NULL) {
return null;
}
if (token == JsonToken.VALUE_NUMBER_INT || token == JsonToken.VALUE_NUMBER_FLOAT) {
return Fraction.getFraction(parser.getDoubleValue());
}
if (token == JsonToken.VALUE_STRING) {
String fraction = parser.getText().trim();
if (zeroByZero != null && "0/0".equals(fraction)) {
return zeroByZero;
}
if (divideByZero != null && fraction.endsWith("/0")) {
return divideByZero;
}
return Fraction.getFraction(fraction);
}
throw context.wrongTokenException(parser, Fraction.class, token,
"Expected NUMBER or STRING token for Fraction");
}
}
/**
* Jackson Serializer for Fraction
*/
public static class FractionSerializer extends JsonSerializer<Fraction> {
@Override
public void serialize(Fraction value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
if (value == null) {
gen.writeNull();
} else {
gen.writeString(value.toProperString());
}
}
}
/**
* Convenience method to create and register the adapter with a module
*/
public void registerModule(SimpleModule module) {
module.addDeserializer(Fraction.class, new FractionDeserializer());
module.addSerializer(Fraction.class, new FractionSerializer());
}
/**
* Static utility method to create a pre-configured module
*/
public static SimpleModule createModule() {
return createModule(Fraction.ZERO, Fraction.ZERO);
}
/**
* Static utility method to create a pre-configured module with custom values
*/
public static SimpleModule createModule(Fraction zeroByZero, Fraction divideByZero) {
SimpleModule module = new SimpleModule();
FractionAdapter adapter = new FractionAdapter(zeroByZero, divideByZero);
adapter.registerModule(module);
return module;
}
}

View File

@ -0,0 +1,201 @@
package com.backend.hls.proxy.util;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import net.bramp.ffmpeg.probe.FFmpegProbeResult;
/**
* Utility class for piping data to FFmpeg/FFprobe
*/
public class PipeUtil {
private static final Logger logger = LoggerFactory.getLogger(PipeUtil.class);
private static final ObjectMapper objectMapper = JsonMapper
.builder()
.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.addModule(FractionAdapter
.createModule())
.build();
private static final int BUFFER_SIZE = 8192;
/**
* Execute FFprobe with pipe input
*/
public static FFmpegProbeResult probeWithPipe(String ffprobe, InputStream inputStream)
throws IOException {
ProcessBuilder pb = new ProcessBuilder(
ffprobe,
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
"pipe:0");
pb.redirectErrorStream(false);
Process process = pb.start();
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<String> errorReader = executor.submit(() -> {
try (BufferedReader stderr = new BufferedReader(
new InputStreamReader(process.getErrorStream()))) {
return stderr.lines().reduce("", (a, b) -> a + "\n" + b);
} catch (IOException e) {
return "";
}
});
try {
// Write input to stdin
Future<?> writer = executor.submit(() -> {
try (OutputStream stdin = process.getOutputStream()) {
inputStream.transferTo(stdin);
stdin.flush();
} catch (IOException e) {
logger.error("FFMpeg warning: {}", e);
}
});
// Read JSON output from stdout
Future<String> reader = executor.submit(() -> {
try (BufferedReader stdout = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
return stdout.lines().reduce("", (a, b) -> a + b);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
writer.get(30, TimeUnit.SECONDS);
boolean finished = process.waitFor(30, TimeUnit.SECONDS);
if (!finished || process.exitValue() != 0) {
String error = errorReader.get(1, TimeUnit.SECONDS);
logger.error("FFProbe error: {}", error);
throw new IOException("FFprobe failed");
}
String json = reader.get(5, TimeUnit.SECONDS);
logger.info("Probed JSON: {}", json);
return objectMapper.readValue(json, FFmpegProbeResult.class);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
process.destroyForcibly();
throw new IOException("Error probing stream", e);
} finally {
executor.shutdownNow();
}
}
/**
* Execute FFmpeg with pipe input and output
*/
public static byte[] executeWithPipe(String ffmpegPath, InputStream inputStream,
String inputFormat, String... ffmpegArgs)
throws IOException {
ProcessBuilder pb = new ProcessBuilder();
pb.command().add(ffmpegPath);
pb.command().add("-f");
pb.command().add(inputFormat);
pb.command().add("-i");
pb.command().add("pipe:0");
for (String arg : ffmpegArgs) {
pb.command().add(arg);
}
pb.command().add("pipe:1");
pb.redirectErrorStream(false);
Process process = pb.start();
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<String> errorReader = executor.submit(() -> {
try (BufferedReader stderr = new BufferedReader(
new InputStreamReader(process.getErrorStream()))) {
return stderr.lines().reduce("", (a, b) -> a + "\n" + b);
} catch (IOException e) {
return "";
}
});
try {
Future<?> writer = executor.submit(() -> {
try (OutputStream stdin = process.getOutputStream()) {
byte[] buffer = new byte[BUFFER_SIZE];
int read;
while ((read = inputStream.read(buffer)) != -1) {
stdin.write(buffer, 0, read);
}
stdin.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Future<byte[]> reader = executor.submit(() -> {
try (InputStream stdout = process.getInputStream()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[BUFFER_SIZE];
int read;
while ((read = stdout.read(buffer)) != -1) {
baos.write(buffer, 0, read);
}
return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
writer.get(60, TimeUnit.SECONDS);
boolean finished = process.waitFor(120, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new IOException("FFmpeg timeout");
}
if (process.exitValue() != 0) {
String error = errorReader.get(1, TimeUnit.SECONDS);
logger.error("FFMpeg error: {}", error);
throw new IOException("FFmpeg failed: " + error);
}
return reader.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
try {
String error = errorReader.get(1, TimeUnit.SECONDS);
logger.error("FFMpeg error: {}", error);
} catch (InterruptedException | ExecutionException | TimeoutException e1) {
e1.printStackTrace();
}
process.destroyForcibly();
throw new IOException("Error executing FFmpeg", e);
} finally {
executor.shutdownNow();
}
}
}