Standartize connectionKey, use long for playlistId
This commit is contained in:
@ -6,6 +6,7 @@ import org.springframework.security.core.annotation.AuthenticationPrincipal;
|
|||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
import com.bivashy.backend.composer.auth.CustomUserDetails;
|
import com.bivashy.backend.composer.auth.CustomUserDetails;
|
||||||
|
import com.bivashy.backend.composer.dto.importing.ImportTrackKey;
|
||||||
import com.bivashy.backend.composer.dto.importing.TrackProgressDTO;
|
import com.bivashy.backend.composer.dto.importing.TrackProgressDTO;
|
||||||
import com.bivashy.backend.composer.service.importing.RedisMessageSubscriber;
|
import com.bivashy.backend.composer.service.importing.RedisMessageSubscriber;
|
||||||
import com.bivashy.backend.composer.service.importing.RedisProgressService;
|
import com.bivashy.backend.composer.service.importing.RedisProgressService;
|
||||||
@ -33,7 +34,7 @@ public class ProgressSSEController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/importing/test/{playlistId}")
|
@GetMapping("/importing/test/{playlistId}")
|
||||||
public void test(@PathVariable String playlistId, @AuthenticationPrincipal CustomUserDetails user) {
|
public void test(@PathVariable long playlistId, @AuthenticationPrincipal CustomUserDetails user) {
|
||||||
var userId = user.getId();
|
var userId = user.getId();
|
||||||
redisProgressService.saveProgress(new TrackProgressDTO(
|
redisProgressService.saveProgress(new TrackProgressDTO(
|
||||||
playlistId,
|
playlistId,
|
||||||
@ -43,7 +44,7 @@ public class ProgressSSEController {
|
|||||||
|
|
||||||
@GetMapping(value = "/importing/stream/{playlistId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
@GetMapping(value = "/importing/stream/{playlistId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||||
public Flux<ServerSentEvent<String>> streamProgress(
|
public Flux<ServerSentEvent<String>> streamProgress(
|
||||||
@PathVariable String playlistId,
|
@PathVariable long playlistId,
|
||||||
@AuthenticationPrincipal CustomUserDetails user,
|
@AuthenticationPrincipal CustomUserDetails user,
|
||||||
HttpServletResponse response) {
|
HttpServletResponse response) {
|
||||||
var userId = user.getId();
|
var userId = user.getId();
|
||||||
@ -52,7 +53,7 @@ public class ProgressSSEController {
|
|||||||
response.setHeader("Connection", "keep-alive");
|
response.setHeader("Connection", "keep-alive");
|
||||||
response.setCharacterEncoding("UTF-8");
|
response.setCharacterEncoding("UTF-8");
|
||||||
|
|
||||||
String connectionKey = getConnectionKey(playlistId, userId);
|
String connectionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
|
||||||
|
|
||||||
Sinks.Many<String> sink = sinks.computeIfAbsent(connectionKey, k -> {
|
Sinks.Many<String> sink = sinks.computeIfAbsent(connectionKey, k -> {
|
||||||
Sinks.Many<String> newSink = Sinks.many().replay().latest();
|
Sinks.Many<String> newSink = Sinks.many().replay().latest();
|
||||||
@ -98,7 +99,7 @@ public class ProgressSSEController {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanupConnection(String playlistId, long userId,
|
private void cleanupConnection(Long playlistId, long userId,
|
||||||
Sinks.Many<String> sink, String connectionKey) {
|
Sinks.Many<String> sink, String connectionKey) {
|
||||||
try {
|
try {
|
||||||
redisProgressService.removeActiveConnection(playlistId, userId);
|
redisProgressService.removeActiveConnection(playlistId, userId);
|
||||||
@ -109,8 +110,4 @@ public class ProgressSSEController {
|
|||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getConnectionKey(String playlistId, long userId) {
|
|
||||||
return String.format("%s:%s", Long.toString(userId), playlistId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,11 +1,19 @@
|
|||||||
package com.bivashy.backend.composer.dto.importing;
|
package com.bivashy.backend.composer.dto.importing;
|
||||||
|
|
||||||
public class ImportTrackKey {
|
public class ImportTrackKey {
|
||||||
public static String progressKey(String playlistId, long userId) {
|
public static String progressKey(long playlistId, long userId) {
|
||||||
return String.format("progress:%s:%s", Long.toString(userId), playlistId);
|
return String.format("progress:%d:%d", userId, playlistId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String trackKey(String playlistId, String trackId, long userId) {
|
public static String trackKey(long playlistId, String trackId, long userId) {
|
||||||
return String.format("track:%s:%s:%s", Long.toString(userId), playlistId, trackId);
|
return String.format("track:%d:%d:%s", userId, playlistId, trackId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String redisChannelKey(long playlistId, long userId) {
|
||||||
|
return String.format("progress_updates:%d:%d", userId, playlistId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String subscriptionKey(long playlistId, long userId) {
|
||||||
|
return String.format("%d:%d", playlistId, userId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
package com.bivashy.backend.composer.dto.importing;
|
package com.bivashy.backend.composer.dto.importing;
|
||||||
|
|
||||||
public class TrackProgressDTO {
|
public class TrackProgressDTO {
|
||||||
private String playlistId;
|
private long playlistId;
|
||||||
private String trackId;
|
private String trackId;
|
||||||
private String trackTitle;
|
private String trackTitle;
|
||||||
private String format;
|
private String format;
|
||||||
@ -14,14 +14,14 @@ public class TrackProgressDTO {
|
|||||||
public TrackProgressDTO() {
|
public TrackProgressDTO() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TrackProgressDTO(String playlistId, String trackId, long userId) {
|
public TrackProgressDTO(long playlistId, String trackId, long userId) {
|
||||||
this.playlistId = playlistId;
|
this.playlistId = playlistId;
|
||||||
this.trackId = trackId;
|
this.trackId = trackId;
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
this.timestamp = System.currentTimeMillis();
|
this.timestamp = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TrackProgressDTO(String playlistId,
|
public TrackProgressDTO(long playlistId,
|
||||||
String trackId,
|
String trackId,
|
||||||
String trackTitle,
|
String trackTitle,
|
||||||
String format,
|
String format,
|
||||||
@ -41,11 +41,11 @@ public class TrackProgressDTO {
|
|||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPlaylistId() {
|
public long getPlaylistId() {
|
||||||
return playlistId;
|
return playlistId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPlaylistId(String playlistId) {
|
public void setPlaylistId(long playlistId) {
|
||||||
this.playlistId = playlistId;
|
this.playlistId = playlistId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -6,6 +6,8 @@ import org.springframework.data.redis.listener.ChannelTopic;
|
|||||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import com.bivashy.backend.composer.dto.importing.ImportTrackKey;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
@ -20,9 +22,9 @@ public class RedisMessageSubscriber {
|
|||||||
this.container = container;
|
this.container = container;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void subscribeToPlaylist(String playlistId, long userId, Consumer<String> messageHandler) {
|
public void subscribeToPlaylist(long playlistId, long userId, Consumer<String> messageHandler) {
|
||||||
String channel = String.format("progress_updates:%s:%s", userId, playlistId);
|
String channel = ImportTrackKey.redisChannelKey(playlistId, userId);
|
||||||
String subscriptionKey = getSubscriptionKey(playlistId, userId);
|
String subscriptionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
|
||||||
|
|
||||||
if (!subscriptions.containsKey(subscriptionKey)) {
|
if (!subscriptions.containsKey(subscriptionKey)) {
|
||||||
container.addMessageListener(new MessageListener() {
|
container.addMessageListener(new MessageListener() {
|
||||||
@ -39,12 +41,9 @@ public class RedisMessageSubscriber {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unsubscribeFromPlaylist(String playlistId, long userId) {
|
public void unsubscribeFromPlaylist(long playlistId, long userId) {
|
||||||
String subscriptionKey = getSubscriptionKey(playlistId, userId);
|
String subscriptionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
|
||||||
subscriptions.remove(subscriptionKey);
|
subscriptions.remove(subscriptionKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getSubscriptionKey(String playlistId, long userId) {
|
|
||||||
return String.format("%s:%s", Long.toString(userId), playlistId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -44,7 +44,7 @@ public class RedisProgressService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<TrackProgressDTO> getPlaylistProgress(String playlistId, long userId) {
|
public List<TrackProgressDTO> getPlaylistProgress(long playlistId, long userId) {
|
||||||
try {
|
try {
|
||||||
String key = ImportTrackKey.progressKey(playlistId, userId);
|
String key = ImportTrackKey.progressKey(playlistId, userId);
|
||||||
Map<Object, Object> progressMap = redisTemplate.opsForHash().entries(key);
|
Map<Object, Object> progressMap = redisTemplate.opsForHash().entries(key);
|
||||||
@ -65,7 +65,7 @@ public class RedisProgressService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public TrackProgressDTO getTrackProgress(String playlistId, String trackId, long userId) {
|
public TrackProgressDTO getTrackProgress(long playlistId, String trackId, long userId) {
|
||||||
try {
|
try {
|
||||||
String key = ImportTrackKey.trackKey(playlistId, trackId, userId);
|
String key = ImportTrackKey.trackKey(playlistId, trackId, userId);
|
||||||
String progressJson = redisTemplate.opsForValue().get(key);
|
String progressJson = redisTemplate.opsForValue().get(key);
|
||||||
@ -81,10 +81,7 @@ public class RedisProgressService {
|
|||||||
|
|
||||||
private void publishProgressUpdate(TrackProgressDTO progress) {
|
private void publishProgressUpdate(TrackProgressDTO progress) {
|
||||||
try {
|
try {
|
||||||
String channel = String.format("progress_updates:%s:%s",
|
String channel = ImportTrackKey.redisChannelKey(progress.getPlaylistId(), progress.getUserId());
|
||||||
progress.getUserId(),
|
|
||||||
progress.getPlaylistId());
|
|
||||||
|
|
||||||
String message = objectMapper.writeValueAsString(progress);
|
String message = objectMapper.writeValueAsString(progress);
|
||||||
redisTemplate.convertAndSend(channel, message);
|
redisTemplate.convertAndSend(channel, message);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -92,18 +89,18 @@ public class RedisProgressService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addActiveConnection(String playlistId, long userId) {
|
public void addActiveConnection(long playlistId, long userId) {
|
||||||
String connectionKey = String.format("%s:%s", Long.toString(userId), playlistId);
|
String connectionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
|
||||||
activeConnections.computeIfAbsent(connectionKey, k -> ConcurrentHashMap.newKeySet()).add(connectionKey);
|
activeConnections.computeIfAbsent(connectionKey, k -> ConcurrentHashMap.newKeySet()).add(connectionKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeActiveConnection(String playlistId, long userId) {
|
public void removeActiveConnection(long playlistId, long userId) {
|
||||||
String connectionKey = String.format("%s:%s", Long.toString(userId), playlistId);
|
String connectionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
|
||||||
activeConnections.remove(connectionKey);
|
activeConnections.remove(connectionKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasActiveConnections(String playlistId, long userId) {
|
public boolean hasActiveConnections(long playlistId, long userId) {
|
||||||
String connectionKey = String.format("%s:%s", Long.toString(userId), playlistId);
|
String connectionKey = ImportTrackKey.subscriptionKey(playlistId, userId);
|
||||||
return activeConnections.containsKey(connectionKey);
|
return activeConnections.containsKey(connectionKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user