同一设备限制 点播数 重复点播则关闭已有点播

This commit is contained in:
shikong 2023-10-16 17:22:02 +08:00
parent 37cfd012e8
commit 71b75038e2

View File

@ -8,6 +8,8 @@ import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.IdUtil;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
@ -274,18 +276,34 @@ public class Gb28181DownloadService {
return result;
}
@SneakyThrows
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time) {
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
// 限制单个设备/通道 只能 点播 一路
if(StringUtils.isNotBlank(existCallId)){
String deviceIp = docking.getIp();
int devicePort = Integer.parseInt(docking.getPort());
sender.sendRequest((provider,localIp,localPort)->
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
RedisUtil.KeyOps.delete(cacheKey);
log.info("关闭已存在的 点播请求 {} {}", cacheKey,existCallId);
Thread.sleep(200);
}
return (provider, ip, port) -> {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
};
}
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit){
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit){
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@ -313,7 +331,7 @@ public class Gb28181DownloadService {
String toTag = item.getToTag();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key,streamId, time, unit));
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key,cacheKey, streamId, time, unit));
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
}));
result.complete(new VideoInfo(videoUrl(streamId), callId, device));
@ -341,7 +359,7 @@ public class Gb28181DownloadService {
return subscriber;
}
public Flow.Subscriber<SIPRequest> byeSubscriber(String key,String streamId, long time, TimeUnit unit){
public Flow.Subscriber<SIPRequest> byeSubscriber(String key,String cacheKey,String streamId, long time, TimeUnit unit){
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
@Override
@ -353,6 +371,7 @@ public class Gb28181DownloadService {
@SneakyThrows
@Override
public void onNext(SIPRequest request) {
RedisUtil.KeyOps.delete(cacheKey);
String transport = request.getTopmostViaHeader().getTransport();
String ip = request.getLocalAddress().getHostAddress();
sender.getProvider(transport,ip)