diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/deviceinfo/dto/DeviceInfoRequestDTO.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/deviceinfo/dto/DeviceInfoRequestDTO.java index ce60cae..c2368d2 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/deviceinfo/dto/DeviceInfoRequestDTO.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/deviceinfo/dto/DeviceInfoRequestDTO.java @@ -2,11 +2,19 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.requ import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; + @JacksonXmlRootElement(localName = "Query") @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class DeviceInfoRequestDTO { - private String cmdType; + @Builder.Default + private String cmdType = "DeviceInfo"; @JacksonXmlProperty(localName = "SN") private String sn; diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/keepalive/KeepaliveNotifyDTO.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/keepalive/KeepaliveNotifyDTO.java new file mode 100644 index 0000000..2f47b59 --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/keepalive/KeepaliveNotifyDTO.java @@ -0,0 +1,28 @@ +package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.keepalive; + +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@JacksonXmlRootElement(localName = "Notify") +@NoArgsConstructor +@AllArgsConstructor +@Builder +@Data +public class KeepaliveNotifyDTO { + @Builder.Default + private String cmdType = CmdType.KEEPALIVE; + + @JacksonXmlProperty(localName = "SN") + private String sn; + + @JacksonXmlProperty(localName = "DeviceID") + private String deviceId; + + @Builder.Default + private String status = "OK"; +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/request/SipRequestBuilder.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/request/SipRequestBuilder.java index 221028c..88b5068 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/request/SipRequestBuilder.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/request/SipRequestBuilder.java @@ -180,4 +180,42 @@ public class SipRequestBuilder implements ApplicationContextAware { request.setContent(content, contentTypeHeader); return request; } + + @SneakyThrows + public static Request createMessageRequest(MockingDevice device, String ip, int port,long cSeq,String content, String viaTag, String fromTag, CallIdHeader callIdHeader) { + Request request; + String target = StringUtils.joinWith(":", serverConfig.getIp(), serverConfig.getPort()); + // sip uri + SipURI requestURI = MessageHelper.createSipURI(serverConfig.getId(), target); + + // via + List viaHeaders = getViaHeaders(serverConfig.getIp(), serverConfig.getPort(), sipConfig.getTransport(), viaTag ); + + String from = StringUtils.joinWith(":", ip, port); + // from + SipURI fromSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), from); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(serverConfig.getId(), target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, null); + + // Forwards + MaxForwardsHeader maxForwards = MessageHelper.createMaxForwardsHeader(70); + // ceq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, Request.MESSAGE); + + // 使用 GB28181 默认编码 否则中文将会乱码 + MessageFactoryImpl messageFactory = (MessageFactoryImpl) getSipFactory().createMessageFactory(); + messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET); + request = messageFactory.createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, + toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtil.createUserAgentHeader()); + + ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); + request.setContent(content, contentTypeHeader); + return request; + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/keepalive/KeepaliveService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/keepalive/KeepaliveService.java new file mode 100644 index 0000000..7a4f3fd --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/keepalive/KeepaliveService.java @@ -0,0 +1,52 @@ +package cn.skcks.docking.gb28181.mocking.service.gb28181.keepalive; + +import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.keepalive.KeepaliveNotifyDTO; +import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; +import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.sip.header.CallIdHeader; +import java.util.concurrent.*; + +@Slf4j +@Service +@RequiredArgsConstructor +public class KeepaliveService { + private final SipSender sender; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + private final ConcurrentHashMap> map = new ConcurrentHashMap<>(); + + public void keepalive(MockingDevice mockingDevice){ + unKeepalive(mockingDevice); + scheduledExecutorService.scheduleWithFixedDelay(()->{ + KeepaliveNotifyDTO keepaliveNotifyDTO = KeepaliveNotifyDTO.builder() + .deviceId(mockingDevice.getGbDeviceId()) + .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) + .build(); + sender.sendRequest((provider, ip, port) -> { + CallIdHeader callIdHeader = provider.getNewCallId(); + return SipRequestBuilder.createMessageRequest(mockingDevice, + ip, + port, + 1, + XmlUtils.toXml(keepaliveNotifyDTO), + SipUtil.generateViaTag(), + SipUtil.generateFromTag(), + callIdHeader); + }); + },0,30, TimeUnit.SECONDS); + } + + public void unKeepalive(MockingDevice mockingDevice){ + ScheduledFuture scheduledFuture = map.get(mockingDevice.getDeviceCode()); + if(scheduledFuture != null){ + scheduledFuture.cancel(true); + } + } +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java index ecacd45..ddf9655 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java @@ -2,28 +2,21 @@ package cn.skcks.docking.gb28181.mocking.service.gb28181.register; import cn.skcks.docking.gb28181.common.json.JsonResponse; -import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; -import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; -import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.service.device.DeviceService; +import cn.skcks.docking.gb28181.mocking.service.gb28181.keepalive.KeepaliveService; import gov.nist.javax.sip.message.SIPResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.bouncycastle.cert.ocsp.Req; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.web.context.request.async.DeferredResult; -import javax.sip.SipProvider; import javax.sip.header.CallIdHeader; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; @@ -44,7 +37,7 @@ public class RegisterService { private final SipSubscribe subscribe; - private final MockingExecutor executor; + private final KeepaliveService keepaliveService; private static final int TIMEOUT = 60; @@ -73,6 +66,8 @@ public class RegisterService { @SneakyThrows public CompletableFuture> register(MockingDevice device) { + keepaliveService.unKeepalive(device); + CompletableFuture> result = new CompletableFuture<>(); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); sender.sendRequest((provider, ip, port) -> { @@ -126,6 +121,7 @@ public class RegisterService { if (statusCode == Response.OK) { log.info("设备: {}({}), 注册成功", device.getDeviceCode(), device.getGbDeviceId()); result.complete(JsonResponse.success(null)); + keepaliveService.keepalive(device); this.onComplete(); } }