diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/ZlmRtmpConfig.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/ZlmRtmpConfig.java new file mode 100644 index 0000000..865340e --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/ZlmRtmpConfig.java @@ -0,0 +1,12 @@ +package cn.skcks.docking.gb28181.mocking.config.sip; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "media.rtmp") +public class ZlmRtmpConfig { + int port = 1935; +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/ZlmRtspConfig.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/ZlmRtspConfig.java index dfdb016..9b8ed7b 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/ZlmRtspConfig.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/ZlmRtspConfig.java @@ -7,6 +7,7 @@ import org.springframework.context.annotation.Configuration; @Data @Configuration @ConfigurationProperties(prefix = "media.rtsp") +@Deprecated public class ZlmRtspConfig { int port = 554; } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index a53aa21..e874363 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -283,14 +283,14 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle deviceProxyService.pullStreamByZlmFfmpegSource(request, callId, device, start, stop, address, port, ssrc); } else { // RTP 推流 - deviceProxyService.proxyVideo2Rtp(request, sendOkResponse, callId, device, start, stop, address, port, ssrc, deviceProxyService.playbackTask()); + deviceProxyService.proxyVideo2Rtp(sendOkResponse,request, callId, device, start, stop, address, port, ssrc, deviceProxyService.playbackTask()); } } public void downloadSubscriber(SIPRequest request,Runnable sendOkResponse, String callId,MockingDevice device,Date start,Date stop,String address,int port,String ssrc){ log.info("收到 下载请求: {} 开始推流",callId); - // RTP 推流 - deviceProxyService.proxyVideo2Rtp(request, sendOkResponse, callId, device, start, stop, address, port, ssrc,deviceProxyService.downloadTask()); + // RTP 推流 + deviceProxyService.proxyVideo2Rtp(sendOkResponse, request, callId, device, start, stop, address, port, ssrc,deviceProxyService.downloadTask()); } @Override diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/AckSubscribe.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/AckSubscribe.java index 57069a6..1649341 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/AckSubscribe.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/AckSubscribe.java @@ -1,17 +1,15 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe; import gov.nist.javax.sip.message.SIPRequest; import lombok.RequiredArgsConstructor; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Flow; -import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.*; @RequiredArgsConstructor -public class AckSubscribe implements GenericSubscribe { +public class AckSubscribe implements GenericTimeoutSubscribe { private final Executor executor; private static final Map> publishers = new ConcurrentHashMap<>(); @@ -41,4 +39,14 @@ public class AckSubscribe implements GenericSubscribe { public void complete(String key) { delPublisher(key); } + + @Override + public void addPublisher(String key, long time, TimeUnit timeUnit) { + + } + + @Override + public void refreshPublisher(String key, long time, TimeUnit timeUnit) { + + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java index 7f98777..6f7ad97 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java @@ -1,6 +1,8 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipRequestSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; @@ -13,6 +15,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; @Slf4j @Data @@ -21,14 +25,15 @@ import java.util.concurrent.Executor; public class SipSubscribe { @Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); private GenericSubscribe registerSubscribe; - private GenericSubscribe ackSubscribe; + private GenericTimeoutSubscribe ackSubscribe; private GenericSubscribe byeSubscribe; @PostConstruct private void init() { registerSubscribe = new RegisterSubscribe(executor); - ackSubscribe = new AckSubscribe(executor); + ackSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService); byeSubscribe = new ByeSubscribe(executor); } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index 6a7177b..82eb82e 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -23,10 +23,7 @@ import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp; import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp; import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus; import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; -import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; -import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig; -import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig; -import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtspConfig; +import cn.skcks.docking.gb28181.mocking.config.sip.*; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; @@ -91,7 +88,7 @@ public class DeviceProxyService { private final ZlmMediaService zlmMediaService; private final ZlmMediaConfig zlmMediaConfig; private final ZlmStreamChangeHookService zlmStreamChangeHookService; - private final ZlmRtspConfig zlmRtspConfig; + private final ZlmRtmpConfig zlmRtmpConfig; private final VideoCacheManager videoCacheManager; private final String DEFAULT_ZLM_APP = "live"; @@ -104,10 +101,10 @@ public class DeviceProxyService { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); public interface TaskProcessor { - void process(SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc); + void process(Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc); } - private void requestZlmPushStream(ScheduledFuture schedule, Runnable sendOkResponse, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{ + private void requestZlmPushStream(SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{ GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); @@ -121,6 +118,7 @@ public class DeviceProxyService { // 重试次数 .withStopStrategy(StopStrategies.stopAfterAttempt(3)) .build(); + zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{ try { retryer.call(()->{ StartSendRtp startSendRtp = new StartSendRtp(); @@ -136,16 +134,13 @@ public class DeviceProxyService { return startSendRtpResp; }); } catch (Exception e) { - schedule.cancel(true); + log.error("zlm rtp 推流失败",e); Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId)) .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); throw new RuntimeException(e); } + }); - // 停止发送 trying - schedule.cancel(false); - // 响应 sdp ok - sendOkResponse.run(); // }); zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ StopSendRtp stopSendRtp = new StopSendRtp(); @@ -170,57 +165,101 @@ public class DeviceProxyService { } public TaskProcessor playbackTask(){ - return (SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { - ScheduledFuture schedule = trying(request); - Flow.Subscriber task = ffmpegTask(request, callbackTask, callId, key, device); - try { - String zlmRtpUrl = getZlmRtspUrl(DEFAULT_ZLM_APP, callId); - FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule,request, device, key); - Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); - requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - callbackTask.put(device.getDeviceCode(), executor); - executeResultHandler.waitFor(); - } catch (Exception e) { - schedule.cancel(true); - sendBye(request,device,""); - throw new RuntimeException(e); - } + return (Runnable sendOkResponse, SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { + scheduledExecutorService.schedule(() -> { + trying(request); + sendOkResponse.run(); + String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); + subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); + subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + subscribe.getAckSubscribe().delPublisher(ackKey); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onComplete() { + Flow.Subscriber task = ffmpegTask(request, callbackTask, callId, key, device); + try { + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); + FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); + Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + callbackTask.put(device.getDeviceCode(), executor); + executeResultHandler.waitFor(); + } catch (Exception e) { + sendBye(request,device,""); + throw new RuntimeException(e); + } + } + }); + }, 1, TimeUnit.SECONDS); }; } public TaskProcessor downloadTask(){ - return (SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ - ScheduledFuture schedule = trying(request); - Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); - try { - String zlmRtpUrl = getZlmRtspUrl(DEFAULT_ZLM_APP, callId); - FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule, request, device, key); - Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); - requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - downloadTask.put(device.getDeviceCode(), executor); - executeResultHandler.waitFor(); - } catch (Exception e) { - schedule.cancel(true); - sendBye(request,device,""); - throw new RuntimeException(e); - } + return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ + scheduledExecutorService.schedule(() -> { + trying(request); + sendOkResponse.run(); + String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); + subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); + subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + subscribe.getAckSubscribe().delPublisher(ackKey); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onComplete() { + Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); + try { + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); + FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); + Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + downloadTask.put(device.getDeviceCode(), executor); + executeResultHandler.waitFor(); + } catch (Exception e) { + sendBye(request, device, ""); + throw new RuntimeException(e); + } + } + }); + }, 1, TimeUnit.SECONDS); }; } - private String getZlmRtspUrl(String app, String streamId){ - return "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtspConfig.getPort() + "/" + app +"/" + streamId; + private String getZlmRtmpUrl(String app, String streamId){ + return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId; } - private ScheduledFuture trying(SIPRequest request){ - return scheduledExecutorService.scheduleAtFixedRate(() -> { - InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build(); - Response tryingInviteResponse = inviteRequestBuilder.createTryingInviteResponse(request); - String ip = request.getLocalAddress().getHostAddress(); - String transPort = request.getTopmostViaHeader().getTransport(); - sender.sendResponse(ip, transPort, ((provider, ip1, port) -> tryingInviteResponse)); - }, 0,1, TimeUnit.SECONDS); + private void trying(SIPRequest request){ + InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build(); + Response tryingInviteResponse = inviteRequestBuilder.createTryingInviteResponse(request); + String ip = request.getLocalAddress().getHostAddress(); + String transPort = request.getTopmostViaHeader().getTransport(); + sender.sendResponse(ip, transPort, ((provider, ip1, port) -> tryingInviteResponse)); } public Flow.Subscriber ffmpegByeSubscriber(SIPRequest inviteRequest,String key, MockingDevice device, ConcurrentHashMap task){ @@ -355,7 +394,7 @@ public class DeviceProxyService { .withStopStrategy(StopStrategies.stopAfterAttempt(3)) .build(); - String toUrl = "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtspConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId; + String toUrl = "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId; String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); try { ZlmResponse sourceResp = retryer.call(() -> zlmMediaService.addFfmpegSource(AddFFmpegSource.builder() @@ -426,7 +465,8 @@ public class DeviceProxyService { } } - ScheduledFuture schedule = trying(request); + trying(request); + sendOkResponse.run(); Retryer> retryer = RetryerBuilder.>newBuilder() .retryIfResult(resp -> { log.info("resp {}", resp); @@ -479,11 +519,6 @@ public class DeviceProxyService { log.error("zlm rtp 推流失败",e); sendBye(request, device, ""); } - - // 停止发送 trying - schedule.cancel(false); - // 响应 sdp ok - sendOkResponse.run(); }); zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> { @@ -501,8 +536,6 @@ public class DeviceProxyService { subscribe.getByeSubscribe().addPublisher(key); subscribe.getByeSubscribe().addSubscribe(key, subscriber); } catch (Exception e) { - // 停止发送 trying - schedule.cancel(true); log.error("zlm 代理拉流失败",e); sendBye(request, device, ""); @@ -536,12 +569,12 @@ public class DeviceProxyService { return fromUrl; } - public void proxyVideo2Rtp(SIPRequest request,Runnable sendOkResponse, String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) { + public void proxyVideo2Rtp(Runnable sendOkResponse,SIPRequest request, String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) { String fromUrl = getProxyUrl(device, startTime, endTime); String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); subscribe.getByeSubscribe().addPublisher(key); long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); - taskProcessor.process(request, sendOkResponse, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc); + taskProcessor.process(sendOkResponse,request, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc); } @SneakyThrows @@ -560,14 +593,11 @@ public class DeviceProxyService { @Setter(AccessLevel.PRIVATE) private boolean hasResult = false; - private final ScheduledFuture tryingSchedule; private final SIPRequest request; private final MockingDevice device; private final String key; private void close(){ - tryingSchedule.cancel(true); - CallIdHeader requestCallId = request.getCallId(); String callId = requestCallId.getCallId(); callbackTask.remove(callId); @@ -624,8 +654,8 @@ public class DeviceProxyService { } } - public FfmpegExecuteResultHandler mediaStatus(ScheduledFuture tryingSchedule,SIPRequest request, MockingDevice device,String key){ - return new FfmpegExecuteResultHandler(tryingSchedule,request,device,key); + public FfmpegExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){ + return new FfmpegExecuteResultHandler(request,device,key); } /** diff --git a/gb28181-mocking-starter/src/main/resources/application-local.yml b/gb28181-mocking-starter/src/main/resources/application-local.yml index 5f406ef..9355dd2 100644 --- a/gb28181-mocking-starter/src/main/resources/application-local.yml +++ b/gb28181-mocking-starter/src/main/resources/application-local.yml @@ -77,6 +77,8 @@ media: id: amrWMKmbKqoBjRQ9 # secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333 + rtmp: + port: 1936 rtsp: port: 554 proxy: @@ -104,10 +106,11 @@ ffmpeg-support: #input: -thread_queue_size 128 -re -i rtsp://admin:XXXXXX@10.10.11.171/Streaming/Channels/1/ #input: -hwaccel cuda -re -i rtsp://10.10.11.200/camera/171 input: -re -i + output: -c:v copy -an -f flv # output: -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtsp #flv #rtp_mpegts - output: -c:v copy -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp #flv # + #output: -c:v libx264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp #flv # # output: -c:v h264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp - download: -thread_queue_size 128 -re -i + download: -thread_queue_size 128 -i download-speed: 0 # output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp # download: -i E:\Repository\other\happytime-gb28181-device-x64\666.mp4 -filter:v "setpts=4.0*PTS"