调整视频回放/下载 新增 prefetch 参数 指定是否提前启动预拉取

This commit is contained in:
shikong 2024-04-29 00:37:01 +08:00
parent d1829901bf
commit 4799d83014
3 changed files with 36 additions and 23 deletions

View File

@ -68,6 +68,7 @@ import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.Duration;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -222,7 +223,7 @@ public class Gb28181DownloadService {
}); });
} }
download(deviceCode, startTime, endTime, useDownload).whenComplete((videoInfo, e) -> { download(deviceCode, startTime, endTime, useDownload, true).whenComplete((videoInfo, e) -> {
writeFileHeader(response, deviceCode, startTime, endTime, fileHeader); writeFileHeader(response, deviceCode, startTime, endTime, fileHeader);
log.info("videoInfo {}", videoInfo); log.info("videoInfo {}", videoInfo);
if (e != null) { if (e != null) {
@ -274,11 +275,11 @@ public class Gb28181DownloadService {
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求 // 间隔一定时间(200ms) 给设备足够的时间结束前次请求
scheduledExecutorService.schedule(()->{ scheduledExecutorService.schedule(()->{
download(deviceCode, startTime, endTime).whenComplete((videoInfo, e)->{ download(deviceCode, startTime, endTime, false).whenComplete((videoInfo, e)->{
log.info("获取媒体信息 {}", videoInfo); log.info("获取媒体信息 {}", videoInfo);
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey); String existCallId = RedisUtil.StringOps.get(cacheKey);
// 到达时间后主动结束, 防止某些设备不会主动结束 // 到达时间后 延迟 10秒 主动结束, 防止某些设备不会主动结束
scheduledExecutorService.schedule(()->{ scheduledExecutorService.schedule(()->{
log.info("到达结束时间 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId()); log.info("到达结束时间 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId());
String deviceIp = docking.getIp(); String deviceIp = docking.getIp();
@ -291,7 +292,7 @@ public class Gb28181DownloadService {
zlmMediaService.closeRtpServer(CloseRtpServer.builder() zlmMediaService.closeRtpServer(CloseRtpServer.builder()
.streamId(videoInfo.streamId) .streamId(videoInfo.streamId)
.build()); .build());
}, time, TimeUnit.MILLISECONDS); }, time + Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS);
String url = videoWsUrl(videoInfo.getUrl()); String url = videoWsUrl(videoInfo.getUrl());
url = StringUtils.replaceOnce(url, ".live.flv", ".live.mp4"); url = StringUtils.replaceOnce(url, ".live.flv", ".live.mp4");
@ -368,7 +369,7 @@ public class Gb28181DownloadService {
DateTime start = DateUtil.date(); DateTime start = DateUtil.date();
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse(); HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
try{ try{
download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{ download(deviceCode, startTime,endTime, true).whenComplete((videoInfo, e)->{
streamHeader(asyncResponse); streamHeader(asyncResponse);
if(e != null){ if(e != null){
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage())); writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
@ -454,13 +455,13 @@ public class Gb28181DownloadService {
} }
String ssrc = ssrcService.getPlaySsrc(); String ssrc = ssrcService.getPlaySsrc();
GB28181Description gb28181Description = GB28181SDPBuilder.Receiver.play(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode); GB28181Description gb28181Description = GB28181SDPBuilder.Receiver.play(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode);
sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result)); sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result, false));
return result; return result;
} }
@SneakyThrows @SneakyThrows
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture<VideoInfo> result) { public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture<VideoInfo> result, Boolean prefetch) {
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey); String existCallId = RedisUtil.StringOps.get(cacheKey);
@ -479,7 +480,7 @@ public class Gb28181DownloadService {
CallIdHeader callId = provider.getNewCallId(); CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey); subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS); Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS, prefetch);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
RedisUtil.StringOps.set(cacheKey, callId.getCallId()); RedisUtil.StringOps.set(cacheKey, callId.getCallId());
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
@ -487,12 +488,12 @@ public class Gb28181DownloadService {
} }
} }
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime) { public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime, Boolean prefetch) {
return download(deviceCode,startTime,endTime, proxySipConfig.isUsePlaybackToDownload()); return download(deviceCode,startTime,endTime, proxySipConfig.isUsePlaybackToDownload(), prefetch);
} }
@SneakyThrows @SneakyThrows
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime, Boolean useDownload) { public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch) {
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
if (deviceByDeviceCode.isEmpty()) { if (deviceByDeviceCode.isEmpty()) {
String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode); String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode);
@ -500,12 +501,12 @@ public class Gb28181DownloadService {
throw new JsonException(reason); throw new JsonException(reason);
} else { } else {
WvpProxyDevice device = deviceByDeviceCode.get(); WvpProxyDevice device = deviceByDeviceCode.get();
return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime, useDownload); return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime, useDownload, prefetch);
} }
} }
@SneakyThrows @SneakyThrows
public CompletableFuture<VideoInfo> download(String gbDeviceId, String channel, Date startTime, Date endTime, Boolean useDownload){ public CompletableFuture<VideoInfo> download(String gbDeviceId, String channel, Date startTime, Date endTime, Boolean useDownload, Boolean prefetch){
CompletableFuture<VideoInfo> result = new CompletableFuture<>(); CompletableFuture<VideoInfo> result = new CompletableFuture<>();
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId); Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
@ -559,12 +560,12 @@ public class Gb28181DownloadService {
URIField uriField = new URIField(); URIField uriField = new URIField();
uriField.setURI(StringUtils.joinWith(":", channel, "0")); uriField.setURI(StringUtils.joinWith(":", channel, "0"));
gb28181Description.setURI(uriField); gb28181Description.setURI(uriField);
sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time)); sender.sendRequest(inviteRequest(docking, device, gb28181Description, action, ssrc, streamId, result, time, prefetch));
return result; return result;
} }
@SneakyThrows @SneakyThrows
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time) { public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, GB28181SDPBuilder.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time, Boolean prefetch) {
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey); String existCallId = RedisUtil.StringOps.get(cacheKey);
@ -583,16 +584,18 @@ public class Gb28181DownloadService {
CallIdHeader callId = provider.getNewCallId(); CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey); subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS); Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS, prefetch);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
RedisUtil.StringOps.set(cacheKey, callId.getCallId()); RedisUtil.StringOps.set(cacheKey, callId.getCallId());
// 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流 if(prefetch){
result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device)); // 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流
result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device));
}
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
}; };
} }
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit){ public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit, Boolean prefetch){
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1]; ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() { Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription; private Flow.Subscription subscription;
@ -614,6 +617,10 @@ public class Gb28181DownloadService {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
log.info("收到响应状态 {}", statusCode); log.info("收到响应状态 {}", statusCode);
String callId = item.getCallId().getCallId(); String callId = item.getCallId().getCallId();
if(!prefetch){
// 相应 200OK 后再返回, 用于对延迟不敏感的实时请求
result.complete(new VideoInfo(streamId, videoRtmpUrl(streamId), item.getCallId().getCallId(), device));
}
scheduledExecutorService.schedule(()->{ scheduledExecutorService.schedule(()->{
sender.sendRequest(((provider, ip, port) -> { sender.sendRequest(((provider, ip, port) -> {

View File

@ -75,7 +75,8 @@ proxy:
# - 192.168.0.195 # - 192.168.0.195
stream-mode: udp stream-mode: udp
use-playback-to-download: false use-playback-to-download: false
proxy-media-url: 'https://10.10.10.200:18181/media' # proxy-media-url: 'https://10.10.10.200:18181/media'
proxy-media-url: 'https://10.10.10.200:5444'
use-record-info-query-before-download: true use-record-info-query-before-download: true
retry-record-info-query-before-download-interval: 3 retry-record-info-query-before-download-interval: 3
retry-record-info-query-before-download-times: 20 retry-record-info-query-before-download-times: 20
@ -96,11 +97,16 @@ ffmpeg-support:
# output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f flv # -rtsp_transport tcp # output: -enc_time_base -1 -preset ultrafast -tune zerolatency -vcodec libx264 -an -movflags faststart -f flv # -rtsp_transport tcp
#output: -c:v libx264 -an -f flv # -rtsp_transport tcp #output: -c:v libx264 -an -f flv # -rtsp_transport tcp
output: -c:v copy -an -f flv output: -c:v copy -an -f flv
download: -rw_timeout 10000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 128 -i #download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i
download: -rw_timeout 30000000 -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 1 -i
log-level: error
# download: -rtmp_live recorded -tcp_nodelay 1 -thread_queue_size 128 -i
debug: debug:
download: false download: false
input: false input: false
output: false output: false
tmp-dir: G:\Temp\record\download-proxy
use-tmp-file: true
# [可选] 日志配置, 一般不需要改 # [可选] 日志配置, 一般不需要改
logging: logging:
@ -110,4 +116,4 @@ report:
enabled: false enabled: false
url: http://127.0.0.1:8080/api/report url: http://127.0.0.1:8080/api/report
custom-headers: custom-headers:
agent: gb28181-proxy agent: gb28181-proxy

View File

@ -57,7 +57,7 @@
<!-- <docker.registry.password>XXX</docker.registry.password>--> <!-- <docker.registry.password>XXX</docker.registry.password>-->
<docker.maven.plugin.version>1.4.13</docker.maven.plugin.version> <docker.maven.plugin.version>1.4.13</docker.maven.plugin.version>
<gb28181.docking.version>0.1.0-SNAPSHOT</gb28181.docking.version> <gb28181.docking.version>0.1.0</gb28181.docking.version>
</properties> </properties>
<profiles> <profiles>