From a3a23db8dfb3b7bd567b4777fd2359ab4db903c0 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Thu, 21 Sep 2023 02:48:13 +0800 Subject: [PATCH] =?UTF-8?q?sip=20=E4=B8=8B=E8=BD=BD=E4=BF=A1=E4=BB=A4=20?= =?UTF-8?q?=E6=9E=84=E9=80=A0=20(=E6=9C=AA=E5=AE=8C)=20=E5=BE=85=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=20notify=3DMediaStatus,=20BYE=20=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wvp/service/docking/DockingService.java | 9 ++ .../gb28181/Gb28181DownloadService.java | 103 +++++++++++++++++- .../wvp/sip/request/SipRequestBuilder.java | 37 +++++++ .../wvp/sip/subscribe/SipSubscribe.java | 4 + .../src/main/resources/application-local.yml | 2 +- 5 files changed, 151 insertions(+), 4 deletions(-) diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java index 5044a25..ea7011f 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java @@ -35,6 +35,15 @@ public class DockingService { return getDeviceByDeviceCode(deviceCode).orElse(null) != null; } + public Optional getDeviceByGbDeviceId(String gbDeviceId){ + return wvpProxyDockingMapper.selectOne(s-> + s.where(WvpProxyDockingDynamicSqlSupport.gbDeviceId, isEqualTo(gbDeviceId))); + } + + public Boolean hasDeviceByGbDeviceId(String deviceCode){ + return getDeviceByGbDeviceId(deviceCode).orElse(null) != null; + } + /** * 添加设备 * @param device 设备 diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index e3f532f..b099d9b 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -1,8 +1,17 @@ package cn.skcks.docking.gb28181.wvp.service.gb28181; +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.common.json.JsonUtils; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp; import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer; @@ -12,8 +21,16 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.service.ssrc.SsrcService; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import gov.nist.javax.sdp.MediaDescriptionImpl; +import gov.nist.javax.sdp.fields.AttributeField; import gov.nist.javax.sdp.fields.TimeField; +import gov.nist.javax.sip.message.SIPResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -24,10 +41,14 @@ import javax.sdp.Connection; import javax.sdp.SdpFactory; import javax.sdp.TimeDescription; import javax.sip.ListeningPoint; +import javax.sip.header.CallIdHeader; +import javax.sip.message.Request; +import javax.sip.message.Response; import java.util.Date; import java.util.List; import java.util.Optional; import java.util.Vector; +import java.util.concurrent.*; @Slf4j @Service @@ -37,7 +58,11 @@ public class Gb28181DownloadService { private final ZlmMediaConfig zlmMediaConfig; private final SsrcService ssrcService; private final DeviceService deviceService; + private final DockingService dockingService; private final ProxySipConfig proxySipConfig; + private final SipSender sender; + private final SipSubscribe subscribe; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private String videoUrl(String streamId) { return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.flv"); @@ -74,19 +99,35 @@ public class Gb28181DownloadService { } @SneakyThrows - public void download(String gbDeviceId, String channel,Date startTime, Date endTime){ + public CompletableFuture download(String gbDeviceId, String channel, Date startTime, Date endTime){ + CompletableFuture result = new CompletableFuture<>(); + Optional deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId); + long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); + if(deviceByGbDeviceId.isEmpty()){ + log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId); + result.complete(""); + return result; + } Optional deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel); if (deviceByGbDeviceIdAndChannel.isEmpty()) { log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel); - return ; + result.complete(""); + return result; } + WvpProxyDevice device = deviceByGbDeviceIdAndChannel.get(); + WvpProxyDocking docking = deviceByGbDeviceId.get(); long start = startTime.toInstant().getEpochSecond(); long end = endTime.toInstant().getEpochSecond(); String streamId = MediaSdpHelper.getStreamId(gbDeviceId, channel, String.valueOf(start), String.valueOf(end)); int streamMode = proxySipConfig.getTransport().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; String ip = zlmMediaConfig.getIp(); int port = openRtpServer(streamId, streamMode); + if(port <= 0){ + log.error("zlm 暂无可用端口"); + result.complete(""); + return result; + } String ssrc = ssrcService.getPlaySsrc(); TimeField timeField = new TimeField(); timeField.setStartTime(start); @@ -95,7 +136,63 @@ public class Gb28181DownloadService { GB28181Description gb28181Description = MediaSdpHelper.playback(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, StreamMode.of(ListeningPoint.UDP), startTime, endTime); gb28181Description.setSessionName(SdpFactory.getInstance().createSessionName(MediaSdpHelper.Action.DOWNLOAD.getAction())); gb28181Description.setTimeDescriptions(new Vector<>(){{add(timeDescription);}}); - log.debug("{}", gb28181Description); + MediaDescriptionImpl media = (MediaDescriptionImpl) gb28181Description.getMediaDescriptions(true).get(0); + media.setAttribute("downloadspeed", String.valueOf(4)); + sender.sendRequest(inviteRequest(docking, device, gb28181Description, MediaSdpHelper.Action.DOWNLOAD, ssrc, streamId, result, time)); + return result; } + public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture result, long time) { + return (provider, ip, port) -> { + CallIdHeader callId = provider.getNewCallId(); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); + subscribe.getInviteSubscribe().addPublisher(subscribeKey); + Flow.Subscriber subscriber = inviteSubscriber(subscribeKey, ssrc, streamId, result, time); + subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); + scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); + return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); + }; + } + + public Flow.Subscriber inviteSubscriber(String subscribeKey,String ssrc,String streamId,CompletableFuture result, long time){ + return new Flow.Subscriber<>() { + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey); + subscription.request(1); + } + + @Override + public void onNext(SIPResponse item) { + int statusCode = item.getStatusCode(); + log.debug("{} 收到订阅消息 {}", subscribeKey, item); + if (statusCode == Response.TRYING) { + log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey); + subscription.request(1); + } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { + log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); + result.complete(videoUrl(streamId)); + onComplete(); + } else { + log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); + result.complete(""); + ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); + onComplete(); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscribe.getInviteSubscribe().delPublisher(subscribeKey); + } + }; + } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java index f20a0c4..bbca4df 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java @@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import gov.nist.javax.sip.message.MessageFactoryImpl; @@ -16,11 +17,14 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; +import javax.sip.InvalidArgumentException; +import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; import javax.sip.address.Address; import javax.sip.address.SipURI; import javax.sip.header.*; import javax.sip.message.Request; +import java.text.ParseException; import java.util.Collections; import java.util.List; @@ -126,6 +130,39 @@ public class SipRequestBuilder implements ApplicationContextAware { return request; } + @SneakyThrows + public static Request createInviteRequest(String ip, int port, WvpProxyDocking device, String channelId, String content, String viaTag, String fromTag, String toTag, String ssrc, CallIdHeader callIdHeader){ + Request request; + String target = StringUtils.joinWith(":", device.getIp(), device.getPort()); + SipURI requestLine = MessageHelper.createSipURI(channelId, target); + // via + List viaHeaders = getViaHeaders(ip, port, sipConfig.getTransport(), viaTag ); + // from + SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain()); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(channelId, target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, null); + // Forwards + MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + // cSeq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.INVITE); + request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); + request.addHeader(SipUtil.createUserAgentHeader()); + + Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port)); + + request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + // Subject + SubjectHeader subjectHeader = getSipFactory().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", channelId, ssrc, sipConfig.getId(), 0)); + request.addHeader(subjectHeader); + ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); + request.setContent(content, contentTypeHeader); + return request; + } + public static long getCSeq() { String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId()); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java index 4b25c1c..409cf11 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java @@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.wvp.sip.subscribe; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.InviteSubscribe; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import jakarta.annotation.PostConstruct; @@ -22,14 +23,17 @@ public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; private GenericSubscribe catalogSubscribe; + private GenericSubscribe inviteSubscribe; @PostConstruct private void init() { catalogSubscribe = new CatalogSubscribe(executor); + inviteSubscribe = new InviteSubscribe(executor); } @PreDestroy private void destroy() { catalogSubscribe.close(); + inviteSubscribe.close(); } } diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml index a1524a2..c8ec726 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml @@ -1,5 +1,5 @@ server: - port: 18186 + port: 18183 project: version: @project.version@