From d2ae05e13578f7dbd7cb79b52699e1ef52bfc904 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Thu, 24 Aug 2023 16:30:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=B5=81=E7=82=B9=E6=92=AD?= =?UTF-8?q?=E4=B8=8E=E5=85=B3=E9=97=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/api/play/PlayController.java | 15 +-- .../gb28181/api/play/dto/RealTimePlayDTO.java | 2 +- .../gb28181/api/play/dto/RealTimeStopDTO.java | 17 +++ .../core/sip/listener/SipListenerImpl.java | 44 ++------ .../response/InviteResponseProcessor.java | 19 +++- .../message/request/SipRequestBuilder.java | 2 +- .../message/subscribe/InviteSubscribe.java | 10 +- .../sip/message/subscribe/SipSubscribe.java | 4 +- .../gb28181/service/play/PlayService.java | 100 +++++++++++++++--- .../gb28181/service/record/RecordService.java | 4 +- 10 files changed, 148 insertions(+), 69 deletions(-) create mode 100644 api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimeStopDTO.java diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java b/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java index 3eafaa4..deace72 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java @@ -3,12 +3,10 @@ package cn.skcks.docking.gb28181.api.play; import cn.skcks.docking.gb28181.annotation.web.JsonMapping; import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; import cn.skcks.docking.gb28181.api.play.dto.RealTimePlayDTO; -import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO; +import cn.skcks.docking.gb28181.api.play.dto.RealTimeStopDTO; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.config.SwaggerConfig; import cn.skcks.docking.gb28181.service.play.PlayService; -import cn.skcks.docking.gb28181.service.record.RecordService; -import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; import org.springdoc.core.annotations.ParameterObject; @@ -18,8 +16,6 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -import java.util.List; - @Tag(name="播放") @RestController @JsonMapping("/device/play") @@ -32,8 +28,13 @@ public class PlayController { return SwaggerConfig.api("Play", "/device/play"); } - @GetJson("/realtime") - public DeferredResult> getInfo(@ParameterObject @Validated RealTimePlayDTO dto){ + @GetJson("/realTimePlay") + public DeferredResult> realTimePlay(@ParameterObject @Validated RealTimePlayDTO dto){ return playService.realTimePlay(dto.getDeviceId(), dto.getChannelId(), dto.getTimeout()); } + + @GetJson("/realtimeStop") + public JsonResponse realTimeStop(@ParameterObject @Validated RealTimeStopDTO dto){ + return playService.realTimeStop(dto.getDeviceId(), dto.getChannelId()); + } } diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimePlayDTO.java b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimePlayDTO.java index 9bc3d39..b45cb07 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimePlayDTO.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimePlayDTO.java @@ -5,7 +5,7 @@ import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotBlank; import lombok.Data; -@Schema(title = "查询历史录像") +@Schema(title = "点播") @Data public class RealTimePlayDTO { @NotBlank diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimeStopDTO.java b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimeStopDTO.java new file mode 100644 index 0000000..b38b299 --- /dev/null +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RealTimeStopDTO.java @@ -0,0 +1,17 @@ +package cn.skcks.docking.gb28181.api.play.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import lombok.Data; + +@Schema(title = "关闭点播") +@Data +public class RealTimeStopDTO { + @NotBlank + @Schema(description = "设备id", example = "44050100001180000001") + private String deviceId; + + @NotBlank + @Schema(description = "通道id", example = "44050100001180000001") + private String channelId; +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java index efa158a..48a525d 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java @@ -24,12 +24,13 @@ public class SipListenerImpl implements SipListener { private final SipSubscribe sipSubscribe; private final ConcurrentMap requestProcessor = new ConcurrentHashMap<>(); private final ConcurrentMap responseProcessor = new ConcurrentHashMap<>(); - public void addRequestProcessor(String method, MessageProcessor messageProcessor){ + + public void addRequestProcessor(String method, MessageProcessor messageProcessor) { log.debug("[SipListener] 注册 {} 请求处理器", method); requestProcessor.put(method, messageProcessor); } - public void addResponseProcessor(String method, MessageProcessor messageProcessor){ + public void addResponseProcessor(String method, MessageProcessor messageProcessor) { log.debug("[SipListener] 注册 {} 响应处理器", method); responseProcessor.put(method, messageProcessor); } @@ -39,7 +40,7 @@ public class SipListenerImpl implements SipListener { @Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME) public void processRequest(RequestEvent requestEvent) { String method = requestEvent.getRequest().getMethod(); - log.debug("传入请求 method => {}",method); + log.debug("传入请求 method => {}", method); Optional.ofNullable(requestProcessor.get(method)).ifPresent(processor -> { processor.process(requestEvent); }); @@ -49,49 +50,20 @@ public class SipListenerImpl implements SipListener { public void processResponse(ResponseEvent responseEvent) { Response response = responseEvent.getResponse(); int status = response.getStatusCode(); - // log.debug(); + CSeqHeader cseqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME); + String method = cseqHeader.getMethod(); + log.debug("{} {}", method, response); // Success if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { - CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); - String method = cseqHeader.getMethod(); - log.debug("传入响应 method => {}",method); + log.debug("传入响应 method => {}", method); Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> { processor.process(responseEvent); }); - // ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); - // if (sipRequestProcessor != null) { - // sipRequestProcessor.process(responseEvent); - // } - - // if(status != Response.UNAUTHORIZED && responseEvent.getResponse() != null){} - - // if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { - // CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - // if (callIdHeader != null) { - // SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); - // if (subscribe != null) { - // SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - // sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); - // subscribe.response(eventResult); - // } - // } - // } } else if ((status >= Response.TRYING) && (status < Response.OK)) { // 增加其它无需回复的响应,如101、180等 } else { log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()); - // if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { - // CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - // if (callIdHeader != null) { - // SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); - // if (subscribe != null) { - // SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - // subscribe.response(eventResult); - // sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); - // } - // } - // } if (responseEvent.getDialog() != null) { responseEvent.getDialog().delete(); } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java index c13166b..2b77c41 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/response/InviteResponseProcessor.java @@ -5,6 +5,8 @@ import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import gov.nist.javax.sip.ResponseEventExt; import gov.nist.javax.sip.message.SIPResponse; @@ -15,12 +17,17 @@ import org.springframework.stereotype.Component; import javax.sdp.SdpParseException; import javax.sdp.SessionDescription; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; +import javax.sip.SipFactory; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.util.EventObject; +import java.util.Optional; @Slf4j @Component @@ -28,6 +35,7 @@ import java.util.EventObject; public class InviteResponseProcessor implements MessageProcessor { private final SipListener sipListener; private final SipMessageSender sender; + private final SipSubscribe subscribe; @PostConstruct @Override @@ -40,15 +48,22 @@ public class InviteResponseProcessor implements MessageProcessor { try { SIPResponse response = (SIPResponse) requestEvent.getResponse(); int statusCode = response.getStatusCode(); + CallIdHeader callId = response.getCallId(); + String subscribeKey = GenericSubscribe.Helper.getKey(Method.INVITE, callId.getCallId()); + // trying不会回复 if (statusCode == Response.TRYING) { + subscribe.getInviteSubscribe().getPublisher(subscribeKey).submit(response); return; } // 成功响应 // 下发ack if (statusCode == Response.OK) { - ResponseEventExt event = (ResponseEventExt) requestEvent; + Optional.ofNullable(subscribe.getInviteSubscribe().getPublisher(subscribeKey)) + .ifPresentOrElse(publisher-> publisher.submit(response), + ()-> log.warn("对应订阅 {} 已结束",callId.getCallId())); + ResponseEventExt event = (ResponseEventExt) requestEvent; String contentString = new String(response.getRawContent()); Gb28181Sdp gb28181Sdp = SipUtil.parseSDP(contentString); SessionDescription sdp = gb28181Sdp.getBaseSdb(); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/request/SipRequestBuilder.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/request/SipRequestBuilder.java index 2639753..e165fda 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/request/SipRequestBuilder.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/request/SipRequestBuilder.java @@ -176,7 +176,7 @@ public class SipRequestBuilder implements ApplicationContextAware { return request; } - public static Request createByteRequest(DockingDevice device, String channelId, SipTransactionInfo transactionInfo) throws ParseException, InvalidArgumentException, PeerUnavailableException { + public static Request createByeRequest(DockingDevice device, String channelId, SipTransactionInfo transactionInfo) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; // 请求行 SipURI requestLine = getSipURI(channelId, device.getHostAddress()); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java index 1e57695..5e6e839 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java @@ -1,6 +1,6 @@ package cn.skcks.docking.gb28181.core.sip.message.subscribe; -import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; +import gov.nist.javax.sip.message.SIPResponse; import lombok.RequiredArgsConstructor; import java.util.Map; @@ -10,9 +10,9 @@ import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; @RequiredArgsConstructor -public class InviteSubscribe implements GenericSubscribe { +public class InviteSubscribe implements GenericSubscribe { private final Executor executor; - private static final Map> publishers = new ConcurrentHashMap<>(); + private static final Map> publishers = new ConcurrentHashMap<>(); public void close() { Helper.close(publishers); @@ -22,11 +22,11 @@ public class InviteSubscribe implements GenericSubscribe { Helper.addPublisher(executor, publishers, key); } - public SubmissionPublisher getPublisher(String key) { + public SubmissionPublisher getPublisher(String key) { return Helper.getPublisher(publishers, key); } - public void addSubscribe(String key, Flow.Subscriber subscribe) { + public void addSubscribe(String key, Flow.Subscriber subscribe) { Helper.addSubscribe(publishers, key, subscribe); } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java index eb6b81a..9bc0e10 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java @@ -1,8 +1,8 @@ package cn.skcks.docking.gb28181.core.sip.message.subscribe; -import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; +import gov.nist.javax.sip.message.SIPResponse; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; @@ -21,7 +21,7 @@ public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; private GenericSubscribe recordInfoSubscribe; - private GenericSubscribe inviteSubscribe; + private GenericSubscribe inviteSubscribe; @PostConstruct private void init() { diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java index ec7bff0..588c204 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -1,7 +1,9 @@ package cn.skcks.docking.gb28181.service.play; import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.common.json.JsonUtils; import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; @@ -14,6 +16,7 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.service.SipService; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; +import cn.skcks.docking.gb28181.media.dto.rtp.CloseRtpServer; import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp; import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer; import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServerResp; @@ -22,6 +25,7 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.service.ssrc.SsrcService; +import gov.nist.javax.sip.message.SIPResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -34,14 +38,15 @@ import javax.sip.ListeningPoint; import javax.sip.SipProvider; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; +import javax.sip.message.Response; import java.text.MessageFormat; +import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; @Slf4j @Service @RequiredArgsConstructor public class PlayService { - private static final String PREFIX = "RealTimePlay"; private final ZlmMediaConfig zlmMediaConfig; private final DockingDeviceService deviceService; private final ZlmMediaService zlmMediaService; @@ -50,6 +55,10 @@ public class PlayService { private final SipMessageSender sender; private final SipSubscribe subscribe; + private String videoUrl(String streamId){ + return StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv"); + } + /** * 实时视频点播 * @param deviceId 设备id @@ -66,10 +75,9 @@ public class PlayService { } String streamId = MediaSdpHelper.getStreamId(deviceId,channelId); - String key = CacheUtil.getKey(PREFIX, streamId); + String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceId, channelId); if(RedisUtil.KeyOps.hasKey(key)){ - String url = RedisUtil.StringOps.get(key); - result.setResult(JsonResponse.success(url)); + result.setResult(JsonResponse.success(videoUrl(streamId))); return result; } @@ -101,15 +109,83 @@ public class PlayService { SipProvider provider = sipService.getProvider(transport, senderIp); CallIdHeader callId = provider.getNewCallId(); Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); + String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); + subscribe.getInviteSubscribe().addPublisher(subscribeKey); + Flow.Subscriber subscriber = new Flow.Subscriber<>() { + private Flow.Subscription subscription; + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + log.info("订阅 {} {}",MessageProcessor.Method.INVITE,subscribeKey); + subscription.request(1); + } + + @Override + public void onNext(SIPResponse item) { + int statusCode = item.getStatusCode(); + log.debug("{} 收到订阅消息 {}", subscribeKey, item); + if(statusCode == Response.TRYING){ + log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE,subscribeKey); + subscription.request(1); + } else if(statusCode>=Response.OK && statusCode < Response.MULTIPLE_CHOICES){ + log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE,subscribeKey); + RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item))); + RedisUtil.StringOps.set(CacheUtil.getKey(key,"ssrc"), ssrc); + result.setResult(JsonResponse.success(videoUrl(streamId))); + onComplete(); + } else { + log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE,subscribeKey); + RedisUtil.KeyOps.delete(key); + RedisUtil.KeyOps.delete(CacheUtil.getKey(key,"ssrc")); + result.setResult(JsonResponse.error("连接流媒体服务失败")); + ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); + onComplete(); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey); + } + }; + subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); + sender.send(senderIp, request); + result.onTimeout(()->{ + subscribe.getInviteSubscribe().delPublisher(subscribeKey); + result.setResult(JsonResponse.error("点播超时")); + }); + return result; + } + + @SneakyThrows + public JsonResponse realTimeStop(String deviceId, String channelId){ + DockingDevice device = deviceService.getDevice(deviceId); + if (device == null) { + log.info("未能找到 编码为 => {} 的设备", deviceId); + return JsonResponse.error(null, "未找到设备"); + } + + String streamId = MediaSdpHelper.getStreamId(deviceId,channelId); + String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceId, channelId); + String ssrcKey = CacheUtil.getKey(key,"ssrc"); + zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); + SipTransactionInfo transactionInfo = JsonUtils.parse(RedisUtil.StringOps.get(key), SipTransactionInfo.class); + if(transactionInfo == null){ + return JsonResponse.error("未找到连接信息"); + } + Request request = SipRequestBuilder.createByeRequest(device, channelId, transactionInfo); + String senderIp = device.getLocalIp(); sender.send(senderIp, request); - String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, deviceId, streamId); -// subscribe.getInviteSubscribe().addPublisher(subscribeKey); - result.setResult(JsonResponse.success(StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv"))); - return result; -// zlmMediaService.getRtpInfo(); -// GetMediaList getMediaList = new GetMediaList(); -// getMediaList.set -// zlmMediaService.getMediaList() + String ssrc = RedisUtil.StringOps.get(ssrcKey); + ssrcService.releaseSsrc(zlmMediaConfig.getId(),ssrc); + RedisUtil.KeyOps.delete(ssrcKey); + RedisUtil.KeyOps.delete(key); + return JsonResponse.success(null); } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java index e03f16b..d70eb9c 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java @@ -81,7 +81,6 @@ public class RecordService { String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, deviceId, sn); subscribe.getRecordInfoSubscribe().addPublisher(key); - sender.send(senderIp, request); List list = new ArrayList<>(); AtomicLong atomicSum = new AtomicLong(0); AtomicLong atomicNum = new AtomicLong(0); @@ -135,9 +134,8 @@ public class RecordService { log.debug("订阅结束 => {}", key); } }; - subscribe.getRecordInfoSubscribe().addSubscribe(key, subscriber); - + sender.send(senderIp, request); result.onTimeout(() -> { result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT, RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)),