实时点播信息缓存到 redis

This commit is contained in:
shikong 2023-12-15 09:42:20 +08:00
parent 7528fdbb4b
commit 92dee06429
2 changed files with 29 additions and 19 deletions

View File

@ -8,6 +8,7 @@ import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.IdUtil;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
@ -41,7 +42,6 @@ import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PreDestroy;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@ -82,8 +82,6 @@ public class Gb28181DownloadService {
private final WvpProxyConfig wvpProxyConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, DeferredResult<JsonResponse<String>>> requestMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> realtimeVideo = new ConcurrentHashMap<>();
private final ConcurrentMap<String, VideoInfo> realtimeVideoInfoMap = new ConcurrentHashMap<>();
@Getter
private final RealTime realtime = new RealTime();
@ -241,7 +239,8 @@ public class Gb28181DownloadService {
result.setResult(JsonResponse.error("设备(通道)不存在"));
return result;
}
String existUrl = realtimeVideo.get(deviceCode);
String existUrl = RedisUtil.StringOps.get(CacheUtil.getKey(deviceCode, MediaSdpHelper.Action.PLAY.getAction()));
if(Optional.ofNullable(existUrl).isPresent()){
result.setResult(JsonResponse.success(existUrl));
return result;
@ -249,14 +248,18 @@ public class Gb28181DownloadService {
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求
scheduledExecutorService.schedule(()->{
realtime.realtime(deviceCode).whenComplete((videoInfo, e)->{
if(videoInfo == null){
result.setResult(JsonResponse.error("媒体信息获取失败"));
return;
}
log.info("获取媒体信息 {}", videoInfo);
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
realtimeVideoInfoMap.put(cacheKey, videoInfo);
RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(),videoInfo.getCallId()), JsonUtils.toJson(videoInfo));
String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ?
StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()):
videoInfo.getUrl();
realtimeVideo.put(deviceCode, url);
RedisUtil.StringOps.set(CacheUtil.getKey(deviceCode, MediaSdpHelper.Action.PLAY.getAction()), url);
result.setResult(JsonResponse.success(url));
});
}, 200, TimeUnit.MILLISECONDS);
@ -294,15 +297,17 @@ public class Gb28181DownloadService {
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
realtimeVideoInfoMap.computeIfPresent(cacheKey, (key, videoInfo) -> {
String infoKey = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), existCallId);
VideoInfo videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(infoKey), VideoInfo.class);
if(videoInfo != null){
RedisUtil.KeyOps.delete(infoKey);
closeRealtimeVideoNow(docking,device,videoInfo,cacheKey,existCallId);
return null;
});
}
}
public void closeRealtimeVideoNow(WvpProxyDocking docking, WvpProxyDevice device, Gb28181DownloadService.VideoInfo videoInfo, String cacheKey, String existCallId){
log.info("结束实时视频 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId());
realtimeVideo.remove(device.getDeviceCode());
RedisUtil.KeyOps.delete(CacheUtil.getKey(device.getDeviceCode(), MediaSdpHelper.Action.PLAY.getAction()));
String deviceIp = docking.getIp();
int devicePort = Integer.parseInt(docking.getPort());
if (StringUtils.isNotBlank(existCallId)) {
@ -606,6 +611,12 @@ public class Gb28181DownloadService {
RedisUtil.KeyOps.delete(cacheKey);
String transport = request.getTopmostViaHeader().getTransport();
String ip = request.getLocalAddress().getHostAddress();
if(time <= 0) {
String callId = request.getCallId().getCallId();
String infoKey = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), callId);
RedisUtil.KeyOps.delete(infoKey);
}
sender.getProvider(transport,ip)
.sendResponse(SipResponseBuilder.response(request, Response.OK, "OK"));
onComplete();
@ -632,11 +643,4 @@ public class Gb28181DownloadService {
}
return subscriber;
}
@PreDestroy
private void destroy(){
realtimeVideo.forEach((key,item)->{
closeRealtimeVideoNow(key);
});
}
}

View File

@ -1,5 +1,8 @@
package cn.skcks.docking.gb28181.wvp.sip.message.bye.request.request;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
@ -46,7 +49,10 @@ public class ByeRequestProcessor implements MessageProcessor {
Optional.ofNullable(subscribe.getByeSubscribe().getPublisher(key))
.ifPresentOrElse(
publisher -> publisher.submit(request),
() -> sender.sendResponse(ip, transport, ((provider, ip1, port) ->
SipResponseBuilder.response(request, Response.OK, "OK"))));
() -> {
RedisUtil.KeyOps.delete(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), callId));
sender.sendResponse(ip, transport, ((provider, ip1, port) ->
SipResponseBuilder.response(request, Response.OK, "OK")));
});
}
}