diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java index 561c241..9003498 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java @@ -1,8 +1,10 @@ package cn.skcks.docking.gb28181.core.sip.listener; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import gov.nist.javax.sip.message.SIPResponse; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; @@ -21,7 +23,7 @@ import java.util.concurrent.ConcurrentMap; @Component @Slf4j public class SipListenerImpl implements SipListener { - private final SipSubscribe sipSubscribe; + private final SipSubscribe subscribe; private final ConcurrentMap requestProcessor = new ConcurrentHashMap<>(); private final ConcurrentMap responseProcessor = new ConcurrentHashMap<>(); @@ -65,6 +67,11 @@ public class SipListenerImpl implements SipListener { // 增加其它无需回复的响应,如101、180等 } else { log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()); + SIPResponse sipResponse = (SIPResponse) response; + String callId = sipResponse.getCallIdHeader().getCallId(); + Optional.ofNullable(subscribe.getSipResponseSubscribe() + .getPublisher(GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId))) + .ifPresent(publisher->publisher.submit(sipResponse)); if (responseEvent.getDialog() != null) { responseEvent.getDialog().delete(); } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java index 2b77c41..030ee44 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java @@ -53,13 +53,13 @@ public class InviteResponseProcessor implements MessageProcessor { // trying不会回复 if (statusCode == Response.TRYING) { - subscribe.getInviteSubscribe().getPublisher(subscribeKey).submit(response); + subscribe.getSipResponseSubscribe().getPublisher(subscribeKey).submit(response); return; } // 成功响应 // 下发ack if (statusCode == Response.OK) { - Optional.ofNullable(subscribe.getInviteSubscribe().getPublisher(subscribeKey)) + Optional.ofNullable(subscribe.getSipResponseSubscribe().getPublisher(subscribeKey)) .ifPresentOrElse(publisher-> publisher.submit(response), ()-> log.warn("对应订阅 {} 已结束",callId.getCallId())); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java index 9c0d9c6..dd1a90f 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -1,5 +1,7 @@ package cn.skcks.docking.gb28181.service.play; +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.JsonUtils; import cn.skcks.docking.gb28181.common.redis.RedisUtil; @@ -147,7 +149,7 @@ public class PlayService { CallIdHeader callId = provider.getNewCallId(); Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); - subscribe.getInviteSubscribe().addPublisher(subscribeKey); + subscribe.getSipResponseSubscribe().addPublisher(subscribeKey); Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -186,13 +188,13 @@ public class PlayService { @Override public void onComplete() { - subscribe.getInviteSubscribe().delPublisher(subscribeKey); + subscribe.getSipResponseSubscribe().delPublisher(subscribeKey); } }; - subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); + subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber); sender.send(senderIp, request); result.onTimeout(() -> { - subscribe.getInviteSubscribe().delPublisher(subscribeKey); + subscribe.getSipResponseSubscribe().delPublisher(subscribeKey); result.setResult(JsonResponse.error("点播超时")); }); return result; @@ -245,7 +247,7 @@ public class PlayService { Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); - subscribe.getInviteSubscribe().addPublisher(subscribeKey); + subscribe.getSipResponseSubscribe().addPublisher(subscribeKey); Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -266,6 +268,7 @@ public class PlayService { } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); + RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND), TimeUnit.SECONDS); result.setResult(JsonResponse.success(videoUrl(streamId))); onComplete(); } else { @@ -287,10 +290,10 @@ public class PlayService { subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey); } }; - subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); + subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber); sender.send(senderIp, request); result.onTimeout(() -> { - subscribe.getInviteSubscribe().delPublisher(subscribeKey); + subscribe.getSipResponseSubscribe().delPublisher(subscribeKey); result.setResult(JsonResponse.error("点播超时")); }); return result;