推流重构 取消 rtsp 换回 rtmp
This commit is contained in:
parent
717ee609bf
commit
89814f3e0d
@ -0,0 +1,12 @@
|
|||||||
|
package cn.skcks.docking.gb28181.mocking.config.sip;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Configuration
|
||||||
|
@ConfigurationProperties(prefix = "media.rtmp")
|
||||||
|
public class ZlmRtmpConfig {
|
||||||
|
int port = 1935;
|
||||||
|
}
|
@ -7,6 +7,7 @@ import org.springframework.context.annotation.Configuration;
|
|||||||
@Data
|
@Data
|
||||||
@Configuration
|
@Configuration
|
||||||
@ConfigurationProperties(prefix = "media.rtsp")
|
@ConfigurationProperties(prefix = "media.rtsp")
|
||||||
|
@Deprecated
|
||||||
public class ZlmRtspConfig {
|
public class ZlmRtspConfig {
|
||||||
int port = 554;
|
int port = 554;
|
||||||
}
|
}
|
||||||
|
@ -283,14 +283,14 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle
|
|||||||
deviceProxyService.pullStreamByZlmFfmpegSource(request, callId, device, start, stop, address, port, ssrc);
|
deviceProxyService.pullStreamByZlmFfmpegSource(request, callId, device, start, stop, address, port, ssrc);
|
||||||
} else {
|
} else {
|
||||||
// RTP 推流
|
// RTP 推流
|
||||||
deviceProxyService.proxyVideo2Rtp(request, sendOkResponse, callId, device, start, stop, address, port, ssrc, deviceProxyService.playbackTask());
|
deviceProxyService.proxyVideo2Rtp(sendOkResponse,request, callId, device, start, stop, address, port, ssrc, deviceProxyService.playbackTask());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void downloadSubscriber(SIPRequest request,Runnable sendOkResponse, String callId,MockingDevice device,Date start,Date stop,String address,int port,String ssrc){
|
public void downloadSubscriber(SIPRequest request,Runnable sendOkResponse, String callId,MockingDevice device,Date start,Date stop,String address,int port,String ssrc){
|
||||||
log.info("收到 下载请求: {} 开始推流",callId);
|
log.info("收到 下载请求: {} 开始推流",callId);
|
||||||
// RTP 推流
|
// RTP 推流
|
||||||
deviceProxyService.proxyVideo2Rtp(request, sendOkResponse, callId, device, start, stop, address, port, ssrc,deviceProxyService.downloadTask());
|
deviceProxyService.proxyVideo2Rtp(sendOkResponse, request, callId, device, start, stop, address, port, ssrc,deviceProxyService.downloadTask());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,17 +1,15 @@
|
|||||||
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
|
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
|
||||||
|
|
||||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe;
|
||||||
import gov.nist.javax.sip.message.SIPRequest;
|
import gov.nist.javax.sip.message.SIPRequest;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.Flow;
|
|
||||||
import java.util.concurrent.SubmissionPublisher;
|
|
||||||
|
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class AckSubscribe implements GenericSubscribe<SIPRequest> {
|
public class AckSubscribe implements GenericTimeoutSubscribe<SIPRequest> {
|
||||||
private final Executor executor;
|
private final Executor executor;
|
||||||
|
|
||||||
private static final Map<String, SubmissionPublisher<SIPRequest>> publishers = new ConcurrentHashMap<>();
|
private static final Map<String, SubmissionPublisher<SIPRequest>> publishers = new ConcurrentHashMap<>();
|
||||||
@ -41,4 +39,14 @@ public class AckSubscribe implements GenericSubscribe<SIPRequest> {
|
|||||||
public void complete(String key) {
|
public void complete(String key) {
|
||||||
delPublisher(key);
|
delPublisher(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addPublisher(String key, long time, TimeUnit timeUnit) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void refreshPublisher(String key, long time, TimeUnit timeUnit) {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
|
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
|
||||||
|
|
||||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipRequestSubscribe;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||||
import gov.nist.javax.sip.message.SIPRequest;
|
import gov.nist.javax.sip.message.SIPRequest;
|
||||||
import gov.nist.javax.sip.message.SIPResponse;
|
import gov.nist.javax.sip.message.SIPResponse;
|
||||||
@ -13,6 +15,8 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Data
|
@Data
|
||||||
@ -21,14 +25,15 @@ import java.util.concurrent.Executor;
|
|||||||
public class SipSubscribe {
|
public class SipSubscribe {
|
||||||
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
|
||||||
private final Executor executor;
|
private final Executor executor;
|
||||||
|
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
|
||||||
private GenericSubscribe<SIPResponse> registerSubscribe;
|
private GenericSubscribe<SIPResponse> registerSubscribe;
|
||||||
private GenericSubscribe<SIPRequest> ackSubscribe;
|
private GenericTimeoutSubscribe<SIPRequest> ackSubscribe;
|
||||||
private GenericSubscribe<SIPRequest> byeSubscribe;
|
private GenericSubscribe<SIPRequest> byeSubscribe;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
private void init() {
|
private void init() {
|
||||||
registerSubscribe = new RegisterSubscribe(executor);
|
registerSubscribe = new RegisterSubscribe(executor);
|
||||||
ackSubscribe = new AckSubscribe(executor);
|
ackSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService);
|
||||||
byeSubscribe = new ByeSubscribe(executor);
|
byeSubscribe = new ByeSubscribe(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,10 +23,7 @@ import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp;
|
|||||||
import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp;
|
import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp;
|
||||||
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
|
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
|
||||||
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
|
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
|
||||||
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
|
import cn.skcks.docking.gb28181.mocking.config.sip.*;
|
||||||
import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig;
|
|
||||||
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
|
|
||||||
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtspConfig;
|
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
|
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
|
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
|
||||||
@ -91,7 +88,7 @@ public class DeviceProxyService {
|
|||||||
private final ZlmMediaService zlmMediaService;
|
private final ZlmMediaService zlmMediaService;
|
||||||
private final ZlmMediaConfig zlmMediaConfig;
|
private final ZlmMediaConfig zlmMediaConfig;
|
||||||
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
|
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
|
||||||
private final ZlmRtspConfig zlmRtspConfig;
|
private final ZlmRtmpConfig zlmRtmpConfig;
|
||||||
private final VideoCacheManager videoCacheManager;
|
private final VideoCacheManager videoCacheManager;
|
||||||
|
|
||||||
private final String DEFAULT_ZLM_APP = "live";
|
private final String DEFAULT_ZLM_APP = "live";
|
||||||
@ -104,10 +101,10 @@ public class DeviceProxyService {
|
|||||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
public interface TaskProcessor {
|
public interface TaskProcessor {
|
||||||
void process(SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
|
void process(Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void requestZlmPushStream(ScheduledFuture<?> schedule, Runnable sendOkResponse, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{
|
private void requestZlmPushStream(SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{
|
||||||
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
|
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
|
||||||
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
|
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
|
||||||
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
|
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
|
||||||
@ -121,6 +118,7 @@ public class DeviceProxyService {
|
|||||||
// 重试次数
|
// 重试次数
|
||||||
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
|
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
|
||||||
.build();
|
.build();
|
||||||
|
zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||||
try {
|
try {
|
||||||
retryer.call(()->{
|
retryer.call(()->{
|
||||||
StartSendRtp startSendRtp = new StartSendRtp();
|
StartSendRtp startSendRtp = new StartSendRtp();
|
||||||
@ -136,16 +134,13 @@ public class DeviceProxyService {
|
|||||||
return startSendRtpResp;
|
return startSendRtpResp;
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
schedule.cancel(true);
|
log.error("zlm rtp 推流失败",e);
|
||||||
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
|
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId))
|
||||||
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// 停止发送 trying
|
|
||||||
schedule.cancel(false);
|
|
||||||
// 响应 sdp ok
|
|
||||||
sendOkResponse.run();
|
|
||||||
// });
|
// });
|
||||||
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||||
StopSendRtp stopSendRtp = new StopSendRtp();
|
StopSendRtp stopSendRtp = new StopSendRtp();
|
||||||
@ -170,57 +165,101 @@ public class DeviceProxyService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public TaskProcessor playbackTask(){
|
public TaskProcessor playbackTask(){
|
||||||
return (SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
|
return (Runnable sendOkResponse, SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
|
||||||
ScheduledFuture<?> schedule = trying(request);
|
scheduledExecutorService.schedule(() -> {
|
||||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
trying(request);
|
||||||
try {
|
sendOkResponse.run();
|
||||||
String zlmRtpUrl = getZlmRtspUrl(DEFAULT_ZLM_APP, callId);
|
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
|
||||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule,request, device, key);
|
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
|
||||||
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
|
||||||
requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
@Override
|
||||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
public void onSubscribe(Flow.Subscription subscription) {
|
||||||
callbackTask.put(device.getDeviceCode(), executor);
|
subscription.request(1);
|
||||||
executeResultHandler.waitFor();
|
}
|
||||||
} catch (Exception e) {
|
|
||||||
schedule.cancel(true);
|
@Override
|
||||||
sendBye(request,device,"");
|
public void onNext(SIPRequest item) {
|
||||||
throw new RuntimeException(e);
|
subscribe.getAckSubscribe().delPublisher(ackKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable throwable) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
||||||
|
try {
|
||||||
|
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
||||||
|
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||||
|
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||||
|
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||||
|
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||||
|
callbackTask.put(device.getDeviceCode(), executor);
|
||||||
|
executeResultHandler.waitFor();
|
||||||
|
} catch (Exception e) {
|
||||||
|
sendBye(request,device,"");
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}, 1, TimeUnit.SECONDS);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public TaskProcessor downloadTask(){
|
public TaskProcessor downloadTask(){
|
||||||
return (SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
|
return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
|
||||||
ScheduledFuture<?> schedule = trying(request);
|
scheduledExecutorService.schedule(() -> {
|
||||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
|
trying(request);
|
||||||
try {
|
sendOkResponse.run();
|
||||||
String zlmRtpUrl = getZlmRtspUrl(DEFAULT_ZLM_APP, callId);
|
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
|
||||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule, request, device, key);
|
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
|
||||||
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
|
||||||
requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
@Override
|
||||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
public void onSubscribe(Flow.Subscription subscription) {
|
||||||
downloadTask.put(device.getDeviceCode(), executor);
|
subscription.request(1);
|
||||||
executeResultHandler.waitFor();
|
}
|
||||||
} catch (Exception e) {
|
|
||||||
schedule.cancel(true);
|
@Override
|
||||||
sendBye(request,device,"");
|
public void onNext(SIPRequest item) {
|
||||||
throw new RuntimeException(e);
|
subscribe.getAckSubscribe().delPublisher(ackKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable throwable) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
|
||||||
|
try {
|
||||||
|
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
|
||||||
|
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||||
|
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||||
|
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||||
|
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||||
|
downloadTask.put(device.getDeviceCode(), executor);
|
||||||
|
executeResultHandler.waitFor();
|
||||||
|
} catch (Exception e) {
|
||||||
|
sendBye(request, device, "");
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}, 1, TimeUnit.SECONDS);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getZlmRtspUrl(String app, String streamId){
|
private String getZlmRtmpUrl(String app, String streamId){
|
||||||
return "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtspConfig.getPort() + "/" + app +"/" + streamId;
|
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ScheduledFuture<?> trying(SIPRequest request){
|
private void trying(SIPRequest request){
|
||||||
return scheduledExecutorService.scheduleAtFixedRate(() -> {
|
InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build();
|
||||||
InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build();
|
Response tryingInviteResponse = inviteRequestBuilder.createTryingInviteResponse(request);
|
||||||
Response tryingInviteResponse = inviteRequestBuilder.createTryingInviteResponse(request);
|
String ip = request.getLocalAddress().getHostAddress();
|
||||||
String ip = request.getLocalAddress().getHostAddress();
|
String transPort = request.getTopmostViaHeader().getTransport();
|
||||||
String transPort = request.getTopmostViaHeader().getTransport();
|
sender.sendResponse(ip, transPort, ((provider, ip1, port) -> tryingInviteResponse));
|
||||||
sender.sendResponse(ip, transPort, ((provider, ip1, port) -> tryingInviteResponse));
|
|
||||||
}, 0,1, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Flow.Subscriber<SIPRequest> ffmpegByeSubscriber(SIPRequest inviteRequest,String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){
|
public Flow.Subscriber<SIPRequest> ffmpegByeSubscriber(SIPRequest inviteRequest,String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){
|
||||||
@ -355,7 +394,7 @@ public class DeviceProxyService {
|
|||||||
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
|
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
String toUrl = "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtspConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId;
|
String toUrl = "rtsp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId;
|
||||||
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||||
try {
|
try {
|
||||||
ZlmResponse<AddFFmpegSourceResp> sourceResp = retryer.call(() -> zlmMediaService.addFfmpegSource(AddFFmpegSource.builder()
|
ZlmResponse<AddFFmpegSourceResp> sourceResp = retryer.call(() -> zlmMediaService.addFfmpegSource(AddFFmpegSource.builder()
|
||||||
@ -426,7 +465,8 @@ public class DeviceProxyService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ScheduledFuture<?> schedule = trying(request);
|
trying(request);
|
||||||
|
sendOkResponse.run();
|
||||||
Retryer<ZlmResponse<AddStreamProxyResp>> retryer = RetryerBuilder.<ZlmResponse<AddStreamProxyResp>>newBuilder()
|
Retryer<ZlmResponse<AddStreamProxyResp>> retryer = RetryerBuilder.<ZlmResponse<AddStreamProxyResp>>newBuilder()
|
||||||
.retryIfResult(resp -> {
|
.retryIfResult(resp -> {
|
||||||
log.info("resp {}", resp);
|
log.info("resp {}", resp);
|
||||||
@ -479,11 +519,6 @@ public class DeviceProxyService {
|
|||||||
log.error("zlm rtp 推流失败",e);
|
log.error("zlm rtp 推流失败",e);
|
||||||
sendBye(request, device, "");
|
sendBye(request, device, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 停止发送 trying
|
|
||||||
schedule.cancel(false);
|
|
||||||
// 响应 sdp ok
|
|
||||||
sendOkResponse.run();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> {
|
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> {
|
||||||
@ -501,8 +536,6 @@ public class DeviceProxyService {
|
|||||||
subscribe.getByeSubscribe().addPublisher(key);
|
subscribe.getByeSubscribe().addPublisher(key);
|
||||||
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
|
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// 停止发送 trying
|
|
||||||
schedule.cancel(true);
|
|
||||||
|
|
||||||
log.error("zlm 代理拉流失败",e);
|
log.error("zlm 代理拉流失败",e);
|
||||||
sendBye(request, device, "");
|
sendBye(request, device, "");
|
||||||
@ -536,12 +569,12 @@ public class DeviceProxyService {
|
|||||||
return fromUrl;
|
return fromUrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void proxyVideo2Rtp(SIPRequest request,Runnable sendOkResponse, String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) {
|
public void proxyVideo2Rtp(Runnable sendOkResponse,SIPRequest request, String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) {
|
||||||
String fromUrl = getProxyUrl(device, startTime, endTime);
|
String fromUrl = getProxyUrl(device, startTime, endTime);
|
||||||
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||||
subscribe.getByeSubscribe().addPublisher(key);
|
subscribe.getByeSubscribe().addPublisher(key);
|
||||||
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
||||||
taskProcessor.process(request, sendOkResponse, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc);
|
taskProcessor.process(sendOkResponse,request, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@ -560,14 +593,11 @@ public class DeviceProxyService {
|
|||||||
@Setter(AccessLevel.PRIVATE)
|
@Setter(AccessLevel.PRIVATE)
|
||||||
private boolean hasResult = false;
|
private boolean hasResult = false;
|
||||||
|
|
||||||
private final ScheduledFuture<?> tryingSchedule;
|
|
||||||
private final SIPRequest request;
|
private final SIPRequest request;
|
||||||
private final MockingDevice device;
|
private final MockingDevice device;
|
||||||
private final String key;
|
private final String key;
|
||||||
|
|
||||||
private void close(){
|
private void close(){
|
||||||
tryingSchedule.cancel(true);
|
|
||||||
|
|
||||||
CallIdHeader requestCallId = request.getCallId();
|
CallIdHeader requestCallId = request.getCallId();
|
||||||
String callId = requestCallId.getCallId();
|
String callId = requestCallId.getCallId();
|
||||||
callbackTask.remove(callId);
|
callbackTask.remove(callId);
|
||||||
@ -624,8 +654,8 @@ public class DeviceProxyService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public FfmpegExecuteResultHandler mediaStatus(ScheduledFuture<?> tryingSchedule,SIPRequest request, MockingDevice device,String key){
|
public FfmpegExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){
|
||||||
return new FfmpegExecuteResultHandler(tryingSchedule,request,device,key);
|
return new FfmpegExecuteResultHandler(request,device,key);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -77,6 +77,8 @@ media:
|
|||||||
id: amrWMKmbKqoBjRQ9
|
id: amrWMKmbKqoBjRQ9
|
||||||
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
|
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
|
||||||
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333
|
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333
|
||||||
|
rtmp:
|
||||||
|
port: 1936
|
||||||
rtsp:
|
rtsp:
|
||||||
port: 554
|
port: 554
|
||||||
proxy:
|
proxy:
|
||||||
@ -104,10 +106,11 @@ ffmpeg-support:
|
|||||||
#input: -thread_queue_size 128 -re -i rtsp://admin:XXXXXX@10.10.11.171/Streaming/Channels/1/
|
#input: -thread_queue_size 128 -re -i rtsp://admin:XXXXXX@10.10.11.171/Streaming/Channels/1/
|
||||||
#input: -hwaccel cuda -re -i rtsp://10.10.11.200/camera/171
|
#input: -hwaccel cuda -re -i rtsp://10.10.11.200/camera/171
|
||||||
input: -re -i
|
input: -re -i
|
||||||
|
output: -c:v copy -an -f flv
|
||||||
# output: -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtsp #flv #rtp_mpegts
|
# output: -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtsp #flv #rtp_mpegts
|
||||||
output: -c:v copy -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp #flv #
|
#output: -c:v libx264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp #flv #
|
||||||
# output: -c:v h264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp
|
# output: -c:v h264 -an -preset ultrafast -flvflags no_duration_filesize -bf 0 -max_interleave_delta 0 -f rtsp
|
||||||
download: -thread_queue_size 128 -re -i
|
download: -thread_queue_size 128 -i
|
||||||
download-speed: 0
|
download-speed: 0
|
||||||
# output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp
|
# output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp
|
||||||
# download: -i E:\Repository\other\happytime-gb28181-device-x64\666.mp4 -filter:v "setpts=4.0*PTS"
|
# download: -i E:\Repository\other\happytime-gb28181-device-x64\666.mp4 -filter:v "setpts=4.0*PTS"
|
||||||
|
Loading…
Reference in New Issue
Block a user