初步 添加 zlm ffmpegSource 支持

This commit is contained in:
shikong 2024-01-16 17:28:33 +08:00
parent 73a84c2f93
commit e9820fdbcd
5 changed files with 169 additions and 28 deletions

View File

@ -11,6 +11,8 @@ public class FfmpegConfig {
private String ffmpeg; private String ffmpeg;
private String ffprobe; private String ffprobe;
private Boolean useZlmFfmpeg = false;
private Rtp rtp; private Rtp rtp;
@Data @Data

View File

@ -324,8 +324,12 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle
@Override @Override
public void onNext(SIPRequest item) { public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key); log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流 if(ffmpegConfig.getUseZlmFfmpeg()){
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port,ssrc, deviceProxyService.playbackTask()); deviceProxyService.pullStreamByZlmFfmpegSource(request,callId,device, start, stop, address, port,ssrc);
} else {
// RTP 推流
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port,ssrc, deviceProxyService.playbackTask());
}
onComplete(); onComplete();
} }

View File

@ -12,6 +12,8 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.proxy.AddFFmpegSource;
import cn.skcks.docking.gb28181.media.dto.proxy.AddFFmpegSourceResp;
import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxy; import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxy;
import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxyResp; import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxyResp;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse; import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
@ -54,6 +56,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Date; import java.util.Date;
@ -84,6 +87,10 @@ public class DeviceProxyService {
private final ZlmMediaConfig zlmMediaConfig; private final ZlmMediaConfig zlmMediaConfig;
private final ZlmStreamChangeHookService zlmStreamChangeHookService; private final ZlmStreamChangeHookService zlmStreamChangeHookService;
private final ZlmRtmpConfig zlmRtmpConfig; private final ZlmRtmpConfig zlmRtmpConfig;
private final String DEFAULT_ZLM_APP = "live";
private final String ZLM_FFMPEG_PROXY_APP = "ffmpeg_proxy";
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public interface TaskProcessor { public interface TaskProcessor {
@ -94,7 +101,7 @@ public class DeviceProxyService {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
schedule.cancel(false); schedule.cancel(false);
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder() Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
@ -108,7 +115,7 @@ public class DeviceProxyService {
try { try {
retryer.call(()->{ retryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp(); StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live"); startSendRtp.setApp(DEFAULT_ZLM_APP);
startSendRtp.setStream(callId); startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc); startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr); startSendRtp.setDstUrl(toAddr);
@ -121,15 +128,15 @@ public class DeviceProxyService {
}); });
} catch (Exception e) { } catch (Exception e) {
schedule.cancel(true); schedule.cancel(true);
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)) Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
sendBye(request,device,key); sendBye(request,device,key);
}); });
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + DEFAULT_ZLM_APP +"/" + callId;
} }
private Flow.Subscriber<SIPRequest> ffmpegTask(SIPRequest request,ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){ private Flow.Subscriber<SIPRequest> ffmpegTask(SIPRequest request,ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
@ -232,6 +239,45 @@ public class DeviceProxyService {
}; };
} }
public Flow.Subscriber<SIPRequest> zlmFfmpegByeSubscriber(String key, SIPRequest inviteRequest,MockingDevice device){
return new Flow.Subscriber<>() {
private SIPRequest request;
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("订阅 bye {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
request = item;
subscribe.getByeSubscribe().delPublisher(key);
}
@Override
public void onError(Throwable throwable) {}
@Override
public void onComplete() {
log.info("bye 订阅结束 {}", key);
if(request == null){
sendBye(inviteRequest,device,"");
} else {
String ip = request.getLocalAddress().getHostAddress();
String transPort = request.getTopmostViaHeader().getTransport();
sender.sendResponse(ip, transPort, ((provider, ip1, port) ->
SipResponseBuilder.response(request, Response.OK, "OK")));
}
String cacheKey = CacheUtil.getKey("ZLM","FFMPEG", "PROXY", key);
String proxyKey = RedisUtil.StringOps.get(cacheKey);
log.info("关闭拉流代理 {}", zlmMediaService.delFfmpegSource(proxyKey));
RedisUtil.KeyOps.delete(cacheKey);
}
};
}
public Flow.Subscriber<SIPRequest> zlmByeSubscriber(String key, SIPRequest inviteRequest,MockingDevice device){ public Flow.Subscriber<SIPRequest> zlmByeSubscriber(String key, SIPRequest inviteRequest,MockingDevice device){
return new Flow.Subscriber<>() { return new Flow.Subscriber<>() {
private SIPRequest request; private SIPRequest request;
@ -270,6 +316,69 @@ public class DeviceProxyService {
}; };
} }
public void pullStreamByZlmFfmpegSource(SIPRequest request,String callId, MockingDevice device, Date start, Date stop,String rtpAddr, int rtpPort, String ssrc){
Retryer<ZlmResponse<AddFFmpegSourceResp>> retryer = RetryerBuilder.<ZlmResponse<AddFFmpegSourceResp>>newBuilder()
.retryIfResult(resp -> {
log.info("resp {}", resp);
return !resp.getCode().equals(ResponseStatus.Success);
})
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
String toUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId;
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
try {
ZlmResponse<AddFFmpegSourceResp> sourceResp = retryer.call(() -> zlmMediaService.addFfmpegSource(AddFFmpegSource.builder()
.srcUrl(getProxyUrl(device,start,stop))
.dstUrl(toUrl)
.enableHls(false)
.enableMp4(false)
.timeoutMs(Duration.ofSeconds(30).toMillis())
.build()));
String proxyKey = sourceResp.getData().getKey();
String cacheKey = CacheUtil.getKey("ZLM","FFMPEG", "PROXY", key);
RedisUtil.StringOps.set(cacheKey, proxyKey);
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
Retryer<StartSendRtpResp> rtpRetryer = rtpRetryer();
zlmStreamChangeHookService.getRegistHandler(ZLM_FFMPEG_PROXY_APP).put(callId,()->{
try {
rtpRetryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp(DEFAULT_ZLM_APP);
startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(rtpAddr);
startSendRtp.setDstPort(rtpPort);
startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
return startSendRtpResp;
});
} catch (Exception e){
log.error("zlm rtp 推流失败",e);
sendBye(request, device, "");
}
});
Flow.Subscriber<SIPRequest> subscriber = zlmFfmpegByeSubscriber(key,request,device);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
}catch (Exception e){
log.error("zlm ffmpeg 拉/推流失败",e);
sendBye(request, device, "");
}
}
@SneakyThrows @SneakyThrows
public void pullLiveStream2Rtp(SIPRequest request,String callId, MockingDevice device, String rtpAddr, int rtpPort, String ssrc){ public void pullLiveStream2Rtp(SIPRequest request,String callId, MockingDevice device, String rtpAddr, int rtpPort, String ssrc){
Retryer<ZlmResponse<AddStreamProxyResp>> retryer = RetryerBuilder.<ZlmResponse<AddStreamProxyResp>>newBuilder() Retryer<ZlmResponse<AddStreamProxyResp>> retryer = RetryerBuilder.<ZlmResponse<AddStreamProxyResp>>newBuilder()
@ -290,7 +399,7 @@ public class DeviceProxyService {
try { try {
ZlmResponse<AddStreamProxyResp> proxy = retryer.call(() -> zlmMediaService.addStreamProxy(AddStreamProxy.builder() ZlmResponse<AddStreamProxyResp> proxy = retryer.call(() -> zlmMediaService.addStreamProxy(AddStreamProxy.builder()
.url(liveUrl) .url(liveUrl)
.app("live") .app(DEFAULT_ZLM_APP)
.stream(callId) .stream(callId)
.build())); .build()));
@ -304,24 +413,12 @@ public class DeviceProxyService {
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
Retryer<StartSendRtpResp> rtpRetryer = RetryerBuilder.<StartSendRtpResp>newBuilder() Retryer<StartSendRtpResp> rtpRetryer = rtpRetryer();
.retryIfResult(resp -> { zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
log.info("resp {}", resp);
return resp.getLocalPort() == null || resp.getLocalPort() <= 0;
})
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
try { try {
rtpRetryer.call(()->{ rtpRetryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp(); StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live"); startSendRtp.setApp(DEFAULT_ZLM_APP);
startSendRtp.setStream(callId); startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc); startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(rtpAddr); startSendRtp.setDstUrl(rtpAddr);
@ -348,7 +445,7 @@ public class DeviceProxyService {
} }
} }
public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) { private String getProxyUrl(MockingDevice device, Date startTime, Date endTime){
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3); HashMap<String, String> map = new HashMap<>(3);
String deviceCode = device.getDeviceCode(); String deviceCode = device.getDeviceCode();
@ -360,10 +457,14 @@ public class DeviceProxyService {
String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8); String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8);
fromUrl = StringUtils.joinWith("?", fromUrl, query); fromUrl = StringUtils.joinWith("?", fromUrl, query);
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl); log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); return fromUrl;
}
public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) {
String fromUrl = getProxyUrl(device, startTime, endTime);
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key); subscribe.getByeSubscribe().addPublisher(key);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
taskProcessor.process(request, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc); taskProcessor.process(request, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc);
} }
@ -397,7 +498,7 @@ public class DeviceProxyService {
String callId = requestCallId.getCallId(); String callId = requestCallId.getCallId();
callbackTask.remove(callId); callbackTask.remove(callId);
Optional<ZlmStreamChangeHookService.ZlmStreamChangeHookHandler> optionalZlmStreamChangeHookHandler = Optional<ZlmStreamChangeHookService.ZlmStreamChangeHookHandler> optionalZlmStreamChangeHookHandler =
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)); Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId));
// 如果取消注册已完成就直接结束, 否则发送 bye请求 结束 // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束
if(optionalZlmStreamChangeHookHandler.isEmpty()){ if(optionalZlmStreamChangeHookHandler.isEmpty()){
return; return;
@ -471,4 +572,19 @@ public class DeviceProxyService {
log.error("bye 请求发送失败 {}",e.getMessage()); log.error("bye 请求发送失败 {}",e.getMessage());
} }
} }
private Retryer<StartSendRtpResp> rtpRetryer(){
return RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> {
log.info("resp {}", resp);
return resp.getLocalPort() == null || resp.getLocalPort() <= 0;
})
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
}
} }

View File

@ -1,7 +1,9 @@
package cn.skcks.docking.gb28181.mocking.service.zlm.hook; package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig; import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import lombok.AccessLevel;
import lombok.Data; import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -20,13 +22,26 @@ public class ZlmStreamChangeHookService {
void handler(); void handler();
} }
public ConcurrentMap<String, ZlmStreamChangeHookHandler> registHandler = new ConcurrentHashMap<>(); @Getter(AccessLevel.PRIVATE)
public ConcurrentMap<String, ZlmStreamChangeHookHandler> unregistHandler = new ConcurrentHashMap<>(); private ConcurrentMap<String,ConcurrentMap<String, ZlmStreamChangeHookHandler>> registHandler = new ConcurrentHashMap<>();
@Getter(AccessLevel.PRIVATE)
private ConcurrentMap<String,ConcurrentMap<String, ZlmStreamChangeHookHandler>> unregistHandler = new ConcurrentHashMap<>();
public ConcurrentMap<String, ZlmStreamChangeHookHandler> getRegistHandler(String app){
this.registHandler.putIfAbsent(app,new ConcurrentHashMap<>());
return this.registHandler.get(app);
}
public ConcurrentMap<String, ZlmStreamChangeHookHandler> getUnregistHandler(String app){
this.unregistHandler.putIfAbsent(app,new ConcurrentHashMap<>());
return this.unregistHandler.get(app);
}
public void processEvent(String app,String streamId, Boolean regist){ public void processEvent(String app,String streamId, Boolean regist){
log.debug("app {}, streamId {}, regist {}", app,streamId, regist); log.debug("app {}, streamId {}, regist {}", app,streamId, regist);
if(regist){ if(regist){
ConcurrentMap<String, ZlmStreamChangeHookHandler> registHandler = getRegistHandler(app);
Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{ Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{
try { try {
Thread.sleep(zlmHookConfig.getDelay().toMillis()); Thread.sleep(zlmHookConfig.getDelay().toMillis());
@ -36,6 +51,7 @@ public class ZlmStreamChangeHookService {
handler.handler(); handler.handler();
}); });
} else { } else {
ConcurrentMap<String, ZlmStreamChangeHookHandler> unregistHandler = getUnregistHandler(app);
Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{ Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{
try { try {
Thread.sleep(zlmHookConfig.getDelay().toMillis()); Thread.sleep(zlmHookConfig.getDelay().toMillis());

View File

@ -78,6 +78,7 @@ proxy:
# 参数 device_id, begin_time, end_time # 参数 device_id, begin_time, end_time
#url: http://192.168.2.3:18183 #url: http://192.168.2.3:18183
url: http://10.10.10.20:18183 url: http://10.10.10.20:18183
#url: http://127.0.0.1:18183
ffmpeg-support: ffmpeg-support:
task: task:
# 最大同时推流任务数, <= 0 时不做限制 # 最大同时推流任务数, <= 0 时不做限制
@ -112,6 +113,8 @@ ffmpeg-support:
download: false download: false
input: false input: false
output: false output: false
# 是否通过 zlm 调用 ffmpeg
use-zlm-ffmpeg: false
# [可选] 日志配置, 一般不需要改 # [可选] 日志配置, 一般不需要改
logging: logging: