From fabc564fc8bc1565c522ddba5bb6cba529837b05 Mon Sep 17 00:00:00 2001 From: zxb <919411476@qq.com> Date: Mon, 4 Mar 2024 23:16:35 +0800 Subject: [PATCH] =?UTF-8?q?wvp=20=E5=9B=BD=E6=A0=87=E7=BA=A7=E8=81=94=20?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E4=B8=8B=E8=BD=BD=20=E6=8E=A8=E6=B5=81=20?= =?UTF-8?q?=E9=A1=BA=E5=BA=8F=E6=B5=81=E7=A8=8B=20=E7=B2=97=E7=95=A5?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transmit/cmd/impl/SIPCommander.java | 44 ++++++------ .../request/impl/AckRequestProcessor.java | 50 ++++++++++++- .../request/impl/InviteRequestProcessor.java | 30 ++++++++ .../impl/InviteResponseProcessor.java | 71 ++++++++++++++++++- .../iot/vmp/service/impl/PlayServiceImpl.java | 1 + 5 files changed, 169 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 7b340f8d..0f03d761 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -61,7 +61,7 @@ public class SIPCommander implements ISIPCommander { @Autowired private SIPSender sipSender; - + @Autowired private SIPRequestHeaderProvider headerProvider; @@ -190,7 +190,7 @@ public class SIPCommander implements ISIPCommander { ptzXml.append("5\r\n"); ptzXml.append("\r\n"); ptzXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request); @@ -253,8 +253,8 @@ public class SIPCommander implements ISIPCommander { ptzXml.append("5\r\n"); ptzXml.append("\r\n"); ptzXml.append("\r\n"); - - + + Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request, errorEvent, okEvent); @@ -667,7 +667,7 @@ public class SIPCommander implements ISIPCommander { broadcastXml.append("" + device.getDeviceId() + "\r\n"); broadcastXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request); @@ -687,7 +687,7 @@ public class SIPCommander implements ISIPCommander { broadcastXml.append("" + device.getDeviceId() + "\r\n"); broadcastXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); @@ -718,7 +718,7 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("" + recordCmdStr + "\r\n"); cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent,okEvent); @@ -742,7 +742,7 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("Boot\r\n"); cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request); @@ -801,7 +801,7 @@ public class SIPCommander implements ISIPCommander { } cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent,okEvent); @@ -830,7 +830,7 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("Send\r\n"); cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request); @@ -878,7 +878,7 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("\r\n"); cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent,okEvent); @@ -941,7 +941,7 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("\r\n"); cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); @@ -986,7 +986,7 @@ public class SIPCommander implements ISIPCommander { catalogXml.append("" + device.getDeviceId() + "\r\n"); catalogXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); @@ -1011,7 +1011,7 @@ public class SIPCommander implements ISIPCommander { catalogXml.append(" " + device.getDeviceId() + "\r\n"); catalogXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); @@ -1056,7 +1056,7 @@ public class SIPCommander implements ISIPCommander { } recordInfoXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); @@ -1107,7 +1107,7 @@ public class SIPCommander implements ISIPCommander { } cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); @@ -1137,7 +1137,7 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("" + configType + "\r\n"); cmdXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); @@ -1187,7 +1187,7 @@ public class SIPCommander implements ISIPCommander { mobilePostitionXml.append("60\r\n"); mobilePostitionXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, mobilePostitionXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); @@ -1269,7 +1269,7 @@ public class SIPCommander implements ISIPCommander { } cmdXml.append("\r\n"); - + Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), null, expires, "presence",sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request); @@ -1319,14 +1319,14 @@ public class SIPCommander implements ISIPCommander { } dragXml.append(cmdString); dragXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, dragXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); logger.debug("拉框信令: " + request.toString()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request); } - + /** @@ -1430,7 +1430,7 @@ public class SIPCommander implements ISIPCommander { deviceStatusXml.append("\r\n"); deviceStatusXml.append("\r\n"); - + Request request = headerProvider.createMessageRequest(device, deviceStatusXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index d189048b..22303d17 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -21,6 +22,7 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -35,6 +37,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; +import javax.sip.message.Response; import java.text.ParseException; import java.util.HashMap; import java.util.Map; @@ -90,10 +93,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Autowired + private SipSubscribe sipSubscribe; - /** + /** * 处理 ACK请求 - * + * * @param evt */ @Override @@ -116,6 +121,47 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In logger.info("收到ACK,rtp/{} TCP主动方式后续处理", sendRtpItem.getStreamId()); return; } + + // sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> { + // sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId()); + // try { + // responseAck((SIPRequest) evt.getRequest(), Response.OK); + // } catch (SipException | InvalidArgumentException | ParseException e) { + // logger.error(e.getMessage()); + // } + // }); + + SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId()); + if( okSubscribe != null) { + okSubscribe.response(new SipSubscribe.EventResult<>(evt)); + } + + // String channelId = id; + // if(storager.queryVideoDeviceByChannelId(id) != null){ + // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id); + // if(okSubscribe!=null){ + // okSubscribe.response(null); + // } + // } else { + // Device device = storager.queryVideoDevice(id); + // if(device != null){ + // List deviceChannels = storager.queryChannelWithCatalog(device.getDeviceId()); + // deviceChannels.forEach(channel -> { + // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + channel.getChannelId()); + // if(okSubscribe!=null){ + // okSubscribe.response(null); + // } + // }); + // } else { + // logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort()); + // sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck); + // } + // } + okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId()); + if( okSubscribe != null) { + okSubscribe.response(new SipSubscribe.EventResult<>(evt)); + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, 协议:{}", diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index fa61d7df..872ee4e2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; @@ -125,6 +126,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private SIPCommander cmder; + @Autowired + private SipSubscribe sipSubscribe; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -445,7 +449,20 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } }, 60 * 1000); + + sipSubscribe.addOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId(), (eventResult) -> { + sipSubscribe.removeOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId()); + + SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + channelId); + if(okSubscribe != null) { + okSubscribe.response(null); + //sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + channelId); + } + // responseAck(streamInfo.getTransactionInfo()) + }); responseSdpAck(request, content.toString(), platform); + // sipSubscribe.addOkSubscribe("ACK_" + channelId, (eventResult) -> {}); + // tcp主动模式,回复sdp后开启监听 if (sendRtpItem.isTcpActive()) { MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); @@ -537,6 +554,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setStreamId(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); + // sipSubscribe.addOkSubscribe("DEVICE_INVITE_OK_" + channelId, (eventResult) -> { + // sipSubscribe.removeOkSubscribe("DEVICE_INVITE_OK_" + channelId); + // try { + // logger.info("国标级联 [录像下载] 向设备 => ({}) {} 发送 INVITE OK", channelId, callIdHeader.getCallId()); + // responseSdpAck(request, new String(request.getRawContent()), platform); + // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId()); + // if(okSubscribe != null){ + // okSubscribe.response(new SipSubscribe.EventResult<>(evt)); + // } + // } catch (SipException | InvalidArgumentException | ParseException e) { + // throw new RuntimeException(e); + // } + // }); playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), DateUtil.formatter.format(end), NumberUtils.createDouble(downloadSpeed).intValue(), (code, msg, data) -> { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index 436d2a43..3a01c65e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -1,14 +1,20 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.gb28181.SipLayer; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.ResponseEventExt; import gov.nist.javax.sip.message.SIPResponse; +import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -21,9 +27,15 @@ import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.sip.SipFactory; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** @@ -50,6 +62,13 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { @Autowired private SIPRequestHeaderProvider headerProvider; + @Autowired + private SipSubscribe sipSubscribe; + + @Autowired + private IVideoManagerStorage storager; + + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); @Override public void afterPropertiesSet() throws Exception { @@ -61,7 +80,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { /** * 处理invite响应 - * + * * @param evt 响应消息 * @throws ParseException */ @@ -85,8 +104,54 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(sdp.getOrigin().getUsername(), event.getRemoteIpAddress() + ":" + event.getRemotePort()); Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response); - logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort()); - sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck); + String id = sdp.getOrigin().getUsername(); + // String channelId = id; + // if(storager.queryVideoDeviceByChannelId(id) != null){ + // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id); + // if(okSubscribe!=null){ + // okSubscribe.response(null); + // } + // } else { + // Device device = storager.queryVideoDevice(id); + // if(device != null){ + // List deviceChannels = storager.queryChannelWithCatalog(device.getDeviceId()); + // deviceChannels.forEach(channel -> { + // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + channel.getChannelId()); + // if(okSubscribe!=null){ + // okSubscribe.response(null); + // } + // }); + // } else { + // logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort()); + // sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck); + // } + // } + + + String callId = response.getCallIdHeader().getCallId(); + // SipSubscribe.Event ackDeviceEvent = sipSubscribe.getOkSubscribe("ACK_DEVICE_EVENT_" + callId); + // if(ackDeviceEvent != null){ + // + // } + + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + try { + logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort()); + sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck); + } catch (SdpParseException | ParseException | SipException e) { + throw new RuntimeException(e); + } + }, 3, TimeUnit.SECONDS); + sipSubscribe.addOkSubscribe("ACK_DEVICE_" + id, (eventResult)->{ + schedule.cancel(true); + sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id); + try { + logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort()); + sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck); + } catch (SdpParseException | ParseException | SipException e) { + throw new RuntimeException(e); + } + }); } } catch (InvalidArgumentException | ParseException | SipException | SdpParseException e) { logger.info("[点播回复ACK],异常:", e ); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 634b35d4..8cc9498e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -729,6 +729,7 @@ public class PlayServiceImpl implements IPlayService { InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); return; } + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime); logger.debug("streamInfo => {}",streamInfo);