This commit is contained in:
shikong 2023-12-14 17:09:00 +08:00
parent a5faf10138
commit 2bb56f5bbc

View File

@ -81,6 +81,7 @@ public class Gb28181DownloadService {
private final WvpProxyConfig wvpProxyConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, DeferredResult<JsonResponse<String>>> requestMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> realtimeVideo = new ConcurrentHashMap<>();
private final ConcurrentMap<String, VideoInfo> realtimeVideoInfoMap = new ConcurrentHashMap<>();
@Getter
@ -224,7 +225,7 @@ public class Gb28181DownloadService {
}
@SneakyThrows
public DeferredResult<JsonResponse<String>> realtimeVideoUrl(String deviceCode){
public synchronized DeferredResult<JsonResponse<String>> realtimeVideoUrl(String deviceCode){
DeferredResult<JsonResponse<String>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60));
result.onTimeout(()->{
result.setResult(JsonResponse.error("请求超时"));
@ -239,9 +240,11 @@ public class Gb28181DownloadService {
result.setResult(JsonResponse.error("设备(通道)不存在"));
return result;
}
closeExistRequest(deviceCode, device, docking);
requestMap.put(deviceCode, result);
String existUrl = realtimeVideo.get(deviceCode);
if(Optional.ofNullable(existUrl).isPresent()){
result.setResult(JsonResponse.success(existUrl));
return result;
}
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求
scheduledExecutorService.schedule(()->{
realtime.realtime(deviceCode).whenComplete((videoInfo, e)->{
@ -416,7 +419,7 @@ public class Gb28181DownloadService {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 60, TimeUnit.SECONDS);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
@ -575,7 +578,12 @@ public class Gb28181DownloadService {
schedule[0].cancel(true);
}
};
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit);
if(time == 0){
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60, unit);
} else {
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit);
}
return subscriber;
}
@ -608,11 +616,16 @@ public class Gb28181DownloadService {
@Override
public void onComplete() {
subscribe.getByeSubscribe().delPublisher(key);
schedule[0].cancel(true);
if(time > 0){
schedule[0].cancel(true);
}
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));
}
};
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit);
if(time > 0){
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit);
}
return subscriber;
}
}