wvp 国标级联 视频下载 推流 顺序流程 粗略修正

This commit is contained in:
zxb 2024-03-04 23:16:35 +08:00
parent c871fc5a61
commit fabc564fc8
5 changed files with 169 additions and 27 deletions

View File

@ -61,7 +61,7 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private SIPSender sipSender;
@Autowired
private SIPRequestHeaderProvider headerProvider;
@ -190,7 +190,7 @@ public class SIPCommander implements ISIPCommander {
ptzXml.append("<ControlPriority>5</ControlPriority>\r\n");
ptzXml.append("</Info>\r\n");
ptzXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request);
@ -253,8 +253,8 @@ public class SIPCommander implements ISIPCommander {
ptzXml.append("<ControlPriority>5</ControlPriority>\r\n");
ptzXml.append("</Info>\r\n");
ptzXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request, errorEvent, okEvent);
@ -667,7 +667,7 @@ public class SIPCommander implements ISIPCommander {
broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
broadcastXml.append("</Notify>\r\n");
Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
@ -687,7 +687,7 @@ public class SIPCommander implements ISIPCommander {
broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
broadcastXml.append("</Notify>\r\n");
Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent);
@ -718,7 +718,7 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("<RecordCmd>" + recordCmdStr + "</RecordCmd>\r\n");
cmdXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent,okEvent);
@ -742,7 +742,7 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("<TeleBoot>Boot</TeleBoot>\r\n");
cmdXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
@ -801,7 +801,7 @@ public class SIPCommander implements ISIPCommander {
}
cmdXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent,okEvent);
@ -830,7 +830,7 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("<IFameCmd>Send</IFameCmd>\r\n");
cmdXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
@ -878,7 +878,7 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("</HomePosition>\r\n");
cmdXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent,okEvent);
@ -941,7 +941,7 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("</BasicParam>\r\n");
cmdXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent);
@ -986,7 +986,7 @@ public class SIPCommander implements ISIPCommander {
catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
catalogXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
@ -1011,7 +1011,7 @@ public class SIPCommander implements ISIPCommander {
catalogXml.append(" <DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
catalogXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
@ -1056,7 +1056,7 @@ public class SIPCommander implements ISIPCommander {
}
recordInfoXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(),
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
@ -1107,7 +1107,7 @@ public class SIPCommander implements ISIPCommander {
}
cmdXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent);
@ -1137,7 +1137,7 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("<ConfigType>" + configType + "</ConfigType>\r\n");
cmdXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent);
@ -1187,7 +1187,7 @@ public class SIPCommander implements ISIPCommander {
mobilePostitionXml.append("<Interval>60</Interval>\r\n");
mobilePostitionXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, mobilePostitionXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
@ -1269,7 +1269,7 @@ public class SIPCommander implements ISIPCommander {
}
cmdXml.append("</Query>\r\n");
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), null, expires, "presence",sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
@ -1319,14 +1319,14 @@ public class SIPCommander implements ISIPCommander {
}
dragXml.append(cmdString);
dragXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, dragXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
logger.debug("拉框信令: " + request.toString());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request);
}
/**
@ -1430,7 +1430,7 @@ public class SIPCommander implements ISIPCommander {
deviceStatusXml.append("</info>\r\n");
deviceStatusXml.append("</Notify>\r\n");
Request request = headerProvider.createMessageRequest(device, deviceStatusXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request);

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -21,6 +22,7 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@ -35,6 +37,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
@ -90,10 +93,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private RedisGbPlayMsgListener redisGbPlayMsgListener;
@Autowired
private SipSubscribe sipSubscribe;
/**
/**
* 处理 ACK请求
*
*
* @param evt
*/
@Override
@ -116,6 +121,47 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("收到ACKrtp/{} TCP主动方式后续处理", sendRtpItem.getStreamId());
return;
}
// sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> {
// sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
// try {
// responseAck((SIPRequest) evt.getRequest(), Response.OK);
// } catch (SipException | InvalidArgumentException | ParseException e) {
// logger.error(e.getMessage());
// }
// });
SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId());
if( okSubscribe != null) {
okSubscribe.response(new SipSubscribe.EventResult<>(evt));
}
// String channelId = id;
// if(storager.queryVideoDeviceByChannelId(id) != null){
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id);
// if(okSubscribe!=null){
// okSubscribe.response(null);
// }
// } else {
// Device device = storager.queryVideoDevice(id);
// if(device != null){
// List<DeviceChannel> deviceChannels = storager.queryChannelWithCatalog(device.getDeviceId());
// deviceChannels.forEach(channel -> {
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + channel.getChannelId());
// if(okSubscribe!=null){
// okSubscribe.response(null);
// }
// });
// } else {
// logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
// sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
// }
// }
okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId());
if( okSubscribe != null) {
okSubscribe.response(new SipSubscribe.EventResult<>(evt));
}
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("收到ACKrtp/{}开始向上级推流, 目标={}:{}SSRC={}, 协议:{}",

View File

@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@ -125,6 +126,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private SIPCommander cmder;
@Autowired
private SipSubscribe sipSubscribe;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@ -445,7 +449,20 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}, 60 * 1000);
sipSubscribe.addOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId(), (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId());
SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + channelId);
if(okSubscribe != null) {
okSubscribe.response(null);
//sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + channelId);
}
// responseAck(streamInfo.getTransactionInfo())
});
responseSdpAck(request, content.toString(), platform);
// sipSubscribe.addOkSubscribe("ACK_" + channelId, (eventResult) -> {});
// tcp主动模式回复sdp后开启监听
if (sendRtpItem.isTcpActive()) {
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@ -537,6 +554,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// sipSubscribe.addOkSubscribe("DEVICE_INVITE_OK_" + channelId, (eventResult) -> {
// sipSubscribe.removeOkSubscribe("DEVICE_INVITE_OK_" + channelId);
// try {
// logger.info("国标级联 [录像下载] 向设备 => ({}) {} 发送 INVITE OK", channelId, callIdHeader.getCallId());
// responseSdpAck(request, new String(request.getRawContent()), platform);
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId());
// if(okSubscribe != null){
// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
// }
// } catch (SipException | InvalidArgumentException | ParseException e) {
// throw new RuntimeException(e);
// }
// });
playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.formatter.format(end), NumberUtils.createDouble(downloadSpeed).intValue(),
(code, msg, data) -> {

View File

@ -1,14 +1,20 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.message.SIPResponse;
import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -21,9 +27,15 @@ import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.SipFactory;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@ -50,6 +62,13 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
@Autowired
private SIPRequestHeaderProvider headerProvider;
@Autowired
private SipSubscribe sipSubscribe;
@Autowired
private IVideoManagerStorage storager;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
@Override
public void afterPropertiesSet() throws Exception {
@ -61,7 +80,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
/**
* 处理invite响应
*
*
* @param evt 响应消息
* @throws ParseException
*/
@ -85,8 +104,54 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(sdp.getOrigin().getUsername(), event.getRemoteIpAddress() + ":" + event.getRemotePort());
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
String id = sdp.getOrigin().getUsername();
// String channelId = id;
// if(storager.queryVideoDeviceByChannelId(id) != null){
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id);
// if(okSubscribe!=null){
// okSubscribe.response(null);
// }
// } else {
// Device device = storager.queryVideoDevice(id);
// if(device != null){
// List<DeviceChannel> deviceChannels = storager.queryChannelWithCatalog(device.getDeviceId());
// deviceChannels.forEach(channel -> {
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + channel.getChannelId());
// if(okSubscribe!=null){
// okSubscribe.response(null);
// }
// });
// } else {
// logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
// sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
// }
// }
String callId = response.getCallIdHeader().getCallId();
// SipSubscribe.Event ackDeviceEvent = sipSubscribe.getOkSubscribe("ACK_DEVICE_EVENT_" + callId);
// if(ackDeviceEvent != null){
//
// }
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
try {
logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
} catch (SdpParseException | ParseException | SipException e) {
throw new RuntimeException(e);
}
}, 3, TimeUnit.SECONDS);
sipSubscribe.addOkSubscribe("ACK_DEVICE_" + id, (eventResult)->{
schedule.cancel(true);
sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
try {
logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
} catch (SdpParseException | ParseException | SipException e) {
throw new RuntimeException(e);
}
});
}
} catch (InvalidArgumentException | ParseException | SipException | SdpParseException e) {
logger.info("[点播回复ACK],异常:", e );

View File

@ -729,6 +729,7 @@ public class PlayServiceImpl implements IPlayService {
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
logger.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime);
logger.debug("streamInfo => {}",streamInfo);