优化设备注册,支持到期续订,优化国标级联到期续订。

This commit is contained in:
648540858 2023-03-28 14:09:41 +08:00
parent 58d1f0ea16
commit 91bfbc36f1
19 changed files with 205 additions and 90 deletions

View File

@ -11,7 +11,7 @@
<groupId>com.genersoft</groupId> <groupId>com.genersoft</groupId>
<artifactId>wvp-pro</artifactId> <artifactId>wvp-pro</artifactId>
<version>2.6.7</version> <version>2.6.8</version>
<name>web video platform</name> <name>web video platform</name>
<description>国标28181视频平台</description> <description>国标28181视频平台</description>
<packaging>${project.packaging}</packaging> <packaging>${project.packaging}</packaging>

View File

@ -40,17 +40,20 @@ public class SipPlatformRunner implements CommandLineRunner {
List<ParentPlatform> parentPlatforms = storager.queryEnableParentPlatformList(true); List<ParentPlatform> parentPlatforms = storager.queryEnableParentPlatformList(true);
for (ParentPlatform parentPlatform : parentPlatforms) { for (ParentPlatform parentPlatform : parentPlatforms) {
ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
// 更新缓存 // 更新缓存
ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch(); ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setParentPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId()); parentPlatformCatch.setId(parentPlatform.getServerGBId());
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
// 设置所有平台离线
platformService.offline(parentPlatform, true);
// 取消订阅 // 取消订阅
sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ sipCommanderForPlatform.unregister(parentPlatform, parentPlatformCatchOld.getSipTransactionInfo(), null, (eventResult)->{
platformService.login(parentPlatform); platformService.login(parentPlatform);
}); });
// 设置所有平台离线
platformService.offline(parentPlatform, true);
} }
} }
} }

View File

@ -191,6 +191,9 @@ public class Device {
@Schema(description = "是否作为消息通道") @Schema(description = "是否作为消息通道")
private boolean asMessageChannel; private boolean asMessageChannel;
@Schema(description = "设备注册的事务信息")
private SipTransactionInfo sipTransactionInfo;
public String getDeviceId() { public String getDeviceId() {
return deviceId; return deviceId;
@ -439,4 +442,12 @@ public class Device {
public void setAsMessageChannel(boolean asMessageChannel) { public void setAsMessageChannel(boolean asMessageChannel) {
this.asMessageChannel = asMessageChannel; this.asMessageChannel = asMessageChannel;
} }
public SipTransactionInfo getSipTransactionInfo() {
return sipTransactionInfo;
}
public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) {
this.sipTransactionInfo = sipTransactionInfo;
}
} }

View File

@ -16,6 +16,8 @@ public class ParentPlatformCatch {
private ParentPlatform parentPlatform; private ParentPlatform parentPlatform;
private SipTransactionInfo sipTransactionInfo;
public String getId() { public String getId() {
return id; return id;
} }
@ -55,4 +57,12 @@ public class ParentPlatformCatch {
public void setCallId(String callId) { public void setCallId(String callId) {
this.callId = callId; this.callId = callId;
} }
public SipTransactionInfo getSipTransactionInfo() {
return sipTransactionInfo;
}
public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) {
this.sipTransactionInfo = sipTransactionInfo;
}
} }

View File

@ -63,7 +63,7 @@ public class SipRunner implements CommandLineRunner {
if (deviceService.expire(device)){ if (deviceService.expire(device)){
deviceService.offline(device.getDeviceId(), "注册已过期"); deviceService.offline(device.getDeviceId(), "注册已过期");
}else { }else {
deviceService.online(device); deviceService.online(device, null);
} }
} }
// 重置cseq计数 // 重置cseq计数

View File

@ -18,14 +18,16 @@ public interface ISIPCommanderForPlatform {
* @return * @return
*/ */
void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException;
void register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException;
void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException;
void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException;
/** /**
* 向上级平台注销 * 向上级平台注销
* @param parentPlatform * @param parentPlatform
* @return * @return
*/ */
void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException; void unregister(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException;
/** /**

View File

@ -14,7 +14,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils; import org.springframework.util.DigestUtils;
import javax.sip.*; import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.address.Address; import javax.sip.address.Address;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.*; import javax.sip.header.*;
@ -22,7 +23,6 @@ import javax.sip.message.Request;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
/** /**
@ -45,7 +45,7 @@ public class SIPRequestHeaderPlarformProvider {
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException { public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, long CSeq, String fromTag, String toTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null; Request request = null;
String sipAddress = parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort(); String sipAddress = parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort();
//请求行 //请求行
@ -53,7 +53,8 @@ public class SIPRequestHeaderPlarformProvider {
parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort());
//via //via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getServerIP(), parentPlatform.getServerPort(), parentPlatform.getTransport(), viaTag); ViaHeader viaHeader = sipLayer.getSipFactory().createHeaderFactory().createViaHeader(parentPlatform.getServerIP(),
parentPlatform.getServerPort(), parentPlatform.getTransport(), SipUtils.getNewViaTag());
viaHeader.setRPort(); viaHeader.setRPort();
viaHeaders.add(viaHeader); viaHeaders.add(viaHeader);
//from //from
@ -63,7 +64,7 @@ public class SIPRequestHeaderPlarformProvider {
//to //to
SipURI toSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), sipConfig.getDomain()); SipURI toSipURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), sipConfig.getDomain());
Address toAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(toSipURI); Address toAddress = sipLayer.getSipFactory().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,null); ToHeader toHeader = sipLayer.getSipFactory().createHeaderFactory().createToHeader(toAddress,toTag);
//Forwards //Forwards
MaxForwardsHeader maxForwards = sipLayer.getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = sipLayer.getSipFactory().createHeaderFactory().createMaxForwardsHeader(70);
@ -85,11 +86,11 @@ public class SIPRequestHeaderPlarformProvider {
return request; return request;
} }
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String viaTag, public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String toTag,
String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException { WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException {
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader, isRegister); Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, toTag, callIdHeader, isRegister);
SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); SipURI requestURI = sipLayer.getSipFactory().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort());
if (www == null) { if (www == null) {
AuthorizationHeader authorizationHeader = sipLayer.getSipFactory().createHeaderFactory().createAuthorizationHeader("Digest"); AuthorizationHeader authorizationHeader = sipLayer.getSipFactory().createHeaderFactory().createAuthorizationHeader("Digest");
@ -107,8 +108,6 @@ public class SIPRequestHeaderPlarformProvider {
// qop 保护质量 包含auth默认的和auth-int增加了报文完整性检测两种策略 // qop 保护质量 包含auth默认的和auth-int增加了报文完整性检测两种策略
String qop = www.getQop(); String qop = www.getQop();
callIdHeader.setCallId(callId);
String cNonce = null; String cNonce = null;
String nc = "00000001"; String nc = "00000001";
if (qop != null) { if (qop != null) {

View File

@ -75,20 +75,40 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
@Override @Override
public void unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { public void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException {
register(parentPlatform, null, null, errorEvent, okEvent, false, false);
register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, true);
} }
@Override @Override
public void register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, public void unregister(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException {
register(parentPlatform, sipTransactionInfo, null, errorEvent, okEvent, false, false);
}
@Override
public void register(ParentPlatform parentPlatform, @Nullable SipTransactionInfo sipTransactionInfo, @Nullable WWWAuthenticateHeader www,
SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException { SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) throws SipException, InvalidArgumentException, ParseException {
Request request; Request request;
if (!registerAgain ) {
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
String fromTag = SipUtils.getNewFromTag();
String toTag = null;
if (sipTransactionInfo != null ) {
if (sipTransactionInfo.getCallId() != null) {
callIdHeader.setCallId(sipTransactionInfo.getCallId());
}
if (sipTransactionInfo.getFromTag() != null) {
fromTag = sipTransactionInfo.getFromTag();
}
if (sipTransactionInfo.getToTag() != null) {
toTag = sipTransactionInfo.getToTag();
}
}
if (!registerAgain ) {
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform,
redisCatchStorage.getCSEQ(), SipUtils.getNewFromTag(), redisCatchStorage.getCSEQ(), fromTag,
SipUtils.getNewViaTag(), callIdHeader, isRegister); toTag, callIdHeader, isRegister);
// callid 写入缓存 等注册成功可以更新状态 // callid 写入缓存 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId(); String callIdFromHeader = callIdHeader.getCallId();
redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister)); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister));
@ -106,8 +126,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}); });
}else { }else {
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister);
request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, SipUtils.getNewFromTag(), null, callId, www, callIdHeader, isRegister);
} }
sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent); sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent);

View File

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper; import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo; import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate; import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@ -18,6 +19,7 @@ import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.SIPDateHeader; import gov.nist.javax.sip.header.SIPDateHeader;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -31,6 +33,7 @@ import javax.sip.header.AuthorizationHeader;
import javax.sip.header.ContactHeader; import javax.sip.header.ContactHeader;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import javax.sip.header.ViaHeader; import javax.sip.header.ViaHeader;
import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.text.ParseException; import java.text.ParseException;
@ -102,6 +105,30 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
SipUri uri = (SipUri) address.getURI(); SipUri uri = (SipUri) address.getURI();
String deviceId = uri.getUser(); String deviceId = uri.getUser();
Device device = deviceService.getDevice(deviceId); Device device = deviceService.getDevice(deviceId);
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request,
userSetting.getSipUseSourceIpAsRemoteAddress());
if (device != null &&
device.getSipTransactionInfo() != null &&
request.getCallIdHeader().getCallId().equals(device.getSipTransactionInfo().getCallId())) {
logger.info("[注册请求] 注册续订: {}", device.getDeviceId());
device.setExpires(request.getExpires().getExpires());
device.setIp(remoteAddressInfo.getIp());
device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort())));
device.setLocalIp(request.getLocalAddress().getHostAddress());
Response registerOkResponse = getRegisterOkResponse(request);
// 判断TCP还是UDP
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
String transport = reqViaHeader.getTransport();
device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP");
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse);
device.setRegisterTime(DateUtil.getNow());
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse)registerOkResponse);
deviceService.online(device, sipTransactionInfo);
return;
}
String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword(); String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword();
AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
if (authHead == null && !ObjectUtils.isEmpty(password)) { if (authHead == null && !ObjectUtils.isEmpty(password)) {
@ -144,9 +171,6 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 添加Expires头 // 添加Expires头
response.addHeader(request.getExpires()); response.addHeader(request.getExpires());
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request,
userSetting.getSipUseSourceIpAsRemoteAddress());
if (device == null) { if (device == null) {
device = new Device(); device = new Device();
device.setStreamMode("UDP"); device.setStreamMode("UDP");
@ -179,7 +203,8 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
if (registerFlag) { if (registerFlag) {
logger.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress); logger.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress);
device.setRegisterTime(DateUtil.getNow()); device.setRegisterTime(DateUtil.getNow());
deviceService.online(device); SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse)response);
deviceService.online(device, sipTransactionInfo);
} else { } else {
logger.info("[注销成功] deviceId: {}->{}" ,deviceId, requestAddress); logger.info("[注销成功] deviceId: {}->{}" ,deviceId, requestAddress);
deviceService.offline(deviceId, "主动注销"); deviceService.offline(deviceId, "主动注销");
@ -188,4 +213,23 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
logger.error("未处理的异常 ", e); logger.error("未处理的异常 ", e);
} }
} }
private Response getRegisterOkResponse(Request request) throws ParseException {
// 携带授权头并且密码正确
Response response = getMessageFactory().createResponse(Response.OK, request);
// 添加date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
dateHeader.setDate(wvpSipDate);
response.addHeader(dateHeader);
// 添加Contact头
response.addHeader(request.getHeader(ContactHeader.NAME));
// 添加Expires头
response.addHeader(request.getExpires());
return response;
}
} }

View File

@ -73,35 +73,38 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
String channelId = getText(rootElement, "DeviceID"); String channelId = getText(rootElement, "DeviceID");
// 远程启动功能 // 远程启动功能
if (!ObjectUtils.isEmpty(getText(rootElement, "TeleBoot"))) { if (!ObjectUtils.isEmpty(getText(rootElement, "TeleBoot"))) {
if (parentPlatform.getServerGBId().equals(targetGBId)) { // TODO 拒绝远程启动命令
// 远程启动本平台需要在重新启动程序后先对SipStack解绑 logger.warn("[国标级联]收到平台的远程启动命令, 不处理");
logger.info("执行远程启动本平台命令");
try { // if (parentPlatform.getServerGBId().equals(targetGBId)) {
cmderFroPlatform.unregister(parentPlatform, null, null); // // 远程启动本平台需要在重新启动程序后先对SipStack解绑
} catch (InvalidArgumentException | ParseException | SipException e) { // logger.info("执行远程启动本平台命令");
logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); // try {
} // cmderFroPlatform.unregister(parentPlatform, null, null);
taskExecutor.execute(() -> { // } catch (InvalidArgumentException | ParseException | SipException e) {
// 远程启动 // logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
// try { // }
// Thread.sleep(3000); // taskExecutor.execute(() -> {
// SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider"); // // 远程启动
// SipStackImpl stack = (SipStackImpl)up.getSipStack(); //// try {
// stack.stop(); //// Thread.sleep(3000);
// Iterator listener = stack.getListeningPoints(); //// SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider");
// while (listener.hasNext()) { //// SipStackImpl stack = (SipStackImpl)up.getSipStack();
// stack.deleteListeningPoint((ListeningPoint) listener.next()); //// stack.stop();
// } //// Iterator listener = stack.getListeningPoints();
// Iterator providers = stack.getSipProviders(); //// while (listener.hasNext()) {
// while (providers.hasNext()) { //// stack.deleteListeningPoint((ListeningPoint) listener.next());
// stack.deleteSipProvider((SipProvider) providers.next()); //// }
// } //// Iterator providers = stack.getSipProviders();
// VManageBootstrap.restart(); //// while (providers.hasNext()) {
// } catch (InterruptedException | ObjectInUseException e) { //// stack.deleteSipProvider((SipProvider) providers.next());
// logger.error("[任务执行失败] 服务重启: {}", e.getMessage()); //// }
// } //// VManageBootstrap.restart();
}); //// } catch (InterruptedException | ObjectInUseException e) {
} //// logger.error("[任务执行失败] 服务重启: {}", e.getMessage());
//// }
// });
// }
} }
DeviceControlType deviceControlType = DeviceControlType.typeOf(rootElement); DeviceControlType deviceControlType = DeviceControlType.typeOf(rootElement);
logger.info("[接受deviceControl命令] 命令: {}", deviceControlType); logger.info("[接受deviceControl命令] 命令: {}", deviceControlType);

View File

@ -88,7 +88,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
// 对于已经离线的设备判断他的注册是否已经过期 // 对于已经离线的设备判断他的注册是否已经过期
if (!deviceService.expire(device)){ if (!deviceService.expire(device)){
device.setOnline(0); device.setOnline(0);
deviceService.online(device); deviceService.online(device, null);
} }
} }
// 刷新过期任务 // 刷新过期任务

View File

@ -71,7 +71,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
} }
String text = onlineElement.getText(); String text = onlineElement.getText();
if ("ONLINE".equalsIgnoreCase(text.trim())) { if ("ONLINE".equalsIgnoreCase(text.trim())) {
deviceService.online(device); deviceService.online(device, null);
}else { }else {
deviceService.offline(device.getDeviceId(), "设备状态查询结果:" + text.trim()); deviceService.offline(device.getDeviceId(), "设备状态查询结果:" + text.trim());
} }

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -10,6 +11,7 @@ import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import gov.nist.javax.sip.message.SIPResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -18,7 +20,6 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent; import javax.sip.ResponseEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.header.WWWAuthenticateHeader; import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
@ -65,9 +66,8 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
*/ */
@Override @Override
public void process(ResponseEvent evt) { public void process(ResponseEvent evt) {
Response response = evt.getResponse(); SIPResponse response = (SIPResponse)evt.getResponse();
CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME); String callId = response.getCallIdHeader().getCallId();
String callId = callIdHeader.getCallId();
PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId); PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId);
if (platformRegisterInfo == null) { if (platformRegisterInfo == null) {
logger.info(String.format("[国标级联]未找到callId %s 的注册/注销平台id", callId )); logger.info(String.format("[国标级联]未找到callId %s 的注册/注销平台id", callId ));
@ -90,15 +90,17 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
if (response.getStatusCode() == Response.UNAUTHORIZED) { if (response.getStatusCode() == Response.UNAUTHORIZED) {
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
try { try {
sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true, platformRegisterInfo.isRegister()); sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, true, platformRegisterInfo.isRegister());
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage()); logger.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage());
} }
}else if (response.getStatusCode() == Response.OK){ }else if (response.getStatusCode() == Response.OK){
if (platformRegisterInfo.isRegister()) { if (platformRegisterInfo.isRegister()) {
platformService.online(parentPlatform); SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response);
platformService.online(parentPlatform, sipTransactionInfo);
}else { }else {
platformService.offline(parentPlatform, false); platformService.offline(parentPlatform, false);
} }

View File

@ -116,7 +116,7 @@ public class ZLMHttpHookListener {
@PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) {
logger.info("[ZLM HOOK] 收到zlm心跳" + param.getMediaServerId()); // logger.info("[ZLM HOOK] 收到zlm心跳" + param.getMediaServerId());
taskExecutor.execute(() -> { taskExecutor.execute(() -> {
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.vmanager.bean.BaseTree; import com.genersoft.iot.vmp.vmanager.bean.BaseTree;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
@ -18,7 +19,7 @@ public interface IDeviceService {
* 设备上线 * 设备上线
* @param device 设备信息 * @param device 设备信息
*/ */
void online(Device device); void online(Device device, SipTransactionInfo sipTransactionInfo);
/** /**
* 设备下线 * 设备下线

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service; package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
/** /**
@ -35,7 +36,7 @@ public interface IPlatformService {
* 平台上线 * 平台上线
* @param parentPlatform 平台信息 * @param parentPlatform 平台信息
*/ */
void online(ParentPlatform parentPlatform); void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo);
/** /**
* 平台离线 * 平台离线

View File

@ -89,7 +89,7 @@ public class DeviceServiceImpl implements IDeviceService {
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Override @Override
public void online(Device device) { public void online(Device device, SipTransactionInfo sipTransactionInfo) {
logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId()); Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId()); Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
@ -104,6 +104,14 @@ public class DeviceServiceImpl implements IDeviceService {
// 默认心跳间隔60 // 默认心跳间隔60
device.setKeepaliveIntervalTime(60); device.setKeepaliveIntervalTime(60);
} }
if (sipTransactionInfo != null) {
device.setSipTransactionInfo(sipTransactionInfo);
}else {
if (deviceInRedis != null) {
device.setSipTransactionInfo(deviceInRedis.getSipTransactionInfo());
}
}
// 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询 // 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询
if (device.getCreateTime() == null) { if (device.getCreateTime() == null) {
device.setOnline(1); device.setOnline(1);

View File

@ -123,8 +123,10 @@ public class PlatformServiceImpl implements IPlatformService {
@Override @Override
public boolean update(ParentPlatform parentPlatform) { public boolean update(ParentPlatform parentPlatform) {
logger.info("[国标级联]更新平台 {}", parentPlatform.getDeviceGBId());
parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase()); parentPlatform.setCharacterSet(parentPlatform.getCharacterSet().toUpperCase());
ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId()); ParentPlatform parentPlatformOld = platformMapper.getParentPlatById(parentPlatform.getId());
ParentPlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatformOld.getServerGBId());
parentPlatform.setUpdateTime(DateUtil.getNow()); parentPlatform.setUpdateTime(DateUtil.getNow());
if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) { if (!parentPlatformOld.getTreeType().equals(parentPlatform.getTreeType())) {
// 目录结构发生变化清空之前的关联关系 // 目录结构发生变化清空之前的关联关系
@ -134,6 +136,7 @@ public class PlatformServiceImpl implements IPlatformService {
platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId()); platformGbStreamMapper.delByPlatformId(parentPlatformOld.getServerGBId());
} }
// 停止心跳定时 // 停止心跳定时
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId(); final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatformOld.getServerGBId();
dynamicTask.stop(keepaliveTaskKey); dynamicTask.stop(keepaliveTaskKey);
@ -142,9 +145,13 @@ public class PlatformServiceImpl implements IPlatformService {
dynamicTask.stop(registerTaskKey); dynamicTask.stop(registerTaskKey);
// 注销旧的 // 注销旧的
try { try {
commanderForPlatform.unregister(parentPlatformOld, null, eventResult -> { if (parentPlatformOld.isStatus()) {
logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId()); logger.info("保存平台{}时发现救平台在线,发送注销命令", parentPlatform.getDeviceGBId());
}); commanderForPlatform.unregister(parentPlatformOld, parentPlatformCatchOld.getSipTransactionInfo(), null, eventResult -> {
logger.info("[国标级联] 注销成功, 平台:{}", parentPlatformOld.getServerGBId());
});
}
} catch (InvalidArgumentException | ParseException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); logger.error("[命令发送失败] 国标级联 注销: {}", e.getMessage());
} }
@ -185,36 +192,36 @@ public class PlatformServiceImpl implements IPlatformService {
@Override @Override
public void online(ParentPlatform parentPlatform) { public void online(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo) {
logger.info("[国标级联]{}, 平台上线/更新注册", parentPlatform.getServerGBId()); logger.info("[国标级联]{}, 平台上线", parentPlatform.getServerGBId());
platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true); platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true);
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
if (parentPlatformCatch != null) { if (parentPlatformCatch == null) {
parentPlatformCatch.getParentPlatform().setStatus(true);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
}else {
parentPlatformCatch = new ParentPlatformCatch(); parentPlatformCatch = new ParentPlatformCatch();
parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setParentPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId()); parentPlatformCatch.setId(parentPlatform.getServerGBId());
parentPlatform.setStatus(true); parentPlatform.setStatus(true);
parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
} }
parentPlatformCatch.getParentPlatform().setStatus(true);
parentPlatformCatch.setSipTransactionInfo(sipTransactionInfo);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.isAlive(registerTaskKey)) { if (!dynamicTask.isAlive(registerTaskKey)) {
logger.info("[国标级联]{}, 添加定时注册任务", parentPlatform.getServerGBId());
// 添加注册任务 // 添加注册任务
dynamicTask.startCron(registerTaskKey, dynamicTask.startCron(registerTaskKey,
// 注册失败注册成功时由程序直接调用了online方法 // 注册失败注册成功时由程序直接调用了online方法
()-> { ()-> registerTask(parentPlatform, sipTransactionInfo),
registerTask(parentPlatform); parentPlatform.getExpires() * 1000);
},
(parentPlatform.getExpires()) *1000);
} }
final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId();
if (!dynamicTask.contains(keepaliveTaskKey)) { if (!dynamicTask.contains(keepaliveTaskKey)) {
logger.info("[国标级联]{}, 添加定时心跳任务", parentPlatform.getServerGBId());
// 添加心跳任务 // 添加心跳任务
dynamicTask.startCron(keepaliveTaskKey, dynamicTask.startCron(keepaliveTaskKey,
()-> { ()-> {
@ -259,7 +266,7 @@ public class PlatformServiceImpl implements IPlatformService {
} }
} }
private void registerTask(ParentPlatform parentPlatform){ private void registerTask(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo){
try { try {
// 设置超时重发 后续从底层支持消息重发 // 设置超时重发 后续从底层支持消息重发
String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout"; String key = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId() + "_timeout";
@ -267,10 +274,10 @@ public class PlatformServiceImpl implements IPlatformService {
return; return;
} }
dynamicTask.startDelay(key, ()->{ dynamicTask.startDelay(key, ()->{
registerTask(parentPlatform); registerTask(parentPlatform, sipTransactionInfo);
}, 1000); }, 1000);
logger.info("[国标级联] 平台:{}注册即将到期,重新注册", parentPlatform.getServerGBId()); logger.info("[国标级联] 平台:{}注册即将到期,开始续订", parentPlatform.getServerGBId());
commanderForPlatform.register(parentPlatform, eventResult -> { commanderForPlatform.register(parentPlatform, sipTransactionInfo, eventResult -> {
dynamicTask.stop(key); dynamicTask.stop(key);
offline(parentPlatform, false); offline(parentPlatform, false);
},eventResult -> { },eventResult -> {

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -229,12 +230,16 @@ public class PlatformController {
throw new ControllerException(ErrorCode.ERROR400); throw new ControllerException(ErrorCode.ERROR400);
} }
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(serverGBId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(serverGBId);
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(serverGBId);
if (parentPlatform == null) { if (parentPlatform == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "平台不存在"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "平台不存在");
} }
if (parentPlatformCatch == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "平台不存在");
}
// 发送离线消息,无论是否成功都删除缓存 // 发送离线消息,无论是否成功都删除缓存
try { try {
commanderForPlatform.unregister(parentPlatform, (event -> { commanderForPlatform.unregister(parentPlatform, parentPlatformCatch.getSipTransactionInfo(), (event -> {
// 清空redis缓存 // 清空redis缓存
redisCatchStorage.delPlatformCatchInfo(parentPlatform.getServerGBId()); redisCatchStorage.delPlatformCatchInfo(parentPlatform.getServerGBId());
redisCatchStorage.delPlatformKeepalive(parentPlatform.getServerGBId()); redisCatchStorage.delPlatformKeepalive(parentPlatform.getServerGBId());