Compare commits
No commits in common. "master" and "feature/record_info" have entirely different histories.
master
...
feature/re
@ -4,7 +4,7 @@ import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
|
||||
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
|
||||
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
|
||||
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO;
|
||||
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.device.control.DeviceControlService;
|
||||
@ -57,7 +57,8 @@ public class Gb28181Controller {
|
||||
}
|
||||
|
||||
@PostJson("/recordInfo")
|
||||
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> recordInfo(RecordInfoDTO dto){
|
||||
return recordInfoService.requestRecordInfo(dto);
|
||||
public JsonResponse<RecordInfoRequestDTO> recordInfo(RecordInfoDTO dto){
|
||||
recordInfoService.requestRecordInfo(dto);
|
||||
return JsonResponse.success(null);
|
||||
}
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ public class VideoController {
|
||||
if(proxyConfig.getEnable()){
|
||||
wvpService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime());
|
||||
} else {
|
||||
gb28181DownloadService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime(), req.getFileHeader(), req.getUseDownload());
|
||||
gb28181DownloadService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime(), req.getFileHeader());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,9 +33,6 @@ public class VideoReq {
|
||||
@Schema(description = "http 头是否需要文件名 (没有文件名时浏览器会试图直接播放,会导致短时间内重复访问同一设备,导致失败)")
|
||||
private Boolean fileHeader = true;
|
||||
|
||||
@Schema(description = "使用哪种方式拉取历史视频 (true 为 使用 Download 方式拉取 4倍速流, false 为 使用 Playback 原始速率拉取 视频回放)")
|
||||
private Boolean useDownload = true;
|
||||
|
||||
public void setDevice_id(String deviceCode){
|
||||
this.deviceCode = deviceCode;
|
||||
}
|
||||
@ -61,8 +58,4 @@ public class VideoReq {
|
||||
public void setEnd_time(Date endTime){
|
||||
this.endTime = endTime;
|
||||
}
|
||||
|
||||
public void setUse_download(Boolean useDownload){
|
||||
this.useDownload = useDownload;
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,4 @@ public class FfmpegConfig {
|
||||
private Boolean input = false;
|
||||
private Boolean output = false;
|
||||
}
|
||||
|
||||
private Boolean useTmpFile = true;
|
||||
private String tmpDir = System.getProperty("java.io.tmpdir");
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ public class Gb28181DeviceVideoApiConfig {
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class Offset {
|
||||
private Duration forward = Duration.of(0, ChronoUnit.SECONDS);
|
||||
private Duration forward = Duration.of(30, ChronoUnit.SECONDS);
|
||||
private Duration back= Duration.of(0, ChronoUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +0,0 @@
|
||||
package cn.skcks.docking.gb28181.wvp.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Data
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "media")
|
||||
public class MediaRtmpConfig {
|
||||
private int rtmpPort = 1935;
|
||||
}
|
@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.wvp.config;
|
||||
|
||||
import cn.skcks.docking.gb28181.config.sip.SipConfig;
|
||||
import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@ -10,7 +11,6 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.ListeningPoint;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "proxy.gb28181.sip", ignoreInvalidFields = true)
|
||||
@ -44,14 +44,6 @@ public class ProxySipConfig {
|
||||
*/
|
||||
private String proxyMediaUrl = "";
|
||||
|
||||
/**
|
||||
* 调用 视频下载之前 是否使用 recordInfo 查询
|
||||
*/
|
||||
private boolean useRecordInfoQueryBeforeDownload = true;
|
||||
private int retryRecordInfoQueryBeforeDownloadTimes = 20;
|
||||
private long retryRecordInfoQueryBeforeDownloadInterval = 3;
|
||||
private TimeUnit retryRecordInfoQueryBeforeDownloadIntervalUnit = TimeUnit.SECONDS;
|
||||
|
||||
@Bean
|
||||
public SipConfig sipConfig(){
|
||||
SipConfig sipConfig = new SipConfig();
|
||||
|
@ -1,17 +0,0 @@
|
||||
package cn.skcks.docking.gb28181.wvp.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "report")
|
||||
public class ReportConfig {
|
||||
private Boolean enabled = false;
|
||||
private String url;
|
||||
private Map<String, String> customHeaders = new HashMap<>();
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
package cn.skcks.docking.gb28181.wvp.dto.report;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
@Schema(description = "上报信息")
|
||||
public class ReportReq {
|
||||
@Schema(description = "上报消息id")
|
||||
private String id;
|
||||
@Schema(description = "设备编码")
|
||||
private String deviceCode;
|
||||
@Schema(description = "设备id/通道id")
|
||||
private String deviceId;
|
||||
@Schema(description = "点播时长")
|
||||
private String durationTime;
|
||||
@Schema(description = "点播时长")
|
||||
private TimeRange timeRange;
|
||||
|
||||
@Schema(description = "日志记录时间")
|
||||
private String logTime;
|
||||
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public static class TimeRange {
|
||||
@Schema(description = "开始时间")
|
||||
private String startTime;
|
||||
|
||||
@Schema(description = "结束时间")
|
||||
private String endTime;
|
||||
}
|
||||
|
||||
@Schema(description = "文件大小, 未知大小为 -1")
|
||||
private String fileSize;
|
||||
}
|
@ -17,40 +17,30 @@ import java.util.concurrent.TimeUnit;
|
||||
public class FfmpegSupportService {
|
||||
private final FfmpegConfig ffmpegConfig;
|
||||
|
||||
@SneakyThrows
|
||||
public Executor downloadToStream(String input, String out, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler) {
|
||||
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
|
||||
FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
|
||||
String inputParam = debug.getDownload() ? rtp.getDownload() : StringUtils.joinWith(" ", "-y", rtp.getDownload(), input);
|
||||
log.info("视频输入参数 {}", inputParam);
|
||||
|
||||
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", "-t", unit.toSeconds(time), rtp.getOutput(), out);
|
||||
log.info("视频输出参数 {}", outputParam);
|
||||
|
||||
return ffmpegExecutor(inputParam, outputParam, unit.toSeconds(time) + 60, TimeUnit.SECONDS, streamHandler, executeResultHandler);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public Executor downloadToStream(String input, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler) {
|
||||
return downloadToStream(input, "-", time, unit, streamHandler, executeResultHandler);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public Executor playbackToStream(String input, String out, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler){
|
||||
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
|
||||
FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
|
||||
String inputParam = debug.getInput() ? rtp.getInput() : StringUtils.joinWith(" ", "-y", rtp.getInput(), input);
|
||||
String inputParam = debug.getDownload() ? rtp.getDownload() : StringUtils.joinWith(" ", rtp.getDownload(), input);
|
||||
log.info("视频输入参数 {}", inputParam);
|
||||
|
||||
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), out);
|
||||
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), "-");
|
||||
log.info("视频输出参数 {}", outputParam);
|
||||
|
||||
return ffmpegExecutor(inputParam, outputParam, unit.toSeconds(time) + 60, TimeUnit.SECONDS, streamHandler, executeResultHandler);
|
||||
return ffmpegExecutor(inputParam, outputParam, time, unit, streamHandler, executeResultHandler);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public Executor playbackToStream(String input, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler) {
|
||||
return playbackToStream(input, "-", time, unit, streamHandler, executeResultHandler);
|
||||
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
|
||||
FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
|
||||
String inputParam = debug.getInput() ? rtp.getInput() : StringUtils.joinWith(" ", rtp.getInput(), input);
|
||||
log.info("视频输入参数 {}", inputParam);
|
||||
|
||||
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), "-");
|
||||
log.info("视频输出参数 {}", outputParam);
|
||||
|
||||
return ffmpegExecutor(inputParam, outputParam, time, unit, streamHandler, executeResultHandler);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
|
@ -1,6 +1,9 @@
|
||||
package cn.skcks.docking.gb28181.wvp.service.gb28181;
|
||||
|
||||
import cn.hutool.core.date.*;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateUnit;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonException;
|
||||
@ -22,28 +25,18 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
|
||||
import cn.skcks.docking.gb28181.sdp.GB28181Description;
|
||||
import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder;
|
||||
import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode;
|
||||
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
|
||||
import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
|
||||
import cn.skcks.docking.gb28181.wvp.config.MediaRtmpConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.executor.DefaultVideoExecutor;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
|
||||
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.record.RecordInfoService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO;
|
||||
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
|
||||
import cn.skcks.docking.gb28181.wvp.utils.RetryUtil;
|
||||
import com.github.rholder.retry.Retryer;
|
||||
import com.github.rholder.retry.RetryerBuilder;
|
||||
import com.github.rholder.retry.StopStrategies;
|
||||
import com.github.rholder.retry.WaitStrategies;
|
||||
import gov.nist.javax.sdp.MediaDescriptionImpl;
|
||||
import gov.nist.javax.sdp.fields.TimeField;
|
||||
import gov.nist.javax.sdp.fields.URIField;
|
||||
@ -55,7 +48,6 @@ import jakarta.servlet.http.HttpServletResponse;
|
||||
import lombok.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
@ -68,10 +60,8 @@ import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.MessageFormat;
|
||||
import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.*;
|
||||
@ -90,14 +80,7 @@ public class Gb28181DownloadService {
|
||||
private final SipSubscribe subscribe;
|
||||
private final VideoService videoService;
|
||||
private final WvpProxyConfig wvpProxyConfig;
|
||||
private final RecordInfoService recordInfoService;
|
||||
private final MediaRtmpConfig mediaRtmpConfig;
|
||||
|
||||
@Qualifier(DefaultVideoExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final Executor executor;
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(64);
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ConcurrentMap<String, DeferredResult<JsonResponse<String>>> requestMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final RealtimeManager realtimeManager;
|
||||
@ -134,19 +117,8 @@ public class Gb28181DownloadService {
|
||||
MessageFormat.format("attachment; filename=\"{0}.mp4\"",fileName));
|
||||
}
|
||||
|
||||
private String videoRtmpUrl(String streamId) {
|
||||
String rtmpSchema = "rtmp://" + zlmMediaConfig.getIp() + ":" + mediaRtmpConfig.getRtmpPort();
|
||||
return StringUtils.joinWith("/", rtmpSchema, "rtp", streamId);
|
||||
}
|
||||
|
||||
private String videoWsUrl(String rtmpUrl){
|
||||
String rtmpSchema = "rtmp://" + zlmMediaConfig.getIp() + ":" + mediaRtmpConfig.getRtmpPort();
|
||||
if(StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl())){
|
||||
return StringUtils.replace(rtmpUrl + ".live.flv", rtmpSchema, proxySipConfig.getProxyMediaUrl());
|
||||
} else {
|
||||
String wsSchema = StringUtils.replace(zlmMediaConfig.getUrl(), "http://", "ws://");
|
||||
return StringUtils.replace(rtmpUrl + ".live.flv", rtmpSchema, wsSchema);
|
||||
}
|
||||
private String videoUrl(String streamId) {
|
||||
return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.mp4");
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@ -169,7 +141,7 @@ public class Gb28181DownloadService {
|
||||
openRtpServer.setStreamId(streamId);
|
||||
openRtpServer.setTcpMode(streamMode);
|
||||
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer);
|
||||
log.info("openRtpServerResp => {} => {}", streamId, openRtpServerResp);
|
||||
log.info("openRtpServerResp => {}", openRtpServerResp);
|
||||
if (!openRtpServerResp.getCode().equals(ResponseStatus.Success)) {
|
||||
log.error("{}", openRtpServerResp.getCode().getMsg());
|
||||
return -1;
|
||||
@ -178,76 +150,28 @@ public class Gb28181DownloadService {
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@SuppressWarnings({"UnstableApiUsage", "unchecked"})
|
||||
public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime, Boolean fileHeader, Boolean useDownload) {
|
||||
public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime, Boolean fileHeader) {
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(DateUtil.between(startTime, DateUtil.offsetSecond(endTime, 60), DateUnit.MS));
|
||||
asyncContext.setTimeout(0);
|
||||
asyncContext.start(()->{
|
||||
DateTime start = DateUtil.date();
|
||||
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
|
||||
try{
|
||||
if(proxySipConfig.isUseRecordInfoQueryBeforeDownload()){
|
||||
String name = MessageFormat.format("{0} {1}-{2}", deviceCode, startTime, endTime);
|
||||
Retryer<JsonResponse<List<RecordInfoItemVO>>> retryer = RetryerBuilder.<JsonResponse<List<RecordInfoItemVO>>>newBuilder()
|
||||
// 异常就重试
|
||||
.retryIfException()
|
||||
.retryIfRuntimeException()
|
||||
// 重试间隔
|
||||
.withWaitStrategy(WaitStrategies.fixedWait(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadInterval(), proxySipConfig.getRetryRecordInfoQueryBeforeDownloadIntervalUnit()))
|
||||
// 重试次数
|
||||
.withStopStrategy(StopStrategies.stopAfterAttempt(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadTimes()))
|
||||
.retryIfResult((result) -> {
|
||||
log.info("{}", result);
|
||||
return result == null ||
|
||||
result.getCode() != Response.OK ||
|
||||
result.getData() == null ||
|
||||
result.getData().isEmpty();
|
||||
})
|
||||
.withRetryListener(RetryUtil.defaultRetryListener(name)).build();
|
||||
|
||||
retryer.call(()->{
|
||||
CompletableFuture<JsonResponse<List<RecordInfoItemVO>>> future = new CompletableFuture<>();
|
||||
future.completeOnTimeout(JsonResponse.error("录像查询超时"),1,TimeUnit.MINUTES);
|
||||
// 发起设备录像查询
|
||||
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestedRecordInfo =
|
||||
recordInfoService.requestRecordInfo(new RecordInfoDTO(deviceCode, startTime, endTime, "", 0, "all"));
|
||||
|
||||
requestedRecordInfo.setResultHandler(result -> {
|
||||
future.complete((JsonResponse<List<RecordInfoItemVO>>) result);
|
||||
});
|
||||
requestedRecordInfo.onError((throwable)->{
|
||||
future.complete(JsonResponse.error(throwable.getMessage()));
|
||||
});
|
||||
|
||||
return future.get();
|
||||
});
|
||||
}
|
||||
|
||||
download(deviceCode, startTime, endTime, useDownload, true).whenComplete((videoInfo, e) -> {
|
||||
writeFileHeader(response, deviceCode, startTime, endTime, fileHeader);
|
||||
log.info("videoInfo {}", videoInfo);
|
||||
if (e != null) {
|
||||
download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{
|
||||
writeFileHeader(response,deviceCode,startTime,endTime,fileHeader);
|
||||
if(e != null){
|
||||
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
|
||||
} else if (videoInfo == null) {
|
||||
} else if(videoInfo == null){
|
||||
writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败"));
|
||||
} else if (wvpProxyConfig.getUseFfmpeg()) {
|
||||
log.info("开始 ffmpeg 录制, deviceCode {}, startTime {}, endTime {}", deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
|
||||
videoService.ffmpegRecord(request, asyncResponse, videoInfo.getUrl(), startTime, endTime, DateUtil.between(startTime, endTime, DateUnit.SECOND), videoInfo.getDevice(), videoInfo.getCallId());
|
||||
DateTime end = DateUtil.date();
|
||||
asyncContext.complete();
|
||||
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start, end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
|
||||
} else if(wvpProxyConfig.getUseFfmpeg()){
|
||||
videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId());
|
||||
} else {
|
||||
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime, endTime, DateUnit.SECOND) + 15);
|
||||
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
|
||||
}
|
||||
asyncContext.complete();
|
||||
});
|
||||
} catch(Exception e) {
|
||||
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
|
||||
} finally {
|
||||
if(!wvpProxyConfig.getUseFfmpeg()){
|
||||
DateTime end = DateUtil.date();
|
||||
asyncContext.complete();
|
||||
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start,end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
|
||||
}
|
||||
asyncContext.complete();
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -275,11 +199,11 @@ public class Gb28181DownloadService {
|
||||
|
||||
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求
|
||||
scheduledExecutorService.schedule(()->{
|
||||
download(deviceCode, startTime, endTime, false).whenComplete((videoInfo, e)->{
|
||||
download(deviceCode, startTime, endTime).whenComplete((videoInfo, e)->{
|
||||
log.info("获取媒体信息 {}", videoInfo);
|
||||
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
|
||||
String existCallId = RedisUtil.StringOps.get(cacheKey);
|
||||
// 到达时间后 延迟 10秒 主动结束, 防止某些设备不会主动结束
|
||||
// 到达时间后主动结束, 防止某些设备不会主动结束
|
||||
scheduledExecutorService.schedule(()->{
|
||||
log.info("到达结束时间 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId());
|
||||
String deviceIp = docking.getIp();
|
||||
@ -292,10 +216,11 @@ public class Gb28181DownloadService {
|
||||
zlmMediaService.closeRtpServer(CloseRtpServer.builder()
|
||||
.streamId(videoInfo.streamId)
|
||||
.build());
|
||||
}, time + Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS);
|
||||
}, time, TimeUnit.MILLISECONDS);
|
||||
|
||||
String url = videoWsUrl(videoInfo.getUrl());
|
||||
url = StringUtils.replaceOnce(url, ".live.flv", ".live.mp4");
|
||||
String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ?
|
||||
StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()):
|
||||
videoInfo.getUrl();
|
||||
result.setResult(JsonResponse.success(url));
|
||||
});
|
||||
}, 200, TimeUnit.MILLISECONDS);
|
||||
@ -335,7 +260,9 @@ public class Gb28181DownloadService {
|
||||
|
||||
// 原始链接转换为前端可用的链接
|
||||
RedisUtil.StringOps.set(CacheUtil.getKey(GB28181SDPBuilder.Action.PLAY.getAction(), videoInfo.getCallId()), JsonUtils.toJson(videoInfo));
|
||||
String url = videoWsUrl(videoInfo.getUrl());
|
||||
String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ?
|
||||
StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()):
|
||||
videoInfo.getUrl();
|
||||
videoInfo.setUrl(url);
|
||||
|
||||
realtimeManager.addPlaying(deviceCode, videoInfo);
|
||||
@ -366,35 +293,24 @@ public class Gb28181DownloadService {
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
asyncContext.start(()->{
|
||||
DateTime start = DateUtil.date();
|
||||
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
|
||||
try{
|
||||
download(deviceCode, startTime,endTime, true).whenComplete((videoInfo, e)->{
|
||||
download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{
|
||||
streamHeader(asyncResponse);
|
||||
if(e != null){
|
||||
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
|
||||
} else if(videoInfo == null){
|
||||
writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败"));
|
||||
} else if(wvpProxyConfig.getUseFfmpeg()){
|
||||
log.info("开始 ffmpeg 录制, deviceCode {}, startTime {}, endTime {}", deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
|
||||
videoService.ffmpegRecord(request, asyncResponse, videoInfo.getUrl(), startTime, endTime, DateUtil.between(startTime, endTime, DateUnit.SECOND), videoInfo.getDevice(), videoInfo.getCallId());
|
||||
DateTime end = DateUtil.date();
|
||||
asyncContext.complete();
|
||||
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start, end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
|
||||
|
||||
videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId());
|
||||
} else {
|
||||
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 15);
|
||||
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
|
||||
}
|
||||
asyncContext.complete();
|
||||
});
|
||||
} catch(Exception e) {
|
||||
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
|
||||
|
||||
} finally {
|
||||
if(!wvpProxyConfig.getUseFfmpeg()){
|
||||
DateTime end = DateUtil.date();
|
||||
asyncContext.complete();
|
||||
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start,end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
|
||||
}
|
||||
asyncContext.complete();
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -455,13 +371,13 @@ public class Gb28181DownloadService {
|
||||
}
|
||||
String ssrc = ssrcService.getPlaySsrc();
|
||||
GB28181Description gb28181Description = GB28181SDPBuilder.Receiver.play(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode);
|
||||
sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result, false));
|
||||
sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture<VideoInfo> result, Boolean prefetch) {
|
||||
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture<VideoInfo> result) {
|
||||
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
|
||||
String existCallId = RedisUtil.StringOps.get(cacheKey);
|
||||
|
||||
@ -480,7 +396,7 @@ public class Gb28181DownloadService {
|
||||
CallIdHeader callId = provider.getNewCallId();
|
||||
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
|
||||
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
|
||||
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS, prefetch);
|
||||
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS);
|
||||
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
|
||||
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
|
||||
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
|
||||
@ -488,12 +404,8 @@ public class Gb28181DownloadService {
|
||||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime, Boolean prefetch) {
|
||||
return download(deviceCode,startTime,endTime, proxySipConfig.isUsePlaybackToDownload(), prefetch);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch) {
|
||||
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime) {
|
||||
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
|
||||
if (deviceByDeviceCode.isEmpty()) {
|
||||
String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode);
|
||||
@ -501,12 +413,12 @@ public class Gb28181DownloadService {
|
||||
throw new JsonException(reason);
|
||||
} else {
|
||||
WvpProxyDevice device = deviceByDeviceCode.get();
|
||||
return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime, useDownload, prefetch);
|
||||
return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime);
|
||||
}
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public CompletableFuture<VideoInfo> download(String gbDeviceId, String channel, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch){
|
||||
public CompletableFuture<VideoInfo> download(String gbDeviceId, String channel, Date startTime, Date endTime){
|
||||
CompletableFuture<VideoInfo> result = new CompletableFuture<>();
|
||||
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
|
||||
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
||||
@ -544,7 +456,7 @@ public class Gb28181DownloadService {
|
||||
timeField.setStopTime(end);
|
||||
TimeDescription timeDescription = SdpFactory.getInstance().createTimeDescription(timeField);
|
||||
GB28181SDPBuilder.Action action = GB28181SDPBuilder.Action.DOWNLOAD;
|
||||
if(useDownload == null ? proxySipConfig.isUsePlaybackToDownload(): !useDownload){
|
||||
if(proxySipConfig.isUsePlaybackToDownload()){
|
||||
action = GB28181SDPBuilder.Action.PLAY_BACK;
|
||||
}
|
||||
GB28181Description gb28181Description = GB28181SDPBuilder.Receiver.build(action, gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode, timeDescription);
|
||||
@ -554,18 +466,18 @@ public class Gb28181DownloadService {
|
||||
if(proxySipConfig.getStreamMode() != MediaStreamMode.UDP){
|
||||
media.getMedia().setProtocol("RTP/AVP/TCP");
|
||||
}
|
||||
if(useDownload == null ? !proxySipConfig.isUsePlaybackToDownload(): useDownload){
|
||||
if(!proxySipConfig.isUsePlaybackToDownload()){
|
||||
media.setAttribute("downloadspeed", String.valueOf(4));
|
||||
}
|
||||
URIField uriField = new URIField();
|
||||
uriField.setURI(StringUtils.joinWith(":", channel, "0"));
|
||||
gb28181Description.setURI(uriField);
|
||||
sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time, prefetch));
|
||||
sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time));
|
||||
return result;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time, Boolean prefetch) {
|
||||
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time) {
|
||||
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
|
||||
String existCallId = RedisUtil.StringOps.get(cacheKey);
|
||||
|
||||
@ -584,21 +496,18 @@ public class Gb28181DownloadService {
|
||||
CallIdHeader callId = provider.getNewCallId();
|
||||
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
|
||||
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
|
||||
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS, prefetch);
|
||||
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS);
|
||||
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
|
||||
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
|
||||
if(prefetch){
|
||||
// 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流
|
||||
result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device));
|
||||
}
|
||||
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
|
||||
};
|
||||
}
|
||||
|
||||
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit, Boolean prefetch){
|
||||
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit){
|
||||
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
|
||||
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
|
||||
private Flow.Subscription subscription;
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
this.subscription = subscription;
|
||||
@ -617,21 +526,15 @@ public class Gb28181DownloadService {
|
||||
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
|
||||
log.info("收到响应状态 {}", statusCode);
|
||||
String callId = item.getCallId().getCallId();
|
||||
if(!prefetch){
|
||||
// 待 相应 200OK 后再返回, 用于对延迟不敏感的实时请求
|
||||
result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), item.getCallId().getCallId(), device));
|
||||
}
|
||||
|
||||
scheduledExecutorService.schedule(()->{
|
||||
sender.sendRequest(((provider, ip, port) -> {
|
||||
String fromTag = item.getFromTag();
|
||||
String toTag = item.getToTag();
|
||||
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||
subscribe.getByeSubscribe().addPublisher(key);
|
||||
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit));
|
||||
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
|
||||
}));
|
||||
},200,TimeUnit.MILLISECONDS);
|
||||
sender.sendRequest(((provider, ip, port) -> {
|
||||
String fromTag = item.getFromTag();
|
||||
String toTag = item.getToTag();
|
||||
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||
subscribe.getByeSubscribe().addPublisher(key);
|
||||
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit));
|
||||
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
|
||||
}));
|
||||
result.complete(new VideoInfo(streamId,videoUrl(streamId), callId, device));
|
||||
} else {
|
||||
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
|
||||
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));
|
||||
|
@ -1,12 +1,8 @@
|
||||
package cn.skcks.docking.gb28181.wvp.service.record;
|
||||
|
||||
import cn.skcks.docking.gb28181.common.json.JsonException;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
|
||||
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
|
||||
@ -15,17 +11,16 @@ import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.subscribe.RecordSubscribe;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -33,11 +28,12 @@ import java.util.Optional;
|
||||
public class RecordInfoService {
|
||||
private final SipSender sipSender;
|
||||
private final SipSubscribe sipSubscribe;
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final DockingService dockingService;
|
||||
private final DeviceService deviceService;
|
||||
|
||||
@SneakyThrows
|
||||
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(RecordInfoDTO dto){
|
||||
public void requestRecordInfo(RecordInfoDTO dto){
|
||||
String deviceCode = dto.getDeviceCode();
|
||||
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
|
||||
if (deviceByDeviceCode.isEmpty()) {
|
||||
@ -46,23 +42,20 @@ public class RecordInfoService {
|
||||
throw new JsonException(reason);
|
||||
} else {
|
||||
WvpProxyDevice device = deviceByDeviceCode.get();
|
||||
return requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto);
|
||||
requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto);
|
||||
}
|
||||
}
|
||||
|
||||
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){
|
||||
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result = new DeferredResult<>();
|
||||
public void requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){
|
||||
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
|
||||
if(deviceByGbDeviceId.isEmpty()){
|
||||
log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId);
|
||||
result.setResult(JsonResponse.error(MessageFormat.format("未能找到 设备编码 为 {0} 的设备", gbDeviceId)));
|
||||
return result;
|
||||
return;
|
||||
}
|
||||
Optional<WvpProxyDevice> deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel);
|
||||
if (deviceByGbDeviceIdAndChannel.isEmpty()) {
|
||||
log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel);
|
||||
result.setResult(JsonResponse.error(MessageFormat.format("未能找到 编码 {0}, 通道 {1} 的设备", gbDeviceId, channel)));
|
||||
return result;
|
||||
return;
|
||||
}
|
||||
WvpProxyDocking device = deviceByGbDeviceId.get();
|
||||
String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000);
|
||||
@ -76,12 +69,7 @@ public class RecordInfoService {
|
||||
.filePath(dto.getFilePath())
|
||||
.indistinctQuery(0)
|
||||
.build();
|
||||
|
||||
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channel, sn);
|
||||
sipSubscribe.getMessageSubscribe().addPublisher(key);
|
||||
sipSubscribe.getMessageSubscribe().addSubscribe(key, new RecordSubscribe(sipSubscribe, key, result, gbDeviceId));
|
||||
sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,SipRequestBuilder.getCSeq(), XmlUtils.toXml(recordInfoRequestDTO), SipUtil.generateViaTag(),
|
||||
SipUtil.generateFromTag(), provider.getNewCallId()));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -3,15 +3,11 @@ package cn.skcks.docking.gb28181.wvp.service.record.dto;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.skcks.docking.gb28181.constant.GB28181Constant;
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.format.annotation.DateTimeFormat;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public class RecordInfoDTO {
|
||||
/**
|
||||
|
@ -1,20 +0,0 @@
|
||||
package cn.skcks.docking.gb28181.wvp.service.report;
|
||||
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.media.feign.IgnoreSSLFeignClientConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.dto.report.ReportReq;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestHeader;
|
||||
|
||||
@FeignClient(
|
||||
name = "ReportServiceProxy",
|
||||
url = "${report.url}",
|
||||
configuration = {IgnoreSSLFeignClientConfig.class}
|
||||
)
|
||||
public interface ReportApi {
|
||||
@PostMapping
|
||||
JsonResponse<?> report(@RequestHeader MultiValueMap<String, String> headers, @RequestBody ReportReq body);
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
package cn.skcks.docking.gb28181.wvp.service.report;
|
||||
|
||||
import cn.hutool.core.date.BetweenFormatter;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.io.unit.DataSizeUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.skcks.docking.gb28181.wvp.config.ReportConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.dto.report.ReportReq;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ReportService {
|
||||
private final ReportApi reportApi;
|
||||
private final ReportConfig reportConfig;
|
||||
|
||||
public void report(HttpServletRequest request, WvpProxyDevice device, Date startTime, Date endTime, long fileSize) {
|
||||
if(!reportConfig.getEnabled()){
|
||||
return;
|
||||
}
|
||||
|
||||
ReportReq reportReq = new ReportReq(IdUtil.fastUUID(),
|
||||
device.getDeviceCode(),
|
||||
device.getGbDeviceChannelId(),
|
||||
DateUtil.formatBetween(startTime, endTime, BetweenFormatter.Level.SECOND),
|
||||
new ReportReq.TimeRange(DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime)),
|
||||
DateUtil.now(),
|
||||
DataSizeUtil.format(fileSize));
|
||||
LinkedMultiValueMap<String, String> headers = new LinkedMultiValueMap<>();
|
||||
reportConfig.getCustomHeaders().forEach(headers::add);
|
||||
request.getHeaderNames().asIterator().forEachRemaining(headerKey -> {
|
||||
String header = request.getHeader(headerKey);
|
||||
headers.add(headerKey, header);
|
||||
});
|
||||
headers.add("X-Client-Ip", request.getRemoteAddr());
|
||||
log.info("上报调用信息 {}", reportReq);
|
||||
|
||||
try{
|
||||
reportApi.report(headers, reportReq);
|
||||
} catch (Exception e){
|
||||
log.error("上报调用信息失败", e);
|
||||
}
|
||||
}
|
||||
}
|
@ -3,16 +3,13 @@ package cn.skcks.docking.gb28181.wvp.service.video;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUnit;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||
import cn.skcks.docking.gb28181.wvp.config.FfmpegConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
|
||||
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
|
||||
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.ffmpeg.FfmpegSupportService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.report.ReportService;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
|
||||
import jakarta.servlet.AsyncContext;
|
||||
@ -34,8 +31,9 @@ import org.bytedeco.javacv.FFmpegFrameRecorder;
|
||||
import org.bytedeco.javacv.FrameGrabber;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Date;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@ -52,10 +50,6 @@ public class VideoService {
|
||||
private final ProxySipConfig proxySipConfig;
|
||||
private final DockingService dockingService;
|
||||
private final SipSender sender;
|
||||
private final FfmpegConfig ffmpegConfig;
|
||||
private final ReportService reportService;
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
/**
|
||||
* 写入 flv 响应头信息
|
||||
* @param response HttpServletResponse 响应
|
||||
@ -234,63 +228,31 @@ public class VideoService {
|
||||
* @param time 录制时长 (单位: 秒)
|
||||
*/
|
||||
@SneakyThrows
|
||||
public void ffmpegRecord(HttpServletRequest request, ServletResponse response, String url, Date startTime, Date endTime, long time, WvpProxyDevice device, String callId){
|
||||
String tmpDir = ffmpegConfig.getTmpDir();
|
||||
String fileName = callId + ".mp4";
|
||||
File file = new File(tmpDir, fileName);
|
||||
Executor executor;
|
||||
public void ffmpegRecord(ServletResponse response, String url, long time, WvpProxyDevice device,String callId){
|
||||
ServletOutputStream outputStream = response.getOutputStream();
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, errorStream);
|
||||
DefaultExecuteResultHandler executeResultHandler = mediaStatus(device,callId);
|
||||
if(ffmpegConfig.getUseTmpFile()) {
|
||||
OutputStream outputStream = new PipedOutputStream();
|
||||
String filePath = file.getAbsolutePath();
|
||||
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, System.err);
|
||||
if(proxySipConfig.isUsePlaybackToDownload()){
|
||||
executor = ffmpegSupportService.playbackToStream(url, filePath, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
|
||||
} else {
|
||||
executor = ffmpegSupportService.downloadToStream(url, filePath, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
|
||||
}
|
||||
DateTime startTime = DateUtil.date();
|
||||
Executor executor;
|
||||
if(proxySipConfig.isUsePlaybackToDownload()){
|
||||
executor = ffmpegSupportService.playbackToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
|
||||
} else {
|
||||
OutputStream outputStream = response.getOutputStream();
|
||||
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, System.err);
|
||||
if(proxySipConfig.isUsePlaybackToDownload()){
|
||||
executor = ffmpegSupportService.playbackToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
|
||||
} else {
|
||||
executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
|
||||
}
|
||||
executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
|
||||
}
|
||||
|
||||
DateTime start = DateUtil.date();
|
||||
log.info("开始录制 {}", url);
|
||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
|
||||
log.info("到达结束时间, 结束录制 {}", url);
|
||||
executor.getWatchdog().destroyProcess();
|
||||
log.info("结束录制 {}", url);
|
||||
}, time + 60, TimeUnit.SECONDS);
|
||||
}, time, TimeUnit.SECONDS);
|
||||
executeResultHandler.waitFor();
|
||||
schedule.cancel(true);
|
||||
DateTime end = DateUtil.date();
|
||||
log.info("录制进程结束 {}, 录制耗时: {}", url, DateUtil.between(start,end, DateUnit.SECOND));
|
||||
|
||||
if(ffmpegConfig.getUseTmpFile()) {
|
||||
ServletOutputStream servletOutputStream = response.getOutputStream();
|
||||
try{
|
||||
log.info("临时文件 {}(大小 {})", file.getAbsolutePath(), file.length());
|
||||
IoUtil.copy(new FileInputStream(file), servletOutputStream);
|
||||
response.flushBuffer();
|
||||
reportService.report(request, device, startTime, endTime, file.length());
|
||||
} catch (Exception e){
|
||||
reportService.report(request, device, startTime, endTime, -1);
|
||||
log.error("写入 http 响应异常: {}", e.getMessage());
|
||||
} finally {
|
||||
System.gc();
|
||||
boolean delete = file.delete();
|
||||
log.info("删除临时文件 {} => {}", file, delete);
|
||||
}
|
||||
}
|
||||
DateTime endTime = DateUtil.date();
|
||||
log.info("录制进程结束 {}, 录制耗时: {}", url, DateUtil.between(startTime,endTime, DateUnit.SECOND));
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 录制视频 并写入 异步响应
|
||||
* @param response AsyncContext.getResponse 异步响应
|
||||
|
@ -28,7 +28,6 @@ import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient;
|
||||
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.download.DownloadService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.report.ReportService;
|
||||
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
|
||||
import cn.skcks.docking.gb28181.wvp.utils.RetryUtil;
|
||||
import com.github.rholder.retry.*;
|
||||
@ -66,8 +65,6 @@ public class WvpService {
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ConcurrentMap<String, ScheduledFuture<?>> playing = new ConcurrentHashMap<>();
|
||||
|
||||
private final ReportService reportService;
|
||||
|
||||
public void header(HttpServletResponse response) {
|
||||
response.setContentType("video/mp4");
|
||||
response.setHeader("Accept-Ranges", "none");
|
||||
@ -116,7 +113,6 @@ public class WvpService {
|
||||
String reason = MessageFormat.format("调用 wvp api 查询设备({0})历史失败, 异常: {1}", deviceCode, e.getMessage());
|
||||
writeErrorToResponse(asyncResponse, JsonResponse.error(reason));
|
||||
} finally {
|
||||
reportService.report(request,wvpProxyDevice, startTime, endTime,-1);
|
||||
log.info("asyncContext 结束");
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.M
|
||||
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
|
||||
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
|
||||
import cn.skcks.docking.gb28181.wvp.sip.message.message.notify.MediaStatusRequestDTO;
|
||||
@ -82,13 +81,6 @@ public class MessageRequestProcessor implements MessageProcessor {
|
||||
if(StringUtils.equalsAnyIgnoreCase(messageDto.getCmdType(), CmdType.KEEPALIVE)){
|
||||
response = ok;
|
||||
// 更新设备在线状态
|
||||
} else if(messageDto.getCmdType().equalsIgnoreCase(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO)) {
|
||||
response = ok;
|
||||
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
|
||||
String key = GenericSubscribe.Helper.getKey(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
|
||||
Optional.ofNullable(subscribe.getMessageSubscribe().getPublisher(key))
|
||||
.ifPresentOrElse(publisher -> publisher.submit(request),
|
||||
() -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto));
|
||||
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
|
||||
response = ok;
|
||||
CatalogResponseDTO dto = XmlUtils.parse(content, CatalogResponseDTO.class, GB28181Constant.CHARSET);
|
||||
|
@ -1,81 +0,0 @@
|
||||
package cn.skcks.docking.gb28181.wvp.sip.subscribe;
|
||||
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor;
|
||||
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
|
||||
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO;
|
||||
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
|
||||
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
|
||||
import gov.nist.javax.sip.message.SIPRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Flow;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class RecordSubscribe implements Flow.Subscriber<SIPRequest>{
|
||||
private final SipSubscribe subscribe;
|
||||
private final String key;
|
||||
private final DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result;
|
||||
private final String deviceId;
|
||||
|
||||
private final List<RecordInfoItemDTO> list = new ArrayList<>();
|
||||
private final AtomicLong atomicSum = new AtomicLong(0);
|
||||
private final AtomicLong atomicNum = new AtomicLong(0);
|
||||
private Flow.Subscription subscription;
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
this.subscription = subscription;
|
||||
log.debug("建立订阅 => {}", key);
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(SIPRequest item) {
|
||||
RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
|
||||
atomicSum.set(Math.max(data.getSumNum(), atomicNum.get()));
|
||||
atomicNum.addAndGet(data.getRecordList().getNum());
|
||||
list.addAll(data.getRecordList().getRecordList());
|
||||
long num = atomicNum.get();
|
||||
long sum = atomicSum.get();
|
||||
if(num > sum){
|
||||
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
|
||||
} else {
|
||||
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
|
||||
}
|
||||
|
||||
if (num >= sum) {
|
||||
// 针对某些不按规范的设备
|
||||
// 如果已获取数量 >= 约定的总数
|
||||
// 就执行定时任务, 若 500ms 内未收到新的数据视为已结束
|
||||
subscribe.getMessageSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
|
||||
log.debug("订阅结束 => {}", key);
|
||||
subscribe.getMessageSubscribe().delPublisher(key);
|
||||
}
|
||||
|
||||
private List<RecordInfoItemDTO> sortedRecordList(List<RecordInfoItemDTO> list){
|
||||
return list.stream().sorted((a,b)-> DateUtil.compare(a.getStartTime(),b.getStartTime())).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
@ -2,9 +2,7 @@ package cn.skcks.docking.gb28181.wvp.sip.subscribe;
|
||||
|
||||
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.InviteSubscribe;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipRequestSubscribe;
|
||||
import gov.nist.javax.sip.message.SIPRequest;
|
||||
import gov.nist.javax.sip.message.SIPResponse;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
@ -16,8 +14,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
@ -26,18 +22,15 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||
public class SipSubscribe {
|
||||
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final Executor executor;
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
|
||||
private GenericSubscribe<SIPRequest> catalogSubscribe;
|
||||
private GenericSubscribe<SIPResponse> inviteSubscribe;
|
||||
private GenericSubscribe<SIPRequest> byeSubscribe;
|
||||
private GenericTimeoutSubscribe<SIPRequest> messageSubscribe;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
catalogSubscribe = new CatalogSubscribe(executor);
|
||||
inviteSubscribe = new InviteSubscribe(executor);
|
||||
byeSubscribe = new ByeSubscribe(executor);
|
||||
messageSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@ -45,6 +38,5 @@ public class SipSubscribe {
|
||||
catalogSubscribe.close();
|
||||
inviteSubscribe.close();
|
||||
byeSubscribe.close();
|
||||
messageSubscribe.close();
|
||||
}
|
||||
}
|
||||
|
@ -9,8 +9,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
|
||||
|
||||
@EnableFeignClients(basePackages = {
|
||||
"cn.skcks.docking.gb28181.media",
|
||||
"cn.skcks.docking.gb28181.wvp.proxy",
|
||||
"cn.skcks.docking.gb28181.wvp.service.report"
|
||||
"cn.skcks.docking.gb28181.wvp.proxy"
|
||||
})
|
||||
@SpringBootApplication
|
||||
@ComponentScan(basePackages = {
|
||||
|
@ -40,80 +40,39 @@ media:
|
||||
id: amrWMKmbKqoBjRQ9
|
||||
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
|
||||
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333
|
||||
rtmp-port: 1936
|
||||
|
||||
proxy:
|
||||
wvp:
|
||||
url: http://127.0.0.1:18978
|
||||
# url: http://192.168.3.12:18978
|
||||
user: admin
|
||||
passwd: admin
|
||||
use-ffmpeg: true
|
||||
# 是否使用 wvp 的 api(wvp 的 并发有问题,仅保留用于兼容), 否则使用sip 信令直接操作设备
|
||||
enable: false
|
||||
# 是否使用 ffmpeg 编/解码, 否则使用内置 javacv
|
||||
parents:
|
||||
- 44050100002000000002
|
||||
- 44050100002000000003
|
||||
- 44050100001180000001
|
||||
- 44050100001320000001
|
||||
- 44050100001110000010
|
||||
# 用于生成 代理 wvp 的 视频流 ws-flv 地址
|
||||
#proxy-media-url: 'wss://192.168.1.241:9022/mf-config/media'
|
||||
proxy-media-url: 'ws://10.10.10.200:5080'
|
||||
# 实时视频单次点播持续时间 (默认: 15分钟)
|
||||
realtime-video-duration: 15m
|
||||
gb28181:
|
||||
sip:
|
||||
id: 44050100002000000003
|
||||
# id: 44050100002000000005
|
||||
domain: 4405010000
|
||||
password: 123456
|
||||
port: 5063
|
||||
ip:
|
||||
- 10.10.10.20
|
||||
# - 192.168.0.195
|
||||
stream-mode: udp
|
||||
use-playback-to-download: false
|
||||
# proxy-media-url: 'https://10.10.10.200:18181/media'
|
||||
proxy-media-url: 'https://10.10.10.200:5444'
|
||||
use-record-info-query-before-download: true
|
||||
retry-record-info-query-before-download-interval: 3
|
||||
retry-record-info-query-before-download-times: 20
|
||||
retry-record-info-query-before-download-interval-unit: seconds
|
||||
# - 192.168.1.241
|
||||
device-api:
|
||||
offset:
|
||||
forward: 0s
|
||||
back: 0s
|
||||
|
||||
use-playback-to-download: true
|
||||
proxy-media-url: 'https://10.10.10.200:18181/media'
|
||||
# - 192.168.1.241
|
||||
ffmpeg-support:
|
||||
ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe
|
||||
ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe
|
||||
rtp:
|
||||
# input: -i http://10.10.10.200:5080/live/test.live.flv
|
||||
input: -re -i
|
||||
# output: -preset ultrafast -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -vsync 2 -copyts -f flv # -rtsp_transport tcp
|
||||
# output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f flv # -rtsp_transport tcp
|
||||
#output: -c:v libx264 -an -f flv # -rtsp_transport tcp
|
||||
output: -c:v copy -an -f flv
|
||||
#download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i
|
||||
download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i
|
||||
log-level: error
|
||||
# download: -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 128 -i
|
||||
output: -preset ultrafast -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
|
||||
download: -thread_queue_size 128 -i
|
||||
debug:
|
||||
download: false
|
||||
input: false
|
||||
output: false
|
||||
tmp-dir: G:\Temp\record\download-proxy
|
||||
use-tmp-file: true
|
||||
|
||||
# [可选] 日志配置, 一般不需要改
|
||||
logging:
|
||||
config: classpath:logback.xml
|
||||
|
||||
report:
|
||||
enabled: false
|
||||
url: http://127.0.0.1:8080/api/report
|
||||
custom-headers:
|
||||
agent: gb28181-proxy
|
||||
|
@ -24,8 +24,8 @@ spring:
|
||||
username: root
|
||||
password: 123456a
|
||||
url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform_dev?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
|
||||
profiles:
|
||||
active: local
|
||||
# profiles:
|
||||
# active: local
|
||||
cloud:
|
||||
openfeign:
|
||||
httpclient:
|
||||
@ -78,7 +78,7 @@ proxy:
|
||||
proxy-media-url: 'https://192.168.1.241:9022/mf-config/media'
|
||||
device-api:
|
||||
offset:
|
||||
forward: 0s
|
||||
forward: 30s
|
||||
back: 0s
|
||||
|
||||
ffmpeg-support:
|
||||
@ -88,8 +88,7 @@ ffmpeg-support:
|
||||
rtp:
|
||||
input: -re -i
|
||||
#output: -preset ultrafast -tune zerolatency -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
|
||||
# output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
|
||||
output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f mp4 # -rtsp_transport tcp
|
||||
output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
|
||||
download: -thread_queue_size 128 -fflags +genpts -enc_time_base -1 -i
|
||||
debug:
|
||||
download: false
|
||||
@ -99,10 +98,3 @@ ffmpeg-support:
|
||||
# [可选] 日志配置, 一般不需要改
|
||||
logging:
|
||||
config: classpath:logback.xml
|
||||
|
||||
|
||||
report:
|
||||
enabled: false
|
||||
url: http://127.0.0.1:8080/api/report
|
||||
custom-headers:
|
||||
agent: gb28181-proxy
|
||||
|
2
pom.xml
2
pom.xml
@ -57,7 +57,7 @@
|
||||
<!-- <docker.registry.password>XXX</docker.registry.password>-->
|
||||
<docker.maven.plugin.version>1.4.13</docker.maven.plugin.version>
|
||||
|
||||
<gb28181.docking.version>0.1.0</gb28181.docking.version>
|
||||
<gb28181.docking.version>0.1.0-SNAPSHOT</gb28181.docking.version>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
|
97
test/main.py
97
test/main.py
@ -1,97 +0,0 @@
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import os
|
||||
import os.path as path
|
||||
import sys
|
||||
import datetime
|
||||
import urllib
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
work_path = sys.path[0]
|
||||
# 设备列表
|
||||
devices_file = "devices.txt"
|
||||
devices_file_path = path.join(work_path, devices_file)
|
||||
# 下载目录
|
||||
tmp_dir = "download"
|
||||
tmp_path = path.join(work_path, tmp_dir)
|
||||
|
||||
server = "http://127.0.0.1:18183/video"
|
||||
|
||||
|
||||
# server = "http://httpbin.org/anything/video"
|
||||
|
||||
def check_or_mk_tmp_dir():
|
||||
if not path.exists(tmp_path):
|
||||
os.mkdir(tmp_path)
|
||||
|
||||
|
||||
def check_or_create_devices_file():
|
||||
if not path.exists(devices_file_path):
|
||||
with open(devices_file_path, mode="w", encoding="utf8") as f:
|
||||
f.write("# 设备编码 一行一个\n")
|
||||
|
||||
|
||||
def read_devices_file():
|
||||
check_or_create_devices_file()
|
||||
devices = []
|
||||
with open(devices_file_path, mode="r", encoding="utf8") as f:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if not line:
|
||||
break
|
||||
line = line.strip()
|
||||
if line.startswith("#") or len(line) == 0:
|
||||
continue
|
||||
else:
|
||||
devices.append(line)
|
||||
|
||||
print(f"读取设备数量: {len(devices)}")
|
||||
return devices
|
||||
|
||||
|
||||
def tasks(device: str, start_time: str, end_time: str):
|
||||
params = {
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"device_id": device,
|
||||
"useDownload": True,
|
||||
}
|
||||
url_params = urllib.parse.urlencode(params)
|
||||
url = urllib.parse.urljoin(server, f"?{url_params}")
|
||||
start = datetime.datetime.now()
|
||||
start_str = start.strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||
print(f"{start_str} 开始下载: {url}")
|
||||
file_path = f"{tmp_path}/{device}_{start_time}_{end_time}.mp4"
|
||||
urllib.request.urlretrieve(url, file_path)
|
||||
end = datetime.datetime.now()
|
||||
print(
|
||||
f"{device} {start_time}-{end_time}: 下载用时: {(end - start).total_seconds()} 秒")
|
||||
stats = os.stat(file_path)
|
||||
print(f"文件 {file_path} 大小: {stats.st_size}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
check_or_mk_tmp_dir()
|
||||
check_or_create_devices_file()
|
||||
|
||||
print(work_path)
|
||||
# workers = os.cpu_count()
|
||||
workers = 32
|
||||
|
||||
print(f"最大并发数: {workers}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=workers) as worker:
|
||||
devices = read_devices_file()
|
||||
|
||||
# day = datetime.datetime.today()
|
||||
day = datetime.date(year=2024, month=3, day=11)
|
||||
# 开始时间
|
||||
start = datetime.time(hour=8, minute=11, second=15)
|
||||
# 结束时间
|
||||
end = datetime.time(hour=8, minute=11, second=30)
|
||||
|
||||
start_time = datetime.datetime.combine(day, start).strftime(
|
||||
"%Y%m%d%H%M%S")
|
||||
end_time = datetime.datetime.combine(day, end).strftime("%Y%m%d%H%M%S")
|
||||
for device in devices:
|
||||
worker.submit(tasks, device, start_time, end_time)
|
@ -1,66 +0,0 @@
|
||||
import re
|
||||
import subprocess
|
||||
import os
|
||||
import os.path as path
|
||||
import sys
|
||||
|
||||
work_path = sys.path[0]
|
||||
|
||||
ffmpeg_path = "ffmpeg"
|
||||
ffprobe_path = "ffprobe"
|
||||
|
||||
record_dir = "record"
|
||||
record_path = path.join(work_path, record_dir)
|
||||
|
||||
if not path.exists(record_path):
|
||||
os.mkdir(record_path)
|
||||
|
||||
merge_file = path.join(work_path, "test.mp4")
|
||||
cmd = f"{ffmpeg_path} -y -loglevel error -f concat -safe 0 -i %s -c copy {merge_file}"
|
||||
|
||||
|
||||
def natural_sort_key(s):
|
||||
"""
|
||||
按文件名中的自然数排序
|
||||
"""
|
||||
# 将字符串按照数字和非数字部分分割,返回分割后的子串列表
|
||||
sub_strings = re.split(r'(\d+)', s)
|
||||
# 如果当前子串由数字组成,则将它转换为整数;否则将其替换成空字符串
|
||||
sub_strings = [int(c) if c.isdigit() else '' for c in sub_strings]
|
||||
# 返回子串列表
|
||||
return sub_strings
|
||||
|
||||
file_list = []
|
||||
for item in os.listdir(record_path):
|
||||
p = path.join(record_path, item)
|
||||
if not path.isfile(p) or (not p.endswith(".mp4") and not p.endswith(".rec")):
|
||||
continue
|
||||
else:
|
||||
file_list.append(p)
|
||||
|
||||
sorted_file_list = sorted(file_list, key=natural_sort_key)
|
||||
|
||||
tmp_merge_file = path.join(work_path, "merge.tmp")
|
||||
with open(tmp_merge_file, mode="w", encoding="utf8") as f:
|
||||
for record in sorted_file_list:
|
||||
f.write(f"file '{record}'\n")
|
||||
|
||||
proc = subprocess.Popen(
|
||||
cmd % tmp_merge_file,
|
||||
stdin=None,
|
||||
stdout=None,
|
||||
shell=True
|
||||
)
|
||||
|
||||
proc.communicate()
|
||||
os.remove(tmp_merge_file)
|
||||
|
||||
meta_cmd = f"{ffprobe_path} -v error -i {merge_file} -print_format json -show_format -show_streams -pretty > {merge_file}.meta.json"
|
||||
|
||||
proc = subprocess.Popen(
|
||||
meta_cmd,
|
||||
stdin=None,
|
||||
stdout=None,
|
||||
shell=True
|
||||
)
|
||||
proc.communicate()
|
@ -1,84 +0,0 @@
|
||||
import os
|
||||
import os.path as path
|
||||
import select
|
||||
import sys
|
||||
import urllib.parse
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import subprocess
|
||||
import platform
|
||||
|
||||
work_path = sys.path[0]
|
||||
|
||||
# 下载目录
|
||||
tmp_dir = "download"
|
||||
tmp_path = path.join(work_path, tmp_dir)
|
||||
|
||||
zlm_server = "192.168.3.12"
|
||||
zlm_rtmp_port = 1936
|
||||
zlm_auth_params = {
|
||||
"sign": "41db35390ddad33f83944f44b8b75ded"
|
||||
}
|
||||
|
||||
ffmpeg_path = "ffmpeg"
|
||||
ffmpeg_read_rate = 1
|
||||
|
||||
ffplay_path = "ffplay"
|
||||
enable_preview = True
|
||||
|
||||
workers = os.cpu_count()
|
||||
|
||||
|
||||
def get_rtmp_url(app: str, stream_id: str):
|
||||
params = urllib.parse.urlencode(zlm_auth_params)
|
||||
return "rtmp://{}:{}/{}/{}?{}".format(zlm_server, zlm_rtmp_port, app,
|
||||
stream_id, params)
|
||||
|
||||
|
||||
def check_or_mk_tmp_dir():
|
||||
if not path.exists(tmp_path):
|
||||
os.mkdir(tmp_path)
|
||||
|
||||
|
||||
def push_stream(file: str):
|
||||
stream_id = path.basename(file)
|
||||
target = get_rtmp_url("ffmpeg", stream_id)
|
||||
print("开始 ffmpeg 推流 {} => {}", file, target)
|
||||
cmd = "{} -loglevel error -stream_loop -1 -readrate {} -i {} -t 60 -c copy -f flv {}".format(
|
||||
ffmpeg_path, ffmpeg_read_rate, file, target)
|
||||
print(cmd)
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=None,
|
||||
stdout=None,
|
||||
shell=True
|
||||
)
|
||||
|
||||
if enable_preview and len(
|
||||
ffplay_path) > 0 and platform.system() == "Windows":
|
||||
subprocess.Popen(
|
||||
"ffplay {} -x 400 -y 300 -autoexit".format(target),
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
shell=True
|
||||
)
|
||||
|
||||
proc.communicate()
|
||||
print("ffmpeg 结束 {} =/= {}", file, target)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) > 1:
|
||||
try:
|
||||
workers = int(sys.argv[1])
|
||||
except:
|
||||
print("参数解析错误, 将使用默认线程数 {} ".format(workers))
|
||||
|
||||
print("最大并发数: {}".format(workers))
|
||||
with ThreadPoolExecutor(max_workers=workers) as worker:
|
||||
for item in os.listdir(tmp_path):
|
||||
p = path.join(tmp_path, item)
|
||||
if not path.isfile(p) or not p.endswith(".mp4"):
|
||||
continue
|
||||
else:
|
||||
worker.submit(push_stream, p)
|
@ -1,44 +0,0 @@
|
||||
import os.path as path
|
||||
import sys
|
||||
import shutil
|
||||
|
||||
work_path = sys.path[0]
|
||||
# 设备列表
|
||||
devices_file = "devices.txt"
|
||||
devices_file_path = path.join(work_path, devices_file)
|
||||
# 源文件
|
||||
source_video_file = "20240311081115_20240311081130.mp4"
|
||||
|
||||
|
||||
def check_or_create_devices_file():
|
||||
if not path.exists(devices_file_path):
|
||||
with open(devices_file_path, mode="w", encoding="utf8") as f:
|
||||
f.write("# 设备编码 一行一个\n")
|
||||
|
||||
|
||||
def read_devices_file():
|
||||
check_or_create_devices_file()
|
||||
devices = []
|
||||
with open(devices_file_path, mode="r", encoding="utf8") as f:
|
||||
while True:
|
||||
line = f.readline()
|
||||
if not line:
|
||||
break
|
||||
line = line.strip()
|
||||
if line.startswith("#") or len(line) == 0:
|
||||
continue
|
||||
else:
|
||||
devices.append(line)
|
||||
|
||||
print("读取设备数量: {}".format(len(devices)))
|
||||
return devices
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
check_or_create_devices_file()
|
||||
devices = read_devices_file()
|
||||
|
||||
src = path.join(work_path, source_video_file)
|
||||
for device in devices:
|
||||
dst = path.join(work_path, "{}_{}".format(device, source_video_file))
|
||||
shutil.copyfile(src, dst)
|
Loading…
Reference in New Issue
Block a user