调整实时点播实现
This commit is contained in:
parent
00a5ab9dde
commit
091776c30e
@ -83,6 +83,8 @@ public class Gb28181DownloadService {
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ConcurrentMap<String, DeferredResult<JsonResponse<String>>> requestMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final RealtimeManager realtimeManager;
|
||||
|
||||
@Getter
|
||||
private final RealTime realtime = new RealTime();
|
||||
|
||||
@ -191,7 +193,8 @@ public class Gb28181DownloadService {
|
||||
result.setResult(JsonResponse.error("设备(通道)不存在"));
|
||||
return result;
|
||||
}
|
||||
closeExistRequest(deviceCode, device, docking);
|
||||
|
||||
closeExistRequest(deviceCode);
|
||||
requestMap.put(deviceCode, result);
|
||||
|
||||
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求
|
||||
@ -254,73 +257,36 @@ public class Gb28181DownloadService {
|
||||
return;
|
||||
}
|
||||
log.info("获取媒体信息 {}", videoInfo);
|
||||
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
|
||||
|
||||
// 原始链接转换为前端可用的链接
|
||||
RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), videoInfo.getCallId()), JsonUtils.toJson(videoInfo));
|
||||
String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ?
|
||||
StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()):
|
||||
videoInfo.getUrl();
|
||||
videoInfo.setUrl(url);
|
||||
|
||||
RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode), url);
|
||||
realtimeManager.addPlaying(deviceCode, videoInfo);
|
||||
result.setResult(JsonResponse.success(url));
|
||||
});
|
||||
}, 200, TimeUnit.MILLISECONDS);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void closeExistRequest(String deviceCode, WvpProxyDevice device, WvpProxyDocking docking) {
|
||||
private void closeExistRequest(String deviceCode) {
|
||||
requestMap.computeIfPresent(deviceCode,(key,requestResult)->{
|
||||
log.info("关闭已存在的视频请求 {}", deviceCode);
|
||||
if(!requestResult.hasResult()){
|
||||
requestResult.setResult(JsonResponse.error("同一设备重复请求, 本次请求结束"));
|
||||
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
|
||||
String existCallId = RedisUtil.StringOps.get(cacheKey);
|
||||
if(StringUtils.isNotBlank(existCallId)){
|
||||
String deviceIp = docking.getIp();
|
||||
int devicePort = Integer.parseInt(docking.getPort());
|
||||
sender.sendRequest((provider,localIp,localPort)->
|
||||
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
|
||||
RedisUtil.KeyOps.delete(cacheKey);
|
||||
}
|
||||
realtimeManager.removePlaying(deviceCode);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void closeRealtimeVideoNow(String deviceCode){
|
||||
WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null);
|
||||
if(device == null){
|
||||
return;
|
||||
}
|
||||
WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null);
|
||||
if(docking == null){
|
||||
return;
|
||||
}
|
||||
|
||||
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
|
||||
String existCallId = RedisUtil.StringOps.get(cacheKey);
|
||||
RedisUtil.KeyOps.delete(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode));
|
||||
String infoKey = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), existCallId);
|
||||
VideoInfo videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(infoKey), VideoInfo.class);
|
||||
if(videoInfo != null){
|
||||
RedisUtil.KeyOps.delete(infoKey);
|
||||
closeRealtimeVideoNow(docking,device,videoInfo,cacheKey,existCallId);
|
||||
}
|
||||
realtimeManager.removePlaying(deviceCode);
|
||||
}
|
||||
|
||||
public void closeRealtimeVideoNow(WvpProxyDocking docking, WvpProxyDevice device, Gb28181DownloadService.VideoInfo videoInfo, String cacheKey, String existCallId){
|
||||
log.info("结束实时视频 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId());
|
||||
RedisUtil.KeyOps.delete(CacheUtil.getKey(device.getDeviceCode(), MediaSdpHelper.Action.PLAY.getAction()));
|
||||
String deviceIp = docking.getIp();
|
||||
int devicePort = Integer.parseInt(docking.getPort());
|
||||
if (StringUtils.isNotBlank(existCallId)) {
|
||||
sender.sendRequest((provider, localIp, localPort) ->
|
||||
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
|
||||
}
|
||||
RedisUtil.KeyOps.delete(cacheKey);
|
||||
zlmMediaService.closeRtpServer(CloseRtpServer.builder()
|
||||
.streamId(videoInfo.streamId)
|
||||
.build());
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public void videoStream(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) {
|
||||
|
@ -0,0 +1,109 @@
|
||||
package cn.skcks.docking.gb28181.wvp.service.gb28181;
|
||||
|
||||
import cn.skcks.docking.gb28181.common.json.JsonUtils;
|
||||
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
|
||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||
import cn.skcks.docking.gb28181.media.dto.rtp.CloseRtpServer;
|
||||
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
|
||||
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class RealtimeManager {
|
||||
public final static String REALTIME_KEY = "realtime_";
|
||||
private final ZlmMediaService zlmMediaService;
|
||||
private final DeviceService deviceService;
|
||||
private final DockingService dockingService;
|
||||
private final SipSender sender;
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ConcurrentMap<String, ScheduledFuture<?>> playing = new ConcurrentHashMap<>();
|
||||
|
||||
private String getRealtimeKey(String deviceCode){
|
||||
return REALTIME_KEY + deviceCode;
|
||||
}
|
||||
|
||||
public void addPlaying(String deviceCode, Gb28181DownloadService.VideoInfo videoInfo) {
|
||||
RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode), videoInfo.getUrl());
|
||||
|
||||
ScheduledFuture<?> schedule = playing.get(deviceCode);
|
||||
if(schedule != null){
|
||||
schedule.cancel(true);
|
||||
}
|
||||
|
||||
// 定时任务
|
||||
schedule = scheduledExecutorService.schedule(()->{
|
||||
playing.remove(deviceCode);
|
||||
Gb28181DownloadService.VideoInfo _videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(getRealtimeKey(deviceCode)), Gb28181DownloadService.VideoInfo.class);
|
||||
RedisUtil.KeyOps.delete(getRealtimeKey(deviceCode));
|
||||
close(deviceCode, _videoInfo);
|
||||
}, 3, TimeUnit.MINUTES);
|
||||
|
||||
// 缓存
|
||||
RedisUtil.StringOps.set(getRealtimeKey(deviceCode), JsonUtils.toJson(videoInfo));
|
||||
playing.put(deviceCode, schedule);
|
||||
}
|
||||
|
||||
public void removePlaying(String deviceCode) {
|
||||
ScheduledFuture<?> schedule = playing.get(deviceCode);
|
||||
if(schedule != null){
|
||||
schedule.cancel(true);
|
||||
playing.remove(deviceCode);
|
||||
}
|
||||
|
||||
Gb28181DownloadService.VideoInfo videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(getRealtimeKey(deviceCode)), Gb28181DownloadService.VideoInfo.class);
|
||||
if(videoInfo == null) {
|
||||
log.warn("未找到 设备(deviceCode => {}) 视频信息", deviceCode);
|
||||
return;
|
||||
}
|
||||
|
||||
close(deviceCode, videoInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭视频连接
|
||||
* @param deviceCode 设备编码
|
||||
* @param videoInfo 视频信息
|
||||
*/
|
||||
private void close(String deviceCode, Gb28181DownloadService.VideoInfo videoInfo){
|
||||
RedisUtil.KeyOps.delete(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceCode));
|
||||
|
||||
WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null);
|
||||
if(device == null){
|
||||
return;
|
||||
}
|
||||
WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null);
|
||||
if(docking == null){
|
||||
return;
|
||||
}
|
||||
|
||||
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
|
||||
String existCallId = RedisUtil.StringOps.get(cacheKey);
|
||||
|
||||
String deviceIp = docking.getIp();
|
||||
int devicePort = Integer.parseInt(docking.getPort());
|
||||
|
||||
// 判断缓存的 视频信息 与 缓存的 callId 是否相同 避免 下级点播出现信息不一致
|
||||
if(videoInfo.getCallId().equalsIgnoreCase(existCallId)){
|
||||
sender.sendRequest((provider, localIp, localPort) ->
|
||||
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
|
||||
}
|
||||
|
||||
// 无论是否存在都调用一次 zlm 关闭流, 避免异常情况
|
||||
zlmMediaService.closeRtpServer(CloseRtpServer.builder()
|
||||
.streamId(videoInfo.getStreamId())
|
||||
.build());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user