diff --git a/pom.xml b/pom.xml
index ae9b992..7e96b67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
3.2.3
2.8.5
2024.08.29
- 2.0.6
+ 2.0.7
51
@@ -121,6 +121,11 @@
springdoc-openapi-starter-webmvc-ui
${springdoc-openapi.version}
+
+ org.springdoc
+ springdoc-openapi-starter-webflux-ui
+ ${springdoc-openapi.version}
+
com.github.kokorin.jaffree
jaffree
diff --git a/src/main/java/com/bivashy/backend/composer/controller/importing/ProgressSSEController.java b/src/main/java/com/bivashy/backend/composer/controller/importing/ProgressSSEController.java
index dfdec00..b6d4b92 100644
--- a/src/main/java/com/bivashy/backend/composer/controller/importing/ProgressSSEController.java
+++ b/src/main/java/com/bivashy/backend/composer/controller/importing/ProgressSSEController.java
@@ -4,8 +4,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
-import org.springframework.http.codec.ServerSentEvent;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -16,7 +17,6 @@ import com.bivashy.backend.composer.dto.importing.BaseTrackProgress;
import com.bivashy.backend.composer.dto.importing.ImportTrackKey;
import com.bivashy.backend.composer.service.importing.RedisMessageSubscriber;
import com.bivashy.backend.composer.service.importing.RedisProgressService;
-import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.servlet.http.HttpServletResponse;
import reactor.core.publisher.Flux;
@@ -24,10 +24,11 @@ import reactor.core.publisher.Sinks;
@RestController
public class ProgressSSEController {
+ private static final Logger logger = LoggerFactory.getLogger(ProgressSSEController.class);
private final RedisProgressService redisProgressService;
private final RedisMessageSubscriber redisSubscriber;
- private final Map> sinks = new ConcurrentHashMap<>();
+ private final Map> sinks = new ConcurrentHashMap<>();
public ProgressSSEController(RedisProgressService redisProgressService,
RedisMessageSubscriber redisSubscriber) {
@@ -36,7 +37,7 @@ public class ProgressSSEController {
}
@GetMapping(value = "/importing/stream/{playlistId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public Flux> streamProgress(
+ public Flux streamProgress(
@PathVariable long playlistId,
@AuthenticationPrincipal CustomUserDetails user,
HttpServletResponse response) {
@@ -48,8 +49,8 @@ public class ProgressSSEController {
String connectionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
- Sinks.Many sink = sinks.computeIfAbsent(connectionKey, k -> {
- Sinks.Many newSink = Sinks.many().replay().latest();
+ Sinks.Many sink = sinks.computeIfAbsent(connectionKey, k -> {
+ Sinks.Many newSink = Sinks.many().replay().latest();
redisSubscriber.subscribeToPlaylist(playlistId, userId, message -> {
newSink.tryEmitNext(message);
@@ -61,19 +62,14 @@ public class ProgressSSEController {
redisProgressService.addActiveConnection(playlistId, userId);
return sink.asFlux()
- .map(data -> ServerSentEvent.builder()
- .data(data)
- .event("progress-update")
- .build())
.doFirst(() -> {
try {
List existingProgresses = redisProgressService.getPlaylistProgress(
playlistId,
userId);
- ObjectMapper mapper = new ObjectMapper();
for (BaseTrackProgress progress : existingProgresses) {
- sink.tryEmitNext(mapper.writeValueAsString(progress));
+ sink.tryEmitNext(progress);
}
} catch (Exception e) {
e.printStackTrace();
@@ -92,7 +88,7 @@ public class ProgressSSEController {
}
private void cleanupConnection(Long playlistId, long userId,
- Sinks.Many sink, String connectionKey) {
+ Sinks.Many sink, String connectionKey) {
try {
redisProgressService.removeActiveConnection(playlistId, userId);
redisSubscriber.unsubscribeFromPlaylist(playlistId, userId);
diff --git a/src/main/java/com/bivashy/backend/composer/dto/importing/BaseTrackProgress.java b/src/main/java/com/bivashy/backend/composer/dto/importing/BaseTrackProgress.java
index da33855..e33b59e 100644
--- a/src/main/java/com/bivashy/backend/composer/dto/importing/BaseTrackProgress.java
+++ b/src/main/java/com/bivashy/backend/composer/dto/importing/BaseTrackProgress.java
@@ -10,15 +10,15 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
})
public abstract class BaseTrackProgress {
protected long playlistId;
- protected long trackId;
+ protected long trackSourceId;
protected long userId;
protected long timestamp;
private String type;
- public BaseTrackProgress(long playlistId, long trackId, long userId) {
+ public BaseTrackProgress(long playlistId, long trackSourceId, long userId) {
this.playlistId = playlistId;
- this.trackId = trackId;
+ this.trackSourceId = trackSourceId;
this.userId = userId;
this.timestamp = System.currentTimeMillis();
}
@@ -39,8 +39,8 @@ public abstract class BaseTrackProgress {
return playlistId;
}
- public long getTrackId() {
- return trackId;
+ public long getTrackSourceId() {
+ return trackSourceId;
}
protected void setType(ProgressEntryType type) {
diff --git a/src/main/java/com/bivashy/backend/composer/dto/importing/ImportTrackKey.java b/src/main/java/com/bivashy/backend/composer/dto/importing/ImportTrackKey.java
index 6e4e4f2..6ac8ac8 100644
--- a/src/main/java/com/bivashy/backend/composer/dto/importing/ImportTrackKey.java
+++ b/src/main/java/com/bivashy/backend/composer/dto/importing/ImportTrackKey.java
@@ -5,8 +5,8 @@ public class ImportTrackKey {
return String.format("progress:%d:%d", userId, playlistId);
}
- public static String trackKey(long playlistId, long trackId, long userId) {
- return String.format("track:%d:%d:%d", userId, playlistId, trackId);
+ public static String trackKey(long playlistId, long trackSourceId, long userId) {
+ return String.format("track:%d:%d:%d", userId, playlistId, trackSourceId);
}
public static String redisChannelKey(long playlistId, long userId) {
diff --git a/src/main/java/com/bivashy/backend/composer/dto/importing/PlaylistProgress.java b/src/main/java/com/bivashy/backend/composer/dto/importing/PlaylistProgress.java
index 73b09a7..7d90293 100644
--- a/src/main/java/com/bivashy/backend/composer/dto/importing/PlaylistProgress.java
+++ b/src/main/java/com/bivashy/backend/composer/dto/importing/PlaylistProgress.java
@@ -3,12 +3,18 @@ package com.bivashy.backend.composer.dto.importing;
public class PlaylistProgress extends BaseTrackProgress {
private String ytdlnStdout;
private int overallProgress;
- private String status;
+ private int trackCount;
+ private ProgressStatus status;
- public PlaylistProgress(long playlistId, long trackId, long userId) {
- super(playlistId, trackId, userId);
+ PlaylistProgress() {
+ super(0, 0, 0);
+ }
+
+ public PlaylistProgress(long playlistId, long trackSourceId, long userId, int trackCount) {
+ super(playlistId, trackSourceId, userId);
this.setType(ProgressEntryType.PLAYLIST);
- this.status = "LOADING";
+ this.status = ProgressStatus.LOADING;
+ this.trackCount = trackCount;
}
public String getYtdlnStdout() {
@@ -27,11 +33,16 @@ public class PlaylistProgress extends BaseTrackProgress {
this.overallProgress = overallProgress;
}
- public String getStatus() {
+ public ProgressStatus getStatus() {
return status;
}
- public void setStatus(String status) {
+ public void setStatus(ProgressStatus status) {
this.status = status;
}
+
+ public int getTrackCount() {
+ return trackCount;
+ }
+
}
diff --git a/src/main/java/com/bivashy/backend/composer/dto/importing/ProgressStatus.java b/src/main/java/com/bivashy/backend/composer/dto/importing/ProgressStatus.java
new file mode 100644
index 0000000..68a79c1
--- /dev/null
+++ b/src/main/java/com/bivashy/backend/composer/dto/importing/ProgressStatus.java
@@ -0,0 +1,5 @@
+package com.bivashy.backend.composer.dto.importing;
+
+public enum ProgressStatus {
+ LOADING, FINISHED
+}
diff --git a/src/main/java/com/bivashy/backend/composer/dto/importing/SingleTrackProgress.java b/src/main/java/com/bivashy/backend/composer/dto/importing/SingleTrackProgress.java
index cd2c957..a98f018 100644
--- a/src/main/java/com/bivashy/backend/composer/dto/importing/SingleTrackProgress.java
+++ b/src/main/java/com/bivashy/backend/composer/dto/importing/SingleTrackProgress.java
@@ -4,8 +4,12 @@ public class SingleTrackProgress extends BaseTrackProgress {
private String title;
private String format;
- public SingleTrackProgress(long playlistId, long trackId, long userId, String title, String format) {
- super(playlistId, trackId, userId);
+ SingleTrackProgress() {
+ super(0, 0, 0);
+ }
+
+ public SingleTrackProgress(long playlistId, long trackSourceId, long userId, String title, String format) {
+ super(playlistId, trackSourceId, userId);
this.setType(ProgressEntryType.TRACK);
this.title = title;
this.format = format;
diff --git a/src/main/java/com/bivashy/backend/composer/service/TrackService.java b/src/main/java/com/bivashy/backend/composer/service/TrackService.java
index 5aa48f2..3ab2fdd 100644
--- a/src/main/java/com/bivashy/backend/composer/service/TrackService.java
+++ b/src/main/java/com/bivashy/backend/composer/service/TrackService.java
@@ -104,7 +104,8 @@ public class TrackService {
if (params.includeProgressHistory()) {
redisProgressService
- .saveProgress(new SingleTrackProgress(playlistId, track.getId(), user.getId(), title, fileFormat));
+ .saveProgress(
+ new SingleTrackProgress(playlistId, trackSource.getId(), user.getId(), title, fileFormat));
}
return new TrackResponse(
@@ -120,7 +121,7 @@ public class TrackService {
@Transactional
public List refreshYoutubePlaylist(CustomUserDetails user, long playlistId, long sourceId)
throws ImportTrackException {
- return youtubeTrackService.refreshYoutubePlaylist(playlistId, sourceId);
+ return youtubeTrackService.refreshYoutubePlaylist(user, playlistId, sourceId);
}
@Transactional
@@ -163,7 +164,8 @@ public class TrackService {
TrackSource trackSource = trackSourceService.createYoutubeTrackSource(SourceType.PLAYLIST,
request.youtubeUrl());
- return youtubeTrackService.refreshYoutubePlaylist(playlistId, trackSource, videoInfos, request.youtubeUrl());
+ return youtubeTrackService.refreshYoutubePlaylist(user.getId(), playlistId, trackSource, videoInfos,
+ request.youtubeUrl());
}
public List getPlaylistTracks(CustomUserDetails user, Long playlistId) {
diff --git a/src/main/java/com/bivashy/backend/composer/service/YoutubeTrackService.java b/src/main/java/com/bivashy/backend/composer/service/YoutubeTrackService.java
index ded3db1..a6ae2e4 100644
--- a/src/main/java/com/bivashy/backend/composer/service/YoutubeTrackService.java
+++ b/src/main/java/com/bivashy/backend/composer/service/YoutubeTrackService.java
@@ -15,6 +15,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
+import com.bivashy.backend.composer.auth.CustomUserDetails;
+import com.bivashy.backend.composer.dto.importing.PlaylistProgress;
+import com.bivashy.backend.composer.dto.importing.ProgressStatus;
import com.bivashy.backend.composer.dto.track.TrackResponse;
import com.bivashy.backend.composer.dto.track.service.AddLocalTrackParams;
import com.bivashy.backend.composer.dto.track.service.AddLocalTrackParamsBuilder;
@@ -25,6 +28,7 @@ import com.bivashy.backend.composer.model.TrackSource;
import com.bivashy.backend.composer.model.TrackSourceMetadata;
import com.bivashy.backend.composer.repository.TrackRepository;
import com.bivashy.backend.composer.service.MetadataParseService.Metadata;
+import com.bivashy.backend.composer.service.importing.RedisProgressService;
import com.bivashy.backend.composer.util.SimpleBlob.PathBlob;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jfposton.ytdlp.YtDlp;
@@ -46,16 +50,19 @@ public class YoutubeTrackService {
private final TrackMetadataService trackMetadataService;
private final TrackPlaylistService trackPlaylistService;
private final TrackSourceService trackSourceService;
+ private final RedisProgressService redisProgressService;
public YoutubeTrackService(AudioS3StorageService s3StorageService, MetadataParseService metadataParseService,
TrackRepository trackRepository, TrackMetadataService trackMetadataService,
- TrackPlaylistService trackPlaylistService, TrackSourceService trackSourceService) {
+ TrackPlaylistService trackPlaylistService, TrackSourceService trackSourceService,
+ RedisProgressService redisProgressService) {
this.s3StorageService = s3StorageService;
this.metadataParseService = metadataParseService;
this.trackRepository = trackRepository;
this.trackMetadataService = trackMetadataService;
this.trackPlaylistService = trackPlaylistService;
this.trackSourceService = trackSourceService;
+ this.redisProgressService = redisProgressService;
}
public AddLocalTrackParams downloadYoutubeTrack(Path temporaryFolder, VideoInfo videoInfo, String youtubeUrl)
@@ -85,7 +92,8 @@ public class YoutubeTrackService {
throw new ImportTrackException("cannot download any youtube track");
}
- public List refreshYoutubePlaylist(long playlistId, long sourceId) throws ImportTrackException {
+ public List refreshYoutubePlaylist(CustomUserDetails user, long playlistId, long sourceId)
+ throws ImportTrackException {
Optional trackSourceMetadataOpt = trackSourceService.findWithMetadata(sourceId);
if (trackSourceMetadataOpt.isEmpty())
throw new ImportTrackException("cannot find track source with metadata with id " + sourceId);
@@ -98,10 +106,11 @@ public class YoutubeTrackService {
} catch (YtDlpException e) {
throw new ImportTrackException("cannot `yt-dlp --dump-json` from " + youtubeUrl, e);
}
- return refreshYoutubePlaylist(playlistId, trackSourceMetadata.getSource(), videoInfos, youtubeUrl);
+ return refreshYoutubePlaylist(user.getId(), playlistId, trackSourceMetadata.getSource(), videoInfos,
+ youtubeUrl);
}
- public List refreshYoutubePlaylist(long playlistId, TrackSource trackSource,
+ public List refreshYoutubePlaylist(long userId, long playlistId, TrackSource trackSource,
List videoInfos,
String youtubeUrl) throws ImportTrackException {
List result = new ArrayList<>();
@@ -126,10 +135,29 @@ public class YoutubeTrackService {
ytDlpRequest.setOption("audio-quality", 0);
ytDlpRequest.setOption("audio-format", "best");
ytDlpRequest.setOption("no-overwrites");
- var response = YtDlp.execute(ytDlpRequest);
- logger.info("yt dlp response {}", response);
- // TODO: write to RedisProgressService
+ PlaylistProgress playlistProgress = new PlaylistProgress(playlistId, trackSource.getId(), userId,
+ videoInfos.size());
+ redisProgressService.saveProgress(playlistProgress);
+
+ var response = YtDlp.execute(ytDlpRequest, (downloadProgress, ignored) -> {
+ redisProgressService.updateTrackProgressField(playlistId, trackSource.getId(), userId,
+ progress -> {
+ progress.setOverallProgress((int) downloadProgress);
+ });
+
+ }, stdoutLine -> {
+ redisProgressService.updateTrackProgressField(playlistId, trackSource.getId(), userId,
+ progress -> {
+ progress.setYtdlnStdout(String.join("\n", progress.getYtdlnStdout(), stdoutLine));
+ });
+ }, null);
+ redisProgressService.updateTrackProgressField(playlistId, trackSource.getId(), userId,
+ progress -> {
+ progress.setOverallProgress(100);
+ progress.setStatus(ProgressStatus.FINISHED);
+ });
+ logger.info("yt dlp response {}", response);
try (Stream pathStream = Files.walk(temporaryFolder)) {
List downloadedFiles = Files.walk(temporaryFolder).toList();
diff --git a/src/main/java/com/bivashy/backend/composer/service/importing/RedisMessageSubscriber.java b/src/main/java/com/bivashy/backend/composer/service/importing/RedisMessageSubscriber.java
index bfaeab8..e96d64e 100644
--- a/src/main/java/com/bivashy/backend/composer/service/importing/RedisMessageSubscriber.java
+++ b/src/main/java/com/bivashy/backend/composer/service/importing/RedisMessageSubscriber.java
@@ -1,28 +1,35 @@
package com.bivashy.backend.composer.service.importing;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
+import com.bivashy.backend.composer.dto.importing.BaseTrackProgress;
import com.bivashy.backend.composer.dto.importing.ImportTrackKey;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class RedisMessageSubscriber {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger logger = LoggerFactory.getLogger(Logger.class);
private final RedisMessageListenerContainer container;
- private final Map> subscriptions = new ConcurrentHashMap<>();
+ private final Map> subscriptions = new ConcurrentHashMap<>();
public RedisMessageSubscriber(RedisMessageListenerContainer container) {
this.container = container;
}
- public void subscribeToPlaylist(long playlistId, long userId, Consumer messageHandler) {
+ public void subscribeToPlaylist(long playlistId, long userId, Consumer messageHandler) {
String channel = ImportTrackKey.redisChannelKey(playlistId, userId);
String subscriptionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
@@ -32,7 +39,13 @@ public class RedisMessageSubscriber {
public void onMessage(Message message, byte[] pattern) {
String receivedMessage = new String(message.getBody());
if (subscriptions.containsKey(subscriptionKey)) {
- messageHandler.accept(receivedMessage);
+ try {
+ BaseTrackProgress progress = OBJECT_MAPPER.readValue(receivedMessage,
+ BaseTrackProgress.class);
+ messageHandler.accept(progress);
+ } catch (JsonProcessingException e) {
+ logger.error("cannot deserialize message into BaseTrackProgress.class", e);
+ }
}
}
}, new ChannelTopic(channel));
diff --git a/src/main/java/com/bivashy/backend/composer/service/importing/RedisProgressService.java b/src/main/java/com/bivashy/backend/composer/service/importing/RedisProgressService.java
index 1d9f4ad..8bc9851 100644
--- a/src/main/java/com/bivashy/backend/composer/service/importing/RedisProgressService.java
+++ b/src/main/java/com/bivashy/backend/composer/service/importing/RedisProgressService.java
@@ -1,14 +1,22 @@
package com.bivashy.backend.composer.service.importing;
-import com.bivashy.backend.composer.dto.importing.BaseTrackProgress;
-import com.bivashy.backend.composer.dto.importing.ImportTrackKey;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import com.bivashy.backend.composer.dto.importing.BaseTrackProgress;
+import com.bivashy.backend.composer.dto.importing.ImportTrackKey;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class RedisProgressService {
@@ -27,11 +35,11 @@ public class RedisProgressService {
String key = ImportTrackKey.progressKey(progress.getPlaylistId(), progress.getUserId());
String trackKey = ImportTrackKey.trackKey(
progress.getPlaylistId(),
- progress.getTrackId(),
+ progress.getTrackSourceId(),
progress.getUserId());
String progressJson = objectMapper.writeValueAsString(progress);
- redisTemplate.opsForHash().put(key, Long.toString(progress.getTrackId()), progressJson);
+ redisTemplate.opsForHash().put(key, Long.toString(progress.getTrackSourceId()), progressJson);
redisTemplate.opsForValue().set(trackKey, progressJson);
@@ -44,6 +52,38 @@ public class RedisProgressService {
}
}
+ public void updateTrackProgressField(long playlistId, long trackSourceId, long userId,
+ Consumer updater) {
+ try {
+ String trackKey = ImportTrackKey.trackKey(playlistId, trackSourceId, userId);
+ String hashKey = ImportTrackKey.progressKey(playlistId, userId);
+
+ String existingJson = redisTemplate.opsForValue().get(trackKey);
+ if (existingJson == null) {
+ throw new RuntimeException("Track progress not found");
+ }
+
+ JavaType progressType = objectMapper.getTypeFactory()
+ .constructType(BaseTrackProgress.class);
+
+ T progress = objectMapper.readValue(existingJson, progressType);
+
+ updater.accept(progress);
+
+ String updatedJson = objectMapper.writeValueAsString(progress);
+ redisTemplate.opsForHash().put(hashKey, Long.toString(trackSourceId), updatedJson);
+ redisTemplate.opsForValue().set(trackKey, updatedJson);
+
+ redisTemplate.expire(hashKey, 24, TimeUnit.HOURS);
+ redisTemplate.expire(trackKey, 24, TimeUnit.HOURS);
+
+ publishProgressUpdate(progress);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to update track progress", e);
+ }
+ }
+
public List getPlaylistProgress(long playlistId, long userId) {
try {
String key = ImportTrackKey.progressKey(playlistId, userId);
@@ -65,9 +105,9 @@ public class RedisProgressService {
}
}
- public BaseTrackProgress getTrackProgress(long playlistId, long trackId, long userId) {
+ public BaseTrackProgress getTrackProgress(long playlistId, long trackSourceId, long userId) {
try {
- String key = ImportTrackKey.trackKey(playlistId, trackId, userId);
+ String key = ImportTrackKey.trackKey(playlistId, trackSourceId, userId);
String progressJson = redisTemplate.opsForValue().get(key);
if (progressJson != null) {