diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 36ee8f32..3f437c03 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -16,26 +16,26 @@ import javax.sip.SipException; import java.text.ParseException; import java.util.function.Function; -/** - * @description:设备能力接口,用于定义设备的控制、查询能力 +/** + * @description:设备能力接口,用于定义设备的控制、查询能力 * @author: swwheihei - * @date: 2020年5月3日 下午9:16:34 + * @date: 2020年5月3日 下午9:16:34 */ public interface ISIPCommander { /** * 云台方向放控制,使用配置文件中的默认镜头移动速度 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param leftRight 镜头左移右移 0:停止 1:左移 2:右移 * @param upDown 镜头上移下移 0:停止 1:上移 2:下移 */ void ptzdirectCmd(Device device,String channelId,int leftRight, int upDown) throws InvalidArgumentException, ParseException, SipException; - + /** * 云台方向放控制 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param leftRight 镜头左移右移 0:停止 1:左移 2:右移 @@ -43,28 +43,28 @@ public interface ISIPCommander { * @param moveSpeed 镜头移动速度 */ void ptzdirectCmd(Device device,String channelId,int leftRight, int upDown, int moveSpeed) throws InvalidArgumentException, ParseException, SipException; - + /** * 云台缩放控制,使用配置文件中的默认镜头缩放速度 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大 */ void ptzZoomCmd(Device device,String channelId,int inOut) throws InvalidArgumentException, ParseException, SipException; - + /** * 云台缩放控制 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大 */ void ptzZoomCmd(Device device,String channelId,int inOut, int moveSpeed) throws InvalidArgumentException, ParseException, SipException; - + /** * 云台控制,支持方向与缩放控制 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param leftRight 镜头左移右移 0:停止 1:左移 2:右移 @@ -74,10 +74,10 @@ public interface ISIPCommander { * @param zoomSpeed 镜头缩放速度 */ void ptzCmd(Device device,String channelId,int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed) throws InvalidArgumentException, SipException, ParseException; - + /** * 前端控制,包括PTZ指令、FI指令、预置位指令、巡航指令、扫描指令和辅助开关指令 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param cmdCode 指令码 @@ -86,7 +86,7 @@ public interface ISIPCommander { * @param combineCode2 组合码2 */ void frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2) throws SipException, InvalidArgumentException, ParseException; - + /** * 前端控制指令(用于转发上级指令) * @param device 控制设备 @@ -100,11 +100,14 @@ public interface ISIPCommander { * @param device 视频设备 * @param channelId 预览通道 */ - void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + Function ignoreCallBack, + Function inviteCallBack, + ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 请求回放视频流 - * + * * @param device 视频设备 * @param channelId 预览通道 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss @@ -117,13 +120,13 @@ public interface ISIPCommander { /** * 请求历史媒体下载 - * + * * @param device 视频设备 * @param channelId 预览通道 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss * @param downloadSpeed 下载倍速参数 - */ + */ void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, int downloadSpeed, Function ignoreCallBack, Function inviteCallBack, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; @@ -155,7 +158,7 @@ public interface ISIPCommander { * 回放倍速播放 */ void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) throws InvalidArgumentException, ParseException, SipException; - + /** * 回放控制 * @param device @@ -167,55 +170,55 @@ public interface ISIPCommander { /** * 语音广播 - * + * * @param device 视频设备 * @param channelId 预览通道 */ void audioBroadcastCmd(Device device,String channelId); - + /** * 语音广播 - * + * * @param device 视频设备 */ void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; void audioBroadcastCmd(Device device) throws InvalidArgumentException, SipException, ParseException; - + /** * 音视频录像控制 - * + * * @param device 视频设备 * @param channelId 预览通道 * @param recordCmdStr 录像命令:Record / StopRecord */ void recordCmd(Device device, String channelId, String recordCmdStr, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 远程启动控制命令 - * + * * @param device 视频设备 */ void teleBootCmd(Device device) throws InvalidArgumentException, SipException, ParseException; /** * 报警布防/撤防命令 - * + * * @param device 视频设备 */ void guardCmd(Device device, String guardCmdStr, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 报警复位命令 - * + * * @param device 视频设备 * @param alarmMethod 报警方式(可选) * @param alarmType 报警类型(可选) */ void alarmCmd(Device device, String alarmMethod, String alarmType, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 强制关键帧命令,设备收到此命令应立刻发送一个IDR帧 - * + * * @param device 视频设备 * @param channelId 预览通道 */ @@ -234,58 +237,58 @@ public interface ISIPCommander { /** * 设备配置命令 - * + * * @param device 视频设备 */ void deviceConfigCmd(Device device); - + /** * 设备配置命令:basicParam - * + * * @param device 视频设备 * @param channelId 通道编码(可选) * @param name 设备/通道名称(可选) * @param expiration 注册过期时间(可选) * @param heartBeatInterval 心跳间隔时间(可选) * @param heartBeatCount 心跳超时次数(可选) - */ + */ void deviceBasicConfigCmd(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 查询设备状态 - * + * * @param device 视频设备 */ void deviceStatusQuery(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询设备信息 - * + * * @param device 视频设备 - * @return + * @return */ void deviceInfoQuery(Device device) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询目录列表 - * + * * @param device 视频设备 */ void catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) throws SipException, InvalidArgumentException, ParseException; - + /** * 查询录像信息 - * + * * @param device 视频设备 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss * @param sn */ void recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, Integer Secrecy, String type, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询报警信息 - * + * * @param device 视频设备 * @param startPriority 报警起始级别(可选) * @param endPriority 报警终止级别(可选) @@ -297,33 +300,33 @@ public interface ISIPCommander { */ void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询设备配置 - * + * * @param device 视频设备 * @param channelId 通道编码(可选) * @param configType 配置类型: */ void deviceConfigQuery(Device device, String channelId, String configType, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询设备预置位置 - * + * * @param device 视频设备 */ void presetQuery(Device device, String channelId, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询移动设备位置数据 - * + * * @param device 视频设备 */ void mobilePostitionQuery(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 订阅、取消订阅移动位置 - * + * * @param device 视频设备 * @return true = 命令发送成功 */ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index f22bba82..0ce07082 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -271,6 +271,8 @@ public class SIPCommander implements ISIPCommander { */ @Override public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + Function ignoreCallBack, + Function inviteCallBack, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { String stream = ssrcInfo.getStream(); @@ -369,8 +371,12 @@ public class SIPCommander implements ISIPCommander { // content.append("f=v/2/5/25/1/4000a/1/8/1" + "\r\n"); // 未发现支持此特性的设备 + CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); - Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); + Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(), newCallIdHeader); + if(ignoreCallBack != null){ + ignoreCallBack.apply(newCallIdHeader.getCallId()); + } sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); @@ -379,6 +385,9 @@ public class SIPCommander implements ISIPCommander { ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); String callId = response.getCallIdHeader().getCallId(); + if(inviteCallBack != null){ + inviteCallBack.apply(response); + } streamSession.put(device.getDeviceId(), channelId, callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY); okEvent.response(e); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 9179c3da..cd971873 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -181,17 +181,20 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 开启rtcp保活 param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); } + logger.debug("mediaInfo {}", mediaInfo==null?"null":mediaInfo); if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param)); redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{ startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); }); }else { logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param)); + logger.debug("zlmPublishHookService.getHandler(sendRtpItem.getApp()) {} put {}",zlmPublishHookService.getHandler(sendRtpItem.getApp()), sendRtpItem.getStreamId()); zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{ taskExecutor.submit(()->{ JSONObject startSendRtpStreamResult; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index c84e4c27..0ecb6f40 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -654,7 +654,44 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String streamId = String.format("%s_%s", device.getDeviceId(), channelId); sendRtpItem.setStreamId(streamId); redisCatchStorage.updateSendRTPSever(sendRtpItem); - SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> { + SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, (String deviceCallId) -> { + // 忽略自动回复 ACK + sipSubscribe.addOkSubscribe("ACK_IGNORE_" + deviceCallId, (eventResult) -> { + sipSubscribe.removeOkSubscribe("ACK_IGNORE_" + deviceCallId); + }); + + return null; + }, (SIPResponse response) -> { + + // 先订阅再发起 + // 订阅上级平台的 ACK 请求 + sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), (eventResult) -> { + sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId()); + try { + String deviceContent = new String(response.getRawContent()); + Gb28181Sdp deviceGb28181Sdp = SipUtils.parseSDP(deviceContent); + SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb(); + SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort()); + + logger.info("收到上级 callId => {} 的 ACK 请求, 向 下级 {} 转发 ACK", callIdHeader.getCallId(), deviceSdp.getOrigin().getUsername()); + // 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发 + Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response); + sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck); + } catch (ParseException | SdpParseException | InvalidArgumentException | + SipException e) { + throw new RuntimeException(e); + } + }); + + try { + // 向上级平台回复 INVITE OK + responseSdpAck(request, new String(request.getRawContent()), platform); + } catch (SipException | InvalidArgumentException | + ParseException e) { + throw new RuntimeException(e); + } + return null; + }, ((code, msg, data) -> { if (code == InviteErrorCode.SUCCESS.getCode()) { hookEvent.run(code, msg, data); } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 2f866894..ecb3e9fe 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -18,9 +18,10 @@ import java.util.function.Function; */ public interface IPlayService { - void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, - ErrorCallback callback); + void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ErrorCallback callback); + void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, Function ignoreCallBack, Function inviteCallBack, ErrorCallback callback); SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); + SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, Function ignoreCallBack, Function inviteCallBack, ErrorCallback callback); MediaServerItem getNewMediaServerItem(Device device); @@ -31,7 +32,6 @@ public interface IPlayService { void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); - void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, Function ignoreCallBack, Function inviteCallBack, ErrorCallback callback); StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 8e84488e..681a8291 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -108,7 +108,7 @@ public class PlayServiceImpl implements IPlayService { @Override - public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { + public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, Function ignoreCallBack, Function inviteCallBack, ErrorCallback callback) { if(userSetting.getDisableRealtimePlay()){ logger.info("[点播] 实时点播已禁用"); callback.run(Response.FORBIDDEN, "实时点播已禁用", null); @@ -172,14 +172,17 @@ public class PlayServiceImpl implements IPlayService { null); return null; } - play(mediaServerItem, ssrcInfo, device, channelId, callback); + play(mediaServerItem, ssrcInfo, device, channelId, ignoreCallBack, inviteCallBack,callback); return ssrcInfo; } + @Override + public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { + return play(mediaServerItem, deviceId, channelId, ssrc, null, null, callback); + } @Override - public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, - ErrorCallback callback) { + public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, Function ignoreCallBack, Function inviteCallBack, ErrorCallback callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -249,7 +252,7 @@ public class PlayServiceImpl implements IPlayService { }, userSetting.getPlayTimeout()); try { - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> { + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, ignoreCallBack, inviteCallBack,(mediaServerItemInuse, hookParam ) -> { logger.info("收到订阅消息: " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook响应 @@ -311,6 +314,12 @@ public class PlayServiceImpl implements IPlayService { } } + @Override + public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + ErrorCallback callback) { + play(mediaServerItem, ssrcInfo, device, channelId, null, null, callback); + } + private void tcpActiveHandler(Device device, String channelId, String contentString, MediaServerItem mediaServerItem, String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback callback){