Compare commits

..

No commits in common. "master" and "feature/record_info" have entirely different histories.

28 changed files with 109 additions and 870 deletions

View File

@ -4,7 +4,7 @@ import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService;
import cn.skcks.docking.gb28181.wvp.service.device.control.DeviceControlService;
@ -57,7 +57,8 @@ public class Gb28181Controller {
}
@PostJson("/recordInfo")
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> recordInfo(RecordInfoDTO dto){
return recordInfoService.requestRecordInfo(dto);
public JsonResponse<RecordInfoRequestDTO> recordInfo(RecordInfoDTO dto){
recordInfoService.requestRecordInfo(dto);
return JsonResponse.success(null);
}
}

View File

@ -59,7 +59,7 @@ public class VideoController {
if(proxyConfig.getEnable()){
wvpService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime());
} else {
gb28181DownloadService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime(), req.getFileHeader(), req.getUseDownload());
gb28181DownloadService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime(), req.getFileHeader());
}
}

View File

@ -33,9 +33,6 @@ public class VideoReq {
@Schema(description = "http 头是否需要文件名 (没有文件名时浏览器会试图直接播放,会导致短时间内重复访问同一设备,导致失败)")
private Boolean fileHeader = true;
@Schema(description = "使用哪种方式拉取历史视频 (true 为 使用 Download 方式拉取 4倍速流, false 为 使用 Playback 原始速率拉取 视频回放)")
private Boolean useDownload = true;
public void setDevice_id(String deviceCode){
this.deviceCode = deviceCode;
}
@ -61,8 +58,4 @@ public class VideoReq {
public void setEnd_time(Date endTime){
this.endTime = endTime;
}
public void setUse_download(Boolean useDownload){
this.useDownload = useDownload;
}
}

View File

@ -30,7 +30,4 @@ public class FfmpegConfig {
private Boolean input = false;
private Boolean output = false;
}
private Boolean useTmpFile = true;
private String tmpDir = System.getProperty("java.io.tmpdir");
}

View File

@ -21,7 +21,7 @@ public class Gb28181DeviceVideoApiConfig {
@NoArgsConstructor
@AllArgsConstructor
public static class Offset {
private Duration forward = Duration.of(0, ChronoUnit.SECONDS);
private Duration forward = Duration.of(30, ChronoUnit.SECONDS);
private Duration back= Duration.of(0, ChronoUnit.SECONDS);
}
}

View File

@ -1,12 +0,0 @@
package cn.skcks.docking.gb28181.wvp.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "media")
public class MediaRtmpConfig {
private int rtmpPort = 1935;
}

View File

@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.wvp.config;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
@ -10,7 +11,6 @@ import org.springframework.stereotype.Component;
import javax.sip.ListeningPoint;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
@ConfigurationProperties(prefix = "proxy.gb28181.sip", ignoreInvalidFields = true)
@ -44,14 +44,6 @@ public class ProxySipConfig {
*/
private String proxyMediaUrl = "";
/**
* 调用 视频下载之前 是否使用 recordInfo 查询
*/
private boolean useRecordInfoQueryBeforeDownload = true;
private int retryRecordInfoQueryBeforeDownloadTimes = 20;
private long retryRecordInfoQueryBeforeDownloadInterval = 3;
private TimeUnit retryRecordInfoQueryBeforeDownloadIntervalUnit = TimeUnit.SECONDS;
@Bean
public SipConfig sipConfig(){
SipConfig sipConfig = new SipConfig();

View File

@ -1,17 +0,0 @@
package cn.skcks.docking.gb28181.wvp.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Data
@Configuration
@ConfigurationProperties(prefix = "report")
public class ReportConfig {
private Boolean enabled = false;
private String url;
private Map<String, String> customHeaders = new HashMap<>();
}

View File

@ -1,40 +0,0 @@
package cn.skcks.docking.gb28181.wvp.dto.report;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Schema(description = "上报信息")
public class ReportReq {
@Schema(description = "上报消息id")
private String id;
@Schema(description = "设备编码")
private String deviceCode;
@Schema(description = "设备id/通道id")
private String deviceId;
@Schema(description = "点播时长")
private String durationTime;
@Schema(description = "点播时长")
private TimeRange timeRange;
@Schema(description = "日志记录时间")
private String logTime;
@AllArgsConstructor
@NoArgsConstructor
@Data
public static class TimeRange {
@Schema(description = "开始时间")
private String startTime;
@Schema(description = "结束时间")
private String endTime;
}
@Schema(description = "文件大小, 未知大小为 -1")
private String fileSize;
}

View File

@ -17,40 +17,30 @@ import java.util.concurrent.TimeUnit;
public class FfmpegSupportService {
private final FfmpegConfig ffmpegConfig;
@SneakyThrows
public Executor downloadToStream(String input, String out, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler) {
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
String inputParam = debug.getDownload() ? rtp.getDownload() : StringUtils.joinWith(" ", "-y", rtp.getDownload(), input);
log.info("视频输入参数 {}", inputParam);
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", "-t", unit.toSeconds(time), rtp.getOutput(), out);
log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, unit.toSeconds(time) + 60, TimeUnit.SECONDS, streamHandler, executeResultHandler);
}
@SneakyThrows
public Executor downloadToStream(String input, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler) {
return downloadToStream(input, "-", time, unit, streamHandler, executeResultHandler);
}
@SneakyThrows
public Executor playbackToStream(String input, String out, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
String inputParam = debug.getInput() ? rtp.getInput() : StringUtils.joinWith(" ", "-y", rtp.getInput(), input);
String inputParam = debug.getDownload() ? rtp.getDownload() : StringUtils.joinWith(" ", rtp.getDownload(), input);
log.info("视频输入参数 {}", inputParam);
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), out);
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), "-");
log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, unit.toSeconds(time) + 60, TimeUnit.SECONDS, streamHandler, executeResultHandler);
return ffmpegExecutor(inputParam, outputParam, time, unit, streamHandler, executeResultHandler);
}
@SneakyThrows
public Executor playbackToStream(String input, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler) {
return playbackToStream(input, "-", time, unit, streamHandler, executeResultHandler);
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
String inputParam = debug.getInput() ? rtp.getInput() : StringUtils.joinWith(" ", rtp.getInput(), input);
log.info("视频输入参数 {}", inputParam);
String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), "-");
log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit, streamHandler, executeResultHandler);
}
@SneakyThrows

View File

@ -1,6 +1,9 @@
package cn.skcks.docking.gb28181.wvp.service.gb28181;
import cn.hutool.core.date.*;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.IdUtil;
import cn.skcks.docking.gb28181.common.json.JsonException;
@ -22,28 +25,18 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder;
import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
import cn.skcks.docking.gb28181.wvp.config.MediaRtmpConfig;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
import cn.skcks.docking.gb28181.wvp.executor.DefaultVideoExecutor;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.record.RecordInfoService;
import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.wvp.utils.RetryUtil;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import gov.nist.javax.sdp.MediaDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField;
@ -55,7 +48,6 @@ import jakarta.servlet.http.HttpServletResponse;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
@ -68,10 +60,8 @@ import javax.sip.message.Request;
import javax.sip.message.Response;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.*;
@ -90,14 +80,7 @@ public class Gb28181DownloadService {
private final SipSubscribe subscribe;
private final VideoService videoService;
private final WvpProxyConfig wvpProxyConfig;
private final RecordInfoService recordInfoService;
private final MediaRtmpConfig mediaRtmpConfig;
@Qualifier(DefaultVideoExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(64);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, DeferredResult<JsonResponse<String>>> requestMap = new ConcurrentHashMap<>();
private final RealtimeManager realtimeManager;
@ -134,19 +117,8 @@ public class Gb28181DownloadService {
MessageFormat.format("attachment; filename=\"{0}.mp4\"",fileName));
}
private String videoRtmpUrl(String streamId) {
String rtmpSchema = "rtmp://" + zlmMediaConfig.getIp() + ":" + mediaRtmpConfig.getRtmpPort();
return StringUtils.joinWith("/", rtmpSchema, "rtp", streamId);
}
private String videoWsUrl(String rtmpUrl){
String rtmpSchema = "rtmp://" + zlmMediaConfig.getIp() + ":" + mediaRtmpConfig.getRtmpPort();
if(StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl())){
return StringUtils.replace(rtmpUrl + ".live.flv", rtmpSchema, proxySipConfig.getProxyMediaUrl());
} else {
String wsSchema = StringUtils.replace(zlmMediaConfig.getUrl(), "http://", "ws://");
return StringUtils.replace(rtmpUrl + ".live.flv", rtmpSchema, wsSchema);
}
private String videoUrl(String streamId) {
return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.mp4");
}
@SneakyThrows
@ -169,7 +141,7 @@ public class Gb28181DownloadService {
openRtpServer.setStreamId(streamId);
openRtpServer.setTcpMode(streamMode);
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer);
log.info("openRtpServerResp => {} => {}", streamId, openRtpServerResp);
log.info("openRtpServerResp => {}", openRtpServerResp);
if (!openRtpServerResp.getCode().equals(ResponseStatus.Success)) {
log.error("{}", openRtpServerResp.getCode().getMsg());
return -1;
@ -178,76 +150,28 @@ public class Gb28181DownloadService {
}
@SneakyThrows
@SuppressWarnings({"UnstableApiUsage", "unchecked"})
public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime, Boolean fileHeader, Boolean useDownload) {
public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime, Boolean fileHeader) {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(DateUtil.between(startTime, DateUtil.offsetSecond(endTime, 60), DateUnit.MS));
asyncContext.setTimeout(0);
asyncContext.start(()->{
DateTime start = DateUtil.date();
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
try{
if(proxySipConfig.isUseRecordInfoQueryBeforeDownload()){
String name = MessageFormat.format("{0} {1}-{2}", deviceCode, startTime, endTime);
Retryer<JsonResponse<List<RecordInfoItemVO>>> retryer = RetryerBuilder.<JsonResponse<List<RecordInfoItemVO>>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadInterval(), proxySipConfig.getRetryRecordInfoQueryBeforeDownloadIntervalUnit()))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadTimes()))
.retryIfResult((result) -> {
log.info("{}", result);
return result == null ||
result.getCode() != Response.OK ||
result.getData() == null ||
result.getData().isEmpty();
})
.withRetryListener(RetryUtil.defaultRetryListener(name)).build();
retryer.call(()->{
CompletableFuture<JsonResponse<List<RecordInfoItemVO>>> future = new CompletableFuture<>();
future.completeOnTimeout(JsonResponse.error("录像查询超时"),1,TimeUnit.MINUTES);
// 发起设备录像查询
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestedRecordInfo =
recordInfoService.requestRecordInfo(new RecordInfoDTO(deviceCode, startTime, endTime, "", 0, "all"));
requestedRecordInfo.setResultHandler(result -> {
future.complete((JsonResponse<List<RecordInfoItemVO>>) result);
});
requestedRecordInfo.onError((throwable)->{
future.complete(JsonResponse.error(throwable.getMessage()));
});
return future.get();
});
}
download(deviceCode, startTime, endTime, useDownload, true).whenComplete((videoInfo, e) -> {
writeFileHeader(response, deviceCode, startTime, endTime, fileHeader);
log.info("videoInfo {}", videoInfo);
if (e != null) {
download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{
writeFileHeader(response,deviceCode,startTime,endTime,fileHeader);
if(e != null){
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
} else if (videoInfo == null) {
} else if(videoInfo == null){
writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败"));
} else if (wvpProxyConfig.getUseFfmpeg()) {
log.info("开始 ffmpeg 录制, deviceCode {}, startTime {}, endTime {}", deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
videoService.ffmpegRecord(request, asyncResponse, videoInfo.getUrl(), startTime, endTime, DateUtil.between(startTime, endTime, DateUnit.SECOND), videoInfo.getDevice(), videoInfo.getCallId());
DateTime end = DateUtil.date();
asyncContext.complete();
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start, end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
} else if(wvpProxyConfig.getUseFfmpeg()){
videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId());
} else {
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime, endTime, DateUnit.SECOND) + 15);
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
}
asyncContext.complete();
});
} catch(Exception e) {
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
} finally {
if(!wvpProxyConfig.getUseFfmpeg()){
DateTime end = DateUtil.date();
asyncContext.complete();
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start,end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
}
asyncContext.complete();
}
});
}
@ -275,11 +199,11 @@ public class Gb28181DownloadService {
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求
scheduledExecutorService.schedule(()->{
download(deviceCode, startTime, endTime, false).whenComplete((videoInfo, e)->{
download(deviceCode, startTime, endTime).whenComplete((videoInfo, e)->{
log.info("获取媒体信息 {}", videoInfo);
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
// 到达时间后 延迟 10秒 主动结束, 防止某些设备不会主动结束
// 到达时间后主动结束, 防止某些设备不会主动结束
scheduledExecutorService.schedule(()->{
log.info("到达结束时间 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId());
String deviceIp = docking.getIp();
@ -292,10 +216,11 @@ public class Gb28181DownloadService {
zlmMediaService.closeRtpServer(CloseRtpServer.builder()
.streamId(videoInfo.streamId)
.build());
}, time + Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS);
}, time, TimeUnit.MILLISECONDS);
String url = videoWsUrl(videoInfo.getUrl());
url = StringUtils.replaceOnce(url, ".live.flv", ".live.mp4");
String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ?
StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()):
videoInfo.getUrl();
result.setResult(JsonResponse.success(url));
});
}, 200, TimeUnit.MILLISECONDS);
@ -335,7 +260,9 @@ public class Gb28181DownloadService {
// 原始链接转换为前端可用的链接
RedisUtil.StringOps.set(CacheUtil.getKey(GB28181SDPBuilder.Action.PLAY.getAction(), videoInfo.getCallId()), JsonUtils.toJson(videoInfo));
String url = videoWsUrl(videoInfo.getUrl());
String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ?
StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()):
videoInfo.getUrl();
videoInfo.setUrl(url);
realtimeManager.addPlaying(deviceCode, videoInfo);
@ -366,35 +293,24 @@ public class Gb28181DownloadService {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContext.start(()->{
DateTime start = DateUtil.date();
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
try{
download(deviceCode, startTime,endTime, true).whenComplete((videoInfo, e)->{
download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{
streamHeader(asyncResponse);
if(e != null){
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
} else if(videoInfo == null){
writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败"));
} else if(wvpProxyConfig.getUseFfmpeg()){
log.info("开始 ffmpeg 录制, deviceCode {}, startTime {}, endTime {}", deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
videoService.ffmpegRecord(request, asyncResponse, videoInfo.getUrl(), startTime, endTime, DateUtil.between(startTime, endTime, DateUnit.SECOND), videoInfo.getDevice(), videoInfo.getCallId());
DateTime end = DateUtil.date();
asyncContext.complete();
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start, end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId());
} else {
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 15);
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
}
asyncContext.complete();
});
} catch(Exception e) {
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
} finally {
if(!wvpProxyConfig.getUseFfmpeg()){
DateTime end = DateUtil.date();
asyncContext.complete();
log.info("下载总耗时: {}, deviceCode {}, startTime {}, endTime {}", DateUtil.between(start,end, DateUnit.SECOND), deviceCode, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
}
asyncContext.complete();
}
});
}
@ -455,13 +371,13 @@ public class Gb28181DownloadService {
}
String ssrc = ssrcService.getPlaySsrc();
GB28181Description gb28181Description = GB28181SDPBuilder.Receiver.play(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode);
sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result, false));
sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result));
return result;
}
@SneakyThrows
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture<VideoInfo> result, Boolean prefetch) {
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture<VideoInfo> result) {
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
@ -480,7 +396,7 @@ public class Gb28181DownloadService {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS, prefetch);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
@ -488,12 +404,8 @@ public class Gb28181DownloadService {
}
}
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime, Boolean prefetch) {
return download(deviceCode,startTime,endTime, proxySipConfig.isUsePlaybackToDownload(), prefetch);
}
@SneakyThrows
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch) {
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime) {
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
if (deviceByDeviceCode.isEmpty()) {
String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode);
@ -501,12 +413,12 @@ public class Gb28181DownloadService {
throw new JsonException(reason);
} else {
WvpProxyDevice device = deviceByDeviceCode.get();
return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime, useDownload, prefetch);
return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime);
}
}
@SneakyThrows
public CompletableFuture<VideoInfo> download(String gbDeviceId, String channel, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch){
public CompletableFuture<VideoInfo> download(String gbDeviceId, String channel, Date startTime, Date endTime){
CompletableFuture<VideoInfo> result = new CompletableFuture<>();
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
@ -544,7 +456,7 @@ public class Gb28181DownloadService {
timeField.setStopTime(end);
TimeDescription timeDescription = SdpFactory.getInstance().createTimeDescription(timeField);
GB28181SDPBuilder.Action action = GB28181SDPBuilder.Action.DOWNLOAD;
if(useDownload == null ? proxySipConfig.isUsePlaybackToDownload(): !useDownload){
if(proxySipConfig.isUsePlaybackToDownload()){
action = GB28181SDPBuilder.Action.PLAY_BACK;
}
GB28181Description gb28181Description = GB28181SDPBuilder.Receiver.build(action, gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode, timeDescription);
@ -554,18 +466,18 @@ public class Gb28181DownloadService {
if(proxySipConfig.getStreamMode() != MediaStreamMode.UDP){
media.getMedia().setProtocol("RTP/AVP/TCP");
}
if(useDownload == null ? !proxySipConfig.isUsePlaybackToDownload(): useDownload){
if(!proxySipConfig.isUsePlaybackToDownload()){
media.setAttribute("downloadspeed", String.valueOf(4));
}
URIField uriField = new URIField();
uriField.setURI(StringUtils.joinWith(":", channel, "0"));
gb28181Description.setURI(uriField);
sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time, prefetch));
sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time));
return result;
}
@SneakyThrows
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time, Boolean prefetch) {
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time) {
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
@ -584,21 +496,18 @@ public class Gb28181DownloadService {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS, prefetch);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
if(prefetch){
// 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流
result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device));
}
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
};
}
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit, Boolean prefetch){
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit){
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
@ -617,21 +526,15 @@ public class Gb28181DownloadService {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
log.info("收到响应状态 {}", statusCode);
String callId = item.getCallId().getCallId();
if(!prefetch){
// 相应 200OK 后再返回, 用于对延迟不敏感的实时请求
result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), item.getCallId().getCallId(), device));
}
scheduledExecutorService.schedule(()->{
sender.sendRequest(((provider, ip, port) -> {
String fromTag = item.getFromTag();
String toTag = item.getToTag();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit));
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
}));
},200,TimeUnit.MILLISECONDS);
sender.sendRequest(((provider, ip, port) -> {
String fromTag = item.getFromTag();
String toTag = item.getToTag();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit));
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
}));
result.complete(new VideoInfo(streamId,videoUrl(streamId), callId, device));
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));

View File

@ -1,12 +1,8 @@
package cn.skcks.docking.gb28181.wvp.service.record;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
@ -15,17 +11,16 @@ import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.RecordSubscribe;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
import java.text.MessageFormat;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Slf4j
@Service
@ -33,11 +28,12 @@ import java.util.Optional;
public class RecordInfoService {
private final SipSender sipSender;
private final SipSubscribe sipSubscribe;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final DockingService dockingService;
private final DeviceService deviceService;
@SneakyThrows
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(RecordInfoDTO dto){
public void requestRecordInfo(RecordInfoDTO dto){
String deviceCode = dto.getDeviceCode();
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
if (deviceByDeviceCode.isEmpty()) {
@ -46,23 +42,20 @@ public class RecordInfoService {
throw new JsonException(reason);
} else {
WvpProxyDevice device = deviceByDeviceCode.get();
return requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto);
requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto);
}
}
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result = new DeferredResult<>();
public void requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
if(deviceByGbDeviceId.isEmpty()){
log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId);
result.setResult(JsonResponse.error(MessageFormat.format("未能找到 设备编码 为 {0} 的设备", gbDeviceId)));
return result;
return;
}
Optional<WvpProxyDevice> deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel);
if (deviceByGbDeviceIdAndChannel.isEmpty()) {
log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel);
result.setResult(JsonResponse.error(MessageFormat.format("未能找到 编码 {0}, 通道 {1} 的设备", gbDeviceId, channel)));
return result;
return;
}
WvpProxyDocking device = deviceByGbDeviceId.get();
String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000);
@ -76,12 +69,7 @@ public class RecordInfoService {
.filePath(dto.getFilePath())
.indistinctQuery(0)
.build();
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channel, sn);
sipSubscribe.getMessageSubscribe().addPublisher(key);
sipSubscribe.getMessageSubscribe().addSubscribe(key, new RecordSubscribe(sipSubscribe, key, result, gbDeviceId));
sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,SipRequestBuilder.getCSeq(), XmlUtils.toXml(recordInfoRequestDTO), SipUtil.generateViaTag(),
SipUtil.generateFromTag(), provider.getNewCallId()));
return result;
}
}

View File

@ -3,15 +3,11 @@ package cn.skcks.docking.gb28181.wvp.service.record.dto;
import cn.hutool.core.date.DatePattern;
import cn.skcks.docking.gb28181.constant.GB28181Constant;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class RecordInfoDTO {
/**

View File

@ -1,20 +0,0 @@
package cn.skcks.docking.gb28181.wvp.service.report;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.media.feign.IgnoreSSLFeignClientConfig;
import cn.skcks.docking.gb28181.wvp.dto.report.ReportReq;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
@FeignClient(
name = "ReportServiceProxy",
url = "${report.url}",
configuration = {IgnoreSSLFeignClientConfig.class}
)
public interface ReportApi {
@PostMapping
JsonResponse<?> report(@RequestHeader MultiValueMap<String, String> headers, @RequestBody ReportReq body);
}

View File

@ -1,52 +0,0 @@
package cn.skcks.docking.gb28181.wvp.service.report;
import cn.hutool.core.date.BetweenFormatter;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.unit.DataSizeUtil;
import cn.hutool.core.util.IdUtil;
import cn.skcks.docking.gb28181.wvp.config.ReportConfig;
import cn.skcks.docking.gb28181.wvp.dto.report.ReportReq;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import java.util.Date;
@Slf4j
@Service
@RequiredArgsConstructor
public class ReportService {
private final ReportApi reportApi;
private final ReportConfig reportConfig;
public void report(HttpServletRequest request, WvpProxyDevice device, Date startTime, Date endTime, long fileSize) {
if(!reportConfig.getEnabled()){
return;
}
ReportReq reportReq = new ReportReq(IdUtil.fastUUID(),
device.getDeviceCode(),
device.getGbDeviceChannelId(),
DateUtil.formatBetween(startTime, endTime, BetweenFormatter.Level.SECOND),
new ReportReq.TimeRange(DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime)),
DateUtil.now(),
DataSizeUtil.format(fileSize));
LinkedMultiValueMap<String, String> headers = new LinkedMultiValueMap<>();
reportConfig.getCustomHeaders().forEach(headers::add);
request.getHeaderNames().asIterator().forEachRemaining(headerKey -> {
String header = request.getHeader(headerKey);
headers.add(headerKey, header);
});
headers.add("X-Client-Ip", request.getRemoteAddr());
log.info("上报调用信息 {}", reportReq);
try{
reportApi.report(headers, reportReq);
} catch (Exception e){
log.error("上报调用信息失败", e);
}
}
}

View File

@ -3,16 +3,13 @@ package cn.skcks.docking.gb28181.wvp.service.video;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.IoUtil;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.wvp.config.FfmpegConfig;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.ffmpeg.FfmpegSupportService;
import cn.skcks.docking.gb28181.wvp.service.report.ReportService;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import jakarta.servlet.AsyncContext;
@ -34,8 +31,9 @@ import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.bytedeco.javacv.FrameGrabber;
import org.springframework.stereotype.Service;
import java.io.*;
import java.util.Date;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -52,10 +50,6 @@ public class VideoService {
private final ProxySipConfig proxySipConfig;
private final DockingService dockingService;
private final SipSender sender;
private final FfmpegConfig ffmpegConfig;
private final ReportService reportService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
/**
* 写入 flv 响应头信息
* @param response HttpServletResponse 响应
@ -234,63 +228,31 @@ public class VideoService {
* @param time 录制时长 (单位: )
*/
@SneakyThrows
public void ffmpegRecord(HttpServletRequest request, ServletResponse response, String url, Date startTime, Date endTime, long time, WvpProxyDevice device, String callId){
String tmpDir = ffmpegConfig.getTmpDir();
String fileName = callId + ".mp4";
File file = new File(tmpDir, fileName);
Executor executor;
public void ffmpegRecord(ServletResponse response, String url, long time, WvpProxyDevice device,String callId){
ServletOutputStream outputStream = response.getOutputStream();
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, errorStream);
DefaultExecuteResultHandler executeResultHandler = mediaStatus(device,callId);
if(ffmpegConfig.getUseTmpFile()) {
OutputStream outputStream = new PipedOutputStream();
String filePath = file.getAbsolutePath();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, System.err);
if(proxySipConfig.isUsePlaybackToDownload()){
executor = ffmpegSupportService.playbackToStream(url, filePath, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
} else {
executor = ffmpegSupportService.downloadToStream(url, filePath, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
}
DateTime startTime = DateUtil.date();
Executor executor;
if(proxySipConfig.isUsePlaybackToDownload()){
executor = ffmpegSupportService.playbackToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
} else {
OutputStream outputStream = response.getOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, System.err);
if(proxySipConfig.isUsePlaybackToDownload()){
executor = ffmpegSupportService.playbackToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
} else {
executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
}
executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
}
DateTime start = DateUtil.date();
log.info("开始录制 {}", url);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
log.info("到达结束时间, 结束录制 {}", url);
executor.getWatchdog().destroyProcess();
log.info("结束录制 {}", url);
}, time + 60, TimeUnit.SECONDS);
}, time, TimeUnit.SECONDS);
executeResultHandler.waitFor();
schedule.cancel(true);
DateTime end = DateUtil.date();
log.info("录制进程结束 {}, 录制耗时: {}", url, DateUtil.between(start,end, DateUnit.SECOND));
if(ffmpegConfig.getUseTmpFile()) {
ServletOutputStream servletOutputStream = response.getOutputStream();
try{
log.info("临时文件 {}(大小 {})", file.getAbsolutePath(), file.length());
IoUtil.copy(new FileInputStream(file), servletOutputStream);
response.flushBuffer();
reportService.report(request, device, startTime, endTime, file.length());
} catch (Exception e){
reportService.report(request, device, startTime, endTime, -1);
log.error("写入 http 响应异常: {}", e.getMessage());
} finally {
System.gc();
boolean delete = file.delete();
log.info("删除临时文件 {} => {}", file, delete);
}
}
DateTime endTime = DateUtil.date();
log.info("录制进程结束 {}, 录制耗时: {}", url, DateUtil.between(startTime,endTime, DateUnit.SECOND));
}
/**
* 录制视频 并写入 异步响应
* @param response AsyncContext.getResponse 异步响应

View File

@ -28,7 +28,6 @@ import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.download.DownloadService;
import cn.skcks.docking.gb28181.wvp.service.report.ReportService;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import cn.skcks.docking.gb28181.wvp.utils.RetryUtil;
import com.github.rholder.retry.*;
@ -66,8 +65,6 @@ public class WvpService {
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, ScheduledFuture<?>> playing = new ConcurrentHashMap<>();
private final ReportService reportService;
public void header(HttpServletResponse response) {
response.setContentType("video/mp4");
response.setHeader("Accept-Ranges", "none");
@ -116,7 +113,6 @@ public class WvpService {
String reason = MessageFormat.format("调用 wvp api 查询设备({0})历史失败, 异常: {1}", deviceCode, e.getMessage());
writeErrorToResponse(asyncResponse, JsonResponse.error(reason));
} finally {
reportService.report(request,wvpProxyDevice, startTime, endTime,-1);
log.info("asyncContext 结束");
asyncContext.complete();
}

View File

@ -10,7 +10,6 @@ import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.M
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.notify.MediaStatusRequestDTO;
@ -82,13 +81,6 @@ public class MessageRequestProcessor implements MessageProcessor {
if(StringUtils.equalsAnyIgnoreCase(messageDto.getCmdType(), CmdType.KEEPALIVE)){
response = ok;
// 更新设备在线状态
} else if(messageDto.getCmdType().equalsIgnoreCase(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO)) {
response = ok;
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
String key = GenericSubscribe.Helper.getKey(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
Optional.ofNullable(subscribe.getMessageSubscribe().getPublisher(key))
.ifPresentOrElse(publisher -> publisher.submit(request),
() -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto));
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
response = ok;
CatalogResponseDTO dto = XmlUtils.parse(content, CatalogResponseDTO.class, GB28181Constant.CHARSET);

View File

@ -1,81 +0,0 @@
package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class RecordSubscribe implements Flow.Subscriber<SIPRequest>{
private final SipSubscribe subscribe;
private final String key;
private final DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result;
private final String deviceId;
private final List<RecordInfoItemDTO> list = new ArrayList<>();
private final AtomicLong atomicSum = new AtomicLong(0);
private final AtomicLong atomicNum = new AtomicLong(0);
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.debug("建立订阅 => {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
atomicSum.set(Math.max(data.getSumNum(), atomicNum.get()));
atomicNum.addAndGet(data.getRecordList().getNum());
list.addAll(data.getRecordList().getRecordList());
long num = atomicNum.get();
long sum = atomicSum.get();
if(num > sum){
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
} else {
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
}
if (num >= sum) {
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束
subscribe.getMessageSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
log.debug("订阅结束 => {}", key);
subscribe.getMessageSubscribe().delPublisher(key);
}
private List<RecordInfoItemDTO> sortedRecordList(List<RecordInfoItemDTO> list){
return list.stream().sorted((a,b)-> DateUtil.compare(a.getStartTime(),b.getStartTime())).collect(Collectors.toList());
}
}

View File

@ -2,9 +2,7 @@ package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.InviteSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipRequestSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
@ -16,8 +14,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Slf4j
@Data
@ -26,18 +22,15 @@ import java.util.concurrent.ScheduledExecutorService;
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private GenericSubscribe<SIPRequest> catalogSubscribe;
private GenericSubscribe<SIPResponse> inviteSubscribe;
private GenericSubscribe<SIPRequest> byeSubscribe;
private GenericTimeoutSubscribe<SIPRequest> messageSubscribe;
@PostConstruct
private void init() {
catalogSubscribe = new CatalogSubscribe(executor);
inviteSubscribe = new InviteSubscribe(executor);
byeSubscribe = new ByeSubscribe(executor);
messageSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService);
}
@PreDestroy
@ -45,6 +38,5 @@ public class SipSubscribe {
catalogSubscribe.close();
inviteSubscribe.close();
byeSubscribe.close();
messageSubscribe.close();
}
}

View File

@ -9,8 +9,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableFeignClients(basePackages = {
"cn.skcks.docking.gb28181.media",
"cn.skcks.docking.gb28181.wvp.proxy",
"cn.skcks.docking.gb28181.wvp.service.report"
"cn.skcks.docking.gb28181.wvp.proxy"
})
@SpringBootApplication
@ComponentScan(basePackages = {

View File

@ -40,80 +40,39 @@ media:
id: amrWMKmbKqoBjRQ9
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333
rtmp-port: 1936
proxy:
wvp:
url: http://127.0.0.1:18978
# url: http://192.168.3.12:18978
user: admin
passwd: admin
use-ffmpeg: true
# 是否使用 wvp 的 api(wvp 的 并发有问题,仅保留用于兼容), 否则使用sip 信令直接操作设备
enable: false
# 是否使用 ffmpeg 编/解码, 否则使用内置 javacv
parents:
- 44050100002000000002
- 44050100002000000003
- 44050100001180000001
- 44050100001320000001
- 44050100001110000010
# 用于生成 代理 wvp 的 视频流 ws-flv 地址
#proxy-media-url: 'wss://192.168.1.241:9022/mf-config/media'
proxy-media-url: 'ws://10.10.10.200:5080'
# 实时视频单次点播持续时间 (默认: 15分钟)
realtime-video-duration: 15m
gb28181:
sip:
id: 44050100002000000003
# id: 44050100002000000005
domain: 4405010000
password: 123456
port: 5063
ip:
- 10.10.10.20
# - 192.168.0.195
stream-mode: udp
use-playback-to-download: false
# proxy-media-url: 'https://10.10.10.200:18181/media'
proxy-media-url: 'https://10.10.10.200:5444'
use-record-info-query-before-download: true
retry-record-info-query-before-download-interval: 3
retry-record-info-query-before-download-times: 20
retry-record-info-query-before-download-interval-unit: seconds
# - 192.168.1.241
device-api:
offset:
forward: 0s
back: 0s
use-playback-to-download: true
proxy-media-url: 'https://10.10.10.200:18181/media'
# - 192.168.1.241
ffmpeg-support:
ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe
ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe
rtp:
# input: -i http://10.10.10.200:5080/live/test.live.flv
input: -re -i
# output: -preset ultrafast -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -vsync 2 -copyts -f flv # -rtsp_transport tcp
# output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f flv # -rtsp_transport tcp
#output: -c:v libx264 -an -f flv # -rtsp_transport tcp
output: -c:v copy -an -f flv
#download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i
download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i
log-level: error
# download: -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 128 -i
output: -preset ultrafast -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
download: -thread_queue_size 128 -i
debug:
download: false
input: false
output: false
tmp-dir: G:\Temp\record\download-proxy
use-tmp-file: true
# [可选] 日志配置, 一般不需要改
logging:
config: classpath:logback.xml
report:
enabled: false
url: http://127.0.0.1:8080/api/report
custom-headers:
agent: gb28181-proxy

View File

@ -24,8 +24,8 @@ spring:
username: root
password: 123456a
url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform_dev?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
profiles:
active: local
# profiles:
# active: local
cloud:
openfeign:
httpclient:
@ -78,7 +78,7 @@ proxy:
proxy-media-url: 'https://192.168.1.241:9022/mf-config/media'
device-api:
offset:
forward: 0s
forward: 30s
back: 0s
ffmpeg-support:
@ -88,8 +88,7 @@ ffmpeg-support:
rtp:
input: -re -i
#output: -preset ultrafast -tune zerolatency -vcodec libx264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
# output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f mp4 # -rtsp_transport tcp
output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp
download: -thread_queue_size 128 -fflags +genpts -enc_time_base -1 -i
debug:
download: false
@ -99,10 +98,3 @@ ffmpeg-support:
# [可选] 日志配置, 一般不需要改
logging:
config: classpath:logback.xml
report:
enabled: false
url: http://127.0.0.1:8080/api/report
custom-headers:
agent: gb28181-proxy

View File

@ -57,7 +57,7 @@
<!-- <docker.registry.password>XXX</docker.registry.password>-->
<docker.maven.plugin.version>1.4.13</docker.maven.plugin.version>
<gb28181.docking.version>0.1.0</gb28181.docking.version>
<gb28181.docking.version>0.1.0-SNAPSHOT</gb28181.docking.version>
</properties>
<profiles>

View File

@ -1,97 +0,0 @@
from concurrent.futures import ThreadPoolExecutor
import os
import os.path as path
import sys
import datetime
import urllib
import urllib.parse
import urllib.request
work_path = sys.path[0]
# 设备列表
devices_file = "devices.txt"
devices_file_path = path.join(work_path, devices_file)
# 下载目录
tmp_dir = "download"
tmp_path = path.join(work_path, tmp_dir)
server = "http://127.0.0.1:18183/video"
# server = "http://httpbin.org/anything/video"
def check_or_mk_tmp_dir():
if not path.exists(tmp_path):
os.mkdir(tmp_path)
def check_or_create_devices_file():
if not path.exists(devices_file_path):
with open(devices_file_path, mode="w", encoding="utf8") as f:
f.write("# 设备编码 一行一个\n")
def read_devices_file():
check_or_create_devices_file()
devices = []
with open(devices_file_path, mode="r", encoding="utf8") as f:
while True:
line = f.readline()
if not line:
break
line = line.strip()
if line.startswith("#") or len(line) == 0:
continue
else:
devices.append(line)
print(f"读取设备数量: {len(devices)}")
return devices
def tasks(device: str, start_time: str, end_time: str):
params = {
"start_time": start_time,
"end_time": end_time,
"device_id": device,
"useDownload": True,
}
url_params = urllib.parse.urlencode(params)
url = urllib.parse.urljoin(server, f"?{url_params}")
start = datetime.datetime.now()
start_str = start.strftime("%Y-%m-%d %H:%M:%S.%f")
print(f"{start_str} 开始下载: {url}")
file_path = f"{tmp_path}/{device}_{start_time}_{end_time}.mp4"
urllib.request.urlretrieve(url, file_path)
end = datetime.datetime.now()
print(
f"{device} {start_time}-{end_time}: 下载用时: {(end - start).total_seconds()}")
stats = os.stat(file_path)
print(f"文件 {file_path} 大小: {stats.st_size}")
if __name__ == '__main__':
check_or_mk_tmp_dir()
check_or_create_devices_file()
print(work_path)
# workers = os.cpu_count()
workers = 32
print(f"最大并发数: {workers}")
with ThreadPoolExecutor(max_workers=workers) as worker:
devices = read_devices_file()
# day = datetime.datetime.today()
day = datetime.date(year=2024, month=3, day=11)
# 开始时间
start = datetime.time(hour=8, minute=11, second=15)
# 结束时间
end = datetime.time(hour=8, minute=11, second=30)
start_time = datetime.datetime.combine(day, start).strftime(
"%Y%m%d%H%M%S")
end_time = datetime.datetime.combine(day, end).strftime("%Y%m%d%H%M%S")
for device in devices:
worker.submit(tasks, device, start_time, end_time)

View File

@ -1,66 +0,0 @@
import re
import subprocess
import os
import os.path as path
import sys
work_path = sys.path[0]
ffmpeg_path = "ffmpeg"
ffprobe_path = "ffprobe"
record_dir = "record"
record_path = path.join(work_path, record_dir)
if not path.exists(record_path):
os.mkdir(record_path)
merge_file = path.join(work_path, "test.mp4")
cmd = f"{ffmpeg_path} -y -loglevel error -f concat -safe 0 -i %s -c copy {merge_file}"
def natural_sort_key(s):
"""
按文件名中的自然数排序
"""
# 将字符串按照数字和非数字部分分割,返回分割后的子串列表
sub_strings = re.split(r'(\d+)', s)
# 如果当前子串由数字组成,则将它转换为整数;否则将其替换成空字符串
sub_strings = [int(c) if c.isdigit() else '' for c in sub_strings]
# 返回子串列表
return sub_strings
file_list = []
for item in os.listdir(record_path):
p = path.join(record_path, item)
if not path.isfile(p) or (not p.endswith(".mp4") and not p.endswith(".rec")):
continue
else:
file_list.append(p)
sorted_file_list = sorted(file_list, key=natural_sort_key)
tmp_merge_file = path.join(work_path, "merge.tmp")
with open(tmp_merge_file, mode="w", encoding="utf8") as f:
for record in sorted_file_list:
f.write(f"file '{record}'\n")
proc = subprocess.Popen(
cmd % tmp_merge_file,
stdin=None,
stdout=None,
shell=True
)
proc.communicate()
os.remove(tmp_merge_file)
meta_cmd = f"{ffprobe_path} -v error -i {merge_file} -print_format json -show_format -show_streams -pretty > {merge_file}.meta.json"
proc = subprocess.Popen(
meta_cmd,
stdin=None,
stdout=None,
shell=True
)
proc.communicate()

View File

@ -1,84 +0,0 @@
import os
import os.path as path
import select
import sys
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
import subprocess
import platform
work_path = sys.path[0]
# 下载目录
tmp_dir = "download"
tmp_path = path.join(work_path, tmp_dir)
zlm_server = "192.168.3.12"
zlm_rtmp_port = 1936
zlm_auth_params = {
"sign": "41db35390ddad33f83944f44b8b75ded"
}
ffmpeg_path = "ffmpeg"
ffmpeg_read_rate = 1
ffplay_path = "ffplay"
enable_preview = True
workers = os.cpu_count()
def get_rtmp_url(app: str, stream_id: str):
params = urllib.parse.urlencode(zlm_auth_params)
return "rtmp://{}:{}/{}/{}?{}".format(zlm_server, zlm_rtmp_port, app,
stream_id, params)
def check_or_mk_tmp_dir():
if not path.exists(tmp_path):
os.mkdir(tmp_path)
def push_stream(file: str):
stream_id = path.basename(file)
target = get_rtmp_url("ffmpeg", stream_id)
print("开始 ffmpeg 推流 {} => {}", file, target)
cmd = "{} -loglevel error -stream_loop -1 -readrate {} -i {} -t 60 -c copy -f flv {}".format(
ffmpeg_path, ffmpeg_read_rate, file, target)
print(cmd)
proc = subprocess.Popen(
cmd,
stdin=None,
stdout=None,
shell=True
)
if enable_preview and len(
ffplay_path) > 0 and platform.system() == "Windows":
subprocess.Popen(
"ffplay {} -x 400 -y 300 -autoexit".format(target),
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True
)
proc.communicate()
print("ffmpeg 结束 {} =/= {}", file, target)
if __name__ == '__main__':
if len(sys.argv) > 1:
try:
workers = int(sys.argv[1])
except:
print("参数解析错误, 将使用默认线程数 {} ".format(workers))
print("最大并发数: {}".format(workers))
with ThreadPoolExecutor(max_workers=workers) as worker:
for item in os.listdir(tmp_path):
p = path.join(tmp_path, item)
if not path.isfile(p) or not p.endswith(".mp4"):
continue
else:
worker.submit(push_stream, p)

View File

@ -1,44 +0,0 @@
import os.path as path
import sys
import shutil
work_path = sys.path[0]
# 设备列表
devices_file = "devices.txt"
devices_file_path = path.join(work_path, devices_file)
# 源文件
source_video_file = "20240311081115_20240311081130.mp4"
def check_or_create_devices_file():
if not path.exists(devices_file_path):
with open(devices_file_path, mode="w", encoding="utf8") as f:
f.write("# 设备编码 一行一个\n")
def read_devices_file():
check_or_create_devices_file()
devices = []
with open(devices_file_path, mode="r", encoding="utf8") as f:
while True:
line = f.readline()
if not line:
break
line = line.strip()
if line.startswith("#") or len(line) == 0:
continue
else:
devices.append(line)
print("读取设备数量: {}".format(len(devices)))
return devices
if __name__ == '__main__':
check_or_create_devices_file()
devices = read_devices_file()
src = path.join(work_path, source_video_file)
for device in devices:
dst = path.join(work_path, "{}_{}".format(device, source_video_file))
shutil.copyfile(src, dst)