diff --git a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java index 50ac764..a6cdf6e 100644 --- a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java +++ b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java @@ -2,7 +2,9 @@ package cn.skcks.docking.gb28181.mocking.api.zlm; import cn.skcks.docking.gb28181.annotation.web.methods.PostJson; import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamChangeDTO; +import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamNoneReaderDTO; import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService; +import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -18,9 +20,15 @@ import org.springframework.web.bind.annotation.RestController; @RequiredArgsConstructor public class ZlmHookApi { private final ZlmStreamChangeHookService zlmStreamChangeHookService; + private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService; @PostJson("/on_stream_changed") public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){ zlmStreamChangeHookService.processEvent(dto.getApp(),dto.getStream(), dto.getRegist()); } + + @PostJson("/on_stream_none_reader") + public void onStreamNoneReader(@RequestBody ZlmStreamNoneReaderDTO dto){ + zlmStreamNoneReaderHookService.processEvent(dto.getApp(),dto.getStream()); + } } diff --git a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/dto/ZlmStreamNoneReaderDTO.java b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/dto/ZlmStreamNoneReaderDTO.java new file mode 100644 index 0000000..feff736 --- /dev/null +++ b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/dto/ZlmStreamNoneReaderDTO.java @@ -0,0 +1,20 @@ +package cn.skcks.docking.gb28181.mocking.api.zlm.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@Data +public class ZlmStreamNoneReaderDTO { + @JsonProperty("mediaServerId") + private String mediaServerId; + @JsonProperty("app") + private String app; + @JsonProperty("schema") + private String schema; + @JsonProperty("stream") + private String stream; + @JsonProperty("vhost") + private String vhost; +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index de90327..9cb2f16 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -32,6 +32,7 @@ import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService; import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService; +import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService; import cn.skcks.docking.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.sdp.parser.GB28181DescriptionParser; import cn.skcks.docking.gb28181.sip.method.invite.response.InviteResponseBuilder; @@ -91,6 +92,8 @@ public class DeviceProxyService { private final String DEFAULT_ZLM_APP = "live"; private final String ZLM_FFMPEG_PROXY_APP = "ffmpeg_proxy"; + private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService; + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); public interface TaskProcessor { @@ -133,7 +136,10 @@ public class DeviceProxyService { throw new RuntimeException(e); } }); - zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ +// zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ +// sendBye(request,device,key); +// }); + zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{ sendBye(request,device,key); }); return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + DEFAULT_ZLM_APP +"/" + callId; @@ -492,19 +498,28 @@ public class DeviceProxyService { private void mediaStatus(){ int num = taskNum.decrementAndGet(); log.info("当前任务数 {}", num); - // 等待zlm推流结束, 如果 ffmpeg 结束 30秒 未能推流完成就主动结束 + // 等待zlm推流结束, 如果 ffmpeg 结束 3分钟内 未能推流完成就主动结束 scheduledExecutorService.schedule(()->{ CallIdHeader requestCallId = request.getCallId(); String callId = requestCallId.getCallId(); callbackTask.remove(callId); Optional optionalZlmStreamChangeHookHandler = Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId)); + Optional optionalZlmStreamNoneReaderHandler = + Optional.ofNullable(zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).remove(callId)); // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束 - if(optionalZlmStreamChangeHookHandler.isEmpty()){ + if(optionalZlmStreamChangeHookHandler.isEmpty() && optionalZlmStreamNoneReaderHandler.isEmpty()){ return; } + + optionalZlmStreamChangeHookHandler.ifPresent(handler -> { + log.warn("流改变事件未结束 ZlmStreamChange {} {}, 强制结束", DEFAULT_ZLM_APP,callId); + }); + optionalZlmStreamNoneReaderHandler.ifPresent(handler -> { + log.warn("流无人观看事件未结束 ZlmStreamNoneReader {} {}, 强制结束", DEFAULT_ZLM_APP, callId); + }); sendBye(request,device,key); - },30,TimeUnit.SECONDS); + },3,TimeUnit.MINUTES); } public boolean hasResult() { diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java index 4ca0df7..1e6afdf 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java @@ -34,8 +34,14 @@ public class FfmpegSupportService { public Executor pushDownload2Rtp(String input, String output, long time, TimeUnit unit, ExecuteResultHandler resultHandler){ FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); FfmpegConfig.Debug debug = ffmpegConfig.getDebug(); - String downloadSpeed = StringUtils.joinWith(" ","-filter:v", MessageFormat.format("\"setpts=1/{0}*PTS\"",rtp.getDownloadSpeed())); - String inputParam = debug.getDownload()? rtp.getDownload() : StringUtils.joinWith(" ", rtp.getDownload(), input, downloadSpeed); + String inputParam; + if(rtp.getDownloadSpeed() > 0){ + String downloadSpeed = StringUtils.joinWith(" ","-filter:v", MessageFormat.format("\"setpts=1/{0}*PTS\"",rtp.getDownloadSpeed())); + inputParam = debug.getDownload()? rtp.getDownload() : StringUtils.joinWith(" ", rtp.getDownload(), input, downloadSpeed); + } else { + inputParam = debug.getDownload()? rtp.getDownload(): StringUtils.joinWith(" ", rtp.getDownload(), input); + } + log.info("视频下载参数 {}", inputParam); String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), output); diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/ZlmInitService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/ZlmInitService.java index 399204d..85bc234 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/ZlmInitService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/ZlmInitService.java @@ -27,6 +27,7 @@ public class ZlmInitService { ServerConfig config = data.get(0); HookConfig hook = config.getHook(); hook.setOnStreamChanged(zlmHookConfig.getHook() + "/on_stream_changed"); + hook.setOnStreamNoneReader(zlmHookConfig.getHook() + "/on_stream_none_reader"); zlmMediaService.setServerConfig(config); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java index 52ffbd2..cf9ef1b 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java @@ -38,7 +38,7 @@ public class ZlmStreamChangeHookService { } public void processEvent(String app,String streamId, Boolean regist){ - log.debug("app {}, streamId {}, regist {}", app,streamId, regist); + log.debug("流改变事件: app {}, streamId {}, regist {}", app,streamId, regist); if(regist){ ConcurrentMap registHandler = getRegistHandler(app); diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamNoneReaderHookService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamNoneReaderHookService.java new file mode 100644 index 0000000..9468bdb --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamNoneReaderHookService.java @@ -0,0 +1,48 @@ +package cn.skcks.docking.gb28181.mocking.service.zlm.hook; + +import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig; +import lombok.AccessLevel; +import lombok.Data; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Slf4j +@Data +@Service +@RequiredArgsConstructor +public class ZlmStreamNoneReaderHookService { + private final ZlmHookConfig zlmHookConfig; + + public interface ZlmStreamNoneReaderHookHandler { + void handler(); + } + + @Getter(AccessLevel.PRIVATE) + private ConcurrentMap> handler = new ConcurrentHashMap<>(); + + public ConcurrentMap getHandler(String app) { + this.handler.putIfAbsent(app, new ConcurrentHashMap<>()); + return this.handler.get(app); + } + + + public void processEvent(String app, String streamId) { + log.debug("流无人观看事件: app {}, streamId {}", app, streamId); + + ConcurrentMap handlers = getHandler(app); + Optional.ofNullable(handlers.remove(streamId)).ifPresent((handler) -> { + try { + Thread.sleep(zlmHookConfig.getDelay().toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + handler.handler(); + }); + } +} diff --git a/gb28181-mocking-starter/src/main/resources/application.yml b/gb28181-mocking-starter/src/main/resources/application.yml index 26b87bf..bb423b9 100644 --- a/gb28181-mocking-starter/src/main/resources/application.yml +++ b/gb28181-mocking-starter/src/main/resources/application.yml @@ -76,8 +76,8 @@ proxy: device: # 代理的视频接口地址, 用于获取历史视频 # 参数 device_id, begin_time, end_time - #url: http://192.168.2.3:18183 - url: http://10.10.10.20:18183 + url: http://192.168.2.3:18183 + #url: http://10.10.10.20:18183 #url: http://192.168.1.241:28181 ffmpeg-support: task: @@ -105,10 +105,11 @@ ffmpeg-support: #output: -filter:v "setpts=4.0*PTS" -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f flv # 下载 正常速率 #output: -r 30 -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f flv - output: -r 30 -vcodec libx264 -acodec aac -filter:v "setpts=1*PTS" -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f flv + output: -c:v copy -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f flv + #output: -r 30 -vcodec libx264 -acodec aac -filter:v "setpts=1*PTS" -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f flv #download: -i E:\Repository\other\happytime-gb28181-device-x64\666.mp4 -filter:v "setpts=4.0*PTS" download: -i - download-speed: 4.0 + download-speed: 0 debug: download: false input: false