diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 6c06bd3a..be67e7c6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -760,6 +760,29 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } try{ SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray); + SipStack sipStack; + if ("TCP".equals(platform.getTransport())) { + sipStack = tcpSipProvider.getSipStack(); + } else { + sipStack = udpSipProvider.getSipStack(); + } + SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog); + if (dialog != sipDialog) { + dialog = sipDialog; + } + if ("TCP".equals(platform.getTransport())) { + dialog.setSipProvider(tcpSipProvider); + } else { + dialog.setSipProvider(udpSipProvider); + } + + Field sipStackField = SIPDialog.class.getDeclaredField("sipStack"); + sipStackField.setAccessible(true); + sipStackField.set(dialog, sipStack); + Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners"); + eventListenersField.setAccessible(true); + eventListenersField.set(dialog, new HashSet<>()); + SIPRequest messageRequest = (SIPRequest)dialog.createRequest(Request.MESSAGE); String characterSet = platform.getCharacterSet(); StringBuffer mediaStatusXml = new StringBuffer(200); @@ -775,20 +798,23 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { SipURI sipURI = (SipURI) messageRequest.getRequestURI(); sipURI.setHost(platform.getServerIP()); sipURI.setPort(platform.getServerPort()); - - ClientTransaction transaction = null; + ClientTransaction clientTransaction; if ("TCP".equals(platform.getTransport())) { - transaction = tcpSipProvider.getNewClientTransaction(messageRequest); - } else if ("UDP".equals(platform.getTransport())) { - transaction = udpSipProvider.getNewClientTransaction(messageRequest); + clientTransaction = tcpSipProvider.getNewClientTransaction(messageRequest); + }else { + clientTransaction = udpSipProvider.getNewClientTransaction(messageRequest); } - transaction.sendRequest(); + dialog.sendRequest(clientTransaction); } catch (SipException e) { e.printStackTrace(); return false; } catch (ParseException e) { e.printStackTrace(); return false; + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); } return true; @@ -811,13 +837,22 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { byte[] dialogByteArray = sendRtpItem.getDialog(); if (dialogByteArray != null) { SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray); - SipStack sipStack = udpSipProvider.getSipStack(); + SipStack sipStack; + if ("TCP".equals(platform.getTransport())) { + sipStack = tcpSipProvider.getSipStack(); + } else { + sipStack = udpSipProvider.getSipStack(); + } SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog); if (dialog != sipDialog) { dialog = sipDialog; } try { - dialog.setSipProvider(udpSipProvider); + if ("TCP".equals(platform.getTransport())) { + dialog.setSipProvider(tcpSipProvider); + } else { + dialog.setSipProvider(udpSipProvider); + } Field sipStackField = SIPDialog.class.getDeclaredField("sipStack"); sipStackField.setAccessible(true); sipStackField.set(dialog, sipStack); @@ -825,17 +860,15 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { eventListenersField.setAccessible(true); eventListenersField.set(dialog, new HashSet<>()); - byte[] transactionByteArray = sendRtpItem.getTransaction(); - ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray); Request byeRequest = dialog.createRequest(Request.BYE); SipURI byeURI = (SipURI) byeRequest.getRequestURI(); - SIPRequest request = (SIPRequest) clientTransaction.getRequest(); - byeURI.setHost(request.getRemoteAddress().getHostAddress()); - byeURI.setPort(request.getRemotePort()); + byeURI.setHost(platform.getServerIP()); + byeURI.setPort(platform.getServerPort()); + ClientTransaction clientTransaction; if ("TCP".equals(platform.getTransport())) { clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); - } else if ("UDP".equals(platform.getTransport())) { + } else { clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); } dialog.sendRequest(clientTransaction); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index af61ed3f..b3d67ded 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.SerializeUtils; import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +119,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In logger.error("RTP推流失败: 请检查ZLM服务"); } else if (jsonObject.getInteger("code") == 0) { logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); if (sendRtpItem.isOnlyAudio()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index d967fb94..75b4114d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -264,10 +264,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK); - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); - sendRtpItem.setDialog(dialogByteArray); - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); - sendRtpItem.setTransaction(transactionByteArray); + Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index b758227e..8a5ef793 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -87,8 +87,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, callIdHeader.getCallId(), null); if (ssrcTransaction != null) { // 兼容海康 媒体通知 消息from字段不是设备ID的问题 cmder.streamByeCmd(device.getDeviceId(), ssrcTransaction.getChannelId(), null, callIdHeader.getCallId()); - // 如果级联播放,需要给上级发送此通知 - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, callIdHeader.getCallId()); + + // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null); if (sendRtpItem != null) { ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); if (parentPlatform == null) { @@ -98,7 +99,6 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i sipCommanderFroPlatform.sendMediaStatusNotify(parentPlatform, sendRtpItem); } } - } }