diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java index 1a6fd90..90af384 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java @@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request; import cn.skcks.docking.gb28181.common.json.ResponseStatus; import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.config.sip.SipConfig; import cn.skcks.docking.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; @@ -12,9 +13,12 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; +import cn.skcks.docking.gb28181.service.notify.MediaStatusService; import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO; import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO; +import cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO; +import cn.skcks.docking.gb28181.sip.method.invite.request.InviteRequestBuilder; import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; @@ -22,9 +26,11 @@ import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.sip.RequestEvent; +import javax.sip.address.SipURI; import javax.sip.message.Response; import java.util.EventObject; import java.util.Optional; @@ -37,6 +43,9 @@ public class MessageRequestProcessor implements MessageProcessor { private final DockingDeviceService deviceService; private final SipMessageSender sender; private final SipSubscribe subscribe; + private final SipConfig sipConfig; + + private final MediaStatusService mediaStatusService; @PostConstruct @Override @@ -87,6 +96,12 @@ public class MessageRequestProcessor implements MessageProcessor { response = ok; } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.MEDIA_STATUS)){ response = ok; + sender.send(senderIp, response); + MediaStatusRequestDTO mediaStatusRequestDTO = MANSCDPUtils.parse(content, MediaStatusRequestDTO.class); + if(StringUtils.equalsIgnoreCase(mediaStatusRequestDTO.getNotifyType(),"121")){ + mediaStatusService.process(request, mediaStatusRequestDTO); + return; + } } else { response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage()); } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/notify/MediaStatusService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/notify/MediaStatusService.java new file mode 100644 index 0000000..8ce7655 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/notify/MediaStatusService.java @@ -0,0 +1,66 @@ +package cn.skcks.docking.gb28181.service.notify; + +import cn.hutool.core.collection.CollectionUtil; +import cn.skcks.docking.gb28181.common.json.JsonUtils; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.config.sip.SipConfig; +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder; +import cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify.MediaStatusRequestDTO; +import cn.skcks.docking.gb28181.sip.method.invite.request.InviteRequestBuilder; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import javax.sip.address.SipURI; +import javax.sip.message.Request; +import java.util.Set; + +@RequiredArgsConstructor +@Slf4j +@Service +public class MediaStatusService { + private final SipConfig sipConfig; + private final SipMessageSender sender; + private final SipSubscribe subscribe; + public void process(SIPRequest request, MediaStatusRequestDTO dto){ + String senderIp = request.getLocalAddress().getHostAddress(); + String deviceId = ((SipURI)request.getFromHeader().getAddress().getURI()).getUser(); + if(StringUtils.equalsIgnoreCase(dto.getNotifyType(),"121")){ + InviteRequestBuilder inviteRequestBuilder = InviteRequestBuilder.builder() + .localIp(request.getLocalAddress().getHostAddress()) + .localPort(sipConfig.getPort()) + .localId(((SipURI)request.getToHeader().getAddress().getURI()).getUser()) + .targetIp(request.getRemoteAddress().getHostAddress()) + .targetPort(request.getRemotePort()) + .targetId(((SipURI)request.getFromHeader().getAddress().getURI()).getUser()) + .transport(request.getTopmostViaHeader().getTransport()) + .build(); + + String keyPattern = CacheUtil.getKey(GB28181SDPBuilder.Action.PLAY_BACK.getAction(), deviceId,"*"); + Set keys = RedisUtil.KeyOps.keys(keyPattern); + if (CollectionUtil.isEmpty(keys)){ + // 实在找不到就原样发回去 ╮(╯▽╰)╭ + sender.send(senderIp, inviteRequestBuilder.createByeRequest(request.getCallId().getCallId(), request.getCSeq().getSeqNumber() + 1)); + } else { + keys.forEach(key -> { + String json = RedisUtil.StringOps.get(key); + if(StringUtils.isNotBlank(json)){ + log.debug("{} {}",key,json); + SipTransactionInfo transactionInfo = JsonUtils.parse(json, SipTransactionInfo.class); + String callId = transactionInfo.getCallId(); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.BYE, callId); + log.debug("{} {}",callId,subscribeKey); + subscribe.getSipRequestSubscribe().getPublisher(subscribeKey).submit(request); + } + }); + } + } + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java index 09bc0bf..c0295d7 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -47,6 +47,7 @@ import javax.sip.message.Request; import javax.sip.message.Response; import java.text.MessageFormat; import java.util.Date; +import java.util.Objects; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; @@ -272,6 +273,7 @@ public class PlayService { Request request = inviteRequestBuilder.createPlaybackInviteRequest(callId, SipRequestBuilder.getCSeq(),channelId,ip,port,ssrc,MediaStreamMode.of(device.getStreamMode()),startTime,endTime); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callIdHeader.getCallId()); subscribe.getSipResponseSubscribe().addPublisher(subscribeKey); + Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -292,7 +294,7 @@ public class PlayService { } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey); RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); - RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND), TimeUnit.SECONDS); + RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND) + 30, TimeUnit.SECONDS); result.setResult(JsonResponse.success(videoUrl(streamId))); onComplete(); } else { @@ -353,7 +355,7 @@ public class PlayService { @SneakyThrows @Override public void onComplete() { - if(request != null){ + if(Objects.equals(request.getMethod(), Request.BYE)){ Response byeResponse = InviteResponseBuilder.builder().build().createByeResponse(request, SipUtil.nanoId()); provider.sendResponse(byeResponse); } else { diff --git a/gb28181-sip/src/main/java/cn/skcks/docking/gb28181/sip/manscdp/mediastatus/notify/MediaStatusRequestDTO.java b/gb28181-sip/src/main/java/cn/skcks/docking/gb28181/sip/manscdp/mediastatus/notify/MediaStatusRequestDTO.java new file mode 100644 index 0000000..dbdf687 --- /dev/null +++ b/gb28181-sip/src/main/java/cn/skcks/docking/gb28181/sip/manscdp/mediastatus/notify/MediaStatusRequestDTO.java @@ -0,0 +1,30 @@ +package cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify; + +import cn.skcks.docking.gb28181.constant.CmdType; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@JacksonXmlRootElement(localName = "Notify") +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Data +public class MediaStatusRequestDTO { + @Builder.Default + private String cmdType = CmdType.MEDIA_STATUS; + @JacksonXmlProperty(localName = "SN") + private String sn; + + /** + * 目标设备的设备编码(必选) + */ + @JacksonXmlProperty(localName = "DeviceID") + private String deviceId; + + @JacksonXmlProperty(localName = "NotifyType") + private String notifyType = "121"; +}