添加对 MediaStatus 媒体消息通知的处理

This commit is contained in:
shikong 2024-01-07 22:07:40 +08:00
parent bf5be55ec8
commit 31f1ed957a
4 changed files with 115 additions and 2 deletions

View File

@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request;
import cn.skcks.docking.gb28181.common.json.ResponseStatus; import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
@ -12,9 +13,12 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.service.notify.MediaStatusService;
import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO; import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO; import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.method.invite.request.InviteRequestBuilder;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
@ -22,9 +26,11 @@ import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.EventObject; import java.util.EventObject;
import java.util.Optional; import java.util.Optional;
@ -37,6 +43,9 @@ public class MessageRequestProcessor implements MessageProcessor {
private final DockingDeviceService deviceService; private final DockingDeviceService deviceService;
private final SipMessageSender sender; private final SipMessageSender sender;
private final SipSubscribe subscribe; private final SipSubscribe subscribe;
private final SipConfig sipConfig;
private final MediaStatusService mediaStatusService;
@PostConstruct @PostConstruct
@Override @Override
@ -87,6 +96,12 @@ public class MessageRequestProcessor implements MessageProcessor {
response = ok; response = ok;
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.MEDIA_STATUS)){ } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.MEDIA_STATUS)){
response = ok; response = ok;
sender.send(senderIp, response);
MediaStatusRequestDTO mediaStatusRequestDTO = MANSCDPUtils.parse(content, MediaStatusRequestDTO.class);
if(StringUtils.equalsIgnoreCase(mediaStatusRequestDTO.getNotifyType(),"121")){
mediaStatusService.process(request, mediaStatusRequestDTO);
return;
}
} else { } else {
response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage()); response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage());
} }

View File

@ -0,0 +1,66 @@
package cn.skcks.docking.gb28181.service.notify;
import cn.hutool.core.collection.CollectionUtil;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder;
import cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.sip.method.invite.request.InviteRequestBuilder;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.sip.address.SipURI;
import javax.sip.message.Request;
import java.util.Set;
@RequiredArgsConstructor
@Slf4j
@Service
public class MediaStatusService {
private final SipConfig sipConfig;
private final SipMessageSender sender;
private final SipSubscribe subscribe;
public void process(SIPRequest request, MediaStatusRequestDTO dto){
String senderIp = request.getLocalAddress().getHostAddress();
String deviceId = ((SipURI)request.getFromHeader().getAddress().getURI()).getUser();
if(StringUtils.equalsIgnoreCase(dto.getNotifyType(),"121")){
InviteRequestBuilder inviteRequestBuilder = InviteRequestBuilder.builder()
.localIp(request.getLocalAddress().getHostAddress())
.localPort(sipConfig.getPort())
.localId(((SipURI)request.getToHeader().getAddress().getURI()).getUser())
.targetIp(request.getRemoteAddress().getHostAddress())
.targetPort(request.getRemotePort())
.targetId(((SipURI)request.getFromHeader().getAddress().getURI()).getUser())
.transport(request.getTopmostViaHeader().getTransport())
.build();
String keyPattern = CacheUtil.getKey(GB28181SDPBuilder.Action.PLAY_BACK.getAction(), deviceId,"*");
Set<String> keys = RedisUtil.KeyOps.keys(keyPattern);
if (CollectionUtil.isEmpty(keys)){
// 实在找不到就原样发回去 ()
sender.send(senderIp, inviteRequestBuilder.createByeRequest(request.getCallId().getCallId(), request.getCSeq().getSeqNumber() + 1));
} else {
keys.forEach(key -> {
String json = RedisUtil.StringOps.get(key);
if(StringUtils.isNotBlank(json)){
log.debug("{} {}",key,json);
SipTransactionInfo transactionInfo = JsonUtils.parse(json, SipTransactionInfo.class);
String callId = transactionInfo.getCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.BYE, callId);
log.debug("{} {}",callId,subscribeKey);
subscribe.getSipRequestSubscribe().getPublisher(subscribeKey).submit(request);
}
});
}
}
}
}

View File

@ -47,6 +47,7 @@ import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.Date; import java.util.Date;
import java.util.Objects;
import java.util.concurrent.Flow; import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -272,6 +273,7 @@ public class PlayService {
Request request = inviteRequestBuilder.createPlaybackInviteRequest(callId, SipRequestBuilder.getCSeq(),channelId,ip,port,ssrc,MediaStreamMode.of(device.getStreamMode()),startTime,endTime); Request request = inviteRequestBuilder.createPlaybackInviteRequest(callId, SipRequestBuilder.getCSeq(),channelId,ip,port,ssrc,MediaStreamMode.of(device.getStreamMode()),startTime,endTime);
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callIdHeader.getCallId()); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callIdHeader.getCallId());
subscribe.getSipResponseSubscribe().addPublisher(subscribeKey); subscribe.getSipResponseSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() { Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription; private Flow.Subscription subscription;
@ -292,7 +294,7 @@ public class PlayService {
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey); log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey);
RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc)));
RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND), TimeUnit.SECONDS); RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND) + 30, TimeUnit.SECONDS);
result.setResult(JsonResponse.success(videoUrl(streamId))); result.setResult(JsonResponse.success(videoUrl(streamId)));
onComplete(); onComplete();
} else { } else {
@ -353,7 +355,7 @@ public class PlayService {
@SneakyThrows @SneakyThrows
@Override @Override
public void onComplete() { public void onComplete() {
if(request != null){ if(Objects.equals(request.getMethod(), Request.BYE)){
Response byeResponse = InviteResponseBuilder.builder().build().createByeResponse(request, SipUtil.nanoId()); Response byeResponse = InviteResponseBuilder.builder().build().createByeResponse(request, SipUtil.nanoId());
provider.sendResponse(byeResponse); provider.sendResponse(byeResponse);
} else { } else {

View File

@ -0,0 +1,30 @@
package cn.skcks.docking.gb28181.sip.manscdp.mediastatus.notify;
import cn.skcks.docking.gb28181.constant.CmdType;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Notify")
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class MediaStatusRequestDTO {
@Builder.Default
private String cmdType = CmdType.MEDIA_STATUS;
@JacksonXmlProperty(localName = "SN")
private String sn;
/**
* 目标设备的设备编码(必选)
*/
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
@JacksonXmlProperty(localName = "NotifyType")
private String notifyType = "121";
}