From abe3194c5f7b553a433e52cc5a29da15e4e5b389 Mon Sep 17 00:00:00 2001 From: zxb <919411476@qq.com> Date: Thu, 14 Mar 2024 15:51:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/AckRequestProcessor.java | 75 ++++++------------- .../request/impl/InviteRequestProcessor.java | 2 + .../vmp/service/ZlmPublishHookService.java | 10 ++- 3 files changed, 34 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index cbd2f3a1..3b408ed6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -24,11 +24,12 @@ import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.github.rholder.retry.*; -import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; @@ -39,7 +40,6 @@ import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; -import javax.sip.message.Response; import java.text.ParseException; import java.util.HashMap; import java.util.Map; @@ -107,6 +107,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + /** * 处理 ACK请求 * @@ -133,55 +137,22 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In return; } - // -// sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> { -// sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId()); -// try { -// responseAck((SIPRequest) evt.getRequest(), Response.OK); -// } catch (SipException | InvalidArgumentException | ParseException e) { -// logger.error(e.getMessage()); -// } -// }); + taskExecutor.execute(()->{ + long retry = 50; + while (retry > 0){ + SipSubscribe.Event platformAckSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId()); + if(platformAckSubscribe != null) { + platformAckSubscribe.response(null); + break; + } + retry -=1; - // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId()); -// if( okSubscribe != null) { -// okSubscribe.response(new SipSubscribe.EventResult<>(evt)); -// return; -// } - - // String channelId = id; - // if(storager.queryVideoDeviceByChannelId(id) != null){ - // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id); - // if(okSubscribe!=null){ - // okSubscribe.response(null); - // } - // } else { - // Device device = storager.queryVideoDevice(id); - // if(device != null){ - // List deviceChannels = storager.queryChannelWithCatalog(device.getDeviceId()); - // deviceChannels.forEach(channel -> { - // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + channel.getChannelId()); - // if(okSubscribe!=null){ - // okSubscribe.response(null); - // } - // }); - // } else { - // logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort()); - // sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck); - // } - // } -// okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId()); -// if( okSubscribe != null) { -// okSubscribe.response(new SipSubscribe.EventResult<>(evt)); -// return; -// } - - scheduledExecutorService.schedule(()->{ - SipSubscribe.Event platformAckSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId()); - if(platformAckSubscribe != null) { - platformAckSubscribe.response(null); + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException ignored) { + } } - }, 100, TimeUnit.MILLISECONDS); + }); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); @@ -220,7 +191,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param)); zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{ - scheduledExecutorService.submit(()->{ + taskExecutor.submit(()->{ JSONObject startSendRtpStreamResult; Retryer retryer = RetryerBuilder.newBuilder() .retryIfResult(resp -> resp == null || resp.getInteger("code") != 0) @@ -229,12 +200,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 重试间隔 .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS)) // 重试次数 - .withStopStrategy(StopStrategies.stopAfterAttempt(15 * 1000)) + .withStopStrategy(StopStrategies.stopAfterAttempt(20 * 1000)) .build(); try { startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param)); } catch (ExecutionException | RetryException e) { - logger.error(e.getMessage()); + logger.error("rtp转推失败 {}",e.getMessage()); startSendRtpStreamResult = null; } startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index d06fd22b..c8111e1d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -552,6 +552,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb(); SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort()); + logger.info("收到上级 callId => {} 的 ACK 请求, 向 下级 {} 转发 ACK", callIdHeader.getCallId(), deviceSdp.getOrigin().getUsername()); // 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发 Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response); sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck); @@ -618,6 +619,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb(); SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort()); + logger.info("收到上级 callId => {} 的 ACK 请求, 向 下级 {} 转发 ACK", callIdHeader.getCallId(), deviceSdp.getOrigin().getUsername()); // 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发 Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response); sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck); diff --git a/src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java b/src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java index 542a2e99..d0d6e29b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java @@ -6,6 +6,9 @@ import lombok.Data; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.util.Optional; @@ -17,6 +20,9 @@ import java.util.concurrent.ConcurrentMap; @Service @RequiredArgsConstructor public class ZlmPublishHookService { + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; public interface ZlmPublishHookHandler { void handler(); @@ -38,6 +44,8 @@ public class ZlmPublishHookService { log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip); ConcurrentMap handlers = getHandler(app); - Optional.ofNullable(handlers.remove(streamId)).ifPresent(ZlmPublishHookHandler::handler); + Optional.ofNullable(handlers.remove(streamId)).ifPresent(handler -> { + taskExecutor.execute(handler::handler); + }); } }