完成 video sip信令下载重构, 配置文件 调整

This commit is contained in:
shikong 2023-09-21 14:27:06 +08:00
parent b372fd8d8e
commit 0e586a75bb
7 changed files with 95 additions and 30 deletions

View File

@ -1,7 +1,5 @@
package cn.skcks.docking.gb28181.wvp.api.download; package cn.skcks.docking.gb28181.wvp.api.download;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.service.download.DownloadService; import cn.skcks.docking.gb28181.wvp.service.download.DownloadService;
import cn.skcks.docking.gb28181.wvp.service.gb28181.Gb28181DownloadService; import cn.skcks.docking.gb28181.wvp.service.gb28181.Gb28181DownloadService;
@ -10,7 +8,6 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springdoc.core.annotations.ParameterObject;
import org.springdoc.core.models.GroupedOpenApi; import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
@ -28,11 +25,6 @@ public class DownloadController {
return SwaggerConfig.api("DownloadApi", "/download"); return SwaggerConfig.api("DownloadApi", "/download");
} }
@GetJson
public void download(@ParameterObject VideoReq req){
gb28181DownloadService.download(req.getDeviceCode(),req.getStartTime(),req.getEndTime());
}
@Operation(summary = "下载代理") @Operation(summary = "下载代理")
@RequestMapping(method = RequestMethod.HEAD, value = "/proxy") @RequestMapping(method = RequestMethod.HEAD, value = "/proxy")
public void downloadProxyHeader(HttpServletRequest request, HttpServletResponse response, @RequestParam String url) { public void downloadProxyHeader(HttpServletRequest request, HttpServletResponse response, @RequestParam String url) {

View File

@ -3,6 +3,8 @@ package cn.skcks.docking.gb28181.wvp.api.video;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq; import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
import cn.skcks.docking.gb28181.wvp.service.gb28181.Gb28181DownloadService;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService; import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import cn.skcks.docking.gb28181.wvp.service.wvp.WvpService; import cn.skcks.docking.gb28181.wvp.service.wvp.WvpService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
@ -30,6 +32,10 @@ public class VideoController {
private final VideoService videoService; private final VideoService videoService;
private final WvpService wvpService; private final WvpService wvpService;
private final WvpProxyConfig proxyConfig;
private final Gb28181DownloadService gb28181DownloadService;
@Bean @Bean
public GroupedOpenApi videoApi() { public GroupedOpenApi videoApi() {
return SwaggerConfig.api("VideoApi", "/video"); return SwaggerConfig.api("VideoApi", "/video");
@ -39,6 +45,10 @@ public class VideoController {
@GetMapping(produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) @GetMapping(produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
@ResponseBody @ResponseBody
public void video(HttpServletRequest request, HttpServletResponse response, @ParameterObject VideoReq req) { public void video(HttpServletRequest request, HttpServletResponse response, @ParameterObject VideoReq req) {
wvpService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime()); if(proxyConfig.getEnable()){
wvpService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime());
} else {
gb28181DownloadService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime());
}
} }
} }

View File

@ -36,6 +36,7 @@ public class FfmpegSupportService {
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel()); String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), inputParam, outputParam, logLevelParam); String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), inputParam, outputParam, logLevelParam);
CommandLine commandLine = CommandLine.parse(command); CommandLine commandLine = CommandLine.parse(command);
log.info("{}", commandLine);
Executor executor = new DefaultExecutor(); Executor executor = new DefaultExecutor();
ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time)); ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time));
executor.setStreamHandler(streamHandler); executor.setStreamHandler(streamHandler);

View File

@ -1,7 +1,11 @@
package cn.skcks.docking.gb28181.wvp.service.gb28181; package cn.skcks.docking.gb28181.wvp.service.gb28181;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.IoUtil;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode;
@ -16,10 +20,12 @@ import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.service.ssrc.SsrcService; import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
@ -27,10 +33,14 @@ import gov.nist.javax.sdp.MediaDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField; import gov.nist.javax.sdp.fields.URIField;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.sdp.Connection; import javax.sdp.Connection;
@ -40,6 +50,8 @@ import javax.sip.ListeningPoint;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -58,12 +70,31 @@ public class Gb28181DownloadService {
private final ProxySipConfig proxySipConfig; private final ProxySipConfig proxySipConfig;
private final SipSender sender; private final SipSender sender;
private final SipSubscribe subscribe; private final SipSubscribe subscribe;
private final VideoService videoService;
private final WvpProxyConfig wvpProxyConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public void header(HttpServletResponse response, String fileName) {
response.setContentType("video/mp4");
response.setHeader("Accept-Ranges", "none");
response.setHeader("Connection", "close");
response.setHeader("Content-Disposition",
MessageFormat.format("attachment; filename=\"{0}.mp4\"",fileName));
}
private String videoUrl(String streamId) { private String videoUrl(String streamId) {
return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.flv"); return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.flv");
} }
@SneakyThrows
private void writeErrorToResponse(HttpServletResponse response, JsonResponse<?> json) {
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
IoUtil.writeUtf8(response.getOutputStream(), false, json);
}
private int openRtpServer(String streamId, int streamMode) { private int openRtpServer(String streamId, int streamMode) {
GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo(streamId); GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo(streamId);
if (rtpInfo.getExist()) { if (rtpInfo.getExist()) {
@ -84,13 +115,45 @@ public class Gb28181DownloadService {
return openRtpServerResp.getPort(); return openRtpServerResp.getPort();
} }
public void download(String deviceCode, Date startTime, Date endTime){ @SneakyThrows
public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) {
AsyncContext asyncContext = request.startAsync();
asyncContext.start(()->{
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
try{
header(response, StringUtils.joinWith("_",
deviceCode,
DateUtil.format(startTime, DatePattern.PURE_DATETIME_FORMAT),
DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT)));
download(deviceCode, startTime,endTime).whenComplete((url, e)->{
if(e != null){
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
}
if(wvpProxyConfig.getUseFfmpeg()){
videoService.ffmpegRecord(asyncResponse, url, DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
} else {
videoService.javaCVrecord(asyncResponse, url, DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
}
asyncContext.complete();
});
} catch(Exception e) {
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
asyncContext.complete();
}
});
}
@SneakyThrows
public CompletableFuture<String> download(String deviceCode, Date startTime, Date endTime) {
List<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); List<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
if (deviceByDeviceCode.isEmpty()) { if (deviceByDeviceCode.isEmpty()) {
log.info("未能找到 设备编码 为 {} 的设备",deviceCode); String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode);
log.error("{}",reason);
throw new JsonException(reason);
} else { } else {
WvpProxyDevice device = deviceByDeviceCode.get(0); WvpProxyDevice device = deviceByDeviceCode.get(0);
download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime); return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime);
} }
} }
@ -146,14 +209,14 @@ public class Gb28181DownloadService {
CallIdHeader callId = provider.getNewCallId(); CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey); subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result, time); Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
}; };
} }
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture<String> result, long time){ public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture<String> result){
return new Flow.Subscriber<>() { return new Flow.Subscriber<>() {
private Flow.Subscription subscription; private Flow.Subscription subscription;

View File

@ -161,20 +161,16 @@ public class VideoService {
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, errorStream); PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, errorStream);
DefaultExecuteResultHandler defaultExecuteResultHandler = new DefaultExecuteResultHandler(); DefaultExecuteResultHandler defaultExecuteResultHandler = new DefaultExecuteResultHandler();
Executor executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,defaultExecuteResultHandler); Executor executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,defaultExecuteResultHandler);
// executor.setStreamHandler(streamHandler);
log.info("开始录制 {}", url); log.info("开始录制 {}", url);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
AtomicBoolean record = new AtomicBoolean(true);
scheduledExecutorService.schedule(() -> { scheduledExecutorService.schedule(() -> {
log.info("到达结束时间, 结束录制 {}", url); log.info("到达结束时间, 结束录制 {}", url);
executor.getWatchdog().destroyProcess(); executor.getWatchdog().destroyProcess();
log.info("结束录制 {}", url); log.info("结束录制 {}", url);
// try {
// outputStream.close();
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
}, time, TimeUnit.SECONDS); }, time, TimeUnit.SECONDS);
defaultExecuteResultHandler.waitFor(); defaultExecuteResultHandler.waitFor();
if(errorStream.size() > 0){
log.error("{}", errorStream);
}
} }
} }

View File

@ -45,9 +45,9 @@ proxy:
wvp: wvp:
url: http://127.0.0.1:18978 url: http://127.0.0.1:18978
user: admin user: admin
passwd: admi passwd: admin
use-ffmpeg: false use-ffmpeg: true
enable: true enable: false
gb28181: gb28181:
sip: sip:
id: 44050100002000000003 id: 44050100002000000003
@ -60,15 +60,15 @@ proxy:
ffmpeg-support: ffmpeg-support:
task: task:
max: 4 max: 4
ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe ffmpeg: C:\ffmpeg\bin\ffmpeg.exe
ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe
rtp: rtp:
input: -r -i http://10.10.10.200:5080/live/test.live.flv # input: -i http://10.10.10.200:5080/live/test.live.flv
# input: -re -i input: -re -i
output: -vcodec copy -acodec copy -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp output: -vcodec copy -acodec copy -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
debug: debug:
download: false download: false
input: true input: false
output: false output: false
# [可选] 日志配置, 一般不需要改 # [可选] 日志配置, 一般不需要改

View File

@ -50,7 +50,9 @@ proxy:
user: admin user: admin
passwd: admin passwd: admin
use-wvp-assist: false use-wvp-assist: false
enable: true # 是否使用 wvp 的 api(wvp 的 并发有问题,仅保留用于兼容), 否则使用sip 信令直接操作设备
enable: false
# 是否使用 ffmpeg 编/解码, 否则使用内置 javacv
use-ffmpeg: false use-ffmpeg: false
gb28181: gb28181:
sip: sip:
@ -66,7 +68,8 @@ proxy:
ffmpeg-support: ffmpeg-support:
task: task:
max: 4 max: 4
ffmpeg: /usr/bin/ffmpeg/ffmpeg # ffmpeg: /usr/bin/ffmpeg/ffmpeg
ffmpeg: C:\ffmpeg\bin\ffmpeg.exe
ffprobe: /usr/bin/ffmpeg/ffprobe ffprobe: /usr/bin/ffmpeg/ffprobe
rtp: rtp:
input: -i input: -i