Compare commits
38 Commits
Author | SHA1 | Date | |
---|---|---|---|
48cbc45b34 | |||
52e1f769be | |||
bb118293fd | |||
b955abc852 | |||
c51d353e21 | |||
0ef41d3d92 | |||
ea824175ea | |||
2de6d080cf | |||
d936e1a2b6 | |||
d01d76304b | |||
d0803c6b62 | |||
9a9db33e6f | |||
6966432e76 | |||
8a14fa19cd | |||
723ad67df9 | |||
d64bba2c9c | |||
39af43f7aa | |||
28dfe97a5a | |||
50b7bb4f00 | |||
2374f7896a | |||
7b230144d3 | |||
7b350e899b | |||
0fd058e1c5 | |||
d9376eebf7 | |||
aa327ace86 | |||
9914186862 | |||
b61056e8b8 | |||
1ac99217f5 | |||
8f0b0be8b3 | |||
ff6a5484e3 | |||
8fefeb2187 | |||
ab5a5d6666 | |||
0f1e772168 | |||
fb02f559d4 | |||
d8cab292de | |||
4b55f7fc54 | |||
b42b8be747 | |||
0fc5de87c8 |
@ -28,7 +28,7 @@
|
||||
<dependency>
|
||||
<groupId>cn.skcks.docking.gb28181</groupId>
|
||||
<artifactId>annotation</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<version>${gb28181.docking.version}</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
|
@ -1,12 +1,17 @@
|
||||
package cn.skcks.docking.gb28181.mocking.api.zlm;
|
||||
|
||||
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
|
||||
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
|
||||
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto.ZlmPublishDTO;
|
||||
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamChangeDTO;
|
||||
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamNoneReaderDTO;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmPublishHookService;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
@ -22,17 +27,27 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
public class ZlmHookApi {
|
||||
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
|
||||
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
|
||||
private final ZlmPublishHookService zlmPublishHookService;
|
||||
|
||||
@PostJson("/on_stream_changed")
|
||||
public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){
|
||||
log.debug("on_stream_changed {}", dto);
|
||||
if(StringUtils.equalsIgnoreCase(dto.getSchema(), "rtsp")){
|
||||
if(StringUtils.equalsIgnoreCase(dto.getSchema(), "rtmp")){
|
||||
zlmStreamChangeHookService.processEvent(dto.getApp(),dto.getStream(), dto.getRegist());
|
||||
}
|
||||
}
|
||||
|
||||
@PostJson("/on_stream_none_reader")
|
||||
public void onStreamNoneReader(@RequestBody ZlmStreamNoneReaderDTO dto){
|
||||
zlmStreamNoneReaderHookService.processEvent(dto.getApp(),dto.getStream());
|
||||
if(StringUtils.equalsIgnoreCase(dto.getSchema(), "rtmp")){
|
||||
zlmStreamNoneReaderHookService.processEvent(dto.getApp(),dto.getStream());
|
||||
}
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@PostJson("/on_publish")
|
||||
public ZlmResponse<Void> onPublish(@RequestBody ZlmPublishDTO dto){
|
||||
zlmPublishHookService.processEvent(dto);
|
||||
return new ZlmResponse<>(ResponseStatus.Success, null, "");
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,12 @@ public class DeviceProxyConfig {
|
||||
* 是否只通过代理拉取指定时间范围内的视频查询请求
|
||||
*/
|
||||
private Boolean proxyVideoInTimeRange = true;
|
||||
|
||||
/**
|
||||
* 实时视频单次允许最大播放时长
|
||||
*/
|
||||
private Duration realTimeVideoMaxPlayTime = Duration.ofMinutes(15);
|
||||
|
||||
/**
|
||||
* 代理该时间段内的历史视频查询请求
|
||||
*/
|
||||
@ -40,6 +46,10 @@ public class DeviceProxyConfig {
|
||||
public static class PreDownloadForRecordInfo {
|
||||
private Boolean enable = true;
|
||||
private Duration timeRange = Duration.ofMinutes(5);
|
||||
/**
|
||||
* 分片时长, 当请求时间超过该时长时,将分片下载
|
||||
*/
|
||||
private Duration timeSplit = Duration.ofSeconds(30);
|
||||
private String cachePath = "./record";
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ public class FfmpegConfig {
|
||||
private String input = "-re -i";
|
||||
private String output = "-vcodec h264 -acodec aac -f rtp_mpegts";
|
||||
private String logLevel = "error";
|
||||
private Boolean useRtpToDownload = false;
|
||||
}
|
||||
|
||||
|
||||
|
@ -28,7 +28,7 @@ public class MockingExecutor{
|
||||
public ThreadPoolTaskExecutor sipTaskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(CPU_NUM * 2);
|
||||
executor.setMaxPoolSize(100);
|
||||
executor.setMaxPoolSize(1000);
|
||||
executor.setQueueCapacity(10000);
|
||||
executor.setKeepAliveSeconds(30);
|
||||
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
|
||||
|
@ -4,9 +4,11 @@ import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateUnit;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO;
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoItemDTO;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoResponseDTO;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordListDTO;
|
||||
@ -20,6 +22,7 @@ import gov.nist.javax.sip.message.SIPRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.header.CallIdHeader;
|
||||
@ -29,6 +32,10 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@ -38,24 +45,27 @@ public class RecordInfoRequestProcessor {
|
||||
private final DeviceService deviceService;
|
||||
private final DeviceProxyConfig deviceProxyConfig;
|
||||
private final VideoCacheManager videoCacheManager;
|
||||
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final Executor executor;
|
||||
|
||||
public void process(SIPRequest request, byte[] content) {
|
||||
String senderIp = request.getLocalAddress().getHostAddress();
|
||||
String transport = request.getTopmostViaHeader().getTransport();
|
||||
RecordInfoRequestDTO recordInfoRequestDTO = XmlUtils.parse(content, RecordInfoRequestDTO.class);
|
||||
String id = recordInfoRequestDTO.getDeviceId();
|
||||
|
||||
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
|
||||
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
|
||||
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
|
||||
} else {
|
||||
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
|
||||
processRecordInfoRespWithVideoCacheTask(request, senderIp, transport, device, recordInfoRequestDTO);
|
||||
}
|
||||
}, () -> {
|
||||
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
|
||||
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
|
||||
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
|
||||
} else {
|
||||
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
|
||||
processRecordInfoRespWithVideoCacheTask(request, senderIp, transport, device, recordInfoRequestDTO);
|
||||
}
|
||||
}, () -> {
|
||||
log.error("未能找到 deviceId: {} 的相关信息", id);
|
||||
@ -64,6 +74,24 @@ public class RecordInfoRequestProcessor {
|
||||
});
|
||||
}
|
||||
|
||||
private void processRecordInfoRespWithVideoCacheTask(SIPRequest request, String senderIp, String transport, MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO){
|
||||
CompletableFuture<Runnable> future = CompletableFuture.supplyAsync(()-> {
|
||||
try {
|
||||
preDownloadVideoTask(device.getDeviceCode(), recordInfoRequestDTO).get();
|
||||
return ()-> sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error("preDownloadVideoTask error",e);
|
||||
return() -> sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
|
||||
}
|
||||
}, executor);
|
||||
future.completeOnTimeout(() -> sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport),
|
||||
1, TimeUnit.MINUTES);
|
||||
future.thenApplyAsync(fn->{
|
||||
fn.run();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private boolean preDownloadVideo(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){
|
||||
if(!deviceProxyConfig.getPreDownloadForRecordInfo().getEnable()){
|
||||
return true;
|
||||
@ -75,10 +103,16 @@ public class RecordInfoRequestProcessor {
|
||||
return true;
|
||||
}
|
||||
|
||||
return preDownloadVideoTask(deviceCode,recordInfoRequestDTO).isDone();
|
||||
}
|
||||
|
||||
private CompletableFuture<JsonResponse<String>> preDownloadVideoTask(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){
|
||||
Date startTime = recordInfoRequestDTO.getStartTime();
|
||||
Date endTime = recordInfoRequestDTO.getEndTime();
|
||||
|
||||
// 添加预下载任务
|
||||
videoCacheManager.addTask(deviceCode,startTime,endTime);
|
||||
|
||||
return videoCacheManager.get(deviceCode,startTime,endTime).isDone();
|
||||
return videoCacheManager.get(deviceCode,startTime,endTime);
|
||||
}
|
||||
|
||||
private void sendRecordInfo(MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO, SIPRequest request, String senderIp, String transport) {
|
||||
@ -114,7 +148,7 @@ public class RecordInfoRequestProcessor {
|
||||
|
||||
FromHeader fromHeader = request.getFromHeader();
|
||||
ListUtil.partition(recordInfoItemDTOList,50).forEach(recordList->{
|
||||
RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO();
|
||||
final RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO();
|
||||
recordInfoResponseDTO.setSn(recordInfoRequestDTO.getSn());
|
||||
recordInfoResponseDTO.setDeviceId(device.getGbChannelId());
|
||||
recordInfoResponseDTO.setName(device.getName());
|
||||
@ -125,10 +159,11 @@ public class RecordInfoRequestProcessor {
|
||||
.build();
|
||||
recordInfoResponseDTO.setRecordList(recordListDTO);
|
||||
|
||||
final String xml = XmlUtils.toXml(recordInfoResponseDTO);
|
||||
sender.sendRequest((provider, ip, port) -> {
|
||||
CallIdHeader callIdHeader = provider.getNewCallId();
|
||||
return SipRequestBuilder.createMessageRequest(device,
|
||||
ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader);
|
||||
ip, port, 1, xml, fromHeader.getTag(), callIdHeader);
|
||||
});
|
||||
});
|
||||
}
|
||||
@ -147,11 +182,12 @@ public class RecordInfoRequestProcessor {
|
||||
.build();
|
||||
recordInfoResponseDTO.setRecordList(recordListDTO);
|
||||
|
||||
final String xml = XmlUtils.toXml(recordInfoResponseDTO);
|
||||
FromHeader fromHeader = request.getFromHeader();
|
||||
sender.sendRequest((provider, ip, port) -> {
|
||||
CallIdHeader callIdHeader = provider.getNewCallId();
|
||||
return SipRequestBuilder.createMessageRequest(device,
|
||||
ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader);
|
||||
ip, port, 1, xml, fromHeader.getTag(), callIdHeader);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -3,16 +3,20 @@ package cn.skcks.docking.gb28181.mocking.core.sip.service;
|
||||
import cn.hutool.cache.CacheUtil;
|
||||
import cn.hutool.cache.impl.TimedCache;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUnit;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.net.url.UrlBuilder;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.exec.DefaultExecuteResultHandler;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hc.client5.http.classic.HttpClient;
|
||||
import org.apache.hc.client5.http.classic.methods.HttpGet;
|
||||
@ -24,10 +28,15 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -38,6 +47,8 @@ public class VideoCacheManager {
|
||||
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final Executor executor;
|
||||
|
||||
private final FfmpegSupportService ffmpegSupportService;
|
||||
|
||||
private final TimedCache<String, CompletableFuture<JsonResponse<String>>> tasks =
|
||||
CacheUtil.newTimedCache(TimeUnit.MINUTES.toMillis(30));
|
||||
|
||||
@ -85,13 +96,80 @@ public class VideoCacheManager {
|
||||
|
||||
@SneakyThrows
|
||||
protected CompletableFuture<JsonResponse<String>> downloadVideo(String deviceCode, Date startTime, Date endTime) {
|
||||
File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4").toFile();
|
||||
String fileName = fileName(deviceCode, startTime, endTime);
|
||||
File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName + ".mp4").toFile();
|
||||
if(realFile.exists()){
|
||||
log.info("文件 {} 已缓存, 直接返回", realFile.getAbsolutePath());
|
||||
return CompletableFuture.completedFuture(JsonResponse.success(realFile.getAbsolutePath()));
|
||||
}
|
||||
|
||||
return CompletableFuture.supplyAsync(()->{
|
||||
long between = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
||||
long splitTime = deviceProxyConfig.getPreDownloadForRecordInfo().getTimeSplit().getSeconds();
|
||||
if(between > splitTime){
|
||||
log.info("时间间隔超过 {} 秒, 将分片下载", splitTime);
|
||||
DateTime splitStartTime = DateUtil.date(startTime);
|
||||
DateTime splitEndTime = DateUtil.offsetSecond(startTime, (int) splitTime);
|
||||
List<CompletableFuture<JsonResponse<String>>> completableFutures = new ArrayList<>();
|
||||
|
||||
while(splitEndTime.getTime() < endTime.getTime()){
|
||||
String splitFileName = fileName(deviceCode, splitStartTime, splitEndTime);
|
||||
File tmpFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),splitFileName + ".mp4.tmp").toFile();
|
||||
if(tmpFile.exists()){
|
||||
tmpFile.delete();
|
||||
log.info("删除已存在但未完成下载的临时文件 => {}", tmpFile.getAbsolutePath());
|
||||
}
|
||||
// 添加分片任务
|
||||
addTask(deviceCode, splitStartTime, splitEndTime);
|
||||
completableFutures.add(get(deviceCode, splitStartTime, splitEndTime));
|
||||
// 更新起止时间
|
||||
splitStartTime = DateUtil.offsetSecond(splitStartTime, (int) splitTime);
|
||||
splitEndTime = DateUtil.offsetSecond(splitEndTime, (int) splitTime);
|
||||
if(splitEndTime.getTime() >= endTime.getTime()){
|
||||
splitEndTime = DateUtil.date(endTime);
|
||||
addTask(deviceCode, splitStartTime, splitEndTime);
|
||||
completableFutures.add(get(deviceCode, splitStartTime, splitEndTime));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
CompletableFuture.allOf(completableFutures.toArray(CompletableFuture[]::new));
|
||||
String concatFileName = fileName + ".mp4.concat";
|
||||
File concatFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(), concatFileName).toFile();
|
||||
if(concatFile.exists()){
|
||||
concatFile.delete();
|
||||
log.info("删除已存在但未完成合并的临时合并配置文件 => {}", concatFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
try {
|
||||
concatFile.createNewFile();
|
||||
try(FileWriter fileWriter = new FileWriter(concatFile)){
|
||||
for (CompletableFuture<JsonResponse<String>> result : completableFutures) {
|
||||
String splitFilePath = result.get().getData();
|
||||
String config = String.format("file '%s'\n", splitFilePath);
|
||||
fileWriter.write(config);
|
||||
log.debug("{}", config);
|
||||
}
|
||||
}
|
||||
log.info("生成临时合并配置文件 {}", concatFile.getAbsolutePath());
|
||||
|
||||
log.info("开始合并视频 => {}", realFile.getAbsolutePath());
|
||||
DefaultExecuteResultHandler executeResultHandler = new DefaultExecuteResultHandler();
|
||||
ffmpegSupportService.ffmpegConcatExecutor(concatFile.getAbsolutePath(), realFile.getAbsolutePath(), executeResultHandler);
|
||||
executeResultHandler.waitFor();
|
||||
|
||||
if(realFile.exists()){
|
||||
log.info("视频合并成功 => {}", realFile.getAbsolutePath());
|
||||
return JsonResponse.success(realFile.getAbsolutePath());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("合并分片视频异常 => {}", e.getMessage());
|
||||
return JsonResponse.error(e.getMessage());
|
||||
} finally {
|
||||
System.gc();
|
||||
log.info("删除临时合并配置文件 {} => {}", concatFile.getAbsolutePath(), concatFile.delete());
|
||||
}
|
||||
}
|
||||
final String url = UrlBuilder.of(deviceProxyConfig.getUrl())
|
||||
.addPath("video")
|
||||
.addQuery("device_id", deviceCode)
|
||||
@ -127,6 +205,7 @@ public class VideoCacheManager {
|
||||
return JsonResponse.success(realFile.getAbsolutePath());
|
||||
} catch (Exception e) {
|
||||
log.error("视频下载失败 => {}", e.getMessage());
|
||||
System.gc();
|
||||
file.delete();
|
||||
return JsonResponse.error(e.getMessage());
|
||||
}
|
||||
|
@ -23,7 +23,11 @@ import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp;
|
||||
import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp;
|
||||
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
|
||||
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.*;
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtmpConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
|
||||
@ -32,6 +36,7 @@ import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.service.VideoCacheManager;
|
||||
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
|
||||
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmPublishHookService;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
|
||||
import cn.skcks.docking.gb28181.sdp.GB28181Description;
|
||||
@ -41,6 +46,7 @@ import com.github.rholder.retry.Retryer;
|
||||
import com.github.rholder.retry.RetryerBuilder;
|
||||
import com.github.rholder.retry.StopStrategies;
|
||||
import com.github.rholder.retry.WaitStrategies;
|
||||
import gov.nist.javax.sdp.MediaDescriptionImpl;
|
||||
import gov.nist.javax.sip.message.SIPRequest;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.*;
|
||||
@ -49,15 +55,19 @@ import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteResultHandler;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.sdp.MediaDescription;
|
||||
import javax.sdp.SdpException;
|
||||
import javax.sip.SipProvider;
|
||||
import javax.sip.address.SipURI;
|
||||
import javax.sip.header.CallIdHeader;
|
||||
import javax.sip.message.Request;
|
||||
import javax.sip.message.Response;
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
@ -66,6 +76,7 @@ import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -96,9 +107,14 @@ public class DeviceProxyService {
|
||||
|
||||
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
|
||||
|
||||
private final ZlmPublishHookService zlmPublishHookService;
|
||||
|
||||
private final FfmpegConfig ffmpegConfig;
|
||||
|
||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(128);
|
||||
|
||||
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final java.util.concurrent.Executor executor;
|
||||
|
||||
public interface TaskProcessor {
|
||||
void process(Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
|
||||
@ -109,50 +125,80 @@ public class DeviceProxyService {
|
||||
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
|
||||
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
|
||||
// zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
|
||||
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
|
||||
.retryIfException()
|
||||
.retryIfRuntimeException()
|
||||
// 重试间隔
|
||||
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
|
||||
// 重试次数
|
||||
.withStopStrategy(StopStrategies.stopAfterAttempt(3000))
|
||||
.build();
|
||||
// zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||
try {
|
||||
retryer.call(()->{
|
||||
StartSendRtp startSendRtp = new StartSendRtp();
|
||||
startSendRtp.setApp(DEFAULT_ZLM_APP);
|
||||
startSendRtp.setStream(callId);
|
||||
startSendRtp.setSsrc(ssrc);
|
||||
startSendRtp.setDstUrl(toAddr);
|
||||
startSendRtp.setDstPort(toPort);
|
||||
startSendRtp.setUdp(!tcp);
|
||||
log.info("startSendRtp {}",startSendRtp);
|
||||
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
|
||||
log.info("startSendRtpResp {}",startSendRtpResp);
|
||||
return startSendRtpResp;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("zlm rtp 推流失败",e);
|
||||
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
|
||||
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
// });
|
||||
AtomicReference<String> failMag = new AtomicReference<>("");
|
||||
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
|
||||
.retryIfResult(resp -> {
|
||||
if(resp != null){
|
||||
failMag.set(resp.toString());
|
||||
}
|
||||
return resp.getLocalPort() == null || resp.getLocalPort() <= 0;
|
||||
})
|
||||
.retryIfException()
|
||||
.retryIfRuntimeException()
|
||||
// 重试间隔
|
||||
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
|
||||
// 重试次数
|
||||
.withStopStrategy(StopStrategies.stopAfterAttempt(10 * 1000))
|
||||
.build();
|
||||
zlmPublishHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||
executor.execute(()->{
|
||||
try {
|
||||
StartSendRtp startSendRtp = new StartSendRtp();
|
||||
StartSendRtpResp sendRtpResp = retryer.call(() -> {
|
||||
startSendRtp.setApp(DEFAULT_ZLM_APP);
|
||||
startSendRtp.setStream(callId);
|
||||
startSendRtp.setSsrc(ssrc);
|
||||
startSendRtp.setDstUrl(toAddr);
|
||||
startSendRtp.setDstPort(toPort);
|
||||
startSendRtp.setUdp(!tcp);
|
||||
// log.debug("startSendRtp {}",startSendRtp);
|
||||
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
|
||||
// log.debug("startSendRtpResp {}",startSendRtpResp);
|
||||
return startSendRtpResp;
|
||||
});
|
||||
|
||||
// });
|
||||
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||
StopSendRtp stopSendRtp = new StopSendRtp();
|
||||
stopSendRtp.setApp(DEFAULT_ZLM_APP);
|
||||
stopSendRtp.setStream(callId);
|
||||
stopSendRtp.setSsrc(ssrc);
|
||||
log.info("sendRtp 推流成功 {} {}, req => {}, resp => {}", device.getDeviceCode(),device.getGbChannelId(), startSendRtp, sendRtpResp);
|
||||
} catch (Exception e) {
|
||||
log.error("zlm rtp 推流失败, {}, {} {} {}, {}", failMag.get(), device.getDeviceCode(),device.getGbChannelId(), callId, e.getMessage());
|
||||
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
|
||||
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
zlmStreamRegistHookEvent(DEFAULT_ZLM_APP, callId, ssrc);
|
||||
zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||
sendBye(request,device,key);
|
||||
});
|
||||
}
|
||||
|
||||
private void zlmStreamRegistHookEvent(String app, String callId, String ssrc){
|
||||
zlmStreamChangeHookService.getUnregistHandler(app).put(callId,()->{
|
||||
executor.execute(()->{
|
||||
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
|
||||
StopSendRtp stopSendRtp = new StopSendRtp();
|
||||
stopSendRtp.setApp(app);
|
||||
stopSendRtp.setStream(callId);
|
||||
stopSendRtp.setSsrc(ssrc);
|
||||
|
||||
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", app, callId, ssrc);
|
||||
zlmMediaService.stopSendRtp(stopSendRtp);
|
||||
}, 10, TimeUnit.SECONDS);
|
||||
|
||||
// 如果 流 在 10秒内 重新注册, 则 取消停止RTP推流
|
||||
zlmStreamChangeHookService.getRegistHandler(app).put(callId,()->{
|
||||
schedule.cancel(true);
|
||||
zlmStreamRegistHookEvent(app, callId, ssrc);
|
||||
});
|
||||
|
||||
// 如果 注销 后 10.5 秒内 没有再注册, 就彻底取消相关事件的订阅
|
||||
scheduledExecutorService.schedule(()->{
|
||||
zlmStreamChangeHookService.getRegistHandler(app).remove(callId);
|
||||
},10500, TimeUnit.MILLISECONDS);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private Flow.Subscriber<SIPRequest> ffmpegTask(SIPRequest request,ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
|
||||
Optional.ofNullable(tasks.get(callId)).ifPresent(task->{
|
||||
task.getWatchdog().destroyProcess();
|
||||
@ -166,97 +212,124 @@ public class DeviceProxyService {
|
||||
|
||||
public TaskProcessor playbackTask(){
|
||||
return (Runnable sendOkResponse, SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
|
||||
scheduledExecutorService.schedule(() -> {
|
||||
trying(request);
|
||||
sendOkResponse.run();
|
||||
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
|
||||
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
|
||||
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
subscription.request(1);
|
||||
}
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("{}", e.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(SIPRequest item) {
|
||||
subscribe.getAckSubscribe().delPublisher(ackKey);
|
||||
}
|
||||
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
|
||||
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
|
||||
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
}
|
||||
@Override
|
||||
public void onNext(SIPRequest item) {
|
||||
subscribe.getAckSubscribe().delPublisher(ackKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
||||
try {
|
||||
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||
callbackTask.put(device.getDeviceCode(), executor);
|
||||
executeResultHandler.waitFor();
|
||||
} catch (Exception e) {
|
||||
sendBye(request,device,"");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
||||
try {
|
||||
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
||||
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||
callbackTask.put(device.getDeviceCode(), executor);
|
||||
executeResultHandler.waitFor();
|
||||
} catch (Exception e) {
|
||||
sendBye(request, device, "");
|
||||
log.error("{}", e.getMessage());
|
||||
}
|
||||
});
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
});
|
||||
trying(request);
|
||||
sendOkResponse.run();
|
||||
};
|
||||
}
|
||||
|
||||
public TaskProcessor downloadTask(){
|
||||
return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
|
||||
scheduledExecutorService.schedule(() -> {
|
||||
trying(request);
|
||||
sendOkResponse.run();
|
||||
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
|
||||
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
|
||||
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
subscription.request(1);
|
||||
}
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("{}", e.getMessage());
|
||||
}
|
||||
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
|
||||
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
|
||||
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
|
||||
private SIPRequest ackRequest;
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(SIPRequest item) {
|
||||
subscribe.getAckSubscribe().delPublisher(ackKey);
|
||||
}
|
||||
@Override
|
||||
public void onNext(SIPRequest item) {
|
||||
ackRequest = item;
|
||||
subscribe.getAckSubscribe().delPublisher(ackKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
}
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
|
||||
try {
|
||||
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
||||
scheduledExecutorService.submit(()->{
|
||||
try {
|
||||
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onComplete() {
|
||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
|
||||
try {
|
||||
if(ackRequest != null){
|
||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||
if (!ffmpegConfig.getRtp().getUseRtpToDownload()) {
|
||||
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
||||
executor.execute(()->{
|
||||
try {
|
||||
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||
downloadTask.put(device.getDeviceCode(), executor);
|
||||
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time, executeResultHandler);
|
||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||
downloadTask.put(device.getDeviceCode(), executor);
|
||||
} else {
|
||||
String rtpUrl = getRtpUrl(request);
|
||||
Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time, executeResultHandler);
|
||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||
downloadTask.put(device.getDeviceCode(), executor);
|
||||
}
|
||||
executeResultHandler.waitFor();
|
||||
} catch (Exception e) {
|
||||
sendBye(request, device, "");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
sendBye(request, device, "");
|
||||
log.error("{}", e.getMessage());
|
||||
}
|
||||
});
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
});
|
||||
trying(request);
|
||||
sendOkResponse.run();
|
||||
};
|
||||
}
|
||||
|
||||
private static String getRtpUrl(SIPRequest request) throws ParseException, SdpException {
|
||||
String contentString = new String(request.getRawContent());
|
||||
GB28181DescriptionParser gb28181DescriptionParser = new GB28181DescriptionParser(contentString);
|
||||
GB28181Description sdp = gb28181DescriptionParser.parse();
|
||||
String rtpIp = sdp.getConnection().getAddress();
|
||||
MediaDescriptionImpl media = (MediaDescriptionImpl) sdp.getMediaDescriptions(true).get(0);
|
||||
return "rtp://" + rtpIp + ":" + media.getMedia().getMediaPort();
|
||||
}
|
||||
|
||||
private String getZlmRtmpUrl(String app, String streamId){
|
||||
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId;
|
||||
}
|
||||
@ -401,7 +474,7 @@ public class DeviceProxyService {
|
||||
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
|
||||
.build();
|
||||
|
||||
String toUrl = "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId;
|
||||
String toUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId;
|
||||
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||
try {
|
||||
ZlmResponse<AddFFmpegSourceResp> sourceResp = retryer.call(() -> zlmMediaService.addFfmpegSource(AddFFmpegSource.builder()
|
||||
@ -436,7 +509,7 @@ public class DeviceProxyService {
|
||||
return startSendRtpResp;
|
||||
});
|
||||
} catch (Exception e){
|
||||
log.error("zlm rtp 推流失败",e);
|
||||
log.error("zlm rtp 推流失败, {} {} {}, {}", device.getDeviceCode(),device.getGbChannelId(), callId, e.getMessage());
|
||||
sendBye(request, device, "");
|
||||
}
|
||||
});
|
||||
@ -447,6 +520,7 @@ public class DeviceProxyService {
|
||||
stopSendRtp.setStream(callId);
|
||||
stopSendRtp.setSsrc(ssrc);
|
||||
|
||||
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", ZLM_FFMPEG_PROXY_APP, callId, ssrc);
|
||||
zlmMediaService.stopSendRtp(stopSendRtp);
|
||||
});
|
||||
|
||||
@ -506,6 +580,21 @@ public class DeviceProxyService {
|
||||
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
|
||||
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
|
||||
|
||||
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
|
||||
log.warn("到达最长播放时间 {}, 强制关闭实时视频播放 {} {}", proxyConfig.getRealTimeVideoMaxPlayTime(), device.getGbChannelId(), callId);
|
||||
sendBye(request, device, "");
|
||||
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
|
||||
RedisUtil.KeyOps.delete(cacheKey);
|
||||
|
||||
StopSendRtp stopSendRtp = new StopSendRtp();
|
||||
stopSendRtp.setApp(DEFAULT_ZLM_APP);
|
||||
stopSendRtp.setStream(callId);
|
||||
stopSendRtp.setSsrc(ssrc);
|
||||
|
||||
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", DEFAULT_ZLM_APP, callId, ssrc);
|
||||
zlmMediaService.stopSendRtp(stopSendRtp);
|
||||
}, proxyConfig.getRealTimeVideoMaxPlayTime().toMillis(), TimeUnit.MILLISECONDS);
|
||||
|
||||
Retryer<StartSendRtpResp> rtpRetryer = rtpRetryer();
|
||||
zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||
try {
|
||||
@ -523,18 +612,25 @@ public class DeviceProxyService {
|
||||
return startSendRtpResp;
|
||||
});
|
||||
} catch (Exception e){
|
||||
log.error("zlm rtp 推流失败",e);
|
||||
log.error("zlm rtp 推流失败, {} {} {}, {}", device.getDeviceCode(),device.getGbChannelId(), callId, e.getMessage());
|
||||
sendBye(request, device, "");
|
||||
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
|
||||
RedisUtil.KeyOps.delete(cacheKey);
|
||||
schedule.cancel(true);
|
||||
}
|
||||
});
|
||||
|
||||
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> {
|
||||
schedule.cancel(true);
|
||||
StopSendRtp stopSendRtp = new StopSendRtp();
|
||||
stopSendRtp.setApp(DEFAULT_ZLM_APP);
|
||||
stopSendRtp.setStream(callId);
|
||||
stopSendRtp.setSsrc(ssrc);
|
||||
|
||||
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", DEFAULT_ZLM_APP, callId, ssrc);
|
||||
zlmMediaService.stopSendRtp(stopSendRtp);
|
||||
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
|
||||
RedisUtil.KeyOps.delete(cacheKey);
|
||||
});
|
||||
|
||||
Flow.Subscriber<SIPRequest> subscriber = zlmByeSubscriber(key,request,device);
|
||||
@ -543,7 +639,6 @@ public class DeviceProxyService {
|
||||
subscribe.getByeSubscribe().addPublisher(key);
|
||||
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
|
||||
} catch (Exception e) {
|
||||
|
||||
log.error("zlm 代理拉流失败",e);
|
||||
sendBye(request, device, "");
|
||||
}
|
||||
@ -555,9 +650,15 @@ public class DeviceProxyService {
|
||||
CompletableFuture<JsonResponse<String>> task = videoCacheManager.get(device.getDeviceCode(), startTime, endTime);
|
||||
if(task != null){
|
||||
if(task.isDone()){
|
||||
String file = task.get().getData();
|
||||
log.info("本地视频已缓存, 将从本地缓存推流, 缓存文件 => {}", file);
|
||||
return file;
|
||||
String filePath = task.get().getData();
|
||||
File file = new File(filePath);
|
||||
if(StringUtils.isNotBlank(filePath) && file.exists()){
|
||||
log.info("本地视频已缓存, 将从本地缓存推流, 缓存文件 => {}", filePath);
|
||||
return filePath;
|
||||
} else {
|
||||
log.info("缓存已失效, 将直接使用 http代理 推流 并 重新缓存, 缓存文件 => {}", filePath);
|
||||
videoCacheManager.addTask(device.getDeviceCode(), startTime, endTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -619,9 +720,11 @@ public class DeviceProxyService {
|
||||
|
||||
optionalZlmStreamChangeHookHandler.ifPresent(handler -> {
|
||||
log.warn("流改变事件未结束 ZlmStreamChange {} {}, 强制结束", DEFAULT_ZLM_APP,callId);
|
||||
handler.handler();
|
||||
});
|
||||
optionalZlmStreamNoneReaderHandler.ifPresent(handler -> {
|
||||
log.warn("流无人观看事件未结束 ZlmStreamNoneReader {} {}, 强制结束", DEFAULT_ZLM_APP, callId);
|
||||
handler.handler();
|
||||
});
|
||||
sendBye(request,device,key);
|
||||
}
|
||||
|
@ -44,10 +44,10 @@ public class FfmpegSupportService {
|
||||
|
||||
log.info("视频下载参数 {}", inputParam);
|
||||
|
||||
String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), output);
|
||||
String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", "-t", unit.toSeconds(time), rtp.getOutput(), output);
|
||||
log.info("视频输出参数 {}", outputParam);
|
||||
|
||||
return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler);
|
||||
return ffmpegExecutor(inputParam, outputParam, time + 60, unit, resultHandler);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@ -55,6 +55,7 @@ public class FfmpegSupportService {
|
||||
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
|
||||
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
|
||||
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), logLevelParam, inputParam, outputParam);
|
||||
log.info("ffmpeg 命令 => {}", command);
|
||||
CommandLine commandLine = CommandLine.parse(command);
|
||||
Executor executor = new DefaultExecutor();
|
||||
ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time));
|
||||
@ -62,4 +63,18 @@ public class FfmpegSupportService {
|
||||
executor.execute(commandLine, resultHandler);
|
||||
return executor;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public Executor ffmpegConcatExecutor(String concatConfigFile, String outputFile, ExecuteResultHandler executeResultHandler){
|
||||
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
|
||||
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
|
||||
String inputParam = String.format("-y -f concat -safe 0 -i \"%s\"", concatConfigFile);
|
||||
String outputParam = String.format("-c copy \"%s\"", outputFile);
|
||||
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), logLevelParam, inputParam, outputParam);
|
||||
log.info("ffmpeg 视频合并 命令 => {}", command);
|
||||
CommandLine commandLine = CommandLine.parse(command);
|
||||
Executor executor = new DefaultExecutor();
|
||||
executor.execute(commandLine, executeResultHandler);
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
|
@ -52,11 +52,13 @@ public class RegisterService {
|
||||
|
||||
List<MockingDevice> enabledDevice = deviceService.getAllEnabledDevice();
|
||||
|
||||
List<CompletableFuture<JsonResponse<Void>>[]> completableFutures = ListUtil.split(enabledDevice, 10).stream().map(items -> {
|
||||
CompletableFuture<JsonResponse<Void>>[] array = enabledDevice.stream().map(this::register).toArray(CompletableFuture[]::new);
|
||||
List<CompletableFuture<JsonResponse<Void>>[]> completableFutures = new ArrayList<>();
|
||||
for (List<MockingDevice> mockingDevices : ListUtil.split(enabledDevice, 200)) {
|
||||
CompletableFuture<JsonResponse<Void>>[] array = mockingDevices.stream().map(this::register).toArray(CompletableFuture[]::new);
|
||||
CompletableFuture.allOf(array);
|
||||
return array;
|
||||
}).toList();
|
||||
Thread.sleep(500);
|
||||
completableFutures.add(array);
|
||||
}
|
||||
|
||||
List<CompletableFuture<JsonResponse<Void>>> reduce = completableFutures.stream().map(item -> Arrays.stream(item).toList())
|
||||
.reduce(new ArrayList<>(), (prev, cur) -> {
|
||||
|
@ -8,6 +8,7 @@ import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -17,7 +18,8 @@ import java.util.List;
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@Component
|
||||
public class ZlmInitService {
|
||||
public class ZlmInitService implements SmartLifecycle {
|
||||
private boolean running;
|
||||
private final ZlmMediaService zlmMediaService;
|
||||
private final ZlmHookConfig zlmHookConfig;
|
||||
@PostConstruct
|
||||
@ -28,6 +30,30 @@ public class ZlmInitService {
|
||||
HookConfig hook = config.getHook();
|
||||
hook.setOnStreamChanged(zlmHookConfig.getHook() + "/on_stream_changed");
|
||||
hook.setOnStreamNoneReader(zlmHookConfig.getHook() + "/on_stream_none_reader");
|
||||
hook.setOnPublish(zlmHookConfig.getHook() + "/on_publish");
|
||||
config.getRtmp().setHandshakeSecond(15);
|
||||
config.getRtmp().setKeepAliveSecond(10);
|
||||
config.getRtpProxy().setTimeoutSec(30);
|
||||
zlmMediaService.setServerConfig(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
running = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
ZlmResponse<List<ServerConfig>> serverConfig = zlmMediaService.getServerConfig();
|
||||
List<ServerConfig> data = serverConfig.getData();
|
||||
ServerConfig config = data.get(0);
|
||||
HookConfig hook = config.getHook();
|
||||
hook.setOnPublish("");
|
||||
running = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,60 @@
|
||||
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
|
||||
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto.ZlmPublishDTO;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ZlmPublishHookService {
|
||||
private final ZlmHookConfig zlmHookConfig;
|
||||
|
||||
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final Executor executor;
|
||||
|
||||
public interface ZlmPublishHookHandler {
|
||||
void handler();
|
||||
}
|
||||
|
||||
@Getter(AccessLevel.PRIVATE)
|
||||
private ConcurrentMap<String, ConcurrentMap<String, ZlmPublishHookHandler>> handler = new ConcurrentHashMap<>();
|
||||
|
||||
public ConcurrentMap<String, ZlmPublishHookHandler> getHandler(String app) {
|
||||
this.handler.putIfAbsent(app, new ConcurrentHashMap<>());
|
||||
return this.handler.get(app);
|
||||
}
|
||||
|
||||
|
||||
public void processEvent(ZlmPublishDTO dto) {
|
||||
String app = dto.getApp();
|
||||
String streamId = dto.getStream();
|
||||
String ip = dto.getIp();
|
||||
log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip);
|
||||
|
||||
ConcurrentMap<String, ZlmPublishHookHandler> handlers = getHandler(app);
|
||||
Optional.ofNullable(handlers.remove(streamId)).ifPresent((handler) -> {
|
||||
executor.execute(()->{
|
||||
handler.handler();
|
||||
try {
|
||||
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
log.error("{}", e.getMessage());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
@ -1,16 +1,19 @@
|
||||
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
|
||||
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
@ -18,6 +21,10 @@ import java.util.concurrent.ConcurrentMap;
|
||||
@RequiredArgsConstructor
|
||||
public class ZlmStreamChangeHookService {
|
||||
private final ZlmHookConfig zlmHookConfig;
|
||||
|
||||
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final Executor executor;
|
||||
|
||||
public interface ZlmStreamChangeHookHandler{
|
||||
void handler();
|
||||
}
|
||||
@ -43,22 +50,24 @@ public class ZlmStreamChangeHookService {
|
||||
if(regist){
|
||||
ConcurrentMap<String, ZlmStreamChangeHookHandler> registHandler = getRegistHandler(app);
|
||||
Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{
|
||||
try {
|
||||
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
handler.handler();
|
||||
executor.execute(()->{
|
||||
try {
|
||||
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
handler.handler();
|
||||
});
|
||||
});
|
||||
} else {
|
||||
ConcurrentMap<String, ZlmStreamChangeHookHandler> unregistHandler = getUnregistHandler(app);
|
||||
Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{
|
||||
try {
|
||||
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
handler.handler();
|
||||
executor.execute(()->{
|
||||
try {
|
||||
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
handler.handler();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -1,16 +1,19 @@
|
||||
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
|
||||
|
||||
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
|
||||
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
@ -19,6 +22,9 @@ import java.util.concurrent.ConcurrentMap;
|
||||
public class ZlmStreamNoneReaderHookService {
|
||||
private final ZlmHookConfig zlmHookConfig;
|
||||
|
||||
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
||||
private final Executor executor;
|
||||
|
||||
public interface ZlmStreamNoneReaderHookHandler {
|
||||
void handler();
|
||||
}
|
||||
@ -37,12 +43,14 @@ public class ZlmStreamNoneReaderHookService {
|
||||
|
||||
ConcurrentMap<String, ZlmStreamNoneReaderHookHandler> handlers = getHandler(app);
|
||||
Optional.ofNullable(handlers.remove(streamId)).ifPresent((handler) -> {
|
||||
try {
|
||||
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
handler.handler();
|
||||
executor.execute(()->{
|
||||
try {
|
||||
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||
} catch (InterruptedException ignored) {
|
||||
|
||||
}
|
||||
handler.handler();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,30 @@
|
||||
package cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ZlmPublishDTO {
|
||||
@JsonProperty("mediaServerId")
|
||||
private String mediaServerId;
|
||||
@JsonProperty("app")
|
||||
private String app;
|
||||
@JsonProperty("id")
|
||||
private String id;
|
||||
@JsonProperty("ip")
|
||||
private String ip;
|
||||
@JsonProperty("params")
|
||||
private String params;
|
||||
@JsonProperty("port")
|
||||
private int port;
|
||||
@JsonProperty("schema")
|
||||
private String schema;
|
||||
@JsonProperty("stream")
|
||||
private String stream;
|
||||
@JsonProperty("vhost")
|
||||
private String vhost;
|
||||
}
|
@ -106,11 +106,11 @@ ffmpeg-support:
|
||||
#input: -thread_queue_size 128 -re -i rtsp://admin:XXXXXX@10.10.11.171/Streaming/Channels/1/
|
||||
#input: -hwaccel cuda -re -i rtsp://10.10.11.200/camera/171
|
||||
input: -re -i
|
||||
output: -c:v copy -an -f flv
|
||||
output: -threads 1 -c:v copy -an -f flv
|
||||
# output: -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtsp #flv #rtp_mpegts
|
||||
#output: -c:v libx264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp #flv #
|
||||
# output: -c:v h264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp
|
||||
download: -thread_queue_size 128 -i
|
||||
download: -readrate 4 -i
|
||||
download-speed: 0
|
||||
# output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp
|
||||
# download: -i E:\Repository\other\happytime-gb28181-device-x64\666.mp4 -filter:v "setpts=4.0*PTS"
|
||||
|
@ -79,6 +79,7 @@ proxy:
|
||||
url: http://192.168.2.3:18183
|
||||
#url: http://10.10.10.20:18183
|
||||
#url: http://192.168.1.241:28181
|
||||
real-time-video-max-play-time: 15m
|
||||
ffmpeg-support:
|
||||
task:
|
||||
# 最大同时推流任务数, <= 0 时不做限制
|
||||
|
31
pom.xml
31
pom.xml
@ -57,7 +57,7 @@
|
||||
<!-- <docker.registry.password>XXX</docker.registry.password>-->
|
||||
<docker.maven.plugin.version>1.4.13</docker.maven.plugin.version>
|
||||
|
||||
<gb28181.docking.version>0.1.0-SNAPSHOT</gb28181.docking.version>
|
||||
<gb28181.docking.version>0.1.0</gb28181.docking.version>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
@ -84,19 +84,24 @@
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>gb28181-docking-platform-mvn-repo</id>
|
||||
<!--<url>http://192.168.1.8:20080/zxb/gb28181-docking-platform-mvn-repo/-/raw/master/</url>-->
|
||||
<url>
|
||||
http://git.skcks.cn/Shikong/gb28181-docking-platform-mvn-repo/raw/branch/master/
|
||||
</url>
|
||||
<releases>
|
||||
<enabled>true</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
<updatePolicy>always</updatePolicy>
|
||||
</snapshots>
|
||||
<id>sk-maven</id>
|
||||
<name>sk-maven</name>
|
||||
<url>http://10.10.10.200:18081/repository/maven-public/</url>
|
||||
</repository>
|
||||
<!--<repository>-->
|
||||
<!-- <id>gb28181-docking-platform-mvn-repo</id>-->
|
||||
<!-- <!–<url>http://192.168.1.8:20080/zxb/gb28181-docking-platform-mvn-repo/-/raw/master/</url>–>-->
|
||||
<!-- <url>-->
|
||||
<!-- http://git.skcks.cn/Shikong/gb28181-docking-platform-mvn-repo/raw/branch/master/-->
|
||||
<!-- </url>-->
|
||||
<!-- <releases>-->
|
||||
<!-- <enabled>true</enabled>-->
|
||||
<!-- </releases>-->
|
||||
<!-- <snapshots>-->
|
||||
<!-- <enabled>true</enabled>-->
|
||||
<!-- <updatePolicy>always</updatePolicy>-->
|
||||
<!-- </snapshots>-->
|
||||
<!--</repository>-->
|
||||
</repositories>
|
||||
|
||||
<dependencyManagement>
|
||||
|
Loading…
Reference in New Issue
Block a user