diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index 9f29f72..ade7ee8 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -26,6 +26,7 @@ import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService; import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService; import cn.skcks.docking.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.sdp.parser.GB28181DescriptionParser; +import cn.skcks.docking.gb28181.sip.method.invite.response.InviteResponseBuilder; import com.github.rholder.retry.Retryer; import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.StopStrategies; @@ -92,11 +93,14 @@ public class DeviceProxyService { subscribe.getByeSubscribe().addSubscribe(key, subscriber); int num = taskNum.incrementAndGet(); log.info("当前任务数 {}", num); + + ScheduledFuture schedule = trying(request); try { GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ + schedule.cancel(false); Retryer retryer = RetryerBuilder.newBuilder() .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) .retryIfException() @@ -121,6 +125,7 @@ public class DeviceProxyService { return startSendRtpResp; }); } catch (Exception e) { + schedule.cancel(true); Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)) .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); throw new RuntimeException(e); @@ -136,6 +141,7 @@ public class DeviceProxyService { callbackTask.put(device.getDeviceCode(), executor); executeResultHandler.waitFor(); } catch (Exception e) { + schedule.cancel(true); throw new RuntimeException(e); } }; @@ -150,11 +156,13 @@ public class DeviceProxyService { subscribe.getByeSubscribe().addSubscribe(key, subscriber); int num = taskNum.incrementAndGet(); log.info("当前任务数 {}", num); + ScheduledFuture schedule = trying(request); try { GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ + schedule.cancel(false); Retryer retryer = RetryerBuilder.newBuilder() .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) .retryIfException() @@ -179,6 +187,7 @@ public class DeviceProxyService { return startSendRtpResp; }); } catch (Exception e) { + schedule.cancel(true); Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)) .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); throw new RuntimeException(e); @@ -194,11 +203,22 @@ public class DeviceProxyService { downloadTask.put(device.getDeviceCode(), executor); executeResultHandler.waitFor(); } catch (Exception e) { + schedule.cancel(true); throw new RuntimeException(e); } }; } + private ScheduledFuture trying(SIPRequest request){ + return scheduledExecutorService.schedule(() -> { + InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build(); + Response tryingInviteResponse = inviteRequestBuilder.createTryingInviteResponse(request); + String ip = request.getLocalAddress().getHostAddress(); + String transPort = request.getTopmostViaHeader().getTransport(); + sender.sendResponse(ip, transPort, ((provider, ip1, port) -> tryingInviteResponse)); + }, 200, TimeUnit.MILLISECONDS); + } + public Flow.Subscriber byeSubscriber(String key, MockingDevice device, ConcurrentHashMap task){ return new Flow.Subscriber<>() { @Override