diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java index a77027d..6258397 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java @@ -45,7 +45,7 @@ public class DeviceService { public Optional getDeviceByGbDeviceIdAndChannel(String gbDeviceId,String channel){ return deviceMapper.selectOne(s-> s.where(WvpProxyDeviceDynamicSqlSupport.gbDeviceId,isEqualTo(gbDeviceId)) - .and(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId,isEqualTo(channel))); + .and(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId,isEqualTo(channel)).limit(1)); } @Transactional diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index af217fa..0cc0df2 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -2,10 +2,6 @@ package cn.skcks.docking.gb28181.wvp.service.gb28181; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; -import cn.skcks.docking.gb28181.common.json.JsonResponse; -import cn.skcks.docking.gb28181.common.json.JsonUtils; -import cn.skcks.docking.gb28181.common.redis.RedisUtil; -import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode; @@ -28,7 +24,6 @@ import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import gov.nist.javax.sdp.MediaDescriptionImpl; -import gov.nist.javax.sdp.fields.AttributeField; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; import gov.nist.javax.sip.message.SIPResponse; @@ -149,16 +144,16 @@ public class Gb28181DownloadService { public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture result, long time) { return (provider, ip, port) -> { CallIdHeader callId = provider.getNewCallId(); - String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, action.getAction(), callId.getCallId()); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getInviteSubscribe().addPublisher(subscribeKey); - Flow.Subscriber subscriber = inviteSubscriber(subscribeKey, ssrc, streamId, result, time); + Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result, time); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); }; } - public Flow.Subscriber inviteSubscriber(String subscribeKey,String ssrc,String streamId,CompletableFuture result, long time){ + public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture result, long time){ return new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -178,8 +173,14 @@ public class Gb28181DownloadService { subscription.request(1); } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); + log.info("收到响应状态 {}", statusCode); + sender.sendRequest(((provider, ip, port) -> { + String fromTag = item.getFromTag(); + String toTag = item.getToTag(); + String callId = item.getCallId().getCallId(); + return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId); + })); result.complete(videoUrl(streamId)); - onComplete(); } else { log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); result.complete(""); @@ -190,7 +191,7 @@ public class Gb28181DownloadService { @Override public void onError(Throwable throwable) { - + onComplete(); } @Override diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java index 7891487..47d84af 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java @@ -3,7 +3,6 @@ package cn.skcks.docking.gb28181.wvp.sip.listener; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; -import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; @@ -22,7 +21,6 @@ import java.util.concurrent.ConcurrentMap; @Component @Slf4j public class SipListenerImpl implements SipListener { - private final SipSubscribe sipSubscribe; private final ConcurrentMap requestProcessor = new ConcurrentHashMap<>(); private final ConcurrentMap responseProcessor = new ConcurrentHashMap<>(); @@ -56,6 +54,7 @@ public class SipListenerImpl implements SipListener { String method = cseqHeader.getMethod(); log.debug("{} {}", method, response); + // Success if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { log.debug("传入响应 method => {}", method); @@ -64,6 +63,9 @@ public class SipListenerImpl implements SipListener { }); } else if ((status >= Response.TRYING) && (status < Response.OK)) { // 增加其它无需回复的响应,如101、180等 + Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> { + processor.process(responseEvent); + }); } else { log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()); if (responseEvent.getDialog() != null) { diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java new file mode 100644 index 0000000..3fec558 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java @@ -0,0 +1,51 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.bye.request.request; + +import cn.skcks.docking.gb28181.core.sip.listener.SipListener; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder; +import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.sip.RequestEvent; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.util.EventObject; +import java.util.Optional; + +@Slf4j +@RequiredArgsConstructor +@Component +public class ByeRequestProcessor implements MessageProcessor { + private final SipListener sipListener; + + private final SipSubscribe subscribe; + + private final SipSender sender; + + @PostConstruct + @Override + public void init() { + sipListener.addRequestProcessor(Request.BYE, this); + } + + @Override + public void process(EventObject eventObject) { + RequestEvent requestEvent = (RequestEvent) eventObject; + SIPRequest request = (SIPRequest) requestEvent.getRequest(); + String callId = request.getCallId().getCallId(); + String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); + String ip = request.getLocalAddress().getHostAddress(); + String transport = request.getTopmostViaHeader().getTransport(); + Optional.ofNullable(subscribe.getByeSubscribe().getPublisher(key)) + .ifPresentOrElse( + publisher -> publisher.submit(request), + () -> sender.sendResponse(ip, transport, ((provider, ip1, port) -> + SipResponseBuilder.response(request, Response.OK, "OK")))); + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/invite/response/InviteResponseProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/invite/response/InviteResponseProcessor.java new file mode 100644 index 0000000..cddc97c --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/invite/response/InviteResponseProcessor.java @@ -0,0 +1,41 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.invite.response; + +import cn.skcks.docking.gb28181.core.sip.listener.SipListener; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import gov.nist.javax.sip.message.SIPResponse; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.sip.ResponseEvent; +import javax.sip.message.Request; +import java.util.EventObject; +import java.util.Optional; + +@Slf4j +@RequiredArgsConstructor +@Component +public class InviteResponseProcessor implements MessageProcessor { + private final SipListener sipListener; + private final SipSubscribe subscribe; + + @PostConstruct + @Override + public void init() { + sipListener.addResponseProcessor(Request.INVITE, this); + } + + @Override + public void process(EventObject eventObject) { + ResponseEvent requestEvent = (ResponseEvent) eventObject; + SIPResponse response = (SIPResponse) requestEvent.getResponse(); + String callId = response.getCallId().getCallId(); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId); + log.info("收到 INVITE 响应, key {}", subscribeKey); + Optional.ofNullable(subscribe.getInviteSubscribe().getPublisher(subscribeKey)) + .ifPresent(publisher->publisher.submit(response)); + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java index c975a16..2956ff5 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Component; import javax.sip.RequestEvent; import javax.sip.header.CallIdHeader; +import javax.sip.message.Request; import javax.sip.message.Response; import java.util.EventObject; import java.util.Optional; @@ -40,7 +41,7 @@ public class MessageRequestProcessor implements MessageProcessor { @PostConstruct @Override public void init() { - sipListener.addRequestProcessor(Method.MESSAGE,this); + sipListener.addRequestProcessor(Request.MESSAGE,this); } @Override diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java index 4500d1a..ffc25fb 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java @@ -1,31 +1,15 @@ package cn.skcks.docking.gb28181.wvp.sip.message.register.request; -import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.date.DateUtil; -import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.config.sip.SipConfig; import cn.skcks.docking.gb28181.core.sip.dto.RemoteInfo; -import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; -import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.sip.GbSipDate; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.auth.DigestServerAuthenticationHelper; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; -import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; -import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; -import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; -import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingMapper; -import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; -import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; -import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO; -import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; -import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; -import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; -import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Authorization; import gov.nist.javax.sip.header.SIPDateHeader; @@ -36,20 +20,17 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; -import javax.sip.ListeningPoint; import javax.sip.RequestEvent; -import javax.sip.SipProvider; import javax.sip.address.Address; import javax.sip.header.ExpiresHeader; import javax.sip.header.FromHeader; import javax.sip.header.ViaHeader; import javax.sip.message.Request; import javax.sip.message.Response; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Calendar; +import java.util.EventObject; +import java.util.Locale; @Slf4j @RequiredArgsConstructor diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java index bbca4df..7b47c79 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java @@ -5,7 +5,6 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; -import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import gov.nist.javax.sip.message.MessageFactoryImpl; @@ -17,17 +16,16 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; -import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.*; import javax.sip.message.Request; -import java.text.ParseException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +@SuppressWarnings("Duplicates") @DependsOn("proxySipConfig") @Component public class SipRequestBuilder implements ApplicationContextAware { @@ -158,11 +156,85 @@ public class SipRequestBuilder implements ApplicationContextAware { // Subject SubjectHeader subjectHeader = getSipFactory().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", channelId, ssrc, sipConfig.getId(), 0)); request.addHeader(subjectHeader); + request.addHeader(SipUtil.createUserAgentHeader()); + ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); request.setContent(content, contentTypeHeader); return request; } + @SneakyThrows + public static Request createAckRequest(int status,String ip, int port, WvpProxyDocking device, String channelId, String fromTag, String toTag, String callId) { + Request request; + // 请求行 + String target = StringUtils.joinWith(":", device.getIp(), device.getPort()); + SipURI requestLine = MessageHelper.createSipURI(channelId, target); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, sipConfig.getTransport(), SipUtil.generateViaTag()); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain()); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(channelId, target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, toTag); + + // Forwards + MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + // ceq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.ACK); + CallIdHeader callIdHeader = getSipFactory().createHeaderFactory().createCallIdHeader(callId); + request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.ACK, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtil.createUserAgentHeader()); + + Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port)); + request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + request.addHeader(SipUtil.createUserAgentHeader()); + + return request; + } + + @SneakyThrows + public static Request createByeRequest(String ip, int port, WvpProxyDocking device, String channelId, String fromTag, String toTag, String callId) { + Request request; + // 请求行 + String target = StringUtils.joinWith(":", device.getIp(), device.getPort()); + SipURI requestLine = MessageHelper.createSipURI(channelId, target); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, sipConfig.getTransport(), SipUtil.generateViaTag()); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain()); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(channelId, target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, toTag); + + // Forwards + MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + // ceq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.BYE); + CallIdHeader callIdHeader = getSipFactory().createHeaderFactory().createCallIdHeader(callId); + request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.BYE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtil.createUserAgentHeader()); + + Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port)); + request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + request.addHeader(SipUtil.createUserAgentHeader()); + + return request; + } + public static long getCSeq() { String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId()); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/ByeSubscribe.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/ByeSubscribe.java new file mode 100644 index 0000000..967dcbe --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/ByeSubscribe.java @@ -0,0 +1,39 @@ +package cn.skcks.docking.gb28181.wvp.sip.subscribe; + +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +@RequiredArgsConstructor +public class ByeSubscribe implements GenericSubscribe { + private final Executor executor; + + private static final Map> publishers = new ConcurrentHashMap<>(); + + public void close() { + Helper.close(publishers); + } + + public void addPublisher(String key) { + Helper.addPublisher(executor, publishers, key); + } + + public SubmissionPublisher getPublisher(String key) { + return Helper.getPublisher(publishers, key); + } + + public void addSubscribe(String key, Flow.Subscriber subscribe) { + Helper.addSubscribe(publishers, key, subscribe); + } + + @Override + public void delPublisher(String key) { + Helper.delPublisher(publishers, key); + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java index 409cf11..439c3dd 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java @@ -24,16 +24,19 @@ public class SipSubscribe { private final Executor executor; private GenericSubscribe catalogSubscribe; private GenericSubscribe inviteSubscribe; + private GenericSubscribe byeSubscribe; @PostConstruct private void init() { catalogSubscribe = new CatalogSubscribe(executor); inviteSubscribe = new InviteSubscribe(executor); + byeSubscribe = new ByeSubscribe(executor); } @PreDestroy private void destroy() { catalogSubscribe.close(); inviteSubscribe.close(); + byeSubscribe.close(); } }