wvp 国标级联 历史视频点播 推流流程修正

This commit is contained in:
zxb 2024-03-07 15:01:15 +08:00
parent a3d4bf9899
commit 19524e9289
5 changed files with 62 additions and 20 deletions

View File

@ -110,7 +110,10 @@ public interface ISIPCommander {
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,
Function<String, Void> ignoreCallBack,
Function<SIPResponse, Void> inviteCallBack,
ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
* 请求历史媒体下载

View File

@ -395,7 +395,10 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, ZlmHttpHookSubscribe.Event hookEvent,
String startTime, String endTime,
Function<String, Void> ignoreCallBack,
Function<SIPResponse, Void> inviteCallBack,
ZlmHttpHookSubscribe.Event hookEvent,
SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
@ -476,11 +479,17 @@ public class SIPCommander implements ISIPCommander {
}
subscribe.removeSubscribe(hookSubscribe);
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport());
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
if(ignoreCallBack != null){
ignoreCallBack.apply(newCallIdHeader.getCallId());
}
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
if(inviteCallBack != null){
inviteCallBack.apply(response);
}
streamSession.put(device.getDeviceId(), channelId,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
okEvent.response(event);
});

View File

@ -533,6 +533,42 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.formatter.format(end),
(String deviceCallId)->{
// 忽略自动回复 ACK
sipSubscribe.addOkSubscribe("ACK_IGNORE_" + deviceCallId, (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_IGNORE_" + deviceCallId);
});
return null;
},(SIPResponse response)->{
// 先订阅再发起
// 订阅上级平台的 ACK 请求
sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
try {
String deviceContent = new String(response.getRawContent());
Gb28181Sdp deviceGb28181Sdp = SipUtils.parseSDP(deviceContent);
SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
// 收到上级的 ACK , 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
} catch (ParseException | SdpParseException | InvalidArgumentException | SipException e) {
throw new RuntimeException(e);
}
});
try {
// 向上级平台回复 INVITE OK
responseSdpAck(request, new String(request.getRawContent()), platform);
} catch (SipException | InvalidArgumentException |
ParseException e) {
throw new RuntimeException(e);
}
return null;
},
(code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()){
hookEvent.run(code, msg, data);
@ -561,19 +597,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// sipSubscribe.addOkSubscribe("DEVICE_INVITE_OK_" + channelId, (eventResult) -> {
// sipSubscribe.removeOkSubscribe("DEVICE_INVITE_OK_" + channelId);
// try {
// logger.info("国标级联 [录像下载] 向设备 => ({}) {} 发送 INVITE OK", channelId, callIdHeader.getCallId());
// responseSdpAck(request, new String(request.getRawContent()), platform);
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId());
// if(okSubscribe != null){
// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
// }
// } catch (SipException | InvalidArgumentException | ParseException e) {
// throw new RuntimeException(e);
// }
// });
playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.formatter.format(end), NumberUtils.createDouble(downloadSpeed).intValue(),(String deviceCallId)->{
@ -585,12 +608,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return null;
},(SIPResponse response)->{
// 先订阅再发起
// 订阅上级平台的 ACK 请求
sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
try {
String deviceContent = new String(response.getRawContent());
Gb28181Sdp deviceGb28181Sdp = SipUtils.parseSDP(deviceContent);

View File

@ -26,6 +26,7 @@ public interface IPlayService {
void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback<Object> callback);
void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback);
void zlmServerOffline(String mediaServerId);
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);

View File

@ -471,6 +471,13 @@ public class PlayServiceImpl implements IPlayService {
public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String endTime, ErrorCallback<Object> callback) {
playBack(mediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, null, null,callback);
}
@Override
public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String endTime, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@ -546,6 +553,7 @@ public class PlayServiceImpl implements IPlayService {
try {
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime,
ignoreCallBack, inviteCallBack,
hookEvent, eventResult -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,