实时视频

This commit is contained in:
shikong 2023-12-14 14:43:27 +08:00
parent ff690aaaef
commit 5ae7fc9972
3 changed files with 233 additions and 15 deletions

View File

@ -4,6 +4,7 @@ import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.wvp.api.video.dto.RealtimeVideoReq;
import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoMp4Req;
import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq;
import cn.skcks.docking.gb28181.wvp.config.Gb28181DeviceVideoApiConfig;
@ -76,4 +77,27 @@ public class VideoController {
log.info("偏移后的时间范围 {} ~ {}", startTime, endTime);
return gb28181DownloadService.videoUrl(req.getDeviceCode(), startTime, endTime);
}
@Operation(summary = "获取实时视频 (返回视频url)")
@GetMapping(value = "/device/realtime")
@ResponseBody
public DeferredResult<JsonResponse<String>> realtime(@ParameterObject RealtimeVideoReq req) {
return gb28181DownloadService.realtimeVideoUrl(req.getDeviceCode());
}
@Operation(summary = "获取实时视频 (返回视频url)")
@GetMapping(value = "/device/realtime/renew")
@ResponseBody
public JsonResponse<Void> renew(@ParameterObject RealtimeVideoReq req) {
gb28181DownloadService.renewRealtimeVideoUrl(req.getDeviceCode());
return JsonResponse.success(null);
}
@Operation(summary = "获取实时视频 (返回视频url)")
@GetMapping(value = "/device/realtime/close")
@ResponseBody
public JsonResponse<Void> close(@ParameterObject RealtimeVideoReq req) {
gb28181DownloadService.autoCloseReadtimeVideo(req.getDeviceCode());
return JsonResponse.success(null);
}
}

View File

@ -0,0 +1,14 @@
package cn.skcks.docking.gb28181.wvp.api.video.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@Data
public class RealtimeVideoReq {
@NotBlank(message = "设备编码 不能为空")
@Schema(description = "设备编码")
private String deviceCode;
}

View File

@ -65,6 +65,8 @@ import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.*;
import static cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDeviceDynamicSqlSupport.deviceCode;
@Slf4j
@Service
@RequiredArgsConstructor
@ -81,6 +83,11 @@ public class Gb28181DownloadService {
private final WvpProxyConfig wvpProxyConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, DeferredResult<JsonResponse<String>>> requestMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ScheduledFuture<?>> realtimeMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, VideoInfo> realtimeVideoInfoMap = new ConcurrentHashMap<>();
@Getter
private final RealTime realtime = new RealTime();
@NoArgsConstructor
@AllArgsConstructor
@ -186,21 +193,7 @@ public class Gb28181DownloadService {
result.setResult(JsonResponse.error("设备(通道)不存在"));
return result;
}
requestMap.computeIfPresent(deviceCode,(key,requestResult)->{
if(!requestResult.hasResult()){
requestResult.setResult(JsonResponse.error("同一设备重复请求, 本次请求结束"));
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
if(StringUtils.isNotBlank(existCallId)){
String deviceIp = docking.getIp();
int devicePort = Integer.parseInt(docking.getPort());
sender.sendRequest((provider,localIp,localPort)->
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
RedisUtil.KeyOps.delete(cacheKey);
}
}
return null;
});
closeExistRequest(deviceCode, device, docking);
requestMap.put(deviceCode, result);
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求
@ -233,6 +226,115 @@ public class Gb28181DownloadService {
return result;
}
@SneakyThrows
public DeferredResult<JsonResponse<String>> realtimeVideoUrl(String deviceCode){
DeferredResult<JsonResponse<String>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60));
result.onTimeout(()->{
result.setResult(JsonResponse.error("请求超时"));
});
WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null);
if(device == null){
result.setResult(JsonResponse.error("设备不存在"));
return result;
}
WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null);
if(docking == null){
result.setResult(JsonResponse.error("设备(通道)不存在"));
return result;
}
closeExistRequest(deviceCode, device, docking);
requestMap.put(deviceCode, result);
// 间隔一定时间(200ms) 给设备足够的时间结束前次请求
scheduledExecutorService.schedule(()->{
realtime.realtime(deviceCode).whenComplete((videoInfo, e)->{
log.info("获取媒体信息 {}", videoInfo);
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
realtimeVideoInfoMap.put(cacheKey, videoInfo);
String existCallId = RedisUtil.StringOps.get(cacheKey);
autoCloseReadtimeVideo(docking,device,videoInfo,cacheKey,existCallId);
String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ?
StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()):
videoInfo.getUrl();
result.setResult(JsonResponse.success(url));
});
}, 200, TimeUnit.MILLISECONDS);
return result;
}
public void renewRealtimeVideoUrl(String deviceCode){
WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null);
if(device == null){
return;
}
WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null);
if(docking == null){
return;
}
closeExistRequest(deviceCode, device, docking);
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
realtimeMap.computeIfPresent(cacheKey,(key, scheduledFuture) -> {
scheduledFuture.cancel(true);
return null;
});
autoCloseReadtimeVideo(deviceCode);
}
public void autoCloseReadtimeVideo(String deviceCode){
WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null);
if(device == null){
return;
}
WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null);
if(docking == null){
return;
}
closeExistRequest(deviceCode, device, docking);
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
realtimeVideoInfoMap.computeIfPresent(cacheKey, (key, videoInfo) -> {
autoCloseReadtimeVideo(docking,device,videoInfo,cacheKey,existCallId);
return null;
});
}
private void closeExistRequest(String deviceCode, WvpProxyDevice device, WvpProxyDocking docking) {
requestMap.computeIfPresent(deviceCode,(key,requestResult)->{
if(!requestResult.hasResult()){
requestResult.setResult(JsonResponse.error("同一设备重复请求, 本次请求结束"));
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
if(StringUtils.isNotBlank(existCallId)){
String deviceIp = docking.getIp();
int devicePort = Integer.parseInt(docking.getPort());
sender.sendRequest((provider,localIp,localPort)->
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
RedisUtil.KeyOps.delete(cacheKey);
}
}
return null;
});
}
public void autoCloseReadtimeVideo(WvpProxyDocking docking,WvpProxyDevice device,Gb28181DownloadService.VideoInfo videoInfo,String cacheKey, String existCallId){
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
log.info("结束实时视频 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId());
String deviceIp = docking.getIp();
int devicePort = Integer.parseInt(docking.getPort());
if (StringUtils.isNotBlank(existCallId)) {
sender.sendRequest((provider, localIp, localPort) ->
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
}
RedisUtil.KeyOps.delete(cacheKey);
zlmMediaService.closeRtpServer(CloseRtpServer.builder()
.streamId(videoInfo.streamId)
.build());
}, 60, TimeUnit.SECONDS);
realtimeMap.put(cacheKey,schedule);
}
@SneakyThrows
public void videoStream(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) {
AsyncContext asyncContext = request.startAsync();
@ -271,6 +373,84 @@ public class Gb28181DownloadService {
}
}
private class RealTime {
@SneakyThrows
public CompletableFuture<VideoInfo> realtime(String deviceCode) {
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
if (deviceByDeviceCode.isEmpty()) {
String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode);
log.error("{}",reason);
throw new JsonException(reason);
} else {
WvpProxyDevice device = deviceByDeviceCode.get();
return realtime(device.getGbDeviceId(), device.getGbDeviceChannelId());
}
}
@SneakyThrows
public CompletableFuture<VideoInfo> realtime(String gbDeviceId, String channel){
CompletableFuture<VideoInfo> result = new CompletableFuture<>();
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
if(deviceByGbDeviceId.isEmpty()){
log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId);
result.complete(null);
return result;
}
Optional<WvpProxyDevice> deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel);
if (deviceByGbDeviceIdAndChannel.isEmpty()) {
log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel);
result.complete(null);
return result;
}
WvpProxyDevice device = deviceByGbDeviceIdAndChannel.get();
WvpProxyDocking docking = deviceByGbDeviceId.get();
String streamId = MediaSdpHelper.getStreamId(gbDeviceId, channel, IdUtil.getSnowflakeNextIdStr());
int isTcp = proxySipConfig.getStreamMode() == StreamMode.UDP ? 0 : 1;
StreamMode streamMode = proxySipConfig.getStreamMode();
String ip = zlmMediaConfig.getIp();
int port = openRtpServer(streamId, isTcp);
if(port <= 0){
log.error("zlm 暂无可用端口");
result.complete(null);
return result;
}
String ssrc = ssrcService.getPlaySsrc();
GB28181Description gb28181Description = MediaSdpHelper.play(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode);
sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result));
return result;
}
@SneakyThrows
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture<VideoInfo> result) {
String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId());
String existCallId = RedisUtil.StringOps.get(cacheKey);
// 限制单个设备/通道 只能 点播 一路
if(StringUtils.isNotBlank(existCallId)){
String deviceIp = docking.getIp();
int devicePort = Integer.parseInt(docking.getPort());
sender.sendRequest((provider,localIp,localPort)->
SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId));
RedisUtil.KeyOps.delete(cacheKey);
log.info("关闭已存在的 点播请求 {} {}", cacheKey, existCallId);
Thread.sleep(500);
}
return (provider, ip, port) -> {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 60, TimeUnit.SECONDS);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
RedisUtil.StringOps.set(cacheKey, callId.getCallId());
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
};
}
}
@SneakyThrows
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime) {
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);