调整 ffmpeg 启动时机
This commit is contained in:
parent
fd8a711662
commit
e37e9b2677
@ -570,8 +570,8 @@ public class Gb28181DownloadService {
|
|||||||
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS);
|
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS);
|
||||||
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
|
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
|
||||||
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
|
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
|
||||||
// 用以 提前 启动 ffmpeg 预备录制
|
// 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流
|
||||||
// result.completeAsync(() -> new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device), executor);
|
result.completeAsync(() -> new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device), executor);
|
||||||
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
|
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -580,7 +580,6 @@ public class Gb28181DownloadService {
|
|||||||
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
|
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
|
||||||
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
|
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
|
||||||
private Flow.Subscription subscription;
|
private Flow.Subscription subscription;
|
||||||
private boolean isStart = false;
|
|
||||||
@Override
|
@Override
|
||||||
public void onSubscribe(Flow.Subscription subscription) {
|
public void onSubscribe(Flow.Subscription subscription) {
|
||||||
this.subscription = subscription;
|
this.subscription = subscription;
|
||||||
@ -592,14 +591,6 @@ public class Gb28181DownloadService {
|
|||||||
public void onNext(SIPResponse item) {
|
public void onNext(SIPResponse item) {
|
||||||
int statusCode = item.getStatusCode();
|
int statusCode = item.getStatusCode();
|
||||||
log.debug("{} 收到订阅消息 {}", subscribeKey, item);
|
log.debug("{} 收到订阅消息 {}", subscribeKey, item);
|
||||||
if(statusCode == Response.OK){
|
|
||||||
String callId = item.getCallId().getCallId();
|
|
||||||
if(!isStart){
|
|
||||||
isStart = true;
|
|
||||||
result.completeAsync(() -> new VideoInfo(streamId,videoRtmpUrl(streamId), callId, device), executor);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (statusCode == Response.TRYING) {
|
if (statusCode == Response.TRYING) {
|
||||||
log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey);
|
log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey);
|
||||||
subscription.request(1);
|
subscription.request(1);
|
||||||
@ -607,20 +598,14 @@ public class Gb28181DownloadService {
|
|||||||
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
|
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
|
||||||
log.info("收到响应状态 {}", statusCode);
|
log.info("收到响应状态 {}", statusCode);
|
||||||
String callId = item.getCallId().getCallId();
|
String callId = item.getCallId().getCallId();
|
||||||
if(!isStart){
|
sender.sendRequest(((provider, ip, port) -> {
|
||||||
isStart = true;
|
String fromTag = item.getFromTag();
|
||||||
result.completeAsync(() -> new VideoInfo(streamId,videoRtmpUrl(streamId), callId, device), executor);
|
String toTag = item.getToTag();
|
||||||
}
|
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
||||||
scheduledExecutorService.schedule(()->{
|
subscribe.getByeSubscribe().addPublisher(key);
|
||||||
sender.sendRequest(((provider, ip, port) -> {
|
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit));
|
||||||
String fromTag = item.getFromTag();
|
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
|
||||||
String toTag = item.getToTag();
|
}));
|
||||||
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
|
|
||||||
subscribe.getByeSubscribe().addPublisher(key);
|
|
||||||
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit));
|
|
||||||
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
|
|
||||||
}));
|
|
||||||
},1000, TimeUnit.MILLISECONDS);
|
|
||||||
} else {
|
} else {
|
||||||
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
|
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
|
||||||
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));
|
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));
|
||||||
|
Loading…
Reference in New Issue
Block a user