实时流点播与关闭

This commit is contained in:
shikong 2023-08-24 16:30:25 +08:00
parent 0d5e134943
commit d2ae05e135
10 changed files with 148 additions and 69 deletions

View File

@ -3,12 +3,10 @@ package cn.skcks.docking.gb28181.api.play;
import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.api.play.dto.RealTimePlayDTO;
import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO;
import cn.skcks.docking.gb28181.api.play.dto.RealTimeStopDTO;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.config.SwaggerConfig;
import cn.skcks.docking.gb28181.service.play.PlayService;
import cn.skcks.docking.gb28181.service.record.RecordService;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springdoc.core.annotations.ParameterObject;
@ -18,8 +16,6 @@ import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
@Tag(name="播放")
@RestController
@JsonMapping("/device/play")
@ -32,8 +28,13 @@ public class PlayController {
return SwaggerConfig.api("Play", "/device/play");
}
@GetJson("/realtime")
public DeferredResult<JsonResponse<String>> getInfo(@ParameterObject @Validated RealTimePlayDTO dto){
@GetJson("/realTimePlay")
public DeferredResult<JsonResponse<String>> realTimePlay(@ParameterObject @Validated RealTimePlayDTO dto){
return playService.realTimePlay(dto.getDeviceId(), dto.getChannelId(), dto.getTimeout());
}
@GetJson("/realtimeStop")
public JsonResponse<Void> realTimeStop(@ParameterObject @Validated RealTimeStopDTO dto){
return playService.realTimeStop(dto.getDeviceId(), dto.getChannelId());
}
}

View File

@ -5,7 +5,7 @@ import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
@Schema(title = "查询历史录像")
@Schema(title = "点播")
@Data
public class RealTimePlayDTO {
@NotBlank

View File

@ -0,0 +1,17 @@
package cn.skcks.docking.gb28181.api.play.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
@Schema(title = "关闭点播")
@Data
public class RealTimeStopDTO {
@NotBlank
@Schema(description = "设备id", example = "44050100001180000001")
private String deviceId;
@NotBlank
@Schema(description = "通道id", example = "44050100001180000001")
private String channelId;
}

View File

@ -24,12 +24,13 @@ public class SipListenerImpl implements SipListener {
private final SipSubscribe sipSubscribe;
private final ConcurrentMap<String, MessageProcessor> requestProcessor = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MessageProcessor> responseProcessor = new ConcurrentHashMap<>();
public void addRequestProcessor(String method, MessageProcessor messageProcessor){
public void addRequestProcessor(String method, MessageProcessor messageProcessor) {
log.debug("[SipListener] 注册 {} 请求处理器", method);
requestProcessor.put(method, messageProcessor);
}
public void addResponseProcessor(String method, MessageProcessor messageProcessor){
public void addResponseProcessor(String method, MessageProcessor messageProcessor) {
log.debug("[SipListener] 注册 {} 响应处理器", method);
responseProcessor.put(method, messageProcessor);
}
@ -39,7 +40,7 @@ public class SipListenerImpl implements SipListener {
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
log.debug("传入请求 method => {}",method);
log.debug("传入请求 method => {}", method);
Optional.ofNullable(requestProcessor.get(method)).ifPresent(processor -> {
processor.process(requestEvent);
});
@ -49,49 +50,20 @@ public class SipListenerImpl implements SipListener {
public void processResponse(ResponseEvent responseEvent) {
Response response = responseEvent.getResponse();
int status = response.getStatusCode();
// log.debug();
CSeqHeader cseqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
log.debug("{} {}", method, response);
// Success
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
log.debug("传入响应 method => {}",method);
log.debug("传入响应 method => {}", method);
Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> {
processor.process(responseEvent);
});
// ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
// if (sipRequestProcessor != null) {
// sipRequestProcessor.process(responseEvent);
// }
// if(status != Response.UNAUTHORIZED && responseEvent.getResponse() != null){}
// if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
// CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
// if (callIdHeader != null) {
// SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
// if (subscribe != null) {
// SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
// sipSubscribe.removeOkSubscribe(callIdHeader.getCallId());
// subscribe.response(eventResult);
// }
// }
// }
} else if ((status >= Response.TRYING) && (status < Response.OK)) {
// 增加其它无需回复的响应如101180等
} else {
log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase());
// if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
// CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
// if (callIdHeader != null) {
// SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
// if (subscribe != null) {
// SipSubscribe.EventResult<?> eventResult = new SipSubscribe.EventResult(responseEvent);
// subscribe.response(eventResult);
// sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId());
// }
// }
// }
if (responseEvent.getDialog() != null) {
responseEvent.getDialog().delete();
}

View File

@ -5,6 +5,8 @@ import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.message.SIPResponse;
@ -15,12 +17,17 @@ import org.springframework.stereotype.Component;
import javax.sdp.SdpParseException;
import javax.sdp.SessionDescription;
import javax.sip.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.SipFactory;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.EventObject;
import java.util.Optional;
@Slf4j
@Component
@ -28,6 +35,7 @@ import java.util.EventObject;
public class InviteResponseProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipMessageSender sender;
private final SipSubscribe subscribe;
@PostConstruct
@Override
@ -40,15 +48,22 @@ public class InviteResponseProcessor implements MessageProcessor {
try {
SIPResponse response = (SIPResponse) requestEvent.getResponse();
int statusCode = response.getStatusCode();
CallIdHeader callId = response.getCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Method.INVITE, callId.getCallId());
// trying不会回复
if (statusCode == Response.TRYING) {
subscribe.getInviteSubscribe().getPublisher(subscribeKey).submit(response);
return;
}
// 成功响应
// 下发ack
if (statusCode == Response.OK) {
ResponseEventExt event = (ResponseEventExt) requestEvent;
Optional.ofNullable(subscribe.getInviteSubscribe().getPublisher(subscribeKey))
.ifPresentOrElse(publisher-> publisher.submit(response),
()-> log.warn("对应订阅 {} 已结束",callId.getCallId()));
ResponseEventExt event = (ResponseEventExt) requestEvent;
String contentString = new String(response.getRawContent());
Gb28181Sdp gb28181Sdp = SipUtil.parseSDP(contentString);
SessionDescription sdp = gb28181Sdp.getBaseSdb();

View File

@ -176,7 +176,7 @@ public class SipRequestBuilder implements ApplicationContextAware {
return request;
}
public static Request createByteRequest(DockingDevice device, String channelId, SipTransactionInfo transactionInfo) throws ParseException, InvalidArgumentException, PeerUnavailableException {
public static Request createByeRequest(DockingDevice device, String channelId, SipTransactionInfo transactionInfo) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
// 请求行
SipURI requestLine = getSipURI(channelId, device.getHostAddress());

View File

@ -1,6 +1,6 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor;
import java.util.Map;
@ -10,9 +10,9 @@ import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@RequiredArgsConstructor
public class InviteSubscribe implements GenericSubscribe<SipTransactionInfo> {
public class InviteSubscribe implements GenericSubscribe<SIPResponse> {
private final Executor executor;
private static final Map<String, SubmissionPublisher<SipTransactionInfo>> publishers = new ConcurrentHashMap<>();
private static final Map<String, SubmissionPublisher<SIPResponse>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
@ -22,11 +22,11 @@ public class InviteSubscribe implements GenericSubscribe<SipTransactionInfo> {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SipTransactionInfo> getPublisher(String key) {
public SubmissionPublisher<SIPResponse> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SipTransactionInfo> subscribe) {
public void addSubscribe(String key, Flow.Subscriber<SIPResponse> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}

View File

@ -1,8 +1,8 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
@ -21,7 +21,7 @@ public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private GenericSubscribe<RecordInfoResponseDTO> recordInfoSubscribe;
private GenericSubscribe<SipTransactionInfo> inviteSubscribe;
private GenericSubscribe<SIPResponse> inviteSubscribe;
@PostConstruct
private void init() {

View File

@ -1,7 +1,9 @@
package cn.skcks.docking.gb28181.service.play;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
@ -14,6 +16,7 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.rtp.CloseRtpServer;
import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp;
import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer;
import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServerResp;
@ -22,6 +25,7 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -34,14 +38,15 @@ import javax.sip.ListeningPoint;
import javax.sip.SipProvider;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.MessageFormat;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@RequiredArgsConstructor
public class PlayService {
private static final String PREFIX = "RealTimePlay";
private final ZlmMediaConfig zlmMediaConfig;
private final DockingDeviceService deviceService;
private final ZlmMediaService zlmMediaService;
@ -50,6 +55,10 @@ public class PlayService {
private final SipMessageSender sender;
private final SipSubscribe subscribe;
private String videoUrl(String streamId){
return StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv");
}
/**
* 实时视频点播
* @param deviceId 设备id
@ -66,10 +75,9 @@ public class PlayService {
}
String streamId = MediaSdpHelper.getStreamId(deviceId,channelId);
String key = CacheUtil.getKey(PREFIX, streamId);
String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceId, channelId);
if(RedisUtil.KeyOps.hasKey(key)){
String url = RedisUtil.StringOps.get(key);
result.setResult(JsonResponse.success(url));
result.setResult(JsonResponse.success(videoUrl(streamId)));
return result;
}
@ -101,15 +109,83 @@ public class PlayService {
SipProvider provider = sipService.getProvider(transport, senderIp);
CallIdHeader callId = provider.getNewCallId();
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.info("订阅 {} {}",MessageProcessor.Method.INVITE,subscribeKey);
subscription.request(1);
}
@Override
public void onNext(SIPResponse item) {
int statusCode = item.getStatusCode();
log.debug("{} 收到订阅消息 {}", subscribeKey, item);
if(statusCode == Response.TRYING){
log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE,subscribeKey);
subscription.request(1);
} else if(statusCode>=Response.OK && statusCode < Response.MULTIPLE_CHOICES){
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE,subscribeKey);
RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item)));
RedisUtil.StringOps.set(CacheUtil.getKey(key,"ssrc"), ssrc);
result.setResult(JsonResponse.success(videoUrl(streamId)));
onComplete();
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE,subscribeKey);
RedisUtil.KeyOps.delete(key);
RedisUtil.KeyOps.delete(CacheUtil.getKey(key,"ssrc"));
result.setResult(JsonResponse.error("连接流媒体服务失败"));
ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc);
onComplete();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey);
}
};
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
sender.send(senderIp, request);
result.onTimeout(()->{
subscribe.getInviteSubscribe().delPublisher(subscribeKey);
result.setResult(JsonResponse.error("点播超时"));
});
return result;
}
@SneakyThrows
public JsonResponse<Void> realTimeStop(String deviceId, String channelId){
DockingDevice device = deviceService.getDevice(deviceId);
if (device == null) {
log.info("未能找到 编码为 => {} 的设备", deviceId);
return JsonResponse.error(null, "未找到设备");
}
String streamId = MediaSdpHelper.getStreamId(deviceId,channelId);
String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceId, channelId);
String ssrcKey = CacheUtil.getKey(key,"ssrc");
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));
SipTransactionInfo transactionInfo = JsonUtils.parse(RedisUtil.StringOps.get(key), SipTransactionInfo.class);
if(transactionInfo == null){
return JsonResponse.error("未找到连接信息");
}
Request request = SipRequestBuilder.createByeRequest(device, channelId, transactionInfo);
String senderIp = device.getLocalIp();
sender.send(senderIp, request);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, deviceId, streamId);
// subscribe.getInviteSubscribe().addPublisher(subscribeKey);
result.setResult(JsonResponse.success(StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv")));
return result;
// zlmMediaService.getRtpInfo();
// GetMediaList getMediaList = new GetMediaList();
// getMediaList.set
// zlmMediaService.getMediaList()
String ssrc = RedisUtil.StringOps.get(ssrcKey);
ssrcService.releaseSsrc(zlmMediaConfig.getId(),ssrc);
RedisUtil.KeyOps.delete(ssrcKey);
RedisUtil.KeyOps.delete(key);
return JsonResponse.success(null);
}
}

View File

@ -81,7 +81,6 @@ public class RecordService {
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, deviceId, sn);
subscribe.getRecordInfoSubscribe().addPublisher(key);
sender.send(senderIp, request);
List<RecordInfoItemDTO> list = new ArrayList<>();
AtomicLong atomicSum = new AtomicLong(0);
AtomicLong atomicNum = new AtomicLong(0);
@ -135,9 +134,8 @@ public class RecordService {
log.debug("订阅结束 => {}", key);
}
};
subscribe.getRecordInfoSubscribe().addSubscribe(key, subscriber);
sender.send(senderIp, request);
result.onTimeout(() -> {
result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT,
RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)),