diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/bye/request/ByeRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/bye/request/ByeRequestProcessor.java new file mode 100644 index 0000000..9ab675f --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/bye/request/ByeRequestProcessor.java @@ -0,0 +1,59 @@ +package cn.skcks.docking.gb28181.core.sip.message.processor.bye.request; + +import cn.skcks.docking.gb28181.core.sip.listener.SipListener; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.core.sip.service.SipService; +import cn.skcks.docking.gb28181.sip.method.invite.response.InviteResponseBuilder; +import gov.nist.javax.sip.message.SIPRequest; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.SipProvider; +import javax.sip.message.Request; +import java.util.EventObject; +import java.util.Optional; + +@Slf4j +@RequiredArgsConstructor +@Component +public class ByeRequestProcessor implements MessageProcessor { + private final SipListener sipListener; + + private final SipSubscribe subscribe; + + private final SipService sipService; + + @PostConstruct + @Override + public void init() { + sipListener.addRequestProcessor(Request.BYE, this); + } + + @Override + public void process(EventObject eventObject) { + RequestEvent requestEvent = (RequestEvent) eventObject; + SIPRequest request = (SIPRequest) requestEvent.getRequest(); + String callId = request.getCallId().getCallId(); + String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); + log.info("key {}", key); + String ip = request.getLocalAddress().getHostAddress(); + String transport = request.getTopmostViaHeader().getTransport(); + SipProvider provider= sipService.getProvider(transport, ip); + Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(key)) + .ifPresentOrElse( + publisher -> publisher.submit(request), + () -> { + try { + provider.sendResponse(InviteResponseBuilder.builder().build().createTryingInviteResponse(request)); + } catch (SipException e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java index c7cf3de..fd6d671 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -7,6 +7,7 @@ import cn.skcks.docking.gb28181.common.json.JsonUtils; import cn.skcks.docking.gb28181.common.redis.RedisUtil; import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe; import cn.skcks.docking.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; @@ -14,7 +15,6 @@ import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.service.SipService; -import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; import cn.skcks.docking.gb28181.media.dto.rtp.CloseRtpServer; import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp; @@ -27,6 +27,9 @@ import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder; import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.service.ssrc.SsrcService; +import cn.skcks.docking.gb28181.sip.method.invite.response.InviteResponseBuilder; +import cn.skcks.docking.gb28181.sip.utils.SipUtil; +import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -148,7 +151,7 @@ public class PlayService { SipProvider provider = sipService.getProvider(transport, senderIp); CallIdHeader callId = provider.getNewCallId(); Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); - String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getSipResponseSubscribe().addPublisher(subscribeKey); Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -156,7 +159,7 @@ public class PlayService { @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; - log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {}", Request.INVITE, subscribeKey); subscription.request(1); } @@ -165,15 +168,15 @@ public class PlayService { int statusCode = item.getStatusCode(); log.debug("{} 收到订阅消息 {}", subscribeKey, item); if (statusCode == Response.TRYING) { - log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {} 尝试连接流媒体服务", Request.INVITE, subscribeKey); subscription.request(1); } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { - log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey); RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); result.setResult(JsonResponse.success(videoUrl(streamId))); onComplete(); } else { - log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", Request.INVITE, subscribeKey); RedisUtil.KeyOps.delete(key); result.setResult(JsonResponse.error("连接流媒体服务失败")); ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); @@ -191,6 +194,9 @@ public class PlayService { subscribe.getSipResponseSubscribe().delPublisher(subscribeKey); } }; + byeSubscribe(callId.getCallId(),3600,()->{ + RedisUtil.KeyOps.delete(key); + }); subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber); sender.send(senderIp, request); result.onTimeout(() -> { @@ -246,7 +252,7 @@ public class PlayService { CallIdHeader callId = provider.getNewCallId(); Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); - String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getSipResponseSubscribe().addPublisher(subscribeKey); Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -254,7 +260,7 @@ public class PlayService { @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; - log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {}", Request.INVITE, subscribeKey); subscription.request(1); } @@ -263,16 +269,16 @@ public class PlayService { int statusCode = item.getStatusCode(); log.debug("{} 收到订阅消息 {}", subscribeKey, item); if (statusCode == Response.TRYING) { - log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {} 尝试连接流媒体服务", Request.INVITE, subscribeKey); subscription.request(1); } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { - log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey); RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND), TimeUnit.SECONDS); result.setResult(JsonResponse.success(videoUrl(streamId))); onComplete(); } else { - log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); + log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", Request.INVITE, subscribeKey); RedisUtil.KeyOps.delete(key); result.setResult(JsonResponse.error("连接流媒体服务失败")); ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); @@ -290,6 +296,9 @@ public class PlayService { subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey); } }; + byeSubscribe(callId.getCallId(),DateUtil.between(startTime,endTime,DateUnit.SECOND),()->{ + RedisUtil.KeyOps.delete(key); + }); subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber); sender.send(senderIp, request); result.onTimeout(() -> { @@ -299,6 +308,40 @@ public class PlayService { return result; } + public void byeSubscribe(String callId, long seconds, Runnable cb){ + GenericTimeoutSubscribe sipRequestSubscribe = subscribe.getSipRequestSubscribe(); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.BYE, callId); + sipRequestSubscribe.addPublisher(subscribeKey,seconds + 30,TimeUnit.SECONDS); + Flow.Subscriber subscriber = new Flow.Subscriber<>(){ + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(1); + } + + @Override + @SneakyThrows + public void onNext(SIPRequest item) { + subscribe.getRecordInfoSubscribe().delPublisher(GenericSubscribe.Helper.getKey(Request.INVITE, callId)); + String transport = item.getTopmostViaHeader().getTransport(); + String hostAddress = item.getLocalAddress().getHostAddress(); + Response byeResponse = InviteResponseBuilder.builder().build().createByeResponse(item, SipUtil.nanoId()); + sipService.getProvider(transport,hostAddress).sendResponse(byeResponse); + cb.run(); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey); + } + }; + sipRequestSubscribe.addSubscribe(subscribeKey,subscriber); + } + @SneakyThrows public JsonResponse recordStop(String deviceId, String channelId, Date startTime, Date endTime) { DockingDevice device = deviceService.getDevice(deviceId);