推流结束 发送媒体通知

This commit is contained in:
shikong 2023-09-18 14:47:11 +08:00
parent 117ff7cfb9
commit dc80074447
3 changed files with 95 additions and 13 deletions

View File

@ -1,13 +1,17 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.request;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
@ -29,6 +33,7 @@ import org.springframework.stereotype.Component;
import javax.sdp.*;
import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Arrays;
@ -214,9 +219,9 @@ public class InviteRequestProcessor implements MessageProcessor {
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber;
if(!isDownload){
subscriber = placbackSubscriber(callId,device,start,stop,address,port,key,schedule);
subscriber = placbackSubscriber(request, callId,device,start,stop,address,port,key,schedule);
} else {
subscriber = downloadSubscriber(callId,device,start,stop,address,port,key,schedule);
subscriber = downloadSubscriber(request, callId,device,start,stop,address,port,key,schedule);
}
// 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
@ -229,7 +234,7 @@ public class InviteRequestProcessor implements MessageProcessor {
}, 1,TimeUnit.SECONDS);
}
public Flow.Subscriber<SIPRequest> placbackSubscriber(String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
public Flow.Subscriber<SIPRequest> placbackSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
@ -241,7 +246,18 @@ public class InviteRequestProcessor implements MessageProcessor {
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port, deviceProxyService.playbackTask());
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.playbackTask());
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port1) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
onComplete();
}
@ -258,7 +274,7 @@ public class InviteRequestProcessor implements MessageProcessor {
};
}
public Flow.Subscriber<SIPRequest> downloadSubscriber(String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
public Flow.Subscriber<SIPRequest> downloadSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
@ -270,7 +286,18 @@ public class InviteRequestProcessor implements MessageProcessor {
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port, deviceProxyService.downloadTask());
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.downloadTask());
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port1) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
onComplete();
}

View File

@ -0,0 +1,30 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Notify")
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class MediaStatusRequestDTO {
@Builder.Default
private String cmdType = CmdType.MEDIA_STATUS;
@JacksonXmlProperty(localName = "SN")
private String sn;
/**
* 目标设备的设备编码(必选)
*/
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
@JacksonXmlProperty(localName = "NotifyType")
private String notifyType = "121";
}

View File

@ -5,10 +5,14 @@ import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.URLUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
@ -21,6 +25,7 @@ import org.apache.commons.exec.Executor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.nio.charset.StandardCharsets;
@ -48,13 +53,23 @@ public class DeviceProxyService {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public interface TaskProcessor {
void process(String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time);
void process(SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time);
}
public TaskProcessor playbackTask(){
return (String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> {
return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> {
Optional.ofNullable(callbackTask.get(device.getDeviceCode())).ifPresent(task->{
task.getWatchdog().destroyProcess();
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
});
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
@ -64,9 +79,19 @@ public class DeviceProxyService {
}
public TaskProcessor downloadTask(){
return (String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time)->{
return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time)->{
Optional.ofNullable(downloadTask.get(device.getDeviceCode())).ifPresent(task->{
task.getWatchdog().destroyProcess();
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
});
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
@ -109,13 +134,13 @@ public class DeviceProxyService {
};
}
public synchronized void proxyVideo2Rtp(String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) {
public synchronized void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) {
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3);
String deviceCode = device.getDeviceCode();
map.put("device_id", deviceCode);
map.put("begin_time",DateUtil.format(LocalDateTimeUtil.of(startTime.toInstant(), ZoneId.of(GB28181Constant.TIME_ZONE)), DatePattern.PURE_DATETIME_PATTERN));
map.put("end_time", DateUtil.format(LocalDateTimeUtil.of(endTime.toInstant(), ZoneId.of(GB28181Constant.TIME_ZONE)), DatePattern.PURE_DATETIME_PATTERN));
map.put("begin_time",DateUtil.format(LocalDateTimeUtil.of(startTime.toInstant(), ZoneId.of(GB28181Constant.TIME_ZONE)), DatePattern.PURE_DATETIME_PATTERN) );
map.put("end_time", DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT));
String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8);
fromUrl = StringUtils.joinWith("?", fromUrl, query);
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
@ -124,7 +149,7 @@ public class DeviceProxyService {
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
taskProcessor.process(callId,fromUrl,toUrl,device,key,time);
taskProcessor.process(request, callId,fromUrl,toUrl,device,key,time);
}
@SneakyThrows