添加 zlm on_publish hook 钩子用于 rtmp -> rtp
This commit is contained in:
parent
ab5a5d6666
commit
8fefeb2187
@ -1,12 +1,17 @@
|
|||||||
package cn.skcks.docking.gb28181.mocking.api.zlm;
|
package cn.skcks.docking.gb28181.mocking.api.zlm;
|
||||||
|
|
||||||
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
|
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
|
||||||
|
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
|
||||||
|
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto.ZlmPublishDTO;
|
||||||
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamChangeDTO;
|
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamChangeDTO;
|
||||||
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamNoneReaderDTO;
|
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamNoneReaderDTO;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmPublishHookService;
|
||||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
|
||||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
|
||||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
@ -22,6 +27,7 @@ import org.springframework.web.bind.annotation.RestController;
|
|||||||
public class ZlmHookApi {
|
public class ZlmHookApi {
|
||||||
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
|
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
|
||||||
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
|
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
|
||||||
|
private final ZlmPublishHookService zlmPublishHookService;
|
||||||
|
|
||||||
@PostJson("/on_stream_changed")
|
@PostJson("/on_stream_changed")
|
||||||
public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){
|
public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){
|
||||||
@ -35,4 +41,11 @@ public class ZlmHookApi {
|
|||||||
public void onStreamNoneReader(@RequestBody ZlmStreamNoneReaderDTO dto){
|
public void onStreamNoneReader(@RequestBody ZlmStreamNoneReaderDTO dto){
|
||||||
zlmStreamNoneReaderHookService.processEvent(dto.getApp(),dto.getStream());
|
zlmStreamNoneReaderHookService.processEvent(dto.getApp(),dto.getStream());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
@PostJson("/on_publish")
|
||||||
|
public ZlmResponse<Void> onPublish(@RequestBody ZlmPublishDTO dto){
|
||||||
|
zlmPublishHookService.processEvent(dto);
|
||||||
|
return new ZlmResponse<>(ResponseStatus.Success, null, "");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,7 @@ import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
|||||||
import cn.skcks.docking.gb28181.mocking.core.sip.service.VideoCacheManager;
|
import cn.skcks.docking.gb28181.mocking.core.sip.service.VideoCacheManager;
|
||||||
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
|
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
|
||||||
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
|
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmPublishHookService;
|
||||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
|
||||||
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamNoneReaderHookService;
|
||||||
import cn.skcks.docking.gb28181.sdp.GB28181Description;
|
import cn.skcks.docking.gb28181.sdp.GB28181Description;
|
||||||
@ -102,6 +103,8 @@ public class DeviceProxyService {
|
|||||||
|
|
||||||
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
|
private final ZlmStreamNoneReaderHookService zlmStreamNoneReaderHookService;
|
||||||
|
|
||||||
|
private final ZlmPublishHookService zlmPublishHookService;
|
||||||
|
|
||||||
private final FfmpegConfig ffmpegConfig;
|
private final FfmpegConfig ffmpegConfig;
|
||||||
|
|
||||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
@ -124,28 +127,30 @@ public class DeviceProxyService {
|
|||||||
// 重试次数
|
// 重试次数
|
||||||
.withStopStrategy(StopStrategies.stopAfterAttempt(3000))
|
.withStopStrategy(StopStrategies.stopAfterAttempt(3000))
|
||||||
.build();
|
.build();
|
||||||
// zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
zlmPublishHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||||
try {
|
scheduledExecutorService.submit(()->{
|
||||||
retryer.call(()->{
|
try {
|
||||||
StartSendRtp startSendRtp = new StartSendRtp();
|
retryer.call(()->{
|
||||||
startSendRtp.setApp(DEFAULT_ZLM_APP);
|
StartSendRtp startSendRtp = new StartSendRtp();
|
||||||
startSendRtp.setStream(callId);
|
startSendRtp.setApp(DEFAULT_ZLM_APP);
|
||||||
startSendRtp.setSsrc(ssrc);
|
startSendRtp.setStream(callId);
|
||||||
startSendRtp.setDstUrl(toAddr);
|
startSendRtp.setSsrc(ssrc);
|
||||||
startSendRtp.setDstPort(toPort);
|
startSendRtp.setDstUrl(toAddr);
|
||||||
startSendRtp.setUdp(!tcp);
|
startSendRtp.setDstPort(toPort);
|
||||||
|
startSendRtp.setUdp(!tcp);
|
||||||
// log.debug("startSendRtp {}",startSendRtp);
|
// log.debug("startSendRtp {}",startSendRtp);
|
||||||
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
|
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
|
||||||
// log.debug("startSendRtpResp {}",startSendRtpResp);
|
// log.debug("startSendRtpResp {}",startSendRtpResp);
|
||||||
return startSendRtpResp;
|
return startSendRtpResp;
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("zlm rtp 推流失败",e);
|
log.error("zlm rtp 推流失败",e);
|
||||||
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
|
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
|
||||||
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
// });
|
});
|
||||||
|
});
|
||||||
|
|
||||||
// });
|
// });
|
||||||
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||||
@ -197,9 +202,9 @@ public class DeviceProxyService {
|
|||||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
||||||
try {
|
try {
|
||||||
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
||||||
|
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||||
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||||
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
|
||||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||||
callbackTask.put(device.getDeviceCode(), executor);
|
callbackTask.put(device.getDeviceCode(), executor);
|
||||||
executeResultHandler.waitFor();
|
executeResultHandler.waitFor();
|
||||||
|
@ -28,6 +28,7 @@ public class ZlmInitService {
|
|||||||
HookConfig hook = config.getHook();
|
HookConfig hook = config.getHook();
|
||||||
hook.setOnStreamChanged(zlmHookConfig.getHook() + "/on_stream_changed");
|
hook.setOnStreamChanged(zlmHookConfig.getHook() + "/on_stream_changed");
|
||||||
hook.setOnStreamNoneReader(zlmHookConfig.getHook() + "/on_stream_none_reader");
|
hook.setOnStreamNoneReader(zlmHookConfig.getHook() + "/on_stream_none_reader");
|
||||||
|
hook.setOnPublish(zlmHookConfig.getHook() + "/on_publish");
|
||||||
zlmMediaService.setServerConfig(config);
|
zlmMediaService.setServerConfig(config);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,52 @@
|
|||||||
|
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
|
||||||
|
|
||||||
|
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto.ZlmPublishDTO;
|
||||||
|
import lombok.AccessLevel;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Data
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ZlmPublishHookService {
|
||||||
|
private final ZlmHookConfig zlmHookConfig;
|
||||||
|
|
||||||
|
public interface ZlmPublishHookHandler {
|
||||||
|
void handler();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Getter(AccessLevel.PRIVATE)
|
||||||
|
private ConcurrentMap<String, ConcurrentMap<String, ZlmPublishHookHandler>> handler = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public ConcurrentMap<String, ZlmPublishHookHandler> getHandler(String app) {
|
||||||
|
this.handler.putIfAbsent(app, new ConcurrentHashMap<>());
|
||||||
|
return this.handler.get(app);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void processEvent(ZlmPublishDTO dto) {
|
||||||
|
String app = dto.getApp();
|
||||||
|
String streamId = dto.getStream();
|
||||||
|
String ip = dto.getIp();
|
||||||
|
log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip);
|
||||||
|
|
||||||
|
ConcurrentMap<String, ZlmPublishHookHandler> handlers = getHandler(app);
|
||||||
|
Optional.ofNullable(handlers.remove(streamId)).ifPresent((handler) -> {
|
||||||
|
handler.handler();
|
||||||
|
try {
|
||||||
|
Thread.sleep(zlmHookConfig.getDelay().toMillis());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,30 @@
|
|||||||
|
package cn.skcks.docking.gb28181.mocking.service.zlm.hook.dto;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class ZlmPublishDTO {
|
||||||
|
@JsonProperty("mediaServerId")
|
||||||
|
private String mediaServerId;
|
||||||
|
@JsonProperty("app")
|
||||||
|
private String app;
|
||||||
|
@JsonProperty("id")
|
||||||
|
private String id;
|
||||||
|
@JsonProperty("ip")
|
||||||
|
private String ip;
|
||||||
|
@JsonProperty("params")
|
||||||
|
private String params;
|
||||||
|
@JsonProperty("port")
|
||||||
|
private int port;
|
||||||
|
@JsonProperty("schema")
|
||||||
|
private String schema;
|
||||||
|
@JsonProperty("stream")
|
||||||
|
private String stream;
|
||||||
|
@JsonProperty("vhost")
|
||||||
|
private String vhost;
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user