diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 9245074..5520e30 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -83,6 +83,8 @@ public class Gb28181DownloadService { private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap>> requestMap = new ConcurrentHashMap<>(); + private final RealtimeManager realtimeManager; + @Getter private final RealTime realtime = new RealTime(); @@ -191,7 +193,8 @@ public class Gb28181DownloadService { result.setResult(JsonResponse.error("设备(通道)不存在")); return result; } - closeExistRequest(deviceCode, device, docking); + + closeExistRequest(deviceCode); requestMap.put(deviceCode, result); // 间隔一定时间(200ms) 给设备足够的时间结束前次请求 @@ -254,73 +257,36 @@ public class Gb28181DownloadService { return; } log.info("获取媒体信息 {}", videoInfo); - String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + + // 原始链接转换为前端可用的链接 RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), videoInfo.getCallId()), JsonUtils.toJson(videoInfo)); String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ? StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()): videoInfo.getUrl(); + videoInfo.setUrl(url); - RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode), url); + realtimeManager.addPlaying(deviceCode, videoInfo); result.setResult(JsonResponse.success(url)); }); }, 200, TimeUnit.MILLISECONDS); return result; } - private void closeExistRequest(String deviceCode, WvpProxyDevice device, WvpProxyDocking docking) { + private void closeExistRequest(String deviceCode) { requestMap.computeIfPresent(deviceCode,(key,requestResult)->{ log.info("关闭已存在的视频请求 {}", deviceCode); if(!requestResult.hasResult()){ requestResult.setResult(JsonResponse.error("同一设备重复请求, 本次请求结束")); - String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); - String existCallId = RedisUtil.StringOps.get(cacheKey); - if(StringUtils.isNotBlank(existCallId)){ - String deviceIp = docking.getIp(); - int devicePort = Integer.parseInt(docking.getPort()); - sender.sendRequest((provider,localIp,localPort)-> - SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); - RedisUtil.KeyOps.delete(cacheKey); - } + realtimeManager.removePlaying(deviceCode); } return null; }); } public void closeRealtimeVideoNow(String deviceCode){ - WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null); - if(device == null){ - return; - } - WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null); - if(docking == null){ - return; - } - - String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); - String existCallId = RedisUtil.StringOps.get(cacheKey); - RedisUtil.KeyOps.delete(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode)); - String infoKey = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), existCallId); - VideoInfo videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(infoKey), VideoInfo.class); - if(videoInfo != null){ - RedisUtil.KeyOps.delete(infoKey); - closeRealtimeVideoNow(docking,device,videoInfo,cacheKey,existCallId); - } + realtimeManager.removePlaying(deviceCode); } - public void closeRealtimeVideoNow(WvpProxyDocking docking, WvpProxyDevice device, Gb28181DownloadService.VideoInfo videoInfo, String cacheKey, String existCallId){ - log.info("结束实时视频 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId()); - RedisUtil.KeyOps.delete(CacheUtil.getKey(device.getDeviceCode(), MediaSdpHelper.Action.PLAY.getAction())); - String deviceIp = docking.getIp(); - int devicePort = Integer.parseInt(docking.getPort()); - if (StringUtils.isNotBlank(existCallId)) { - sender.sendRequest((provider, localIp, localPort) -> - SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); - } - RedisUtil.KeyOps.delete(cacheKey); - zlmMediaService.closeRtpServer(CloseRtpServer.builder() - .streamId(videoInfo.streamId) - .build()); - } @SneakyThrows public void videoStream(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) { diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/RealtimeManager.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/RealtimeManager.java new file mode 100644 index 0000000..dbc4dfc --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/RealtimeManager.java @@ -0,0 +1,109 @@ +package cn.skcks.docking.gb28181.wvp.service.gb28181; + +import cn.skcks.docking.gb28181.common.json.JsonUtils; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.media.dto.rtp.CloseRtpServer; +import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.concurrent.*; + +@Slf4j +@Service +@RequiredArgsConstructor +public class RealtimeManager { + public final static String REALTIME_KEY = "realtime_"; + private final ZlmMediaService zlmMediaService; + private final DeviceService deviceService; + private final DockingService dockingService; + private final SipSender sender; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentMap> playing = new ConcurrentHashMap<>(); + + private String getRealtimeKey(String deviceCode){ + return REALTIME_KEY + deviceCode; + } + + public void addPlaying(String deviceCode, Gb28181DownloadService.VideoInfo videoInfo) { + RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode), videoInfo.getUrl()); + + ScheduledFuture schedule = playing.get(deviceCode); + if(schedule != null){ + schedule.cancel(true); + } + + // 定时任务 + schedule = scheduledExecutorService.schedule(()->{ + playing.remove(deviceCode); + Gb28181DownloadService.VideoInfo _videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(getRealtimeKey(deviceCode)), Gb28181DownloadService.VideoInfo.class); + RedisUtil.KeyOps.delete(getRealtimeKey(deviceCode)); + close(deviceCode, _videoInfo); + }, 3, TimeUnit.MINUTES); + + // 缓存 + RedisUtil.StringOps.set(getRealtimeKey(deviceCode), JsonUtils.toJson(videoInfo)); + playing.put(deviceCode, schedule); + } + + public void removePlaying(String deviceCode) { + ScheduledFuture schedule = playing.get(deviceCode); + if(schedule != null){ + schedule.cancel(true); + playing.remove(deviceCode); + } + + Gb28181DownloadService.VideoInfo videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(getRealtimeKey(deviceCode)), Gb28181DownloadService.VideoInfo.class); + if(videoInfo == null) { + log.warn("未找到 设备(deviceCode => {}) 视频信息", deviceCode); + return; + } + + close(deviceCode, videoInfo); + } + + /** + * 关闭视频连接 + * @param deviceCode 设备编码 + * @param videoInfo 视频信息 + */ + private void close(String deviceCode, Gb28181DownloadService.VideoInfo videoInfo){ + RedisUtil.KeyOps.delete(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode)); + + WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null); + if(device == null){ + return; + } + WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null); + if(docking == null){ + return; + } + + String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + String existCallId = RedisUtil.StringOps.get(cacheKey); + + String deviceIp = docking.getIp(); + int devicePort = Integer.parseInt(docking.getPort()); + + // 判断缓存的 视频信息 与 缓存的 callId 是否相同 避免 下级点播出现信息不一致 + if(videoInfo.getCallId().equalsIgnoreCase(existCallId)){ + sender.sendRequest((provider, localIp, localPort) -> + SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); + } + + // 无论是否存在都调用一次 zlm 关闭流, 避免异常情况 + zlmMediaService.closeRtpServer(CloseRtpServer.builder() + .streamId(videoInfo.getStreamId()) + .build()); + } +}