简单实现 实时流播放

(未鉴权 未完全实现 仅可点播)
This commit is contained in:
shikong 2023-08-24 02:41:41 +08:00
parent 7b008248d4
commit 60cb7cf93c
14 changed files with 203 additions and 37 deletions

View File

@ -0,0 +1,39 @@
package cn.skcks.docking.gb28181.api.play;
import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.api.play.dto.RealTimePlayDTO;
import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.config.SwaggerConfig;
import cn.skcks.docking.gb28181.service.play.PlayService;
import cn.skcks.docking.gb28181.service.record.RecordService;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springdoc.core.annotations.ParameterObject;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
@Tag(name="播放")
@RestController
@JsonMapping("/device/play")
@RequiredArgsConstructor
public class PlayController {
private final PlayService playService;
@Bean
public GroupedOpenApi playApi() {
return SwaggerConfig.api("Play", "/device/play");
}
@GetJson("/realtime")
public DeferredResult<JsonResponse<String>> getInfo(@ParameterObject @Validated RealTimePlayDTO dto){
return playService.realTimePlay(dto.getDeviceId(), dto.getChannelId(), dto.getTimeout());
}
}

View File

@ -0,0 +1,22 @@
package cn.skcks.docking.gb28181.api.play.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
@Schema(title = "查询历史录像")
@Data
public class RealTimePlayDTO {
@NotBlank
@Schema(description = "设备id", example = "44050100001180000001")
private String deviceId;
@NotBlank
@Schema(description = "通道id", example = "44050100001180000001")
private String channelId;
@Min(30)
@Schema(description = "超时时间(秒)", example = "30")
private long timeout = 30;
}

View File

@ -7,10 +7,7 @@ import lombok.SneakyThrows;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import javax.sdp.Connection;
import javax.sdp.Origin;
import javax.sdp.SessionDescription;
import javax.sdp.URI;
import javax.sdp.*;
import java.util.Optional;
@EqualsAndHashCode(callSuper = false)
@ -22,6 +19,10 @@ public class GB28181Description extends SessionDescriptionImpl implements Sessio
@SneakyThrows
public static GB28181Description convert(SessionDescriptionImpl sessionDescription){
GB28181Description gb28181Description = new GB28181Description();
SessionName sessionName = sessionDescription.getSessionName();
if(sessionName != null){
gb28181Description.setSessionName(sessionName);
}
gb28181Description.setMediaDescriptions(sessionDescription.getMediaDescriptions(true));
gb28181Description.setBandwidths(sessionDescription.getBandwidths(true));

View File

@ -52,8 +52,10 @@ public class MediaSdpHelper {
}
@SneakyThrows
public static GB28181Description build(Action action, String deviceId, String channelId, String netType, String rtpIp, int rtpPort, long ssrc, StreamMode streamMode, TimeDescription timeDescription){
GB28181Description description = GB28181Description.Convertor.convert((SessionDescriptionImpl) SdpFactory.getInstance().createSessionDescription(action.getAction()));
public static GB28181Description build(Action action, String deviceId, String channelId, String netType, String rtpIp, int rtpPort, String ssrc, StreamMode streamMode, TimeDescription timeDescription){
GB28181Description description = new GB28181Description();
description.setSessionName(SdpFactory.getInstance().createSessionName(action.getAction()));
Version version = SdpFactory.getInstance().createVersion(0);
description.setVersion(version);
@ -95,19 +97,19 @@ public class MediaSdpHelper {
}
@SneakyThrows
public static GB28181Description play(Action action, String deviceId, String channelId, String netType, String rtpIp, int rtpPort, long ssrc, StreamMode streamMode){
public static GB28181Description play(String deviceId, String channelId, String netType, String rtpIp, int rtpPort, String ssrc, StreamMode streamMode){
TimeDescription timeDescription = SdpFactory.getInstance().createTimeDescription();
return build(action, deviceId, channelId, netType, rtpIp, rtpPort, ssrc, streamMode, timeDescription);
return build(Action.PLAY, deviceId, channelId, netType, rtpIp, rtpPort, ssrc, streamMode, timeDescription);
}
@SneakyThrows
public static GB28181Description playback(Action action, String deviceId, String channelId, String netType, String rtpIp, int rtpPort, long ssrc, StreamMode streamMode, Date start, Date end) {
public static GB28181Description playback(String deviceId, String channelId, String netType, String rtpIp, int rtpPort, String ssrc, StreamMode streamMode, Date start, Date end) {
TimeField timeField = new TimeField();
timeField.setStart(start);
timeField.setStop(end);
TimeDescription timeDescription = SdpFactory.getInstance().createTimeDescription(timeField);
GB28181Description description = build(action, deviceId, channelId, netType, rtpIp, rtpPort, ssrc, streamMode, timeDescription);
GB28181Description description = build(Action.PLAY_BACK, deviceId, channelId, netType, rtpIp, rtpPort, ssrc, streamMode, timeDescription);
URIField uriField = new URIField();
uriField.setURI(StringUtils.joinWith(":", channelId, "0"));

View File

@ -15,7 +15,7 @@ public class SsrcField extends SDPField {
super(SSRC_FIELD);
}
private long ssrc;
private String ssrc;
@Override
public String encode() {

View File

@ -16,7 +16,7 @@ public enum StreamMode {
private final String mode;
@JsonCreator
public static StreamMode fromCode(String mode) {
public static StreamMode of(String mode) {
for (StreamMode m : values()) {
if (m.getMode().equalsIgnoreCase(mode)) {
return m;

View File

@ -3,5 +3,6 @@ package cn.skcks.docking.gb28181.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
public interface SipListener extends javax.sip.SipListener {
void addProcessor(String method, MessageProcessor messageProcessor);
void addRequestProcessor(String method, MessageProcessor messageProcessor);
void addResponseProcessor(String method, MessageProcessor messageProcessor);
}

View File

@ -22,18 +22,25 @@ import java.util.concurrent.ConcurrentMap;
@Slf4j
public class SipListenerImpl implements SipListener {
private final SipSubscribe sipSubscribe;
private final ConcurrentMap<String, MessageProcessor> processor = new ConcurrentHashMap<>();
public void addProcessor(String method,MessageProcessor messageProcessor){
log.debug("[SipListener] 注册 {} 处理器", method);
processor.put(method, messageProcessor);
private final ConcurrentMap<String, MessageProcessor> requestProcessor = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MessageProcessor> responseProcessor = new ConcurrentHashMap<>();
public void addRequestProcessor(String method, MessageProcessor messageProcessor){
log.debug("[SipListener] 注册 {} 请求处理器", method);
requestProcessor.put(method, messageProcessor);
}
public void addResponseProcessor(String method, MessageProcessor messageProcessor){
log.debug("[SipListener] 注册 {} 响应处理器", method);
responseProcessor.put(method, messageProcessor);
}
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
log.debug("传入请求 method => {}",method);
Optional.ofNullable(processor.get(method)).ifPresent(processor -> {
Optional.ofNullable(requestProcessor.get(method)).ifPresent(processor -> {
processor.process(requestEvent);
});
}
@ -42,13 +49,16 @@ public class SipListenerImpl implements SipListener {
public void processResponse(ResponseEvent responseEvent) {
Response response = responseEvent.getResponse();
int status = response.getStatusCode();
// log.debug();
// Success
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
log.debug("传入响应 method => {}",method);
Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> {
processor.process(responseEvent);
});
// ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
// if (sipRequestProcessor != null) {
// sipRequestProcessor.process(responseEvent);

View File

@ -4,10 +4,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sip.PeerUnavailableException;
import javax.sip.RequestEvent;
import javax.sip.SipFactory;
import javax.sip.header.HeaderFactory;
import javax.sip.message.MessageFactory;
import java.util.EventObject;
public interface MessageProcessor {
Logger log = LoggerFactory.getLogger(MessageProcessor.class);
@ -15,10 +15,12 @@ public interface MessageProcessor {
class Method {
public static final String REGISTER = "REGISTER";
public static final String MESSAGE = "MESSAGE";
public static final String INVITE = "INVITE";
}
void init();
void process(RequestEvent requestEvent);
void process(EventObject requestEvent);
default MessageFactory getMessageFactory() {
try {

View File

@ -25,6 +25,7 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.util.EventObject;
import java.util.Optional;
@Slf4j
@ -39,11 +40,12 @@ public class MessageRequestProcessor implements MessageProcessor {
@PostConstruct
@Override
public void init() {
sipListener.addProcessor(Method.MESSAGE, this);
sipListener.addRequestProcessor(Method.MESSAGE, this);
}
@Override
public void process(RequestEvent requestEvent) {
public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest)requestEvent.getRequest();
String deviceId = SipUtil.getUserIdFromFromHeader(request);
CallIdHeader callIdHeader = request.getCallIdHeader();

View File

@ -0,0 +1,66 @@
package cn.skcks.docking.gb28181.core.sip.message.processor.message.response;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.Gb28181Sdp;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.sdp.SdpParseException;
import javax.sdp.SessionDescription;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.EventObject;
@Slf4j
@Component
@RequiredArgsConstructor
public class InviteResponseProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipMessageSender sender;
@PostConstruct
@Override
public void init() {
sipListener.addResponseProcessor(Method.INVITE, this);
}
public void process(EventObject eventObject) {
ResponseEvent requestEvent = (ResponseEvent) eventObject;
try {
SIPResponse response = (SIPResponse) requestEvent.getResponse();
int statusCode = response.getStatusCode();
// trying不会回复
if (statusCode == Response.TRYING) {
return;
}
// 成功响应
// 下发ack
if (statusCode == Response.OK) {
ResponseEventExt event = (ResponseEventExt) requestEvent;
String contentString = new String(response.getRawContent());
Gb28181Sdp gb28181Sdp = SipUtil.parseSDP(contentString);
SessionDescription sdp = gb28181Sdp.getBaseSdb();
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(sdp.getOrigin().getUsername(), event.getRemoteIpAddress() + ":" + event.getRemotePort());
Request reqAck = SipRequestBuilder.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
log.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
log.debug("{}", reqAck);
sender.send(response.getLocalAddress().getHostAddress(), reqAck);
}
} catch (InvalidArgumentException | ParseException | SipException | SdpParseException e) {
log.info("[点播回复ACK],异常:", e);
}
}
}

View File

@ -34,6 +34,7 @@ import javax.sip.header.ViaHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Calendar;
import java.util.EventObject;
import java.util.Locale;
@Slf4j
@ -48,12 +49,13 @@ public class RegisterRequestProcessor implements MessageProcessor {
@PostConstruct
@Override
public void init(){
sipListener.addProcessor(Method.REGISTER,this);
sipListener.addRequestProcessor(Method.REGISTER,this);
}
@SneakyThrows
@Override
public void process(RequestEvent requestEvent) {
public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest)requestEvent.getRequest();
FromHeader fromHeader = request.getFrom();
Address address = fromHeader.getAddress();

View File

@ -3,6 +3,13 @@ package cn.skcks.docking.gb28181.service.play;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp;
import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer;
@ -12,16 +19,18 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
import gov.nist.javax.sdp.fields.SDPField;
import gov.nist.javax.sdp.fields.SDPFormat;
import gov.nist.javax.sdp.fields.SDPObject;
import gov.nist.javax.sdp.parser.SDPParser;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sdp.Connection;
import javax.sip.ListeningPoint;
import javax.sip.SipProvider;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
@ -30,16 +39,19 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public class PlayService {
private static final String PREFIX = "RealTimePlay";
private final ZlmMediaConfig mediaConfig;
private final ZlmMediaConfig zlmMediaConfig;
private final DockingDeviceService deviceService;
private final ZlmMediaService zlmMediaService;
private final SsrcService ssrcService;
private final SipService sipService;
private final SipMessageSender sender;
/**
*
* @param deviceId 设备id
* @param channelId 通道id
*/
@SneakyThrows
public DeferredResult<JsonResponse<String>> realTimePlay(String deviceId, String channelId, long timeout){
DeferredResult<JsonResponse<String>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(timeout));
DockingDevice device = deviceService.getDevice(deviceId);
@ -69,18 +81,25 @@ public class PlayService {
openRtpServer.setStreamId(streamId);
openRtpServer.setTcpMode(streamMode);
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer);
log.info("openRtpServerResp => {}", openRtpServerResp);
if(!openRtpServerResp.getCode().equals(ResponseStatus.Success)){
result.setResult(JsonResponse.error(openRtpServerResp.getCode().getMsg()));
return result;
}
String ip = mediaConfig.getIp();
StringBuilder sb = new StringBuilder();
sb.append("v=0\r\n");
sb.append("o=").append(channelId).append(" 0 0 IN IP4 ").append(ip).append("\r\n");
String ip = zlmMediaConfig.getIp();
int port = openRtpServerResp.getPort();
String ssrc = ssrcService.getPlaySsrc();
GB28181Description description = MediaSdpHelper.play(deviceId, channelId, Connection.IP4, ip, port, ssrc, StreamMode.of(device.getStreamMode()));
// new SDPField();
String transport = device.getTransport();
String senderIp = device.getLocalIp();
SipProvider provider = sipService.getProvider(transport, senderIp);
CallIdHeader callId = provider.getNewCallId();
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
sender.send(senderIp, request);
result.setResult(JsonResponse.success(StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv")));
return result;
// zlmMediaService.getRtpInfo();
// GetMediaList getMediaList = new GetMediaList();

View File

@ -167,7 +167,7 @@ public class SipEventTest {
// GB28181Description description = (GB28181Description) description;
description.setSsrcField(new SsrcField(12345678));
description.setSsrcField(new SsrcField("12345678"));
SessionDescription sessionDescription = description;
sessionDescription.setSessionName(SdpFactory.getInstance().createSessionName("PlayBack"));
log.info("\n{}", sessionDescription);
@ -181,7 +181,7 @@ public class SipEventTest {
int rtpPort = 5080;
String rtpIp = "10.10.10.20";
long ssrc = RandomUtil.randomLong(10000000,100000000);
GB28181Description description = MediaSdpHelper.build(MediaSdpHelper.Action.PLAY, deviceId, channel, Connection.IP4, rtpIp, rtpPort, ssrc, StreamMode.UDP, SdpFactory.getInstance().createTimeDescription());
GB28181Description description = MediaSdpHelper.build(MediaSdpHelper.Action.PLAY, deviceId, channel, Connection.IP4, rtpIp, rtpPort, String.valueOf(ssrc), StreamMode.UDP, SdpFactory.getInstance().createTimeDescription());
log.info("\n{}", description);
}
}