修复 invite 响应顺序错误 并完善
This commit is contained in:
parent
456174533d
commit
e4fec4f2d5
@ -175,20 +175,12 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle
|
|||||||
SdpFactory.getInstance().createTimeDescription(timeField));
|
SdpFactory.getInstance().createTimeDescription(timeField));
|
||||||
// playback(request, device, gb28181Description, mediaDescription, time);
|
// playback(request, device, gb28181Description, mediaDescription, time);
|
||||||
String callId = request.getCallId().getCallId();
|
String callId = request.getCallId().getCallId();
|
||||||
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
|
|
||||||
subscribe.getAckSubscribe().addPublisher(key);
|
|
||||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
|
||||||
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
|
|
||||||
Flow.Subscriber<SIPRequest> subscriber = playSubscriber(request,callId,device,address,port,key,ssrc,schedule);
|
|
||||||
// 60秒超时计时器
|
|
||||||
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
|
|
||||||
// 推流 ack 事件订阅
|
|
||||||
subscribe.getAckSubscribe().addSubscribe(key, subscriber);
|
|
||||||
|
|
||||||
scheduledExecutorService.schedule(()->{
|
Runnable sendOkResponse = () -> {
|
||||||
// 发送 sdp 响应
|
// 发送 sdp 响应
|
||||||
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, sdp));
|
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, sdp));
|
||||||
}, 1,TimeUnit.SECONDS);
|
};
|
||||||
|
playSubscriber(request,sendOkResponse, callId,device,address,port,ssrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -268,111 +260,41 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle
|
|||||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
|
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
|
||||||
Flow.Subscriber<SIPRequest> subscriber;
|
Flow.Subscriber<SIPRequest> subscriber;
|
||||||
if(!isDownload){
|
|
||||||
subscriber = placbackSubscriber(request, callId,device,start,stop,address,port,key,ssrc,schedule);
|
|
||||||
} else {
|
|
||||||
subscriber = downloadSubscriber(request, callId,device,start,stop,address,port,key,ssrc,schedule);
|
|
||||||
}
|
|
||||||
// 60秒超时计时器
|
|
||||||
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
|
|
||||||
// 推流 ack 事件订阅
|
|
||||||
subscribe.getAckSubscribe().addSubscribe(key, subscriber);
|
|
||||||
|
|
||||||
scheduledExecutorService.schedule(()->{
|
|
||||||
// 发送 sdp 响应
|
// 发送 sdp 响应
|
||||||
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, sdp));
|
Runnable sendOkResponse = () -> {
|
||||||
}, 1,TimeUnit.SECONDS);
|
Response okResponse = SipResponseBuilder.responseSdp(request, sdp);
|
||||||
}
|
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> okResponse);
|
||||||
|
|
||||||
public Flow.Subscriber<SIPRequest> playSubscriber(SIPRequest request,String callId,MockingDevice device,String address,int port,String key, String ssrc,ScheduledFuture<?>[] scheduledFuture){
|
|
||||||
return new Flow.Subscriber<>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Flow.Subscription subscription) {
|
|
||||||
log.info("创建 ack 订阅 {}", key);
|
|
||||||
subscription.request(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(SIPRequest item) {
|
|
||||||
log.info("收到 ack 确认请求: {} 开始推流",key);
|
|
||||||
// RTP 推流
|
|
||||||
deviceProxyService.pullLiveStream2Rtp(request, callId, device, address, port,ssrc);
|
|
||||||
onComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable throwable) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
subscribe.getAckSubscribe().delPublisher(key);
|
|
||||||
scheduledFuture[0].cancel(true);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if(!isDownload){
|
||||||
|
playbackSubscriber(request, sendOkResponse, callId,device,start,stop,address,port,ssrc);
|
||||||
|
} else {
|
||||||
|
downloadSubscriber(request, sendOkResponse, callId,device,start,stop,address,port,ssrc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Flow.Subscriber<SIPRequest> placbackSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key, String ssrc,ScheduledFuture<?>[] scheduledFuture){
|
public void playSubscriber(SIPRequest request, Runnable sendOkResponse, String callId, MockingDevice device, String address, int port, String ssrc) {
|
||||||
return new Flow.Subscriber<>() {
|
log.info("收到 实时点播请求: {} 开始推流", callId);
|
||||||
@Override
|
// RTP 推流
|
||||||
public void onSubscribe(Flow.Subscription subscription) {
|
deviceProxyService.pullLiveStream2Rtp(request, sendOkResponse, callId, device, address, port, ssrc);
|
||||||
log.info("创建 ack 订阅 {}", key);
|
|
||||||
subscription.request(1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public void playbackSubscriber(SIPRequest request, Runnable sendOkResponse, String callId, MockingDevice device, Date start, Date stop, String address, int port, String ssrc){
|
||||||
public void onNext(SIPRequest item) {
|
log.info("收到 回放 请求: {} 开始推流", callId);
|
||||||
log.info("收到 ack 确认请求: {} 开始推流",key);
|
|
||||||
if (ffmpegConfig.getUseZlmFfmpeg()) {
|
if (ffmpegConfig.getUseZlmFfmpeg()) {
|
||||||
|
sendOkResponse.run();
|
||||||
deviceProxyService.pullStreamByZlmFfmpegSource(request, callId, device, start, stop, address, port, ssrc);
|
deviceProxyService.pullStreamByZlmFfmpegSource(request, callId, device, start, stop, address, port, ssrc);
|
||||||
} else {
|
} else {
|
||||||
// RTP 推流
|
// RTP 推流
|
||||||
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port,ssrc, deviceProxyService.playbackTask());
|
deviceProxyService.proxyVideo2Rtp(request, sendOkResponse, callId, device, start, stop, address, port, ssrc, deviceProxyService.playbackTask());
|
||||||
}
|
}
|
||||||
onComplete();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public void downloadSubscriber(SIPRequest request,Runnable sendOkResponse, String callId,MockingDevice device,Date start,Date stop,String address,int port,String ssrc){
|
||||||
public void onError(Throwable throwable) {
|
log.info("收到 下载请求: {} 开始推流",callId);
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
subscribe.getAckSubscribe().delPublisher(key);
|
|
||||||
scheduledFuture[0].cancel(true);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public Flow.Subscriber<SIPRequest> downloadSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,String ssrc,ScheduledFuture<?>[] scheduledFuture){
|
|
||||||
return new Flow.Subscriber<>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Flow.Subscription subscription) {
|
|
||||||
log.info("创建 ack 订阅 {}", key);
|
|
||||||
subscription.request(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(SIPRequest item) {
|
|
||||||
log.info("收到 ack 确认请求: {} 开始推流",key);
|
|
||||||
// RTP 推流
|
// RTP 推流
|
||||||
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, ssrc,deviceProxyService.downloadTask());
|
deviceProxyService.proxyVideo2Rtp(request, sendOkResponse, callId, device, start, stop, address, port, ssrc,deviceProxyService.downloadTask());
|
||||||
onComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable throwable) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
subscribe.getAckSubscribe().delPublisher(key);
|
|
||||||
scheduledFuture[0].cancel(true);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -98,15 +98,14 @@ public class DeviceProxyService {
|
|||||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
public interface TaskProcessor {
|
public interface TaskProcessor {
|
||||||
void process(SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
|
void process(SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String requestZlmPushStream(ScheduledFuture<?> schedule, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{
|
private String requestZlmPushStream(ScheduledFuture<?> schedule, Runnable sendOkResponse, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{
|
||||||
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
|
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
|
||||||
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
|
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
|
||||||
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
|
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
|
||||||
zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||||
schedule.cancel(false);
|
|
||||||
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
|
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
|
||||||
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
|
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
|
||||||
.retryIfException()
|
.retryIfException()
|
||||||
@ -136,6 +135,11 @@ public class DeviceProxyService {
|
|||||||
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 停止发送 trying
|
||||||
|
schedule.cancel(false);
|
||||||
|
// 响应 sdp ok
|
||||||
|
sendOkResponse.run();
|
||||||
});
|
});
|
||||||
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
|
||||||
StopSendRtp stopSendRtp = new StopSendRtp();
|
StopSendRtp stopSendRtp = new StopSendRtp();
|
||||||
@ -161,11 +165,11 @@ public class DeviceProxyService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public TaskProcessor playbackTask(){
|
public TaskProcessor playbackTask(){
|
||||||
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
|
return (SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
|
||||||
ScheduledFuture<?> schedule = trying(request);
|
ScheduledFuture<?> schedule = trying(request);
|
||||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
|
||||||
try {
|
try {
|
||||||
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
String zlmRtpUrl = requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||||
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||||
@ -180,11 +184,11 @@ public class DeviceProxyService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public TaskProcessor downloadTask(){
|
public TaskProcessor downloadTask(){
|
||||||
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
|
return (SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
|
||||||
ScheduledFuture<?> schedule = trying(request);
|
ScheduledFuture<?> schedule = trying(request);
|
||||||
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
|
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
|
||||||
try {
|
try {
|
||||||
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
String zlmRtpUrl = requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
|
||||||
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
|
||||||
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
|
||||||
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
|
||||||
@ -399,7 +403,8 @@ public class DeviceProxyService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public void pullLiveStream2Rtp(SIPRequest request,String callId, MockingDevice device, String rtpAddr, int rtpPort, String ssrc){
|
public void pullLiveStream2Rtp(SIPRequest request,Runnable sendOkResponse,String callId, MockingDevice device, String rtpAddr, int rtpPort, String ssrc){
|
||||||
|
ScheduledFuture<?> schedule = trying(request);
|
||||||
Retryer<ZlmResponse<AddStreamProxyResp>> retryer = RetryerBuilder.<ZlmResponse<AddStreamProxyResp>>newBuilder()
|
Retryer<ZlmResponse<AddStreamProxyResp>> retryer = RetryerBuilder.<ZlmResponse<AddStreamProxyResp>>newBuilder()
|
||||||
.retryIfResult(resp -> {
|
.retryIfResult(resp -> {
|
||||||
log.info("resp {}", resp);
|
log.info("resp {}", resp);
|
||||||
@ -452,6 +457,11 @@ public class DeviceProxyService {
|
|||||||
log.error("zlm rtp 推流失败",e);
|
log.error("zlm rtp 推流失败",e);
|
||||||
sendBye(request, device, "");
|
sendBye(request, device, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 停止发送 trying
|
||||||
|
schedule.cancel(false);
|
||||||
|
// 响应 sdp ok
|
||||||
|
sendOkResponse.run();
|
||||||
});
|
});
|
||||||
|
|
||||||
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> {
|
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> {
|
||||||
@ -487,12 +497,12 @@ public class DeviceProxyService {
|
|||||||
return fromUrl;
|
return fromUrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) {
|
public void proxyVideo2Rtp(SIPRequest request,Runnable sendOkResponse, String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) {
|
||||||
String fromUrl = getProxyUrl(device, startTime, endTime);
|
String fromUrl = getProxyUrl(device, startTime, endTime);
|
||||||
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||||
subscribe.getByeSubscribe().addPublisher(key);
|
subscribe.getByeSubscribe().addPublisher(key);
|
||||||
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
|
||||||
taskProcessor.process(request, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc);
|
taskProcessor.process(request, sendOkResponse, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
|
Loading…
Reference in New Issue
Block a user