diff --git a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java index 93a3121..50ac764 100644 --- a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java +++ b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java @@ -21,6 +21,6 @@ public class ZlmHookApi { @PostJson("/on_stream_changed") public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){ - zlmStreamChangeHookService.processEvent(dto.getStream(),dto.getStream(), dto.getRegist()); + zlmStreamChangeHookService.processEvent(dto.getApp(),dto.getStream(), dto.getRegist()); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index 39c79fc..3ed1222 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -1,6 +1,5 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.request; -import cn.hutool.core.date.DateUtil; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; @@ -37,6 +36,7 @@ import java.util.concurrent.*; @Slf4j @RequiredArgsConstructor @Component +@SuppressWarnings("Duplicates") public class InviteRequestProcessor implements MessageProcessor { private final SipListener sipListener; @@ -127,17 +127,56 @@ public class InviteRequestProcessor implements MessageProcessor { } /** - * 模拟设备不支持实时 故直接回放 最近15分钟 至 当前时间录像 + * 视频点播 * * @param gb28181Description gb28181 sdp * @param mediaDescription 媒体描述符 */ @SneakyThrows private void play(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) { - TimeField time = new TimeField(); - time.setStart(DateUtil.offsetMinute(DateUtil.date(), -5)); - time.setStop(DateUtil.date()); - playback(request, device, gb28181Description, mediaDescription, time); + TimeField timeField = new TimeField(); + timeField.setZero(); + SdpFactory.getInstance().createTimeDescription(timeField); + + String channelId = gb28181Description.getOrigin().getUsername(); + log.info("通道id: {}", channelId); + String address = gb28181Description.getOrigin().getAddress(); + log.info("目标地址: {}", address); + Media media = mediaDescription.getMedia(); + int port = media.getMediaPort(); + log.info("目标端口号: {}", port); + + String senderIp = request.getLocalAddress().getHostAddress(); + String transport = request.getTopmostViaHeader().getTransport(); + if(StringUtils.isBlank(device.getLiveStream())){ + log.warn("设备({} => {}) 无可用实时流地址, 返回 418", device.getGbDeviceId(), channelId); + sender.sendResponse(senderIp, transport, unsupported(request)); + return; + } + + String ssrc = gb28181Description.getSsrcField().getSsrc(); + GB28181Description sdp = GB28181SDPBuilder.Sender.build(GB28181SDPBuilder.Action.PLAY, + device.getGbDeviceId(), + channelId, Connection.IP4, address, port, + ssrc, + MediaStreamMode.of(((MediaDescription) gb28181Description.getMediaDescriptions(true).get(0)).getMedia().getProtocol()), + SdpFactory.getInstance().createTimeDescription(timeField)); + // playback(request, device, gb28181Description, mediaDescription, time); + String callId = request.getCallId().getCallId(); + String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); + subscribe.getAckSubscribe().addPublisher(key); + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + final ScheduledFuture[] schedule = new ScheduledFuture[1]; + Flow.Subscriber subscriber = playSubscriber(request,callId,device,address,port,key,ssrc,schedule); + // 60秒超时计时器 + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS); + // 推流 ack 事件订阅 + subscribe.getAckSubscribe().addSubscribe(key, subscriber); + + scheduledExecutorService.schedule(()->{ + // 发送 sdp 响应 + sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, sdp)); + }, 1,TimeUnit.SECONDS); } /** @@ -192,14 +231,15 @@ public class InviteRequestProcessor implements MessageProcessor { GB28181SDPBuilder.Action action = isDownload ? GB28181SDPBuilder.Action.DOWNLOAD : GB28181SDPBuilder.Action.PLAY_BACK; TimeField timeField = new TimeField(); timeField.setZero(); + + String ssrc = gb28181Description.getSsrcField().getSsrc(); GB28181Description sdp = GB28181SDPBuilder.Sender.build(action, device.getGbDeviceId(), channelId, Connection.IP4, address, port, - gb28181Description.getSsrcField().getSsrc(), + ssrc, MediaStreamMode.of(((MediaDescription) gb28181Description.getMediaDescriptions(true).get(0)).getMedia().getProtocol()), SdpFactory.getInstance().createTimeDescription(timeField)); - String ssrc = gb28181Description.getSsrcField().getSsrc(); String callId = request.getCallId().getCallId(); String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); subscribe.getAckSubscribe().addPublisher(key); @@ -222,6 +262,35 @@ public class InviteRequestProcessor implements MessageProcessor { }, 1,TimeUnit.SECONDS); } + public Flow.Subscriber playSubscriber(SIPRequest request,String callId,MockingDevice device,String address,int port,String key, String ssrc,ScheduledFuture[] scheduledFuture){ + return new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + log.info("创建 ack 订阅 {}", key); + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + log.info("收到 ack 确认请求: {} 开始推流",key); + // RTP 推流 + deviceProxyService.pullLiveStream2Rtp(request, callId, device, address, port,ssrc); + onComplete(); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscribe.getAckSubscribe().delPublisher(key); + scheduledFuture[0].cancel(true); + } + }; + } + public Flow.Subscriber placbackSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key, String ssrc,ScheduledFuture[] scheduledFuture){ return new Flow.Subscriber<>() { @Override diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index 7272017..ad52678 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -5,13 +5,19 @@ import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.URLUtil; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; +import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxy; +import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxyResp; +import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse; import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtp; import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp; +import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus; import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig; @@ -126,11 +132,11 @@ public class DeviceProxyService { return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; } - private Flow.Subscriber task(ConcurrentHashMap tasks, String callId, String key, MockingDevice device){ + private Flow.Subscriber ffmpegTask(ConcurrentHashMap tasks, String callId, String key, MockingDevice device){ Optional.ofNullable(tasks.get(callId)).ifPresent(task->{ task.getWatchdog().destroyProcess(); }); - Flow.Subscriber subscriber = byeSubscriber(key, device, tasks); + Flow.Subscriber subscriber = ffmpegByeSubscriber(key, device, tasks); subscribe.getByeSubscribe().addSubscribe(key, subscriber); int num = taskNum.incrementAndGet(); log.info("当前任务数 {}", num); @@ -139,7 +145,7 @@ public class DeviceProxyService { public TaskProcessor playbackTask(){ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { - Flow.Subscriber task = task(callbackTask, callId, key, device); + Flow.Subscriber task = ffmpegTask(callbackTask, callId, key, device); ScheduledFuture schedule = trying(request); try { String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); @@ -157,7 +163,7 @@ public class DeviceProxyService { public TaskProcessor downloadTask(){ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ - Flow.Subscriber task = task(downloadTask, callId, key, device); + Flow.Subscriber task = ffmpegTask(downloadTask, callId, key, device); ScheduledFuture schedule = trying(request); try { String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); @@ -183,7 +189,7 @@ public class DeviceProxyService { }, 200, TimeUnit.MILLISECONDS); } - public Flow.Subscriber byeSubscriber(String key, MockingDevice device, ConcurrentHashMap task){ + public Flow.Subscriber ffmpegByeSubscriber(String key, MockingDevice device, ConcurrentHashMap task){ return new Flow.Subscriber<>() { @Override public void onSubscribe(Flow.Subscription subscription) { @@ -217,6 +223,115 @@ public class DeviceProxyService { }; } + public Flow.Subscriber zlmByeSubscriber(String key, MockingDevice device){ + return new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + log.info("订阅 bye {}", key); + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + String ip = item.getLocalAddress().getHostAddress(); + String transPort = item.getTopmostViaHeader().getTransport(); + sender.sendResponse(ip, transPort, ((provider, ip1, port) -> + SipResponseBuilder.response(item, Response.OK, "OK"))); + subscribe.getByeSubscribe().delPublisher(key); + } + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onComplete() { + log.info("bye 订阅结束 {}", key); + String cacheKey = CacheUtil.getKey("INVITE", "PROXY", key); + String proxyKey = RedisUtil.StringOps.get(cacheKey); + log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey)); + RedisUtil.KeyOps.delete(cacheKey); + } + }; + } + + @SneakyThrows + public void pullLiveStream2Rtp(SIPRequest request,String callId, MockingDevice device, String rtpAddr, int rtpPort, String ssrc){ + Retryer> retryer = RetryerBuilder.>newBuilder() + .retryIfResult(resp -> { + log.info("resp {}", resp); + return !resp.getCode().equals(ResponseStatus.Success); + }) + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + + String liveUrl = device.getLiveStream(); + + try { + ZlmResponse proxy = retryer.call(() -> zlmMediaService.addStreamProxy(AddStreamProxy.builder() + .url(liveUrl) + .app("live") + .stream(callId) + .build())); + + log.info("使用 zlm 代理拉流 {}", proxy); + String proxyKey = proxy.getData().getKey(); + String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); + String cacheKey = CacheUtil.getKey("INVITE", "PROXY", key); + RedisUtil.StringOps.set(cacheKey, proxyKey); + + GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); + MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); + boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); + + Retryer rtpRetryer = RetryerBuilder.newBuilder() + .retryIfResult(resp -> { + log.info("resp {}", resp); + return resp.getLocalPort() == null || resp.getLocalPort() <= 0; + }) + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + + zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ + try { + rtpRetryer.call(()->{ + StartSendRtp startSendRtp = new StartSendRtp(); + startSendRtp.setApp("live"); + startSendRtp.setStream(callId); + startSendRtp.setSsrc(ssrc); + startSendRtp.setDstUrl(rtpAddr); + startSendRtp.setDstPort(rtpPort); + startSendRtp.setUdp(!tcp); + log.info("startSendRtp {}",startSendRtp); + StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); + log.info("startSendRtpResp {}",startSendRtpResp); + return startSendRtpResp; + }); + } catch (Exception e){ + log.error("zlm rtp 推流失败",e); + sendBye(request, device, ""); + } + }); + + // zlmStreamChangeHookService.getUnregistHandler().put(callId,()-> sendBye(request,device,key)); + Flow.Subscriber subscriber = zlmByeSubscriber(key,device); + subscribe.getByeSubscribe().addPublisher(key); + subscribe.getByeSubscribe().addSubscribe(key, subscriber); + } catch (Exception e) { + log.error("zlm 代理拉流失败",e); + sendBye(request, device, ""); + } + } + public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) { String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); HashMap map = new HashMap<>(3); diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java index 1d5cc58..56ea172 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java @@ -23,8 +23,8 @@ public class ZlmStreamChangeHookService { public ConcurrentMap registHandler = new ConcurrentHashMap<>(); public ConcurrentMap unregistHandler = new ConcurrentHashMap<>(); - public void processEvent(String stream,String streamId, Boolean regist){ - log.debug("stream {}, streamId {}, regist {}", stream,streamId, regist); + public void processEvent(String app,String streamId, Boolean regist){ + log.debug("app {}, streamId {}, regist {}", app,streamId, regist); if(regist){ Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{