From 67b90c734144eae6a127ddc3556049a4f6b965c7 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Sat, 2 Sep 2023 17:54:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E5=90=88=20closeStream=20=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/service/play/PlayService.java | 183 +++++++++--------- 1 file changed, 87 insertions(+), 96 deletions(-) diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java index b08b009..647c8bf 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -56,8 +56,8 @@ public class PlayService { private final SipMessageSender sender; private final SipSubscribe subscribe; - private String videoUrl(String streamId){ - return StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv"); + private String videoUrl(String streamId) { + return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.flv"); } private DeferredResult> makeResult(String deviceId, String channelId, long timeout, DockingDevice device) { @@ -71,46 +71,73 @@ public class PlayService { return result; } - /** - * 实时视频点播 - * @param deviceId 设备id - * @param channelId 通道id - */ - @SneakyThrows - public DeferredResult> realTimePlay(String deviceId, String channelId, long timeout){ - DockingDevice device = deviceService.getDevice(deviceId); - DeferredResult> result = makeResult(deviceId,channelId,timeout, device); - if(result.hasResult()){ - return result; - } - - String streamId = MediaSdpHelper.getStreamId(deviceId,channelId); - String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceId, channelId); - if (RedisUtil.KeyOps.hasKey(key)) { - result.setResult(JsonResponse.success(videoUrl(streamId))); - return result; - } - + private int openRtpServer(DeferredResult> result, String streamId, int streamMode) { GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo(streamId); - if(rtpInfo.getExist()){ + if (rtpInfo.getExist()) { result.setResult(JsonResponse.error(MessageFormat.format("流 {0} 已存在", streamId))); - return result; + return -1; } - int streamMode = device.getStreamMode() == null || device.getStreamMode().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; OpenRtpServer openRtpServer = new OpenRtpServer(); openRtpServer.setPort(0); openRtpServer.setStreamId(streamId); openRtpServer.setTcpMode(streamMode); OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer); log.info("openRtpServerResp => {}", openRtpServerResp); - if(!openRtpServerResp.getCode().equals(ResponseStatus.Success)){ + if (!openRtpServerResp.getCode().equals(ResponseStatus.Success)) { result.setResult(JsonResponse.error(openRtpServerResp.getCode().getMsg())); + return -1; + } + return openRtpServerResp.getPort(); + } + + @SneakyThrows + private JsonResponse closeStream(String streamId, MediaSdpHelper.Action action, DockingDevice device, String channelId) { + zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); + String key = CacheUtil.getKey(action.getAction(), device.getDeviceId(), channelId); + SipTransactionInfo transactionInfo = JsonUtils.parse(RedisUtil.StringOps.get(key), SipTransactionInfo.class); + if (transactionInfo == null) { + return JsonResponse.error("未找到连接信息"); + } + Request request = SipRequestBuilder.createByeRequest(device, channelId, transactionInfo); + String senderIp = device.getLocalIp(); + sender.send(senderIp, request); + + String ssrc = transactionInfo.getSsrc(); + ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); + RedisUtil.KeyOps.delete(key); + return JsonResponse.success(null); + } + + /** + * 实时视频点播 + * + * @param deviceId 设备id + * @param channelId 通道id + */ + @SneakyThrows + public DeferredResult> realTimePlay(String deviceId, String channelId, long timeout) { + DockingDevice device = deviceService.getDevice(deviceId); + DeferredResult> result = makeResult(deviceId, channelId, timeout, device); + if (result.hasResult()) { return result; } + String streamId = MediaSdpHelper.getStreamId(deviceId, channelId); + String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceId, channelId); + if (RedisUtil.KeyOps.hasKey(key)) { + result.setResult(JsonResponse.success(videoUrl(streamId))); + return result; + } + + int streamMode = device.getStreamMode() == null || device.getStreamMode().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; String ip = zlmMediaConfig.getIp(); - int port = openRtpServerResp.getPort(); + int port = openRtpServer(result, streamId, streamMode); + + if (result.hasResult()) { + return result; + } + String ssrc = ssrcService.getPlaySsrc(); GB28181Description description = MediaSdpHelper.play(deviceId, channelId, Connection.IP4, ip, port, ssrc, StreamMode.of(device.getStreamMode())); @@ -123,10 +150,11 @@ public class PlayService { subscribe.getInviteSubscribe().addPublisher(subscribeKey); Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; + @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; - log.info("订阅 {} {}",MessageProcessor.Method.INVITE,subscribeKey); + log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey); subscription.request(1); } @@ -134,16 +162,16 @@ public class PlayService { public void onNext(SIPResponse item) { int statusCode = item.getStatusCode(); log.debug("{} 收到订阅消息 {}", subscribeKey, item); - if(statusCode == Response.TRYING){ - log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE,subscribeKey); + if (statusCode == Response.TRYING) { + log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey); subscription.request(1); - } else if(statusCode>=Response.OK && statusCode < Response.MULTIPLE_CHOICES){ - log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE,subscribeKey); - RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item,ssrc))); + } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { + log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); + RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); result.setResult(JsonResponse.success(videoUrl(streamId))); onComplete(); } else { - log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE,subscribeKey); + log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); RedisUtil.KeyOps.delete(key); result.setResult(JsonResponse.error("连接流媒体服务失败")); ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); @@ -163,7 +191,7 @@ public class PlayService { }; subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); sender.send(senderIp, request); - result.onTimeout(()->{ + result.onTimeout(() -> { subscribe.getInviteSubscribe().delPublisher(subscribeKey); result.setResult(JsonResponse.error("点播超时")); }); @@ -171,69 +199,44 @@ public class PlayService { } @SneakyThrows - public JsonResponse realTimeStop(String deviceId, String channelId){ + public JsonResponse realTimeStop(String deviceId, String channelId) { DockingDevice device = deviceService.getDevice(deviceId); if (device == null) { log.info("未能找到 编码为 => {} 的设备", deviceId); return JsonResponse.error(null, "未找到设备"); } - String streamId = MediaSdpHelper.getStreamId(deviceId,channelId); - String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), deviceId, channelId); - zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); - SipTransactionInfo transactionInfo = JsonUtils.parse(RedisUtil.StringOps.get(key), SipTransactionInfo.class); - if(transactionInfo == null){ - return JsonResponse.error("未找到连接信息"); - } - Request request = SipRequestBuilder.createByeRequest(device, channelId, transactionInfo); - String senderIp = device.getLocalIp(); - sender.send(senderIp, request); - - String ssrc = transactionInfo.getSsrc(); - ssrcService.releaseSsrc(zlmMediaConfig.getId(),ssrc); - RedisUtil.KeyOps.delete(key); - return JsonResponse.success(null); + String streamId = MediaSdpHelper.getStreamId(deviceId, channelId); + return closeStream(streamId, MediaSdpHelper.Action.PLAY, device, channelId); } @SneakyThrows - public DeferredResult> recordPlay(String deviceId, String channelId, Date startTime, Date endTime, long timeout){ + public DeferredResult> recordPlay(String deviceId, String channelId, Date startTime, Date endTime, long timeout) { DockingDevice device = deviceService.getDevice(deviceId); long start = startTime.toInstant().getEpochSecond(); long end = endTime.toInstant().getEpochSecond(); - String streamId = MediaSdpHelper.getStreamId(deviceId,channelId,String.valueOf(start), String.valueOf(end)); - DeferredResult> result = makeResult(deviceId,channelId,timeout,device); - if(result.hasResult()){ + String streamId = MediaSdpHelper.getStreamId(deviceId, channelId, String.valueOf(start), String.valueOf(end)); + DeferredResult> result = makeResult(deviceId, channelId, timeout, device); + if (result.hasResult()) { return result; } String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY_BACK.getAction(), deviceId, channelId); - if(RedisUtil.KeyOps.hasKey(key)){ + if (RedisUtil.KeyOps.hasKey(key)) { result.setResult(JsonResponse.success(videoUrl(streamId))); return result; } - GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo(streamId); - if(rtpInfo.getExist()){ - result.setResult(JsonResponse.error(MessageFormat.format("流 {0} 已存在", streamId))); - return result; - } - int streamMode = device.getStreamMode() == null || device.getStreamMode().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; - OpenRtpServer openRtpServer = new OpenRtpServer(); - openRtpServer.setPort(0); - openRtpServer.setStreamId(streamId); - openRtpServer.setTcpMode(streamMode); - OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer); - log.info("openRtpServerResp => {}", openRtpServerResp); - if(!openRtpServerResp.getCode().equals(ResponseStatus.Success)){ - result.setResult(JsonResponse.error(openRtpServerResp.getCode().getMsg())); + String ip = zlmMediaConfig.getIp(); + int port = openRtpServer(result, streamId, streamMode); + + if (result.hasResult()) { return result; } - String ip = zlmMediaConfig.getIp(); - int port = openRtpServerResp.getPort(); String ssrc = ssrcService.getPlaySsrc(); - GB28181Description description = MediaSdpHelper.playback(deviceId, channelId, Connection.IP4, ip, port, ssrc, StreamMode.of(device.getStreamMode()),startTime,endTime); + GB28181Description description = MediaSdpHelper.playback(deviceId, channelId, Connection.IP4, ip, port, ssrc, StreamMode.of(device.getStreamMode()), startTime, endTime); String transport = device.getTransport(); String senderIp = device.getLocalIp(); @@ -245,10 +248,11 @@ public class PlayService { subscribe.getInviteSubscribe().addPublisher(subscribeKey); Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; + @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; - log.info("订阅 {} {}",MessageProcessor.Method.INVITE,subscribeKey); + log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey); subscription.request(1); } @@ -256,16 +260,16 @@ public class PlayService { public void onNext(SIPResponse item) { int statusCode = item.getStatusCode(); log.debug("{} 收到订阅消息 {}", subscribeKey, item); - if(statusCode == Response.TRYING){ - log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE,subscribeKey); + if (statusCode == Response.TRYING) { + log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey); subscription.request(1); - } else if(statusCode>=Response.OK && statusCode < Response.MULTIPLE_CHOICES){ - log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE,subscribeKey); + } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { + log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); result.setResult(JsonResponse.success(videoUrl(streamId))); onComplete(); } else { - log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE,subscribeKey); + log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); RedisUtil.KeyOps.delete(key); result.setResult(JsonResponse.error("连接流媒体服务失败")); ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); @@ -285,7 +289,7 @@ public class PlayService { }; subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); sender.send(senderIp, request); - result.onTimeout(()->{ + result.onTimeout(() -> { subscribe.getInviteSubscribe().delPublisher(subscribeKey); result.setResult(JsonResponse.error("点播超时")); }); @@ -293,7 +297,7 @@ public class PlayService { } @SneakyThrows - public JsonResponse recordStop(String deviceId, String channelId, Date startTime, Date endTime){ + public JsonResponse recordStop(String deviceId, String channelId, Date startTime, Date endTime) { DockingDevice device = deviceService.getDevice(deviceId); if (device == null) { log.info("未能找到 编码为 => {} 的设备", deviceId); @@ -302,21 +306,8 @@ public class PlayService { long start = startTime.toInstant().getEpochSecond(); long end = endTime.toInstant().getEpochSecond(); - String streamId = MediaSdpHelper.getStreamId(deviceId,channelId,String.valueOf(start), String.valueOf(end)); - String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY_BACK.getAction(), deviceId, channelId); - zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); - SipTransactionInfo transactionInfo = JsonUtils.parse(RedisUtil.StringOps.get(key), SipTransactionInfo.class); - if(transactionInfo == null){ - return JsonResponse.error("未找到连接信息"); - } - Request request = SipRequestBuilder.createByeRequest(device, channelId, transactionInfo); - String senderIp = device.getLocalIp(); - sender.send(senderIp, request); - - String ssrc = transactionInfo.getSsrc(); - ssrcService.releaseSsrc(zlmMediaConfig.getId(),ssrc); - RedisUtil.KeyOps.delete(key); - return JsonResponse.success(null); + String streamId = MediaSdpHelper.getStreamId(deviceId, channelId, String.valueOf(start), String.valueOf(end)); + return closeStream(streamId, MediaSdpHelper.Action.PLAY_BACK, device, channelId); } }