修复级联点播异常

This commit is contained in:
zxb 2024-04-11 20:39:35 +08:00
parent e7339a5f78
commit aa27d2080b
6 changed files with 124 additions and 63 deletions

View File

@ -100,7 +100,10 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
Function<String, Void> ignoreCallBack,
Function<SIPResponse, Void> inviteCallBack,
ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
* 请求回放视频流

View File

@ -271,6 +271,8 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
Function<String, Void> ignoreCallBack,
Function<SIPResponse, Void> inviteCallBack,
ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String stream = ssrcInfo.getStream();
@ -369,8 +371,12 @@ public class SIPCommander implements ISIPCommander {
// content.append("f=v/2/5/25/1/4000a/1/8/1" + "\r\n"); // 未发现支持此特性的设备
CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), newCallIdHeader);
if(ignoreCallBack != null){
ignoreCallBack.apply(newCallIdHeader.getCallId());
}
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
@ -379,6 +385,9 @@ public class SIPCommander implements ISIPCommander {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
String callId = response.getCallIdHeader().getCallId();
if(inviteCallBack != null){
inviteCallBack.apply(response);
}
streamSession.put(device.getDeviceId(), channelId, callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
InviteSessionType.PLAY);
okEvent.response(e);

View File

@ -181,17 +181,20 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
// 开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
logger.debug("mediaInfo {}", mediaInfo==null?"null":mediaInfo);
if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param));
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
});
}else {
logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param));
logger.debug("zlmPublishHookService.getHandler(sendRtpItem.getApp()) {} put {}",zlmPublishHookService.getHandler(sendRtpItem.getApp()), sendRtpItem.getStreamId());
zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{
taskExecutor.submit(()->{
JSONObject startSendRtpStreamResult;

View File

@ -654,7 +654,44 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
sendRtpItem.setStreamId(streamId);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, (String deviceCallId) -> {
// 忽略自动回复 ACK
sipSubscribe.addOkSubscribe("ACK_IGNORE_" + deviceCallId, (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_IGNORE_" + deviceCallId);
});
return null;
}, (SIPResponse response) -> {
// 先订阅再发起
// 订阅上级平台的 ACK 请求
sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
try {
String deviceContent = new String(response.getRawContent());
Gb28181Sdp deviceGb28181Sdp = SipUtils.parseSDP(deviceContent);
SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
logger.info("收到上级 callId => {} 的 ACK 请求, 向 下级 {} 转发 ACK", callIdHeader.getCallId(), deviceSdp.getOrigin().getUsername());
// 收到上级的 ACK , 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
} catch (ParseException | SdpParseException | InvalidArgumentException |
SipException e) {
throw new RuntimeException(e);
}
});
try {
// 向上级平台回复 INVITE OK
responseSdpAck(request, new String(request.getRawContent()), platform);
} catch (SipException | InvalidArgumentException |
ParseException e) {
throw new RuntimeException(e);
}
return null;
}, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
hookEvent.run(code, msg, data);
} else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {

View File

@ -18,9 +18,10 @@ import java.util.function.Function;
*/
public interface IPlayService {
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ErrorCallback<Object> callback);
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ErrorCallback<Object> callback);
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback);
MediaServerItem getNewMediaServerItem(Device device);
@ -31,7 +32,6 @@ public interface IPlayService {
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback);
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);

View File

@ -108,7 +108,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback) {
if(userSetting.getDisableRealtimePlay()){
logger.info("[点播] 实时点播已禁用");
callback.run(Response.FORBIDDEN, "实时点播已禁用", null);
@ -172,14 +172,17 @@ public class PlayServiceImpl implements IPlayService {
null);
return null;
}
play(mediaServerItem, ssrcInfo, device, channelId, callback);
play(mediaServerItem, ssrcInfo, device, channelId, ignoreCallBack, inviteCallBack,callback);
return ssrcInfo;
}
@Override
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {
return play(mediaServerItem, deviceId, channelId, ssrc, null, null, callback);
}
@Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ErrorCallback<Object> callback) {
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@ -249,7 +252,7 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout());
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, ignoreCallBack, inviteCallBack,(mediaServerItemInuse, hookParam ) -> {
logger.info("收到订阅消息: " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// hook响应
@ -311,6 +314,12 @@ public class PlayServiceImpl implements IPlayService {
}
}
@Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ErrorCallback<Object> callback) {
play(mediaServerItem, ssrcInfo, device, channelId, null, null, callback);
}
private void tcpActiveHandler(Device device, String channelId, String contentString,
MediaServerItem mediaServerItem,
String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){