diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 459bbb0..b029e9a 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -42,8 +42,7 @@ import gov.nist.javax.sip.message.SIPResponse; import jakarta.servlet.AsyncContext; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; +import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.http.MediaType; @@ -80,6 +79,15 @@ public class Gb28181DownloadService { private final WvpProxyConfig wvpProxyConfig; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + @NoArgsConstructor + @AllArgsConstructor + @Data + public static class VideoInfo { + private String url; + private String callId; + private WvpProxyDevice device; + } + public void header(HttpServletResponse response) { response.setContentType("video/mp4"); response.setHeader("Accept-Ranges", "none"); @@ -131,16 +139,16 @@ public class Gb28181DownloadService { asyncContext.start(()->{ HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse(); try{ - download(deviceCode, startTime,endTime).whenComplete((url, e)->{ + download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{ writeFileHeader(response,deviceCode,startTime,endTime,fileHeader); if(e != null){ writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage())); - } else if(StringUtils.isBlank(url)){ + } else if(videoInfo == null){ writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败")); } else if(wvpProxyConfig.getUseFfmpeg()){ - videoService.ffmpegRecord(asyncResponse, url, DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60); + videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId()); } else { - videoService.javaCVrecord(asyncResponse, url, DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60); + videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60); } asyncContext.complete(); }); @@ -163,7 +171,7 @@ public class Gb28181DownloadService { } @SneakyThrows - public CompletableFuture download(String deviceCode, Date startTime, Date endTime) { + public CompletableFuture download(String deviceCode, Date startTime, Date endTime) { Optional deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); if (deviceByDeviceCode.isEmpty()) { String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode); @@ -176,19 +184,19 @@ public class Gb28181DownloadService { } @SneakyThrows - public CompletableFuture download(String gbDeviceId, String channel, Date startTime, Date endTime){ - CompletableFuture result = new CompletableFuture<>(); + public CompletableFuture download(String gbDeviceId, String channel, Date startTime, Date endTime){ + CompletableFuture result = new CompletableFuture<>(); Optional deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId); long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); if(deviceByGbDeviceId.isEmpty()){ log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId); - result.complete(""); + result.complete(null); return result; } Optional deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel); if (deviceByGbDeviceIdAndChannel.isEmpty()) { log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel); - result.complete(""); + result.complete(null); return result; } @@ -204,7 +212,7 @@ public class Gb28181DownloadService { int port = openRtpServer(streamId, streamMode); if(port <= 0){ log.error("zlm 暂无可用端口"); - result.complete(""); + result.complete(null); return result; } String ssrc = ssrcService.getPlaySsrc(); @@ -224,7 +232,7 @@ public class Gb28181DownloadService { return result; } - public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture result, long time) { + public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture result, long time) { return (provider, ip, port) -> { CallIdHeader callId = provider.getNewCallId(); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); @@ -235,7 +243,7 @@ public class Gb28181DownloadService { }; } - public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture result, long time, TimeUnit unit){ + public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture result, long time, TimeUnit unit){ ScheduledFuture[] schedule = new ScheduledFuture[1]; Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -257,20 +265,20 @@ public class Gb28181DownloadService { } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); log.info("收到响应状态 {}", statusCode); + String callId = item.getCallId().getCallId(); sender.sendRequest(((provider, ip, port) -> { String fromTag = item.getFromTag(); String toTag = item.getToTag(); - String callId = item.getCallId().getCallId(); String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); subscribe.getByeSubscribe().addPublisher(key); subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key,streamId, time, unit)); return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId); })); - result.complete(videoUrl(streamId)); + result.complete(new VideoInfo(videoUrl(streamId), callId, device)); } else { log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); - result.complete(""); + result.complete(null); ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); onComplete(); } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java index b98a16f..4931ecb 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java @@ -3,14 +3,23 @@ package cn.skcks.docking.gb28181.wvp.service.video; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.service.ffmpeg.FfmpegSupportService; +import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; import jakarta.servlet.AsyncContext; import jakarta.servlet.ServletOutputStream; import jakarta.servlet.ServletResponse; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; +import lombok.AccessLevel; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.exec.*; @@ -22,7 +31,12 @@ import org.bytedeco.javacv.FFmpegFrameRecorder; import org.bytedeco.javacv.FrameGrabber; import org.springframework.stereotype.Service; -import java.io.*; +import javax.sip.SipProvider; +import javax.sip.message.Request; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -35,6 +49,9 @@ import java.util.concurrent.atomic.AtomicBoolean; public class VideoService { private final FfmpegSupportService ffmpegSupportService; private final WvpProxyConfig wvpProxyConfig; + private final ProxySipConfig proxySipConfig; + private final DockingService dockingService; + private final SipSender sender; /** * 写入 flv 响应头信息 * @param response HttpServletResponse 响应 @@ -83,6 +100,62 @@ public class VideoService { } } + @RequiredArgsConstructor + public class FfmpegExecuteResultHandler extends DefaultExecuteResultHandler implements ExecuteResultHandler { + private final static long SLEEP_TIME_MS = 50; + @Setter(AccessLevel.PRIVATE) + private boolean hasResult = false; + private final WvpProxyDevice device; + private final String callId; + private final SipSender sender; + + private void mediaStatus(){ + String deviceId = device.getGbDeviceId(); + Optional deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(deviceId); + if(deviceByGbDeviceId.isEmpty()){ + return; + } + WvpProxyDocking wvpProxyDocking = deviceByGbDeviceId.get(); + String ip = wvpProxyDocking.getIp(); + int port = Integer.parseInt(wvpProxyDocking.getPort()); + String transport = proxySipConfig.getTransport(); + SipProvider provider = sender.getProvider(transport, ip); + Request byeRequest = SipRequestBuilder.createByeRequest(ip, port, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, callId); + try{ + provider.sendRequest(byeRequest); + }catch (Exception e){ + log.error("bye 请求发送失败 {}",e.getMessage()); + } + } + + public boolean hasResult() { + return hasResult; + } + + @SneakyThrows + public void waitFor() { + while (!hasResult()) { + Thread.sleep(SLEEP_TIME_MS); + } + } + + @Override + public void onProcessComplete(int exitValue) { + hasResult = true; + mediaStatus(); + } + + @Override + public void onProcessFailed(ExecuteException e) { + hasResult = true; + mediaStatus(); + } + } + + public FfmpegExecuteResultHandler mediaStatus(WvpProxyDevice device, String key){ + return new FfmpegExecuteResultHandler(device,key,sender); + } + /** * 录制视频 并写入 异步响应 * @param response AsyncContext.getResponse 异步响应 @@ -152,6 +225,33 @@ public class VideoService { } } + /** + * 录制视频 并写入 异步响应 + * @param response AsyncContext.getResponse 异步响应 + * @param url 要录制的视频地址 + * @param time 录制时长 (单位: 秒) + */ + @SneakyThrows + public void ffmpegRecord(ServletResponse response, String url, long time, WvpProxyDevice device,String callId){ + ServletOutputStream outputStream = response.getOutputStream(); + ByteArrayOutputStream errorStream = new ByteArrayOutputStream(); + PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, errorStream); + DefaultExecuteResultHandler executeResultHandler = mediaStatus(device,callId); + DateTime startTime = DateUtil.date(); + Executor executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler); + log.info("开始录制 {}", url); + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + log.info("到达结束时间, 结束录制 {}", url); + executor.getWatchdog().destroyProcess(); + log.info("结束录制 {}", url); + }, time, TimeUnit.SECONDS); + executeResultHandler.waitFor(); + schedule.cancel(true); + DateTime endTime = DateUtil.date(); + log.info("录制进程结束 {}, 录制耗时: {}", url, DateUtil.between(startTime,endTime, DateUnit.SECOND)); + } + /** * 录制视频 并写入 异步响应 * @param response AsyncContext.getResponse 异步响应 diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java index 7b47c79..57662cc 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java @@ -235,6 +235,42 @@ public class SipRequestBuilder implements ApplicationContextAware { return request; } + @SneakyThrows + public static Request createByeRequest(String ip, int port, String targetId, String fromTag, String toTag, String callId) { + Request request; + // 请求行 + String target = StringUtils.joinWith(":", ip, port); + SipURI requestLine = MessageHelper.createSipURI(targetId, target); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, sipConfig.getTransport(), SipUtil.generateViaTag()); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain()); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(targetId, target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, toTag); + + // Forwards + MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + // ceq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.BYE); + CallIdHeader callIdHeader = getSipFactory().createHeaderFactory().createCallIdHeader(callId); + request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.BYE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtil.createUserAgentHeader()); + + Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port)); + request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + request.addHeader(SipUtil.createUserAgentHeader()); + + return request; + } + public static long getCSeq() { String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId());