diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/notify/MediaStatusRequestDTO.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/notify/MediaStatusRequestDTO.java new file mode 100644 index 0000000..21c80de --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/notify/MediaStatusRequestDTO.java @@ -0,0 +1,30 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.message.notify; + +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@JacksonXmlRootElement(localName = "Notify") +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Data +public class MediaStatusRequestDTO { + @Builder.Default + private String cmdType = CmdType.MEDIA_STATUS; + @JacksonXmlProperty(localName = "SN") + private String sn; + + /** + * 目标设备的设备编码(必选) + */ + @JacksonXmlProperty(localName = "DeviceID") + private String deviceId; + + @JacksonXmlProperty(localName = "NotifyType") + private String notifyType = "121"; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java index 2956ff5..2e14d56 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java @@ -12,6 +12,10 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; +import cn.skcks.docking.gb28181.wvp.sip.message.message.notify.MediaStatusRequestDTO; +import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder; +import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; @@ -23,11 +27,15 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.sip.RequestEvent; +import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.util.EventObject; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; @Slf4j @RequiredArgsConstructor @@ -35,9 +43,12 @@ import java.util.Optional; public class MessageRequestProcessor implements MessageProcessor { private final SipListener sipListener; private final SipMessageSender sender; + private final SipSender sipSender; private final SipSubscribe subscribe; private final DockingService dockingService; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + @PostConstruct @Override public void init() { @@ -77,7 +88,26 @@ public class MessageRequestProcessor implements MessageProcessor { Optional.ofNullable(subscribe.getCatalogSubscribe().getPublisher(key)) .ifPresentOrElse(publisher-> publisher.submit(request), ()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto)); - } else { + } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.MEDIA_STATUS)){ + response = ok; + + MediaStatusRequestDTO parse = XmlUtils.parse(content, MediaStatusRequestDTO.class, GB28181Constant.CHARSET); + if(StringUtils.equalsIgnoreCase(parse.getNotifyType(),"121")){ + String ip = request.getLocalAddress().getHostAddress(); + String transport = request.getTopmostViaHeader().getTransport(); + scheduledExecutorService.schedule(()->{ + sipSender.sendResponse(ip, transport, ((provider, ip1, port) -> + SipResponseBuilder.response(request, Response.OK, "OK"))); + + String targetIp = request.getRemoteAddress().getHostAddress(); + int targetPort = request.getRemotePort(); + String targetId = ((SipURI)request.getFromHeader().getAddress().getURI()).getUser(); + String callId = request.getCallIdHeader().getCallId(); + Request byeRequest = SipRequestBuilder.createByeRequest(targetIp, targetPort, targetId, SipUtil.generateFromTag(), null, callId); + sipSender.sendRequest(((provider, ip1, port) -> byeRequest)); + },100, TimeUnit.MILLISECONDS); + } + }else { response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage()); } sender.send(senderIp, response); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java index 57662cc..13dc19f 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java @@ -235,6 +235,42 @@ public class SipRequestBuilder implements ApplicationContextAware { return request; } + @SneakyThrows + public static Request createByeRequest(String ip, int port, long cSeq, String targetId, String fromTag, String toTag, String callId) { + Request request; + // 请求行 + String target = StringUtils.joinWith(":", ip, port); + SipURI requestLine = MessageHelper.createSipURI(targetId, target); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, sipConfig.getTransport(), SipUtil.generateViaTag()); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain()); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(targetId, target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, toTag); + + // Forwards + MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70); + + // ceq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, Request.BYE); + CallIdHeader callIdHeader = getSipFactory().createHeaderFactory().createCallIdHeader(callId); + request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.BYE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtil.createUserAgentHeader()); + + Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port)); + request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress)); + request.addHeader(SipUtil.createUserAgentHeader()); + + return request; + } + @SneakyThrows public static Request createByeRequest(String ip, int port, String targetId, String fromTag, String toTag, String callId) { Request request;