级联回放增加MediaStatus消息 #377

This commit is contained in:
648540858 2022-06-04 00:02:39 +08:00
parent d6a44a03df
commit 075c6ad7f8
4 changed files with 57 additions and 21 deletions

View File

@ -760,6 +760,29 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
try{
SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray);
SipStack sipStack;
if ("TCP".equals(platform.getTransport())) {
sipStack = tcpSipProvider.getSipStack();
} else {
sipStack = udpSipProvider.getSipStack();
}
SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
if (dialog != sipDialog) {
dialog = sipDialog;
}
if ("TCP".equals(platform.getTransport())) {
dialog.setSipProvider(tcpSipProvider);
} else {
dialog.setSipProvider(udpSipProvider);
}
Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
sipStackField.setAccessible(true);
sipStackField.set(dialog, sipStack);
Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
eventListenersField.setAccessible(true);
eventListenersField.set(dialog, new HashSet<>());
SIPRequest messageRequest = (SIPRequest)dialog.createRequest(Request.MESSAGE);
String characterSet = platform.getCharacterSet();
StringBuffer mediaStatusXml = new StringBuffer(200);
@ -775,20 +798,23 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
SipURI sipURI = (SipURI) messageRequest.getRequestURI();
sipURI.setHost(platform.getServerIP());
sipURI.setPort(platform.getServerPort());
ClientTransaction transaction = null;
ClientTransaction clientTransaction;
if ("TCP".equals(platform.getTransport())) {
transaction = tcpSipProvider.getNewClientTransaction(messageRequest);
} else if ("UDP".equals(platform.getTransport())) {
transaction = udpSipProvider.getNewClientTransaction(messageRequest);
clientTransaction = tcpSipProvider.getNewClientTransaction(messageRequest);
}else {
clientTransaction = udpSipProvider.getNewClientTransaction(messageRequest);
}
transaction.sendRequest();
dialog.sendRequest(clientTransaction);
} catch (SipException e) {
e.printStackTrace();
return false;
} catch (ParseException e) {
e.printStackTrace();
return false;
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
return true;
@ -811,13 +837,22 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
byte[] dialogByteArray = sendRtpItem.getDialog();
if (dialogByteArray != null) {
SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray);
SipStack sipStack = udpSipProvider.getSipStack();
SipStack sipStack;
if ("TCP".equals(platform.getTransport())) {
sipStack = tcpSipProvider.getSipStack();
} else {
sipStack = udpSipProvider.getSipStack();
}
SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
if (dialog != sipDialog) {
dialog = sipDialog;
}
try {
if ("TCP".equals(platform.getTransport())) {
dialog.setSipProvider(tcpSipProvider);
} else {
dialog.setSipProvider(udpSipProvider);
}
Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
sipStackField.setAccessible(true);
sipStackField.set(dialog, sipStack);
@ -825,17 +860,15 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
eventListenersField.setAccessible(true);
eventListenersField.set(dialog, new HashSet<>());
byte[] transactionByteArray = sendRtpItem.getTransaction();
ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray);
Request byeRequest = dialog.createRequest(Request.BYE);
SipURI byeURI = (SipURI) byeRequest.getRequestURI();
SIPRequest request = (SIPRequest) clientTransaction.getRequest();
byeURI.setHost(request.getRemoteAddress().getHostAddress());
byeURI.setPort(request.getRemotePort());
byeURI.setHost(platform.getServerIP());
byeURI.setPort(platform.getServerPort());
ClientTransaction clientTransaction;
if ("TCP".equals(platform.getTransport())) {
clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
} else if ("UDP".equals(platform.getTransport())) {
} else {
clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
}
dialog.sendRequest(clientTransaction);

View File

@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -118,6 +119,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
logger.error("RTP推流失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) {
logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
sendRtpItem.setDialog(dialogByteArray);
byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
sendRtpItem.setTransaction(transactionByteArray);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
} else {
logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
if (sendRtpItem.isOnlyAudio()) {

View File

@ -264,10 +264,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK);
byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
sendRtpItem.setDialog(dialogByteArray);
byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
sendRtpItem.setTransaction(transactionByteArray);
Long finalStartTime = startTime;
Long finalStopTime = stopTime;
ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{

View File

@ -87,8 +87,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
if (ssrcTransaction != null) { // 兼容海康 媒体通知 消息from字段不是设备ID的问题
cmder.streamByeCmd(device.getDeviceId(), ssrcTransaction.getChannelId(), null, callIdHeader.getCallId());
// 如果级联播放需要给上级发送此通知
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, callIdHeader.getCallId());
// 如果级联播放需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);
if (sendRtpItem != null) {
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
if (parentPlatform == null) {
@ -98,7 +99,6 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
sipCommanderFroPlatform.sendMediaStatusNotify(parentPlatform, sendRtpItem);
}
}
}
}