This commit is contained in:
shikong 2023-09-13 15:38:11 +08:00
parent 69b554c679
commit ee9b8ce08c
5 changed files with 132 additions and 10 deletions

View File

@ -2,11 +2,19 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.requ
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Query")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceInfoRequestDTO {
private String cmdType;
@Builder.Default
private String cmdType = "DeviceInfo";
@JacksonXmlProperty(localName = "SN")
private String sn;

View File

@ -0,0 +1,28 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.keepalive;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Notify")
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
public class KeepaliveNotifyDTO {
@Builder.Default
private String cmdType = CmdType.KEEPALIVE;
@JacksonXmlProperty(localName = "SN")
private String sn;
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
@Builder.Default
private String status = "OK";
}

View File

@ -180,4 +180,42 @@ public class SipRequestBuilder implements ApplicationContextAware {
request.setContent(content, contentTypeHeader);
return request;
}
@SneakyThrows
public static Request createMessageRequest(MockingDevice device, String ip, int port,long cSeq,String content, String viaTag, String fromTag, CallIdHeader callIdHeader) {
Request request;
String target = StringUtils.joinWith(":", serverConfig.getIp(), serverConfig.getPort());
// sip uri
SipURI requestURI = MessageHelper.createSipURI(serverConfig.getId(), target);
// via
List<ViaHeader> viaHeaders = getViaHeaders(serverConfig.getIp(), serverConfig.getPort(), sipConfig.getTransport(), viaTag );
String from = StringUtils.joinWith(":", ip, port);
// from
SipURI fromSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), from);
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = MessageHelper.createSipURI(serverConfig.getId(), target);
Address toAddress = MessageHelper.createAddress(toSipURI);
ToHeader toHeader = MessageHelper.createToHeader(toAddress, null);
// Forwards
MaxForwardsHeader maxForwards = MessageHelper.createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, Request.MESSAGE);
// 使用 GB28181 默认编码 否则中文将会乱码
MessageFactoryImpl messageFactory = (MessageFactoryImpl) getSipFactory().createMessageFactory();
messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET);
request = messageFactory.createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtil.createUserAgentHeader());
ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
request.setContent(content, contentTypeHeader);
return request;
}
}

View File

@ -0,0 +1,52 @@
package cn.skcks.docking.gb28181.mocking.service.gb28181.keepalive;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.keepalive.KeepaliveNotifyDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.sip.header.CallIdHeader;
import java.util.concurrent.*;
@Slf4j
@Service
@RequiredArgsConstructor
public class KeepaliveService {
private final SipSender sender;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentHashMap<String, ScheduledFuture<?>> map = new ConcurrentHashMap<>();
public void keepalive(MockingDevice mockingDevice){
unKeepalive(mockingDevice);
scheduledExecutorService.scheduleWithFixedDelay(()->{
KeepaliveNotifyDTO keepaliveNotifyDTO = KeepaliveNotifyDTO.builder()
.deviceId(mockingDevice.getGbDeviceId())
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.build();
sender.sendRequest((provider, ip, port) -> {
CallIdHeader callIdHeader = provider.getNewCallId();
return SipRequestBuilder.createMessageRequest(mockingDevice,
ip,
port,
1,
XmlUtils.toXml(keepaliveNotifyDTO),
SipUtil.generateViaTag(),
SipUtil.generateFromTag(),
callIdHeader);
});
},0,30, TimeUnit.SECONDS);
}
public void unKeepalive(MockingDevice mockingDevice){
ScheduledFuture<?> scheduledFuture = map.get(mockingDevice.getDeviceCode());
if(scheduledFuture != null){
scheduledFuture.cancel(true);
}
}
}

View File

@ -2,28 +2,21 @@ package cn.skcks.docking.gb28181.mocking.service.gb28181.register;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import cn.skcks.docking.gb28181.mocking.service.device.DeviceService;
import cn.skcks.docking.gb28181.mocking.service.gb28181.keepalive.KeepaliveService;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.cert.ocsp.Req;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.SipProvider;
import javax.sip.header.CallIdHeader;
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Request;
@ -44,7 +37,7 @@ public class RegisterService {
private final SipSubscribe subscribe;
private final MockingExecutor executor;
private final KeepaliveService keepaliveService;
private static final int TIMEOUT = 60;
@ -73,6 +66,8 @@ public class RegisterService {
@SneakyThrows
public CompletableFuture<JsonResponse<Boolean>> register(MockingDevice device) {
keepaliveService.unKeepalive(device);
CompletableFuture<JsonResponse<Boolean>> result = new CompletableFuture<>();
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
sender.sendRequest((provider, ip, port) -> {
@ -126,6 +121,7 @@ public class RegisterService {
if (statusCode == Response.OK) {
log.info("设备: {}({}), 注册成功", device.getDeviceCode(), device.getGbDeviceId());
result.complete(JsonResponse.success(null));
keepaliveService.keepalive(device);
this.onComplete();
}
}