From fb5f42baee7582ca6a5a4728bdb56f5abb538e05 Mon Sep 17 00:00:00 2001 From: zxb <919411476@qq.com> Date: Sat, 16 Mar 2024 23:45:38 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=86=E9=A2=91=E4=B8=8B=E8=BD=BD=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E4=BD=BF=E7=94=A8=E5=8A=A8=E6=80=81=E7=AB=AF=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/AckRequestProcessor.java | 18 +++++++++++++++--- .../request/impl/InviteRequestProcessor.java | 1 + 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 3b408ed6..a77b294b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * SIP命令类型: ACK请求 @@ -193,19 +194,30 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{ taskExecutor.submit(()->{ JSONObject startSendRtpStreamResult; + AtomicReference failMag = new AtomicReference<>(""); Retryer retryer = RetryerBuilder.newBuilder() - .retryIfResult(resp -> resp == null || resp.getInteger("code") != 0) + .retryIfResult(resp -> { + if(resp != null && resp.getInteger("code") == -1){ + String msg = resp.getString("msg"); + if(!msg.equalsIgnoreCase(failMag.get())){ + failMag.set(msg); + logger.debug(msg); + } + } + return resp == null || resp.getInteger("code") != 0; + }) .retryIfException() .retryIfRuntimeException() // 重试间隔 .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS)) // 重试次数 - .withStopStrategy(StopStrategies.stopAfterAttempt(20 * 1000)) + .withStopStrategy(StopStrategies.stopAfterAttempt(15 * 1000)) .build(); try { startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param)); + logger.info("rtp转推成功 {} {} {}", channelId, mediaInfo, param); } catch (ExecutionException | RetryException e) { - logger.error("rtp转推失败 {}",e.getMessage()); + logger.error("rtp转推失败 {} {} {} {}", channelId, mediaInfo, param, e.getMessage()); startSendRtpStreamResult = null; } startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index c8111e1d..c84e4c27 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -596,6 +596,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); sendRtpItem.setStreamId(ssrcInfo.getStream()); + sendRtpItem.setLocalPort(0); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem);