Compare commits

...

38 Commits

Author SHA1 Message Date
48cbc45b34 调整 2024-04-11 18:35:22 +08:00
52e1f769be 修正 2024-03-19 09:57:42 +08:00
bb118293fd 调整 2024-03-19 08:36:43 +08:00
b955abc852 历史视频分段下载及合并 2024-03-18 17:04:26 +08:00
c51d353e21 历史视频分段下载及合并 2024-03-18 16:53:05 +08:00
0ef41d3d92 限制实时视频最大播放时长 2024-03-18 14:51:59 +08:00
ea824175ea 限制实时视频最大播放时长 2024-03-18 14:41:54 +08:00
2de6d080cf 限制实时视频最大播放时长 2024-03-18 14:40:44 +08:00
d936e1a2b6 调整 2024-03-17 03:25:31 +08:00
d01d76304b 调整 2024-03-17 00:32:33 +08:00
d0803c6b62 调整 2024-03-16 22:44:15 +08:00
9a9db33e6f 启动/关闭时 修改 zlm 配置 2024-03-16 16:43:22 +08:00
6966432e76 视频下载 ffmpeg 添加 "-t" 参数 2024-03-14 14:58:56 +08:00
8a14fa19cd hookService 使用线程池执行 handler 2024-03-14 13:53:54 +08:00
723ad67df9 DeviceProxyService 调整 2024-03-14 11:18:23 +08:00
d64bba2c9c DeviceProxyService 调整 2024-03-14 11:07:23 +08:00
39af43f7aa DeviceProxyService 调整 2024-03-14 10:53:57 +08:00
28dfe97a5a DeviceProxyService 调整 2024-03-14 10:45:15 +08:00
50b7bb4f00 DeviceProxyService scheduledExecutorService 调整 2024-03-14 10:33:04 +08:00
2374f7896a DeviceProxyService scheduledExecutorService 调整 2024-03-14 09:55:43 +08:00
7b230144d3 zlm rtp 推流失败 日志 2024-03-14 09:31:16 +08:00
7b350e899b zlmStreamRegistHookEvent 调整 2024-03-14 09:15:49 +08:00
0fd058e1c5 zlm rtp 推流失败 日志 2024-03-14 09:14:19 +08:00
d9376eebf7 提高 推流重试次数 2024-03-14 01:21:52 +08:00
aa327ace86 提高 推流重试次数 2024-03-14 01:15:25 +08:00
9914186862 zlm 配置 2024-03-13 21:58:14 +08:00
b61056e8b8 zlm 配置 2024-03-13 21:44:54 +08:00
1ac99217f5 针对 视频下载 zlm 流注册/销 额外处理 2024-03-13 21:39:31 +08:00
8f0b0be8b3 调整 getProxyUrl 2024-03-13 14:07:56 +08:00
ff6a5484e3 调整 on_stream_none_reader 仅对 rtmp 做处理 2024-03-13 10:13:37 +08:00
8fefeb2187 添加 zlm on_publish hook 钩子用于 rtmp -> rtp 2024-03-12 21:55:24 +08:00
ab5a5d6666 调整 2024-03-12 10:58:36 +08:00
0f1e772168 调整 2024-03-12 10:57:24 +08:00
fb02f559d4 屏蔽 zlm 推流日志 2024-03-12 10:12:30 +08:00
d8cab292de 批量注册 改为一批 200个 每次间隔 500ms
避免速度过快把 wvp 卡死
2024-03-11 15:31:49 +08:00
4b55f7fc54 特定时间范围的历史视频查询请求 预拉取视频响应
改为异步等待完成再返回 如果1分钟内未能完成再 返回 空录像
2024-03-07 10:47:18 +08:00
b42b8be747 本地测试配置 视频下载 添加 ffmpeg 读取速率参数 2024-03-06 13:21:15 +08:00
0fc5de87c8 添加 rtp 推流配置 2024-03-06 13:19:12 +08:00
18 changed files with 570 additions and 170 deletions

View File

@ -28,7 +28,7 @@
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>annotation</artifactId>
<version>${project.version}</version>
<version>${gb28181.docking.version}</version>
<scope>compile</scope>
</dependency>

View File

@ -1,12 +1,17 @@
package cn.skcks.docking.gb28181.mocking.api.zlm;
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto.ZlmPublishDTO;
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamChangeDTO;
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamNoneReaderDTO;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmPublishHookService;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.RequestBody;
@ -22,17 +27,27 @@ import org.springframework.web.bind.annotation.RestController;
public class ZlmHookApi {
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
private final ZlmPublishHookService zlmPublishHookService;
@PostJson("/on_stream_changed")
public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){
log.debug("on_stream_changed {}", dto);
if(StringUtils.equalsIgnoreCase(dto.getSchema(), "rtsp")){
if(StringUtils.equalsIgnoreCase(dto.getSchema(), "rtmp")){
zlmStreamChangeHookService.processEvent(dto.getApp(),dto.getStream(), dto.getRegist());
}
}
@PostJson("/on_stream_none_reader")
public void onStreamNoneReader(@RequestBody ZlmStreamNoneReaderDTO dto){
zlmStreamNoneReaderHookService.processEvent(dto.getApp(),dto.getStream());
if(StringUtils.equalsIgnoreCase(dto.getSchema(), "rtmp")){
zlmStreamNoneReaderHookService.processEvent(dto.getApp(),dto.getStream());
}
}
@SneakyThrows
@PostJson("/on_publish")
public ZlmResponse<Void> onPublish(@RequestBody ZlmPublishDTO dto){
zlmPublishHookService.processEvent(dto);
return new ZlmResponse<>(ResponseStatus.Success, null, "");
}
}

View File

@ -24,6 +24,12 @@ public class DeviceProxyConfig {
* 是否只通过代理拉取指定时间范围内的视频查询请求
*/
private Boolean proxyVideoInTimeRange = true;
/**
* 实时视频单次允许最大播放时长
*/
private Duration realTimeVideoMaxPlayTime = Duration.ofMinutes(15);
/**
* 代理该时间段内的历史视频查询请求
*/
@ -40,6 +46,10 @@ public class DeviceProxyConfig {
public static class PreDownloadForRecordInfo {
private Boolean enable = true;
private Duration timeRange = Duration.ofMinutes(5);
/**
* 分片时长, 当请求时间超过该时长时将分片下载
*/
private Duration timeSplit = Duration.ofSeconds(30);
private String cachePath = "./record";
}
}

View File

@ -22,6 +22,7 @@ public class FfmpegConfig {
private String input = "-re -i";
private String output = "-vcodec h264 -acodec aac -f rtp_mpegts";
private String logLevel = "error";
private Boolean useRtpToDownload = false;
}

View File

@ -28,7 +28,7 @@ public class MockingExecutor{
public ThreadPoolTaskExecutor sipTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CPU_NUM * 2);
executor.setMaxPoolSize(100);
executor.setMaxPoolSize(1000);
executor.setQueueCapacity(10000);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);

View File

@ -4,9 +4,11 @@ import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordListDTO;
@ -20,6 +22,7 @@ import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.sip.header.CallIdHeader;
@ -29,6 +32,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@Slf4j
@RequiredArgsConstructor
@ -38,24 +45,27 @@ public class RecordInfoRequestProcessor {
private final DeviceService deviceService;
private final DeviceProxyConfig deviceProxyConfig;
private final VideoCacheManager videoCacheManager;
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
public void process(SIPRequest request, byte[] content) {
String senderIp = request.getLocalAddress().getHostAddress();
String transport = request.getTopmostViaHeader().getTransport();
RecordInfoRequestDTO recordInfoRequestDTO = XmlUtils.parse(content, RecordInfoRequestDTO.class);
String id = recordInfoRequestDTO.getDeviceId();
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} else {
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
processRecordInfoRespWithVideoCacheTask(request, senderIp, transport, device, recordInfoRequestDTO);
}
}, () -> {
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} else {
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
processRecordInfoRespWithVideoCacheTask(request, senderIp, transport, device, recordInfoRequestDTO);
}
}, () -> {
log.error("未能找到 deviceId: {} 的相关信息", id);
@ -64,6 +74,24 @@ public class RecordInfoRequestProcessor {
});
}
private void processRecordInfoRespWithVideoCacheTask(SIPRequest request, String senderIp, String transport, MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO){
CompletableFuture<Runnable> future = CompletableFuture.supplyAsync(()-> {
try {
preDownloadVideoTask(device.getDeviceCode(), recordInfoRequestDTO).get();
return ()-> sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} catch (InterruptedException | ExecutionException e) {
log.error("preDownloadVideoTask error",e);
return() -> sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
}
}, executor);
future.completeOnTimeout(() -> sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport),
1, TimeUnit.MINUTES);
future.thenApplyAsync(fn->{
fn.run();
return null;
});
}
private boolean preDownloadVideo(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){
if(!deviceProxyConfig.getPreDownloadForRecordInfo().getEnable()){
return true;
@ -75,10 +103,16 @@ public class RecordInfoRequestProcessor {
return true;
}
return preDownloadVideoTask(deviceCode,recordInfoRequestDTO).isDone();
}
private CompletableFuture<JsonResponse<String>> preDownloadVideoTask(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){
Date startTime = recordInfoRequestDTO.getStartTime();
Date endTime = recordInfoRequestDTO.getEndTime();
// 添加预下载任务
videoCacheManager.addTask(deviceCode,startTime,endTime);
return videoCacheManager.get(deviceCode,startTime,endTime).isDone();
return videoCacheManager.get(deviceCode,startTime,endTime);
}
private void sendRecordInfo(MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO, SIPRequest request, String senderIp, String transport) {
@ -114,7 +148,7 @@ public class RecordInfoRequestProcessor {
FromHeader fromHeader = request.getFromHeader();
ListUtil.partition(recordInfoItemDTOList,50).forEach(recordList->{
RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO();
final RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO();
recordInfoResponseDTO.setSn(recordInfoRequestDTO.getSn());
recordInfoResponseDTO.setDeviceId(device.getGbChannelId());
recordInfoResponseDTO.setName(device.getName());
@ -125,10 +159,11 @@ public class RecordInfoRequestProcessor {
.build();
recordInfoResponseDTO.setRecordList(recordListDTO);
final String xml = XmlUtils.toXml(recordInfoResponseDTO);
sender.sendRequest((provider, ip, port) -> {
CallIdHeader callIdHeader = provider.getNewCallId();
return SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader);
ip, port, 1, xml, fromHeader.getTag(), callIdHeader);
});
});
}
@ -147,11 +182,12 @@ public class RecordInfoRequestProcessor {
.build();
recordInfoResponseDTO.setRecordList(recordListDTO);
final String xml = XmlUtils.toXml(recordInfoResponseDTO);
FromHeader fromHeader = request.getFromHeader();
sender.sendRequest((provider, ip, port) -> {
CallIdHeader callIdHeader = provider.getNewCallId();
return SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader);
ip, port, 1, xml, fromHeader.getTag(), callIdHeader);
});
}

View File

@ -3,16 +3,20 @@ package cn.skcks.docking.gb28181.mocking.core.sip.service;
import cn.hutool.cache.CacheUtil;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.net.url.UrlBuilder;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.DefaultExecuteResultHandler;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.classic.methods.HttpGet;
@ -24,10 +28,15 @@ import org.springframework.stereotype.Service;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.InputStream;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@ -38,6 +47,8 @@ public class VideoCacheManager {
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final FfmpegSupportService ffmpegSupportService;
private final TimedCache<String, CompletableFuture<JsonResponse<String>>> tasks =
CacheUtil.newTimedCache(TimeUnit.MINUTES.toMillis(30));
@ -85,13 +96,80 @@ public class VideoCacheManager {
@SneakyThrows
protected CompletableFuture<JsonResponse<String>> downloadVideo(String deviceCode, Date startTime, Date endTime) {
File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4").toFile();
String fileName = fileName(deviceCode, startTime, endTime);
File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName + ".mp4").toFile();
if(realFile.exists()){
log.info("文件 {} 已缓存, 直接返回", realFile.getAbsolutePath());
return CompletableFuture.completedFuture(JsonResponse.success(realFile.getAbsolutePath()));
}
return CompletableFuture.supplyAsync(()->{
long between = DateUtil.between(startTime, endTime, DateUnit.SECOND);
long splitTime = deviceProxyConfig.getPreDownloadForRecordInfo().getTimeSplit().getSeconds();
if(between > splitTime){
log.info("时间间隔超过 {} 秒, 将分片下载", splitTime);
DateTime splitStartTime = DateUtil.date(startTime);
DateTime splitEndTime = DateUtil.offsetSecond(startTime, (int) splitTime);
List<CompletableFuture<JsonResponse<String>>> completableFutures = new ArrayList<>();
while(splitEndTime.getTime() < endTime.getTime()){
String splitFileName = fileName(deviceCode, splitStartTime, splitEndTime);
File tmpFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),splitFileName + ".mp4.tmp").toFile();
if(tmpFile.exists()){
tmpFile.delete();
log.info("删除已存在但未完成下载的临时文件 => {}", tmpFile.getAbsolutePath());
}
// 添加分片任务
addTask(deviceCode, splitStartTime, splitEndTime);
completableFutures.add(get(deviceCode, splitStartTime, splitEndTime));
// 更新起止时间
splitStartTime = DateUtil.offsetSecond(splitStartTime, (int) splitTime);
splitEndTime = DateUtil.offsetSecond(splitEndTime, (int) splitTime);
if(splitEndTime.getTime() >= endTime.getTime()){
splitEndTime = DateUtil.date(endTime);
addTask(deviceCode, splitStartTime, splitEndTime);
completableFutures.add(get(deviceCode, splitStartTime, splitEndTime));
break;
}
}
CompletableFuture.allOf(completableFutures.toArray(CompletableFuture[]::new));
String concatFileName = fileName + ".mp4.concat";
File concatFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(), concatFileName).toFile();
if(concatFile.exists()){
concatFile.delete();
log.info("删除已存在但未完成合并的临时合并配置文件 => {}", concatFile.getAbsolutePath());
}
try {
concatFile.createNewFile();
try(FileWriter fileWriter = new FileWriter(concatFile)){
for (CompletableFuture<JsonResponse<String>> result : completableFutures) {
String splitFilePath = result.get().getData();
String config = String.format("file '%s'\n", splitFilePath);
fileWriter.write(config);
log.debug("{}", config);
}
}
log.info("生成临时合并配置文件 {}", concatFile.getAbsolutePath());
log.info("开始合并视频 => {}", realFile.getAbsolutePath());
DefaultExecuteResultHandler executeResultHandler = new DefaultExecuteResultHandler();
ffmpegSupportService.ffmpegConcatExecutor(concatFile.getAbsolutePath(), realFile.getAbsolutePath(), executeResultHandler);
executeResultHandler.waitFor();
if(realFile.exists()){
log.info("视频合并成功 => {}", realFile.getAbsolutePath());
return JsonResponse.success(realFile.getAbsolutePath());
}
} catch (Exception e) {
log.error("合并分片视频异常 => {}", e.getMessage());
return JsonResponse.error(e.getMessage());
} finally {
System.gc();
log.info("删除临时合并配置文件 {} => {}", concatFile.getAbsolutePath(), concatFile.delete());
}
}
final String url = UrlBuilder.of(deviceProxyConfig.getUrl())
.addPath("video")
.addQuery("device_id", deviceCode)
@ -127,6 +205,7 @@ public class VideoCacheManager {
return JsonResponse.success(realFile.getAbsolutePath());
} catch (Exception e) {
log.error("视频下载失败 => {}", e.getMessage());
System.gc();
file.delete();
return JsonResponse.error(e.getMessage());
}

View File

@ -23,7 +23,11 @@ import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp;
import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.*;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtmpConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
@ -32,6 +36,7 @@ import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.core.sip.service.VideoCacheManager;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmPublishHookService;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
import cn.skcks.docking.gb28181.sdp.GB28181Description;
@ -41,6 +46,7 @@ import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import gov.nist.javax.sdp.MediaDescriptionImpl;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PreDestroy;
import lombok.*;
@ -49,15 +55,19 @@ import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.Executor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import javax.sdp.MediaDescription;
import javax.sdp.SdpException;
import javax.sip.SipProvider;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
@ -66,6 +76,7 @@ import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@Service
@ -96,9 +107,14 @@ public class DeviceProxyService {
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
private final ZlmPublishHookService zlmPublishHookService;
private final FfmpegConfig ffmpegConfig;
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(128);
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final java.util.concurrent.Executor executor;
public interface TaskProcessor {
void process(Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
@ -109,50 +125,80 @@ public class DeviceProxyService {
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
// zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3000))
.build();
// zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
try {
retryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp(DEFAULT_ZLM_APP);
startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
return startSendRtpResp;
});
} catch (Exception e) {
log.error("zlm rtp 推流失败",e);
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
throw new RuntimeException(e);
}
// });
AtomicReference<String> failMag = new AtomicReference<>("");
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> {
if(resp != null){
failMag.set(resp.toString());
}
return resp.getLocalPort() == null || resp.getLocalPort() <= 0;
})
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(10 * 1000))
.build();
zlmPublishHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{
executor.execute(()->{
try {
StartSendRtp startSendRtp = new StartSendRtp();
StartSendRtpResp sendRtpResp = retryer.call(() -> {
startSendRtp.setApp(DEFAULT_ZLM_APP);
startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(!tcp);
// log.debug("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
// log.debug("startSendRtpResp {}",startSendRtpResp);
return startSendRtpResp;
});
// });
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(DEFAULT_ZLM_APP);
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
log.info("sendRtp 推流成功 {} {}, req => {}, resp => {}", device.getDeviceCode(),device.getGbChannelId(), startSendRtp, sendRtpResp);
} catch (Exception e) {
log.error("zlm rtp 推流失败, {}, {} {} {}, {}", failMag.get(), device.getDeviceCode(),device.getGbChannelId(), callId, e.getMessage());
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
}
});
});
zlmStreamRegistHookEvent(DEFAULT_ZLM_APP, callId, ssrc);
zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{
sendBye(request,device,key);
});
}
private void zlmStreamRegistHookEvent(String app, String callId, String ssrc){
zlmStreamChangeHookService.getUnregistHandler(app).put(callId,()->{
executor.execute(()->{
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(app);
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", app, callId, ssrc);
zlmMediaService.stopSendRtp(stopSendRtp);
}, 10, TimeUnit.SECONDS);
// 如果 10秒内 重新注册, 取消停止RTP推流
zlmStreamChangeHookService.getRegistHandler(app).put(callId,()->{
schedule.cancel(true);
zlmStreamRegistHookEvent(app, callId, ssrc);
});
// 如果 注销 10.5 秒内 没有再注册, 就彻底取消相关事件的订阅
scheduledExecutorService.schedule(()->{
zlmStreamChangeHookService.getRegistHandler(app).remove(callId);
},10500, TimeUnit.MILLISECONDS);
});
});
}
private Flow.Subscriber<SIPRequest> ffmpegTask(SIPRequest request,ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
Optional.ofNullable(tasks.get(callId)).ifPresent(task->{
task.getWatchdog().destroyProcess();
@ -166,97 +212,124 @@ public class DeviceProxyService {
public TaskProcessor playbackTask(){
return (Runnable sendOkResponse, SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
scheduledExecutorService.schedule(() -> {
trying(request);
sendOkResponse.run();
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("{}", e.getMessage());
}
@Override
public void onNext(SIPRequest item) {
subscribe.getAckSubscribe().delPublisher(ackKey);
}
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(SIPRequest item) {
subscribe.getAckSubscribe().delPublisher(ackKey);
}
@Override
public void onComplete() {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
try {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor();
} catch (Exception e) {
sendBye(request,device,"");
throw new RuntimeException(e);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
try {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor();
} catch (Exception e) {
sendBye(request, device, "");
log.error("{}", e.getMessage());
}
});
}, 1, TimeUnit.SECONDS);
}
});
trying(request);
sendOkResponse.run();
};
}
public TaskProcessor downloadTask(){
return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
scheduledExecutorService.schedule(() -> {
trying(request);
sendOkResponse.run();
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("{}", e.getMessage());
}
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
private SIPRequest ackRequest;
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
subscribe.getAckSubscribe().delPublisher(ackKey);
}
@Override
public void onNext(SIPRequest item) {
ackRequest = item;
subscribe.getAckSubscribe().delPublisher(ackKey);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
try {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
scheduledExecutorService.submit(()->{
try {
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
@Override
public void onComplete() {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
try {
if(ackRequest != null){
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
if (!ffmpegConfig.getRtp().getUseRtpToDownload()) {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
executor.execute(()->{
try {
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor);
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor);
} else {
String rtpUrl = getRtpUrl(request);
Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor);
}
executeResultHandler.waitFor();
} catch (Exception e) {
sendBye(request, device, "");
throw new RuntimeException(e);
}
} catch (Exception e) {
sendBye(request, device, "");
log.error("{}", e.getMessage());
}
});
}, 1, TimeUnit.SECONDS);
}
});
trying(request);
sendOkResponse.run();
};
}
private static String getRtpUrl(SIPRequest request) throws ParseException, SdpException {
String contentString = new String(request.getRawContent());
GB28181DescriptionParser gb28181DescriptionParser = new GB28181DescriptionParser(contentString);
GB28181Description sdp = gb28181DescriptionParser.parse();
String rtpIp = sdp.getConnection().getAddress();
MediaDescriptionImpl media = (MediaDescriptionImpl) sdp.getMediaDescriptions(true).get(0);
return "rtp://" + rtpIp + ":" + media.getMedia().getMediaPort();
}
private String getZlmRtmpUrl(String app, String streamId){
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId;
}
@ -401,7 +474,7 @@ public class DeviceProxyService {
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
String toUrl = "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId;
String toUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId;
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
try {
ZlmResponse<AddFFmpegSourceResp> sourceResp = retryer.call(() -> zlmMediaService.addFfmpegSource(AddFFmpegSource.builder()
@ -436,7 +509,7 @@ public class DeviceProxyService {
return startSendRtpResp;
});
} catch (Exception e){
log.error("zlm rtp 推流失败",e);
log.error("zlm rtp 推流失败, {} {} {}, {}", device.getDeviceCode(),device.getGbChannelId(), callId, e.getMessage());
sendBye(request, device, "");
}
});
@ -447,6 +520,7 @@ public class DeviceProxyService {
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", ZLM_FFMPEG_PROXY_APP, callId, ssrc);
zlmMediaService.stopSendRtp(stopSendRtp);
});
@ -506,6 +580,21 @@ public class DeviceProxyService {
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
log.warn("到达最长播放时间 {}, 强制关闭实时视频播放 {} {}", proxyConfig.getRealTimeVideoMaxPlayTime(), device.getGbChannelId(), callId);
sendBye(request, device, "");
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
RedisUtil.KeyOps.delete(cacheKey);
StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(DEFAULT_ZLM_APP);
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", DEFAULT_ZLM_APP, callId, ssrc);
zlmMediaService.stopSendRtp(stopSendRtp);
}, proxyConfig.getRealTimeVideoMaxPlayTime().toMillis(), TimeUnit.MILLISECONDS);
Retryer<StartSendRtpResp> rtpRetryer = rtpRetryer();
zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
try {
@ -523,18 +612,25 @@ public class DeviceProxyService {
return startSendRtpResp;
});
} catch (Exception e){
log.error("zlm rtp 推流失败",e);
log.error("zlm rtp 推流失败, {} {} {}, {}", device.getDeviceCode(),device.getGbChannelId(), callId, e.getMessage());
sendBye(request, device, "");
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
RedisUtil.KeyOps.delete(cacheKey);
schedule.cancel(true);
}
});
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> {
schedule.cancel(true);
StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(DEFAULT_ZLM_APP);
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
log.info("结束 zlm rtp 推流, app {}, stream {}, ssrc {}", DEFAULT_ZLM_APP, callId, ssrc);
zlmMediaService.stopSendRtp(stopSendRtp);
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
RedisUtil.KeyOps.delete(cacheKey);
});
Flow.Subscriber<SIPRequest> subscriber = zlmByeSubscriber(key,request,device);
@ -543,7 +639,6 @@ public class DeviceProxyService {
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
} catch (Exception e) {
log.error("zlm 代理拉流失败",e);
sendBye(request, device, "");
}
@ -555,9 +650,15 @@ public class DeviceProxyService {
CompletableFuture<JsonResponse<String>> task = videoCacheManager.get(device.getDeviceCode(), startTime, endTime);
if(task != null){
if(task.isDone()){
String file = task.get().getData();
log.info("本地视频已缓存, 将从本地缓存推流, 缓存文件 => {}", file);
return file;
String filePath = task.get().getData();
File file = new File(filePath);
if(StringUtils.isNotBlank(filePath) && file.exists()){
log.info("本地视频已缓存, 将从本地缓存推流, 缓存文件 => {}", filePath);
return filePath;
} else {
log.info("缓存已失效, 将直接使用 http代理 推流 并 重新缓存, 缓存文件 => {}", filePath);
videoCacheManager.addTask(device.getDeviceCode(), startTime, endTime);
}
}
}
}
@ -619,9 +720,11 @@ public class DeviceProxyService {
optionalZlmStreamChangeHookHandler.ifPresent(handler -> {
log.warn("流改变事件未结束 ZlmStreamChange {} {}, 强制结束", DEFAULT_ZLM_APP,callId);
handler.handler();
});
optionalZlmStreamNoneReaderHandler.ifPresent(handler -> {
log.warn("流无人观看事件未结束 ZlmStreamNoneReader {} {}, 强制结束", DEFAULT_ZLM_APP, callId);
handler.handler();
});
sendBye(request,device,key);
}

View File

@ -44,10 +44,10 @@ public class FfmpegSupportService {
log.info("视频下载参数 {}", inputParam);
String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), output);
String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", "-t", unit.toSeconds(time), rtp.getOutput(), output);
log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler);
return ffmpegExecutor(inputParam, outputParam, time + 60, unit, resultHandler);
}
@SneakyThrows
@ -55,6 +55,7 @@ public class FfmpegSupportService {
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), logLevelParam, inputParam, outputParam);
log.info("ffmpeg 命令 => {}", command);
CommandLine commandLine = CommandLine.parse(command);
Executor executor = new DefaultExecutor();
ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time));
@ -62,4 +63,18 @@ public class FfmpegSupportService {
executor.execute(commandLine, resultHandler);
return executor;
}
@SneakyThrows
public Executor ffmpegConcatExecutor(String concatConfigFile, String outputFile, ExecuteResultHandler executeResultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
String inputParam = String.format("-y -f concat -safe 0 -i \"%s\"", concatConfigFile);
String outputParam = String.format("-c copy \"%s\"", outputFile);
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), logLevelParam, inputParam, outputParam);
log.info("ffmpeg 视频合并 命令 => {}", command);
CommandLine commandLine = CommandLine.parse(command);
Executor executor = new DefaultExecutor();
executor.execute(commandLine, executeResultHandler);
return executor;
}
}

View File

@ -52,11 +52,13 @@ public class RegisterService {
List<MockingDevice> enabledDevice = deviceService.getAllEnabledDevice();
List<CompletableFuture<JsonResponse<Void>>[]> completableFutures = ListUtil.split(enabledDevice, 10).stream().map(items -> {
CompletableFuture<JsonResponse<Void>>[] array = enabledDevice.stream().map(this::register).toArray(CompletableFuture[]::new);
List<CompletableFuture<JsonResponse<Void>>[]> completableFutures = new ArrayList<>();
for (List<MockingDevice> mockingDevices : ListUtil.split(enabledDevice, 200)) {
CompletableFuture<JsonResponse<Void>>[] array = mockingDevices.stream().map(this::register).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(array);
return array;
}).toList();
Thread.sleep(500);
completableFutures.add(array);
}
List<CompletableFuture<JsonResponse<Void>>> reduce = completableFutures.stream().map(item -> Arrays.stream(item).toList())
.reduce(new ArrayList<>(), (prev, cur) -> {

View File

@ -8,6 +8,7 @@ import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@ -17,7 +18,8 @@ import java.util.List;
@Slf4j
@RequiredArgsConstructor
@Component
public class ZlmInitService {
public class ZlmInitService implements SmartLifecycle {
private boolean running;
private final ZlmMediaService zlmMediaService;
private final ZlmHookConfig zlmHookConfig;
@PostConstruct
@ -28,6 +30,30 @@ public class ZlmInitService {
HookConfig hook = config.getHook();
hook.setOnStreamChanged(zlmHookConfig.getHook() + "/on_stream_changed");
hook.setOnStreamNoneReader(zlmHookConfig.getHook() + "/on_stream_none_reader");
hook.setOnPublish(zlmHookConfig.getHook() + "/on_publish");
config.getRtmp().setHandshakeSecond(15);
config.getRtmp().setKeepAliveSecond(10);
config.getRtpProxy().setTimeoutSec(30);
zlmMediaService.setServerConfig(config);
}
@Override
public void start() {
running = true;
}
@Override
public void stop() {
ZlmResponse<List<ServerConfig>> serverConfig = zlmMediaService.getServerConfig();
List<ServerConfig> data = serverConfig.getData();
ServerConfig config = data.get(0);
HookConfig hook = config.getHook();
hook.setOnPublish("");
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@ -0,0 +1,60 @@
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto.ZlmPublishDTO;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@Slf4j
@Data
@Service
@RequiredArgsConstructor
public class ZlmPublishHookService {
private final ZlmHookConfig zlmHookConfig;
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
public interface ZlmPublishHookHandler {
void handler();
}
@Getter(AccessLevel.PRIVATE)
private ConcurrentMap<String, ConcurrentMap<String, ZlmPublishHookHandler>> handler = new ConcurrentHashMap<>();
public ConcurrentMap<String, ZlmPublishHookHandler> getHandler(String app) {
this.handler.putIfAbsent(app, new ConcurrentHashMap<>());
return this.handler.get(app);
}
public void processEvent(ZlmPublishDTO dto) {
String app = dto.getApp();
String streamId = dto.getStream();
String ip = dto.getIp();
log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip);
ConcurrentMap<String, ZlmPublishHookHandler> handlers = getHandler(app);
Optional.ofNullable(handlers.remove(streamId)).ifPresent((handler) -> {
executor.execute(()->{
handler.handler();
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException e) {
log.error("{}", e.getMessage());
}
});
});
}
}

View File

@ -1,16 +1,19 @@
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@Slf4j
@Data
@ -18,6 +21,10 @@ import java.util.concurrent.ConcurrentMap;
@RequiredArgsConstructor
public class ZlmStreamChangeHookService {
private final ZlmHookConfig zlmHookConfig;
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
public interface ZlmStreamChangeHookHandler{
void handler();
}
@ -43,22 +50,24 @@ public class ZlmStreamChangeHookService {
if(regist){
ConcurrentMap<String, ZlmStreamChangeHookHandler> registHandler = getRegistHandler(app);
Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
handler.handler();
executor.execute(()->{
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException ignored) {
}
handler.handler();
});
});
} else {
ConcurrentMap<String, ZlmStreamChangeHookHandler> unregistHandler = getUnregistHandler(app);
Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
handler.handler();
executor.execute(()->{
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException ignored) {
}
handler.handler();
});
});
}
}

View File

@ -1,16 +1,19 @@
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@Slf4j
@Data
@ -19,6 +22,9 @@ import java.util.concurrent.ConcurrentMap;
public class ZlmStreamNoneReaderHookService {
private final ZlmHookConfig zlmHookConfig;
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
public interface ZlmStreamNoneReaderHookHandler {
void handler();
}
@ -37,12 +43,14 @@ public class ZlmStreamNoneReaderHookService {
ConcurrentMap<String, ZlmStreamNoneReaderHookHandler> handlers = getHandler(app);
Optional.ofNullable(handlers.remove(streamId)).ifPresent((handler) -> {
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
handler.handler();
executor.execute(()->{
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException ignored) {
}
handler.handler();
});
});
}
}

View File

@ -0,0 +1,30 @@
package cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ZlmPublishDTO {
@JsonProperty("mediaServerId")
private String mediaServerId;
@JsonProperty("app")
private String app;
@JsonProperty("id")
private String id;
@JsonProperty("ip")
private String ip;
@JsonProperty("params")
private String params;
@JsonProperty("port")
private int port;
@JsonProperty("schema")
private String schema;
@JsonProperty("stream")
private String stream;
@JsonProperty("vhost")
private String vhost;
}

View File

@ -106,11 +106,11 @@ ffmpeg-support:
#input: -thread_queue_size 128 -re -i rtsp://admin:XXXXXX@10.10.11.171/Streaming/Channels/1/
#input: -hwaccel cuda -re -i rtsp://10.10.11.200/camera/171
input: -re -i
output: -c:v copy -an -f flv
output: -threads 1 -c:v copy -an -f flv
# output: -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtsp #flv #rtp_mpegts
#output: -c:v libx264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp #flv #
# output: -c:v h264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp
download: -thread_queue_size 128 -i
download: -readrate 4 -i
download-speed: 0
# output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp
# download: -i E:\Repository\other\happytime-gb28181-device-x64\666.mp4 -filter:v "setpts=4.0*PTS"

View File

@ -79,6 +79,7 @@ proxy:
url: http://192.168.2.3:18183
#url: http://10.10.10.20:18183
#url: http://192.168.1.241:28181
real-time-video-max-play-time: 15m
ffmpeg-support:
task:
# 最大同时推流任务数, <= 0 时不做限制

31
pom.xml
View File

@ -57,7 +57,7 @@
<!-- <docker.registry.password>XXX</docker.registry.password>-->
<docker.maven.plugin.version>1.4.13</docker.maven.plugin.version>
<gb28181.docking.version>0.1.0-SNAPSHOT</gb28181.docking.version>
<gb28181.docking.version>0.1.0</gb28181.docking.version>
</properties>
<profiles>
@ -84,19 +84,24 @@
<repositories>
<repository>
<id>gb28181-docking-platform-mvn-repo</id>
<!--<url>http://192.168.1.8:20080/zxb/gb28181-docking-platform-mvn-repo/-/raw/master/</url>-->
<url>
http://git.skcks.cn/Shikong/gb28181-docking-platform-mvn-repo/raw/branch/master/
</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
<id>sk-maven</id>
<name>sk-maven</name>
<url>http://10.10.10.200:18081/repository/maven-public/</url>
</repository>
<!--<repository>-->
<!-- <id>gb28181-docking-platform-mvn-repo</id>-->
<!-- &lt;!&ndash;<url>http://192.168.1.8:20080/zxb/gb28181-docking-platform-mvn-repo/-/raw/master/</url>&ndash;&gt;-->
<!-- <url>-->
<!-- http://git.skcks.cn/Shikong/gb28181-docking-platform-mvn-repo/raw/branch/master/-->
<!-- </url>-->
<!-- <releases>-->
<!-- <enabled>true</enabled>-->
<!-- </releases>-->
<!-- <snapshots>-->
<!-- <enabled>true</enabled>-->
<!-- <updatePolicy>always</updatePolicy>-->
<!-- </snapshots>-->
<!--</repository>-->
</repositories>
<dependencyManagement>