diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index 136fbd8..feacdb3 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -1,13 +1,17 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.request; import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser; +import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; @@ -29,6 +33,7 @@ import org.springframework.stereotype.Component; import javax.sdp.*; import javax.sip.RequestEvent; +import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.util.Arrays; @@ -214,9 +219,9 @@ public class InviteRequestProcessor implements MessageProcessor { final ScheduledFuture[] schedule = new ScheduledFuture[1]; Flow.Subscriber subscriber; if(!isDownload){ - subscriber = placbackSubscriber(callId,device,start,stop,address,port,key,schedule); + subscriber = placbackSubscriber(request, callId,device,start,stop,address,port,key,schedule); } else { - subscriber = downloadSubscriber(callId,device,start,stop,address,port,key,schedule); + subscriber = downloadSubscriber(request, callId,device,start,stop,address,port,key,schedule); } // 60秒超时计时器 schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS); @@ -229,7 +234,7 @@ public class InviteRequestProcessor implements MessageProcessor { }, 1,TimeUnit.SECONDS); } - public Flow.Subscriber placbackSubscriber(String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture[] scheduledFuture){ + public Flow.Subscriber placbackSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture[] scheduledFuture){ return new Flow.Subscriber<>() { @Override public void onSubscribe(Flow.Subscription subscription) { @@ -241,7 +246,18 @@ public class InviteRequestProcessor implements MessageProcessor { public void onNext(SIPRequest item) { log.info("收到 ack 确认请求: {} 开始推流",key); // RTP 推流 - deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port, deviceProxyService.playbackTask()); + deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.playbackTask()); + + log.info("{} 推流结束, 发送媒体通知", key); + MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() + .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) + .deviceId(device.getGbChannelId()) + .build(); + + String tag = request.getFromHeader().getTag(); + CallIdHeader requestCallId = request.getCallId(); + sender.sendRequest(((provider, ip, port1) -> SipRequestBuilder.createMessageRequest(device, + ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); onComplete(); } @@ -258,7 +274,7 @@ public class InviteRequestProcessor implements MessageProcessor { }; } - public Flow.Subscriber downloadSubscriber(String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture[] scheduledFuture){ + public Flow.Subscriber downloadSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture[] scheduledFuture){ return new Flow.Subscriber<>() { @Override public void onSubscribe(Flow.Subscription subscription) { @@ -270,7 +286,18 @@ public class InviteRequestProcessor implements MessageProcessor { public void onNext(SIPRequest item) { log.info("收到 ack 确认请求: {} 开始推流",key); // RTP 推流 - deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port, deviceProxyService.downloadTask()); + deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.downloadTask()); + + log.info("{} 推流结束, 发送媒体通知", key); + MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() + .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) + .deviceId(device.getGbChannelId()) + .build(); + + String tag = request.getFromHeader().getTag(); + CallIdHeader requestCallId = request.getCallId(); + sender.sendRequest(((provider, ip, port1) -> SipRequestBuilder.createMessageRequest(device, + ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); onComplete(); } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/notify/dto/MediaStatusRequestDTO.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/notify/dto/MediaStatusRequestDTO.java new file mode 100644 index 0000000..a7ab9eb --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/notify/dto/MediaStatusRequestDTO.java @@ -0,0 +1,30 @@ +package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto; + +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@JacksonXmlRootElement(localName = "Notify") +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Data +public class MediaStatusRequestDTO { + @Builder.Default + private String cmdType = CmdType.MEDIA_STATUS; + @JacksonXmlProperty(localName = "SN") + private String sn; + + /** + * 目标设备的设备编码(必选) + */ + @JacksonXmlProperty(localName = "DeviceID") + private String deviceId; + + @JacksonXmlProperty(localName = "NotifyType") + private String notifyType = "121"; +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index 056fc02..08c3e75 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -5,10 +5,14 @@ import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.URLUtil; +import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; +import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; @@ -21,6 +25,7 @@ import org.apache.commons.exec.Executor; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.nio.charset.StandardCharsets; @@ -48,13 +53,23 @@ public class DeviceProxyService { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); public interface TaskProcessor { - void process(String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time); + void process(SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time); } public TaskProcessor playbackTask(){ - return (String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> { + return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> { Optional.ofNullable(callbackTask.get(device.getDeviceCode())).ifPresent(task->{ task.getWatchdog().destroyProcess(); + log.info("{} 推流结束, 发送媒体通知", key); + MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() + .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) + .deviceId(device.getGbChannelId()) + .build(); + + String tag = request.getFromHeader().getTag(); + CallIdHeader requestCallId = request.getCallId(); + sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device, + ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); }); Flow.Subscriber subscriber = byeSubscriber(key, device, callbackTask); subscribe.getByeSubscribe().addSubscribe(key, subscriber); @@ -64,9 +79,19 @@ public class DeviceProxyService { } public TaskProcessor downloadTask(){ - return (String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time)->{ + return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time)->{ Optional.ofNullable(downloadTask.get(device.getDeviceCode())).ifPresent(task->{ task.getWatchdog().destroyProcess(); + log.info("{} 推流结束, 发送媒体通知", key); + MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() + .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) + .deviceId(device.getGbChannelId()) + .build(); + + String tag = request.getFromHeader().getTag(); + CallIdHeader requestCallId = request.getCallId(); + sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device, + ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); }); Flow.Subscriber subscriber = byeSubscriber(key, device, downloadTask); subscribe.getByeSubscribe().addSubscribe(key, subscriber); @@ -109,13 +134,13 @@ public class DeviceProxyService { }; } - public synchronized void proxyVideo2Rtp(String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) { + public synchronized void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) { String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); HashMap map = new HashMap<>(3); String deviceCode = device.getDeviceCode(); map.put("device_id", deviceCode); - map.put("begin_time",DateUtil.format(LocalDateTimeUtil.of(startTime.toInstant(), ZoneId.of(GB28181Constant.TIME_ZONE)), DatePattern.PURE_DATETIME_PATTERN)); - map.put("end_time", DateUtil.format(LocalDateTimeUtil.of(endTime.toInstant(), ZoneId.of(GB28181Constant.TIME_ZONE)), DatePattern.PURE_DATETIME_PATTERN)); + map.put("begin_time",DateUtil.format(LocalDateTimeUtil.of(startTime.toInstant(), ZoneId.of(GB28181Constant.TIME_ZONE)), DatePattern.PURE_DATETIME_PATTERN) ); + map.put("end_time", DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT)); String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8); fromUrl = StringUtils.joinWith("?", fromUrl, query); log.info("设备: {} 视频 url: {}", deviceCode, fromUrl); @@ -124,7 +149,7 @@ public class DeviceProxyService { String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); subscribe.getByeSubscribe().addPublisher(key); - taskProcessor.process(callId,fromUrl,toUrl,device,key,time); + taskProcessor.process(request, callId,fromUrl,toUrl,device,key,time); } @SneakyThrows