From 4799d8301471ef8444a8f6f4923f4f788184de61 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Mon, 29 Apr 2024 00:37:01 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E8=A7=86=E9=A2=91=E5=9B=9E?= =?UTF-8?q?=E6=94=BE/=E4=B8=8B=E8=BD=BD=20=E6=96=B0=E5=A2=9E=20prefetch=20?= =?UTF-8?q?=E5=8F=82=E6=95=B0=20=E6=8C=87=E5=AE=9A=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E6=8F=90=E5=89=8D=E5=90=AF=E5=8A=A8=E9=A2=84=E6=8B=89=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/Gb28181DownloadService.java | 45 +++++++++++-------- .../src/main/resources/application-local.yml | 12 +++-- pom.xml | 2 +- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index f334bd8..41910de 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -68,6 +68,7 @@ import javax.sip.message.Request; import javax.sip.message.Response; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; +import java.time.Duration; import java.time.ZoneId; import java.util.Date; import java.util.List; @@ -222,7 +223,7 @@ public class Gb28181DownloadService { }); } - download(deviceCode, startTime, endTime, useDownload).whenComplete((videoInfo, e) -> { + download(deviceCode, startTime, endTime, useDownload, true).whenComplete((videoInfo, e) -> { writeFileHeader(response, deviceCode, startTime, endTime, fileHeader); log.info("videoInfo {}", videoInfo); if (e != null) { @@ -274,11 +275,11 @@ public class Gb28181DownloadService { // 间隔一定时间(200ms) 给设备足够的时间结束前次请求 scheduledExecutorService.schedule(()->{ - download(deviceCode, startTime, endTime).whenComplete((videoInfo, e)->{ + download(deviceCode, startTime, endTime, false).whenComplete((videoInfo, e)->{ log.info("获取媒体信息 {}", videoInfo); String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); String existCallId = RedisUtil.StringOps.get(cacheKey); - // 到达时间后主动结束, 防止某些设备不会主动结束 + // 到达时间后 延迟 10秒 主动结束, 防止某些设备不会主动结束 scheduledExecutorService.schedule(()->{ log.info("到达结束时间 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId()); String deviceIp = docking.getIp(); @@ -291,7 +292,7 @@ public class Gb28181DownloadService { zlmMediaService.closeRtpServer(CloseRtpServer.builder() .streamId(videoInfo.streamId) .build()); - }, time, TimeUnit.MILLISECONDS); + }, time + Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS); String url = videoWsUrl(videoInfo.getUrl()); url = StringUtils.replaceOnce(url, ".live.flv", ".live.mp4"); @@ -368,7 +369,7 @@ public class Gb28181DownloadService { DateTime start = DateUtil.date(); HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse(); try{ - download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{ + download(deviceCode, startTime,endTime, true).whenComplete((videoInfo, e)->{ streamHeader(asyncResponse); if(e != null){ writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage())); @@ -454,13 +455,13 @@ public class Gb28181DownloadService { } String ssrc = ssrcService.getPlaySsrc(); GB28181Description gb28181Description = GB28181SDPBuilder.Receiver.play(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode); - sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result)); + sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result, false)); return result; } @SneakyThrows - public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture result) { + public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture result, Boolean prefetch) { String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); String existCallId = RedisUtil.StringOps.get(cacheKey); @@ -479,7 +480,7 @@ public class Gb28181DownloadService { CallIdHeader callId = provider.getNewCallId(); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getInviteSubscribe().addPublisher(subscribeKey); - Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS); + Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS, prefetch); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); RedisUtil.StringOps.set(cacheKey, callId.getCallId()); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); @@ -487,12 +488,12 @@ public class Gb28181DownloadService { } } - public CompletableFuture download(String deviceCode, Date startTime, Date endTime) { - return download(deviceCode,startTime,endTime, proxySipConfig.isUsePlaybackToDownload()); + public CompletableFuture download(String deviceCode, Date startTime, Date endTime, Boolean prefetch) { + return download(deviceCode,startTime,endTime, proxySipConfig.isUsePlaybackToDownload(), prefetch); } @SneakyThrows - public CompletableFuture download(String deviceCode, Date startTime, Date endTime, Boolean useDownload) { + public CompletableFuture download(String deviceCode, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch) { Optional deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); if (deviceByDeviceCode.isEmpty()) { String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode); @@ -500,12 +501,12 @@ public class Gb28181DownloadService { throw new JsonException(reason); } else { WvpProxyDevice device = deviceByDeviceCode.get(); - return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime, useDownload); + return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime, useDownload, prefetch); } } @SneakyThrows - public CompletableFuture download(String gbDeviceId, String channel, Date startTime, Date endTime, Boolean useDownload){ + public CompletableFuture download(String gbDeviceId, String channel, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch){ CompletableFuture result = new CompletableFuture<>(); Optional deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId); long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); @@ -559,12 +560,12 @@ public class Gb28181DownloadService { URIField uriField = new URIField(); uriField.setURI(StringUtils.joinWith(":", channel, "0")); gb28181Description.setURI(uriField); - sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time)); + sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time, prefetch)); return result; } @SneakyThrows - public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture result, long time) { + public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture result, long time, Boolean prefetch) { String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); String existCallId = RedisUtil.StringOps.get(cacheKey); @@ -583,16 +584,18 @@ public class Gb28181DownloadService { CallIdHeader callId = provider.getNewCallId(); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getInviteSubscribe().addPublisher(subscribeKey); - Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS); + Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS, prefetch); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); RedisUtil.StringOps.set(cacheKey, callId.getCallId()); - // 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流 - result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device)); + if(prefetch){ + // 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流 + result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device)); + } return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); }; } - public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture result, long time, TimeUnit unit){ + public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture result, long time, TimeUnit unit, Boolean prefetch){ ScheduledFuture[] schedule = new ScheduledFuture[1]; Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -614,6 +617,10 @@ public class Gb28181DownloadService { log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); log.info("收到响应状态 {}", statusCode); String callId = item.getCallId().getCallId(); + if(!prefetch){ + // 待 相应 200OK 后再返回, 用于对延迟不敏感的实时请求 + result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), item.getCallId().getCallId(), device)); + } scheduledExecutorService.schedule(()->{ sender.sendRequest(((provider, ip, port) -> { diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml index 90b99be..74f815a 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml @@ -75,7 +75,8 @@ proxy: # - 192.168.0.195 stream-mode: udp use-playback-to-download: false - proxy-media-url: 'https://10.10.10.200:18181/media' +# proxy-media-url: 'https://10.10.10.200:18181/media' + proxy-media-url: 'https://10.10.10.200:5444' use-record-info-query-before-download: true retry-record-info-query-before-download-interval: 3 retry-record-info-query-before-download-times: 20 @@ -96,11 +97,16 @@ ffmpeg-support: # output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f flv # -rtsp_transport tcp #output: -c:v libx264 -an -f flv # -rtsp_transport tcp output: -c:v copy -an -f flv - download: -rw_timeout 10000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 128 -i + #download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i + download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i + log-level: error + # download: -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 128 -i debug: download: false input: false output: false + tmp-dir: G:\Temp\record\download-proxy + use-tmp-file: true # [可选] 日志配置, 一般不需要改 logging: @@ -110,4 +116,4 @@ report: enabled: false url: http://127.0.0.1:8080/api/report custom-headers: - agent: gb28181-proxy \ No newline at end of file + agent: gb28181-proxy diff --git a/pom.xml b/pom.xml index 1563918..8ecaaab 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ 1.4.13 - 0.1.0-SNAPSHOT + 0.1.0