增加对水星IPC的兼容

增加对SIP错误的订阅,刷新通道或点播或回放出现sip错误时及时返回给页面
优化UI,增加按钮loading
This commit is contained in:
panlinlin 2020-12-26 16:44:27 +08:00
parent 17f4fe254a
commit f2279859b3
13 changed files with 176 additions and 46 deletions

View File

@ -8,8 +8,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.sip.*;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -34,6 +36,9 @@ public class SipLayer implements SipListener {
@Autowired
private SIPProcessorFactory processorFactory;
@Autowired
private SipSubscribe sipSubscribe;
private SipStack sipStack;
private SipFactory sipFactory;
@ -139,11 +144,19 @@ public class SipLayer implements SipListener {
// 增加其它无需回复的响应如101180等
} else {
logger.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
if (evt.getResponse() != null && sipSubscribe.getSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
SipSubscribe.Event subscribe = sipSubscribe.getSubscribe(callIdHeader.getCallId());
if (subscribe != null) {
subscribe.response(evt);
}
}
}
}
// trying不会回复
// if (status == Response.TRYING) {
// }
}
/**

View File

@ -21,6 +21,6 @@ public class RegisterLogicHandler {
// TODO 后续处理只有第一次注册时调用查询设备信息如需更新调用更新API接口
cmder.deviceInfoQuery(device);
cmder.catalogQuery(device);
cmder.catalogQuery(device, null);
}
}

View File

@ -0,0 +1,37 @@
package com.genersoft.iot.vmp.gb28181.event;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
import javax.sip.message.Request;
import java.util.EventObject;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SipSubscribe {
private final static Logger logger = LoggerFactory.getLogger(SipSubscribe.class);
private Map<String, SipSubscribe.Event> allSubscribes = new ConcurrentHashMap<>();
public interface Event {
void response(ResponseEvent event);
}
public void addSubscribe(String key, SipSubscribe.Event event) {
allSubscribes.put(key, event);
}
public SipSubscribe.Event getSubscribe(String key) {
return allSubscribes.get(key);
}
public int getSize(){
return allSubscribes.size();
}
}

View File

@ -4,10 +4,13 @@ import javax.sip.RequestEvent;
import javax.sip.ResponseEvent;
import javax.sip.SipProvider;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.header.Header;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -83,7 +86,8 @@ public class SIPProcessorFactory {
@Autowired
private OtherResponseProcessor otherResponseProcessor;
// 这里使用注解会导致循环依赖注入暂用springBean
private SipProvider tcpSipProvider;
@ -94,6 +98,7 @@ public class SIPProcessorFactory {
Request request = evt.getRequest();
String method = request.getMethod();
// logger.info("接收到消息:"+request.getMethod());
// sipSubscribe.getSubscribe(evt.getServerTransaction().getBranchId()).response(evt);
if (Request.INVITE.equals(method)) {
InviteRequestProcessor processor = new InviteRequestProcessor();
processor.setRequestEvent(evt);
@ -145,6 +150,7 @@ public class SIPProcessorFactory {
}
public ISIPResponseProcessor createResponseProcessor(ResponseEvent evt) {
Response response = evt.getResponse();
CSeqHeader cseqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
/**
@ -83,7 +84,7 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event);
void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
/**
* 请求回放视频流
@ -93,7 +94,7 @@ public interface ISIPCommander {
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event);
void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
/**
* 视频流停止
@ -175,7 +176,7 @@ public interface ISIPCommander {
*
* @param device 视频设备
*/
boolean catalogQuery(Device device);
boolean catalogQuery(Device device, SipSubscribe.Event errorEvent);
/**
* 查询录像信息

View File

@ -4,22 +4,22 @@ import java.text.ParseException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.sip.ClientTransaction;
import javax.sip.Dialog;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.SipProvider;
import javax.sip.TransactionDoesNotExistException;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.Header;
import javax.sip.header.ViaHeader;
import javax.sip.message.Request;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
@ -39,6 +39,8 @@ import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
*/
@Component
public class SIPCommander implements ISIPCommander {
private final Logger logger = LoggerFactory.getLogger(SIPCommander.class);
@Autowired
private SipConfig sipConfig;
@ -69,6 +71,9 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private ZLMHttpHookSubscribe subscribe;
@Autowired
private SipSubscribe sipSubscribe;
/**
@ -221,7 +226,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag");
transmitRequest(device, request);
transmitRequest(device, request, null);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
@ -256,22 +261,23 @@ public class SIPCommander implements ISIPCommander {
ptzXml.append("</Control>\r\n");
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag");
transmitRequest(device, request);
transmitRequest(device, request, null);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
return false;
}
/**
* 请求预览视频流
*
* 请求预览视频流
* @param device 视频设备
* @param channelId 预览通道
* @param event hook订阅
* @param errorEvent sip错误订阅
*/
@Override
public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event) {
public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
try {
String ssrc = streamSession.createPlaySsrc();
@ -300,7 +306,8 @@ public class SIPCommander implements ISIPCommander {
//
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+channelId+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
// content.append("o="+channelId+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n");
content.append("t=0 0\r\n");
@ -332,7 +339,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "live", null, ssrc);
ClientTransaction transaction = transmitRequest(device, request);
ClientTransaction transaction = transmitRequest(device, request, errorEvent);
streamSession.put(streamId, transaction);
DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
if (deviceChannel != null) {
@ -357,7 +364,8 @@ public class SIPCommander implements ISIPCommander {
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/
@Override
public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event) {
public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event
, SipSubscribe.Event errorEvent) {
try {
MediaServerConfig mediaInfo = storager.getMediaInfo();
String ssrc = streamSession.createPlayBackSsrc();
@ -413,8 +421,8 @@ public class SIPCommander implements ISIPCommander {
content.append("y="+ssrc+"\r\n");//ssrc
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "playback", null);
ClientTransaction transaction = transmitRequest(device, request);
ClientTransaction transaction = transmitRequest(device, request, errorEvent);
streamSession.put(streamId, transaction);
} catch ( SipException | ParseException | InvalidArgumentException e) {
@ -575,7 +583,8 @@ public class SIPCommander implements ISIPCommander {
catalogXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDeviceInfoTag", "ToDeviceInfoTag");
transmitRequest(device, request);
transmitRequest(device, request, null);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
@ -590,7 +599,7 @@ public class SIPCommander implements ISIPCommander {
* @param device 视频设备
*/
@Override
public boolean catalogQuery(Device device) {
public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
// 清空通道
storager.cleanChannelsForDevice(device.getDeviceId());
try {
@ -602,8 +611,9 @@ public class SIPCommander implements ISIPCommander {
catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
catalogXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaCatalogBranch", "FromCatalogTag", "ToCatalogTag");
transmitRequest(device, request);
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaCatalogBranch", "FromCatalogTag", null);
transmitRequest(device, request, errorEvent);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return false;
@ -636,7 +646,9 @@ public class SIPCommander implements ISIPCommander {
recordInfoXml.append("</Query>\r\n");
Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "ViaRecordInfoBranch", "FromRecordInfoTag", "ToRecordInfoTag");
transmitRequest(device, request);
transmitRequest(device, request, null);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return false;
@ -688,13 +700,20 @@ public class SIPCommander implements ISIPCommander {
return false;
}
private ClientTransaction transmitRequest(Device device, Request request) throws SipException {
private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent) throws SipException {
ClientTransaction clientTransaction = null;
if("TCP".equals(device.getTransport())) {
clientTransaction = tcpSipProvider.getNewClientTransaction(request);
} else if("UDP".equals(device.getTransport())) {
clientTransaction = udpSipProvider.getNewClientTransaction(request);
}
// 添加订阅
if (errorEvent != null) {
CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME);
sipSubscribe.addSubscribe(callIdHeader.getCallId(), errorEvent);
}
clientTransaction.sendRequest();
return clientTransaction;
}

View File

@ -294,7 +294,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
device.setStreamMode("UDP");
}
storager.updateDevice(device);
cmder.catalogQuery(device);
cmder.catalogQuery(device, null);
// 回复200 OK
responseAck(evt);
if (offLineDetector.isOnline(deviceId)) {

View File

@ -323,7 +323,7 @@ public class ZLMHttpHookListener {
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
});
}, null);
}
}

View File

@ -34,6 +34,7 @@ public class SpringBeanFactory implements ApplicationContextAware {
* 获取对象 这里重写了bean方法起主要作用
*/
public static Object getBean(String beanId) throws BeansException {
if (applicationContext == null) return null;
return applicationContext.getBean(beanId);
}

View File

@ -4,6 +4,7 @@ import java.util.List;
import com.genersoft.iot.vmp.common.PageResult;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -19,6 +20,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import javax.sip.message.Response;
@CrossOrigin
@RestController
@RequestMapping("/api")
@ -86,11 +89,25 @@ public class DeviceController {
if (logger.isDebugEnabled()) {
}
logger.debug("设备信息同步API调用deviceId" + deviceId);
logger.debug("设备通道信息同步API调用deviceId" + deviceId);
Device device = storager.queryVideoDevice(deviceId);
cmder.catalogQuery(device);
DeferredResult<ResponseEntity<Device>> result = new DeferredResult<ResponseEntity<Device>>();
cmder.catalogQuery(device, event -> {
Response response = event.getResponse();
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_CATALOG+deviceId);
msg.setData(String.format("同步通道失败,错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
resultHolder.invokeResult(msg);
});
DeferredResult<ResponseEntity<Device>> result = new DeferredResult<ResponseEntity<Device>>(2*1000L);
result.onTimeout(()->{
logger.warn(String.format("设备通道信息同步超时"));
// 释放rtpserver
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_CATALOG+deviceId);
msg.setData("Timeout");
resultHolder.invokeResult(msg);
});
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_CATALOG+deviceId, result);
return result;
}

View File

@ -28,6 +28,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.message.Response;
import java.text.DecimalFormat;
import java.util.UUID;
@ -72,6 +73,12 @@ public class PlayController {
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
}, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
Response response = event.getResponse();
msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
resultHolder.invokeResult(msg);
});
} else {
String streamId = streamInfo.getStreamId();
@ -86,6 +93,12 @@ public class PlayController {
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
}, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
Response response = event.getResponse();
msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
resultHolder.invokeResult(msg);
});
}
}

View File

@ -27,6 +27,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.message.Response;
import java.util.UUID;
@CrossOrigin
@ -78,6 +79,12 @@ public class PlaybackController {
cmder.playbackStreamCmd(device, channelId, startTime, endTime, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlayBack(response, deviceId, channelId, uuid.toString());
}, event -> {
Response response = event.getResponse();
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
msg.setData(String.format("回放失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
resultHolder.invokeResult(msg);
});
return result;

View File

@ -8,7 +8,7 @@
<div style="background-color: #FFFFFF; margin-bottom: 1rem; position: relative; padding: 0.5rem; text-align: left;">
<span style="font-size: 1rem; font-weight: bold;">设备列表</span>
<div style="position: absolute; right: 1rem; top: 0.3rem;">
<el-button icon="el-icon-refresh-right" circle size="mini" @click="getDeviceList()"></el-button>
<el-button icon="el-icon-refresh-right" circle size="mini" :loading="getDeviceListLoading" @click="getDeviceList()"></el-button>
</div>
</div>
<devicePlayer ref="devicePlayer"></devicePlayer>
@ -51,7 +51,7 @@
<el-table-column label="操作" width="240" align="center" fixed="right">
<template slot-scope="scope">
<el-button size="mini" icon="el-icon-refresh" @click="refDevice(scope.row)">刷新通道</el-button>
<el-button size="mini" :ref="scope.row.deviceId + 'refbtn' " icon="el-icon-refresh" @click="refDevice(scope.row)">刷新通道</el-button>
<el-button size="mini" icon="el-icon-s-open" type="primary" @click="showChannelList(scope.row)">查看通道</el-button>
</template>
</el-table-column>
@ -90,7 +90,8 @@
winHeight: window.innerHeight - 200,
currentPage:1,
count:15,
total:0
total:0,
getDeviceListLoading: false
};
},
computed: {
@ -130,7 +131,7 @@
},
getDeviceList: function() {
let that = this;
this.getDeviceListLoading = true;
this.$axios.get(`/api/devices`,{
params: {
page: that.currentPage - 1,
@ -141,9 +142,11 @@
console.log(res);
that.total = res.data.total;
that.deviceList = res.data.data;
that.getDeviceListLoading = false;
})
.catch(function (error) {
console.log(error);
that.getDeviceListLoading = false;
});
},
@ -158,17 +161,30 @@
refDevice: function(itemData) {
///api/devices/{deviceId}/sync
console.log("刷新对应设备:" + itemData.deviceId);
var that = this;
that.$refs[itemData.deviceId + 'refbtn' ].loading = true;
this.$axios({
method: 'post',
url: '/api/devices/' + itemData.deviceId + '/sync'
}).then(function(res) {
// console.log(""+JSON.stringify(res));
console.log("刷新设备结果:"+JSON.stringify(res));
if (!res.data.deviceId) {
that.$message({
showClose: true,
message: res.data,
type: 'error'
});
}else{
that.$message({
showClose: true,
message: '请求成功',
type: 'success'
});
}
that.$refs[itemData.deviceId + 'refbtn' ].loading = false;
}).catch(function(e) {
that.$message({
showClose: true,
message: '请求成功',
type: 'success'
});
console.error(e)
that.$refs[itemData.deviceId + 'refbtn' ].loading = false;
});;
},
//