通过代理拉取视频流 实现实时视频播放

This commit is contained in:
shikong 2024-01-12 17:02:11 +08:00
parent 5a68c819cb
commit 83ae4ebb4d
4 changed files with 200 additions and 16 deletions

View File

@ -21,6 +21,6 @@ public class ZlmHookApi {
@PostJson("/on_stream_changed")
public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){
zlmStreamChangeHookService.processEvent(dto.getStream(),dto.getStream(), dto.getRegist());
zlmStreamChangeHookService.processEvent(dto.getApp(),dto.getStream(), dto.getRegist());
}
}

View File

@ -1,6 +1,5 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.request;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
@ -37,6 +36,7 @@ import java.util.concurrent.*;
@Slf4j
@RequiredArgsConstructor
@Component
@SuppressWarnings("Duplicates")
public class InviteRequestProcessor implements MessageProcessor {
private final SipListener sipListener;
@ -127,17 +127,56 @@ public class InviteRequestProcessor implements MessageProcessor {
}
/**
* 模拟设备不支持实时 故直接回放 最近15分钟 当前时间录像
* 视频点播
*
* @param gb28181Description gb28181 sdp
* @param mediaDescription 媒体描述符
*/
@SneakyThrows
private void play(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
TimeField time = new TimeField();
time.setStart(DateUtil.offsetMinute(DateUtil.date(), -5));
time.setStop(DateUtil.date());
playback(request, device, gb28181Description, mediaDescription, time);
TimeField timeField = new TimeField();
timeField.setZero();
SdpFactory.getInstance().createTimeDescription(timeField);
String channelId = gb28181Description.getOrigin().getUsername();
log.info("通道id: {}", channelId);
String address = gb28181Description.getOrigin().getAddress();
log.info("目标地址: {}", address);
Media media = mediaDescription.getMedia();
int port = media.getMediaPort();
log.info("目标端口号: {}", port);
String senderIp = request.getLocalAddress().getHostAddress();
String transport = request.getTopmostViaHeader().getTransport();
if(StringUtils.isBlank(device.getLiveStream())){
log.warn("设备({} => {}) 无可用实时流地址, 返回 418", device.getGbDeviceId(), channelId);
sender.sendResponse(senderIp, transport, unsupported(request));
return;
}
String ssrc = gb28181Description.getSsrcField().getSsrc();
GB28181Description sdp = GB28181SDPBuilder.Sender.build(GB28181SDPBuilder.Action.PLAY,
device.getGbDeviceId(),
channelId, Connection.IP4, address, port,
ssrc,
MediaStreamMode.of(((MediaDescription) gb28181Description.getMediaDescriptions(true).get(0)).getMedia().getProtocol()),
SdpFactory.getInstance().createTimeDescription(timeField));
// playback(request, device, gb28181Description, mediaDescription, time);
String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(key);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber = playSubscriber(request,callId,device,address,port,key,ssrc,schedule);
// 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
// 推流 ack 事件订阅
subscribe.getAckSubscribe().addSubscribe(key, subscriber);
scheduledExecutorService.schedule(()->{
// 发送 sdp 响应
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, sdp));
}, 1,TimeUnit.SECONDS);
}
/**
@ -192,14 +231,15 @@ public class InviteRequestProcessor implements MessageProcessor {
GB28181SDPBuilder.Action action = isDownload ? GB28181SDPBuilder.Action.DOWNLOAD : GB28181SDPBuilder.Action.PLAY_BACK;
TimeField timeField = new TimeField();
timeField.setZero();
String ssrc = gb28181Description.getSsrcField().getSsrc();
GB28181Description sdp = GB28181SDPBuilder.Sender.build(action,
device.getGbDeviceId(),
channelId, Connection.IP4, address, port,
gb28181Description.getSsrcField().getSsrc(),
ssrc,
MediaStreamMode.of(((MediaDescription) gb28181Description.getMediaDescriptions(true).get(0)).getMedia().getProtocol()),
SdpFactory.getInstance().createTimeDescription(timeField));
String ssrc = gb28181Description.getSsrcField().getSsrc();
String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(key);
@ -222,6 +262,35 @@ public class InviteRequestProcessor implements MessageProcessor {
}, 1,TimeUnit.SECONDS);
}
public Flow.Subscriber<SIPRequest> playSubscriber(SIPRequest request,String callId,MockingDevice device,String address,int port,String key, String ssrc,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("创建 ack 订阅 {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.pullLiveStream2Rtp(request, callId, device, address, port,ssrc);
onComplete();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscribe.getAckSubscribe().delPublisher(key);
scheduledFuture[0].cancel(true);
}
};
}
public Flow.Subscriber<SIPRequest> placbackSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key, String ssrc,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override

View File

@ -5,13 +5,19 @@ import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.URLUtil;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxy;
import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxyResp;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtp;
import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
@ -126,11 +132,11 @@ public class DeviceProxyService {
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
}
private Flow.Subscriber<SIPRequest> task(ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
private Flow.Subscriber<SIPRequest> ffmpegTask(ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
Optional.ofNullable(tasks.get(callId)).ifPresent(task->{
task.getWatchdog().destroyProcess();
});
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, tasks);
Flow.Subscriber<SIPRequest> subscriber = ffmpegByeSubscriber(key, device, tasks);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num);
@ -139,7 +145,7 @@ public class DeviceProxyService {
public TaskProcessor playbackTask(){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
Flow.Subscriber<SIPRequest> task = task(callbackTask, callId, key, device);
Flow.Subscriber<SIPRequest> task = ffmpegTask(callbackTask, callId, key, device);
ScheduledFuture<?> schedule = trying(request);
try {
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
@ -157,7 +163,7 @@ public class DeviceProxyService {
public TaskProcessor downloadTask(){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
Flow.Subscriber<SIPRequest> task = task(downloadTask, callId, key, device);
Flow.Subscriber<SIPRequest> task = ffmpegTask(downloadTask, callId, key, device);
ScheduledFuture<?> schedule = trying(request);
try {
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
@ -183,7 +189,7 @@ public class DeviceProxyService {
}, 200, TimeUnit.MILLISECONDS);
}
public Flow.Subscriber<SIPRequest> byeSubscriber(String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){
public Flow.Subscriber<SIPRequest> ffmpegByeSubscriber(String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
@ -217,6 +223,115 @@ public class DeviceProxyService {
};
}
public Flow.Subscriber<SIPRequest> zlmByeSubscriber(String key, MockingDevice device){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("订阅 bye {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
String ip = item.getLocalAddress().getHostAddress();
String transPort = item.getTopmostViaHeader().getTransport();
sender.sendResponse(ip, transPort, ((provider, ip1, port) ->
SipResponseBuilder.response(item, Response.OK, "OK")));
subscribe.getByeSubscribe().delPublisher(key);
}
@Override
public void onError(Throwable throwable) {}
@Override
public void onComplete() {
log.info("bye 订阅结束 {}", key);
String cacheKey = CacheUtil.getKey("INVITE", "PROXY", key);
String proxyKey = RedisUtil.StringOps.get(cacheKey);
log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey));
RedisUtil.KeyOps.delete(cacheKey);
}
};
}
@SneakyThrows
public void pullLiveStream2Rtp(SIPRequest request,String callId, MockingDevice device, String rtpAddr, int rtpPort, String ssrc){
Retryer<ZlmResponse<AddStreamProxyResp>> retryer = RetryerBuilder.<ZlmResponse<AddStreamProxyResp>>newBuilder()
.retryIfResult(resp -> {
log.info("resp {}", resp);
return !resp.getCode().equals(ResponseStatus.Success);
})
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
String liveUrl = device.getLiveStream();
try {
ZlmResponse<AddStreamProxyResp> proxy = retryer.call(() -> zlmMediaService.addStreamProxy(AddStreamProxy.builder()
.url(liveUrl)
.app("live")
.stream(callId)
.build()));
log.info("使用 zlm 代理拉流 {}", proxy);
String proxyKey = proxy.getData().getKey();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
String cacheKey = CacheUtil.getKey("INVITE", "PROXY", key);
RedisUtil.StringOps.set(cacheKey, proxyKey);
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
Retryer<StartSendRtpResp> rtpRetryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> {
log.info("resp {}", resp);
return resp.getLocalPort() == null || resp.getLocalPort() <= 0;
})
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
try {
rtpRetryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live");
startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(rtpAddr);
startSendRtp.setDstPort(rtpPort);
startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
return startSendRtpResp;
});
} catch (Exception e){
log.error("zlm rtp 推流失败",e);
sendBye(request, device, "");
}
});
// zlmStreamChangeHookService.getUnregistHandler().put(callId,()-> sendBye(request,device,key));
Flow.Subscriber<SIPRequest> subscriber = zlmByeSubscriber(key,device);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
} catch (Exception e) {
log.error("zlm 代理拉流失败",e);
sendBye(request, device, "");
}
}
public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) {
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3);

View File

@ -23,8 +23,8 @@ public class ZlmStreamChangeHookService {
public ConcurrentMap<String, ZlmStreamChangeHookHandler> registHandler = new ConcurrentHashMap<>();
public ConcurrentMap<String, ZlmStreamChangeHookHandler> unregistHandler = new ConcurrentHashMap<>();
public void processEvent(String stream,String streamId, Boolean regist){
log.debug("stream {}, streamId {}, regist {}", stream,streamId, regist);
public void processEvent(String app,String streamId, Boolean regist){
log.debug("app {}, streamId {}, regist {}", app,streamId, regist);
if(regist){
Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{