diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/MediaRtmpConfig.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/MediaRtmpConfig.java new file mode 100644 index 0000000..b45738f --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/MediaRtmpConfig.java @@ -0,0 +1,12 @@ +package cn.skcks.docking.gb28181.wvp.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "media") +public class MediaRtmpConfig { + private int rtmpPort = 1935; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 6b1817b..8ebad69 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -27,8 +27,10 @@ import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder; import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode; import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; import cn.skcks.docking.gb28181.service.ssrc.SsrcService; +import cn.skcks.docking.gb28181.wvp.config.MediaRtmpConfig; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig; +import cn.skcks.docking.gb28181.wvp.executor.DefaultVideoExecutor; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; @@ -56,6 +58,7 @@ import jakarta.servlet.http.HttpServletResponse; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.web.context.request.async.DeferredResult; @@ -90,6 +93,10 @@ public class Gb28181DownloadService { private final VideoService videoService; private final WvpProxyConfig wvpProxyConfig; private final RecordInfoService recordInfoService; + private final MediaRtmpConfig mediaRtmpConfig; + + @Qualifier(DefaultVideoExecutor.EXECUTOR_BEAN_NAME) + private final Executor executor; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap>> requestMap = new ConcurrentHashMap<>(); @@ -129,7 +136,8 @@ public class Gb28181DownloadService { } private String videoUrl(String streamId) { - return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.mp4"); + String rtmpSchema = "rtmp://" + zlmMediaConfig.getIp() + ":" + mediaRtmpConfig.getRtmpPort(); + return StringUtils.joinWith("/", rtmpSchema, "rtp", streamId); } @SneakyThrows @@ -206,6 +214,7 @@ public class Gb28181DownloadService { download(deviceCode, startTime, endTime, useDownload).whenComplete((videoInfo, e) -> { writeFileHeader(response, deviceCode, startTime, endTime, fileHeader); + log.info("videoInfo {}", videoInfo); if (e != null) { writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage())); } else if (videoInfo == null) { @@ -350,7 +359,9 @@ public class Gb28181DownloadService { } else if(videoInfo == null){ writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败")); } else if(wvpProxyConfig.getUseFfmpeg()){ - videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId()); + scheduledExecutorService.submit(()->{ + videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId()); + }); } else { videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60); } @@ -502,6 +513,7 @@ public class Gb28181DownloadService { result.complete(null); return result; } + String ssrc = ssrcService.getPlaySsrc(); TimeField timeField = new TimeField(); timeField.setStartTime(start); @@ -559,6 +571,7 @@ public class Gb28181DownloadService { ScheduledFuture[] schedule = new ScheduledFuture[1]; Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; + private boolean isStart = false; @Override public void onSubscribe(Flow.Subscription subscription) { @@ -574,10 +587,19 @@ public class Gb28181DownloadService { if (statusCode == Response.TRYING) { log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey); subscription.request(1); + String callId = item.getCallId().getCallId(); + if(!isStart){ + isStart = true; + result.completeAsync(() -> new VideoInfo(streamId,videoUrl(streamId), callId, device), executor); + } } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); log.info("收到响应状态 {}", statusCode); String callId = item.getCallId().getCallId(); + if(!isStart){ + isStart = true; + result.completeAsync(() -> new VideoInfo(streamId,videoUrl(streamId), callId, device), executor); + } sender.sendRequest(((provider, ip, port) -> { String fromTag = item.getFromTag(); String toTag = item.getToTag(); @@ -586,7 +608,6 @@ public class Gb28181DownloadService { subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit)); return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId); })); - result.complete(new VideoInfo(streamId,videoUrl(streamId), callId, device)); } else { log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml index a2774b1..079c1f8 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml @@ -40,11 +40,12 @@ media: id: amrWMKmbKqoBjRQ9 # secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333 + rtmp-port: 1936 proxy: wvp: - #url: http://127.0.0.1:18978 - url: http://192.168.3.12:18978 + url: http://127.0.0.1:18978 +# url: http://192.168.3.12:18978 user: admin passwd: admin use-ffmpeg: true @@ -64,13 +65,14 @@ proxy: realtime-video-duration: 15m gb28181: sip: - id: 44050100002000000005 + id: 44050100002000000003 +# id: 44050100002000000005 domain: 4405010000 password: 123456 port: 5063 ip: -# - 10.10.10.20 - - 192.168.0.195 + - 10.10.10.20 +# - 192.168.0.195 stream-mode: udp use-playback-to-download: false proxy-media-url: 'https://10.10.10.200:18181/media' @@ -90,8 +92,10 @@ ffmpeg-support: rtp: # input: -i http://10.10.10.200:5080/live/test.live.flv input: -re -i - output: -preset ultrafast -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp +# output: -preset ultrafast -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -vsync 2 -copyts -f flv # -rtsp_transport tcp # output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f flv # -rtsp_transport tcp + #output: -c:v libx264 -an -f flv # -rtsp_transport tcp + output: -c:v copy -an -f flv download: -thread_queue_size 128 -i debug: download: false