From 6ecbf83cdc4498b77a42fa78ca4317e9efbf9565 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 22 Sep 2023 13:50:04 +0800 Subject: [PATCH 1/4] docking api --- .../wvp/api/docking/DockingController.java | 34 +++++++++++++++++++ .../wvp/api/docking/dto/DockingPageDTO.java | 19 +++++++++++ .../wvp/service/docking/DockingService.java | 30 ++++++++++++++++ 3 files changed, 83 insertions(+) create mode 100644 gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java create mode 100644 gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/dto/DockingPageDTO.java diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java new file mode 100644 index 0000000..f59bc1b --- /dev/null +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java @@ -0,0 +1,34 @@ +package cn.skcks.docking.gb28181.wvp.api.docking; + +import cn.skcks.docking.gb28181.annotation.web.JsonMapping; +import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.common.page.PageWrapper; +import cn.skcks.docking.gb28181.wvp.api.docking.dto.DockingPageDTO; +import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import io.swagger.v3.oas.annotations.Operation; +import lombok.RequiredArgsConstructor; +import org.springdoc.core.annotations.ParameterObject; +import org.springdoc.core.models.GroupedOpenApi; +import org.springframework.context.annotation.Bean; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@JsonMapping("/docking") +@RequiredArgsConstructor +public class DockingController { + private final DockingService dockingService; + @Bean + public GroupedOpenApi dockingApi() { + return SwaggerConfig.api("Docking", "/docking"); + } + + @Operation(summary = "分页查询对接的设备/平台列表") + @GetJson("/page") + private JsonResponse> page(@ParameterObject @Validated DockingPageDTO dto){ + return JsonResponse.success(PageWrapper.of(dockingService.getDockingWithPage(dto.getPage(), dto.getSize()))); + } +} diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/dto/DockingPageDTO.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/dto/DockingPageDTO.java new file mode 100644 index 0000000..42f2d00 --- /dev/null +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/dto/DockingPageDTO.java @@ -0,0 +1,19 @@ +package cn.skcks.docking.gb28181.wvp.api.docking.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +@Data +public class DockingPageDTO { + @Schema(description = "页数", example = "1") + @NotNull(message = "page 不能为空") + @Min(value = 1, message = "page 必须为正整数") + Integer page; + + @Schema(description = "每页条数", example = "10") + @NotNull(message = "size 不能为空") + @Min(value = 1, message = "size 必须为正整数") + Integer size; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java index ea7011f..66a1530 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java @@ -4,6 +4,10 @@ import cn.skcks.docking.gb28181.common.json.JsonException; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingDynamicSqlSupport; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingMapper; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import com.github.pagehelper.ISelect; +import com.github.pagehelper.Page; +import com.github.pagehelper.PageHelper; +import com.github.pagehelper.PageInfo; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -66,4 +70,30 @@ public class DockingService { return wvpProxyDockingMapper.insert(device) > 0; } + + /** + * 分页查询对接设备/平台 + * @param page 页数 + * @param size 数量 + * @return 分页设备/平台 + */ + public PageInfo getDockingWithPage(int page, int size){ + ISelect select = () -> wvpProxyDockingMapper.select(s -> s.orderBy(WvpProxyDockingDynamicSqlSupport.id.descending())); + return getDockingWithPage(page,size, select); + } + + /** + * 分页查询对接设备/平台 + * @param page 页数 + * @param size 数量 + * @param select 查询语句 + * @return 分页设备/平台 + */ + public PageInfo getDockingWithPage(int page, int size, ISelect select){ + PageInfo pageInfo; + try (Page startPage = PageHelper.startPage(page, size)) { + pageInfo = startPage.doSelectPageInfo(select); + } + return pageInfo; + } } From 3e422e384b5dd9df789523d0cb00925e27d3b7e3 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 22 Sep 2023 13:50:18 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=8E=A8=E6=B5=81=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-local.yml | 2 +- .../src/main/resources/application.yml | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml index b5c9926..19a18b5 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml @@ -65,7 +65,7 @@ ffmpeg-support: rtp: # input: -i http://10.10.10.200:5080/live/test.live.flv input: -re -i - output: -vcodec copy -acodec copy -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp + output: -vcodec copy -acodec copy -preset ultrafast -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp debug: download: false input: false diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application.yml b/gb28181-wvp-proxy-starter/src/main/resources/application.yml index 532955f..3d9ac26 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application.yml @@ -23,7 +23,7 @@ spring: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456a - url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai + url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform_dev?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai # profiles: # active: local cloud: @@ -56,7 +56,7 @@ proxy: use-ffmpeg: false gb28181: sip: - id: 44050100002000000004 + id: 44050100002000000005 domain: 4405010000 password: 123456 port: 5063 @@ -73,7 +73,7 @@ ffmpeg-support: ffprobe: /usr/bin/ffmpeg/ffprobe rtp: input: -i - output: -vcodec h264 -acodec aac -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp + output: -vcodec h264 -acodec aac -preset ultrafast -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp debug: download: false input: false From c11f5c9ef081173eaa9ab4c4a82e6f624f863c63 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 22 Sep 2023 15:31:39 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E4=BF=AE=E6=94=B9api=20=E5=AF=B9=E6=8E=A5=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2api?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wvp/api/device/DeviceController.java | 15 +++++-- .../wvp/api/docking/DockingController.java | 2 + .../wvp/service/device/DeviceService.java | 41 +++++++++++++++++-- .../gb28181/wvp/service/wvp/WvpService.java | 5 ++- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/device/DeviceController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/device/DeviceController.java index d239048..fdf8f48 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/device/DeviceController.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/device/DeviceController.java @@ -22,6 +22,7 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import java.util.List; +import java.util.Optional; @Slf4j @Tag(name = "设备信息") @@ -50,9 +51,9 @@ public class DeviceController { @Operation(summary = "根据设备编码(21位) 查询指定设备信息") @GetJson("/info/deviceCode") - public JsonResponse> infoByDeviceCode(@RequestParam String deviceCode) { - List wvpProxyDevice = deviceService.getDeviceByDeviceCode(deviceCode); - return JsonResponse.success(wvpProxyDevice); + public JsonResponse infoByDeviceCode(@RequestParam String deviceCode) { + Optional wvpProxyDevice = deviceService.getDeviceByDeviceCode(deviceCode); + return JsonResponse.success(wvpProxyDevice.orElse(null)); } @Operation(summary = "根据国标id(20位) 查询指定设备信息") @@ -80,9 +81,15 @@ public class DeviceController { @Operation(summary = "根据主键 id 删除指定设备") @JsonMapping(value = "/delete/id",method = {RequestMethod.GET,RequestMethod.DELETE}) - public JsonResponse deleteByGbDeviceId(@RequestParam Long id){ + public JsonResponse deleteById(@RequestParam Long id){ WvpProxyDevice wvpProxyDevice = new WvpProxyDevice(); wvpProxyDevice.setId(id); return JsonResponse.success(deviceService.deleteDevice(wvpProxyDevice)); } + + @Operation(summary = "根据主键id修改设备信息") + @PostJson("/modify") + public JsonResponse updateById(@RequestBody WvpProxyDevice device){ + return JsonResponse.success(deviceService.modifyDevice(device)); + } } diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java index f59bc1b..37ed51c 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/docking/DockingController.java @@ -9,6 +9,7 @@ import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; import org.springdoc.core.annotations.ParameterObject; import org.springdoc.core.models.GroupedOpenApi; @@ -16,6 +17,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.RestController; +@Tag(name = "设备/平台对接") @RestController @JsonMapping("/docking") @RequiredArgsConstructor diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java index 6258397..0be4dfb 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/device/DeviceService.java @@ -17,6 +17,7 @@ import org.springframework.transaction.annotation.Transactional; import java.text.MessageFormat; import java.util.List; +import java.util.Objects; import java.util.Optional; import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; @@ -32,9 +33,8 @@ public class DeviceService { s.where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(id))); } - public List getDeviceByDeviceCode(String deviceCode){ - return deviceMapper.select(s-> - s.where(WvpProxyDeviceDynamicSqlSupport.deviceCode,isEqualTo(deviceCode))); + public Optional getDeviceByDeviceCode(String deviceCode){ + return deviceMapper.selectOne(s->s.where(WvpProxyDeviceDynamicSqlSupport.deviceCode,isEqualTo(deviceCode)).limit(1)); } public List getDeviceByGbDeviceId(String gbDeviceId){ @@ -163,4 +163,39 @@ public class DeviceService { } return pageInfo; } + + @SneakyThrows + public boolean modifyDevice(WvpProxyDevice device){ + if(device == null){ + return false; + } + Long id = device.getId(); + if(id == null){ + throw new JsonException("id 不能为空"); + } + + String deviceCode = device.getDeviceCode(); + if(StringUtils.isNotBlank(deviceCode)){ + WvpProxyDevice existDevice = getDeviceByDeviceCode(deviceCode).orElse(null); + if(existDevice != null && !Objects.equals(existDevice.getId(), device.getId())){ + throw new JsonException(MessageFormat.format("设备编码 {0} 已存在", deviceCode)); + } + } + + String gbDeviceId = device.getGbDeviceId(); + String gbDeviceChannelId = device.getGbDeviceChannelId(); + if(gbDeviceId != null && StringUtils.isBlank(gbDeviceId)){ + throw new JsonException("国标id 不能为空"); + } + if(gbDeviceChannelId != null && StringUtils.isBlank(gbDeviceChannelId)){ + throw new JsonException("通道 不能为空"); + } + if(StringUtils.isNotBlank(gbDeviceId) && StringUtils.isNotBlank(gbDeviceChannelId)){ + WvpProxyDevice existDevice = getDeviceByGbDeviceIdAndChannel(gbDeviceId, gbDeviceChannelId).orElse(null); + if(existDevice != null && !Objects.equals(existDevice.getId(), device.getId())){ + throw new JsonException(MessageFormat.format("国标id {0} ,通道 {1} 已存在", gbDeviceId, gbDeviceChannelId)); + } + } + return deviceMapper.updateByPrimaryKeySelective(device) > 0; + } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java index 6d88af9..43f9338 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java @@ -41,6 +41,7 @@ import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -71,12 +72,12 @@ public class WvpService { @SneakyThrows public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) { - List wvpProxyDeviceList = deviceService.getDeviceByDeviceCode(deviceCode); + Optional wvpProxyDeviceList = deviceService.getDeviceByDeviceCode(deviceCode); if (wvpProxyDeviceList.isEmpty()) { writeErrorToResponse(response, JsonResponse.error("设备不存在")); return; } - WvpProxyDevice wvpProxyDevice = wvpProxyDeviceList.get(0); + WvpProxyDevice wvpProxyDevice = wvpProxyDeviceList.get(); String deviceId = wvpProxyDevice.getGbDeviceId(); String channelId = wvpProxyDevice.getGbDeviceChannelId(); log.info("设备编码 (deviceCode=>{}) 查询到的设备信息 国标id(gbDeviceId => {}), 通道(channelId => {})", deviceCode, deviceId, channelId); From d847e06ec33842d5a34a3a7af072be35273fbf49 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 22 Sep 2023 17:30:06 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=B8=8B=E8=BD=BD=20=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/wvp/config/ZlmRtspConfig.java | 12 +++ .../service/ffmpeg/FfmpegSupportService.java | 2 +- .../gb28181/Gb28181DownloadService.java | 86 +++++++++++++++---- .../wvp/service/video/VideoService.java | 8 +- .../request/request/ByeRequestProcessor.java | 1 + .../src/main/resources/application-local.yml | 3 + .../src/main/resources/application.yml | 9 +- 7 files changed, 95 insertions(+), 26 deletions(-) create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ZlmRtspConfig.java diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ZlmRtspConfig.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ZlmRtspConfig.java new file mode 100644 index 0000000..72f7ad3 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ZlmRtspConfig.java @@ -0,0 +1,12 @@ +package cn.skcks.docking.gb28181.wvp.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +@ConfigurationProperties(prefix = "media.rtsp") +public class ZlmRtspConfig { + private String url; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/ffmpeg/FfmpegSupportService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/ffmpeg/FfmpegSupportService.java index 9a4227b..2327ed3 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/ffmpeg/FfmpegSupportService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/ffmpeg/FfmpegSupportService.java @@ -21,7 +21,7 @@ public class FfmpegSupportService { public Executor downloadToStream(String input, long time, TimeUnit unit, ExecuteStreamHandler streamHandler, ExecuteResultHandler executeResultHandler) { FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); FfmpegConfig.Debug debug = ffmpegConfig.getDebug(); - String inputParam = debug.getInput() ? rtp.getInput() : StringUtils.joinWith(" ", rtp.getInput(), input); + String inputParam = debug.getDownload() ? rtp.getDownload() : StringUtils.joinWith(" ", rtp.getDownload(), input); log.info("视频输入参数 {}", inputParam); String outputParam = debug.getOutput() ? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), "-"); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 1f6914c..30a3427 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -5,6 +5,7 @@ import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.io.IoUtil; +import cn.hutool.core.util.IdUtil; import cn.skcks.docking.gb28181.common.json.JsonException; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; @@ -15,6 +16,7 @@ import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; +import cn.skcks.docking.gb28181.media.dto.rtp.CloseRtpServer; import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp; import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer; import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServerResp; @@ -23,17 +25,20 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.service.ssrc.SsrcService; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig; +import cn.skcks.docking.gb28181.wvp.config.ZlmRtspConfig; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.service.video.VideoService; import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import gov.nist.javax.sdp.MediaDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; +import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import jakarta.servlet.AsyncContext; import jakarta.servlet.http.HttpServletRequest; @@ -56,7 +61,6 @@ import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.time.ZoneId; import java.util.Date; -import java.util.List; import java.util.Optional; import java.util.Vector; import java.util.concurrent.*; @@ -74,8 +78,8 @@ public class Gb28181DownloadService { private final SipSender sender; private final SipSubscribe subscribe; private final VideoService videoService; - private final WvpProxyConfig wvpProxyConfig; + private final ZlmRtspConfig zlmRtspConfig; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); public void header(HttpServletResponse response) { @@ -128,16 +132,8 @@ public class Gb28181DownloadService { asyncContext.start(()->{ HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse(); try{ - if(fileHeader){ - header(response, StringUtils.joinWith("_", - deviceCode, - DateUtil.format(startTime, DatePattern.PURE_DATETIME_FORMAT), - DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT))); - } else { - header(response); - } - download(deviceCode, startTime,endTime).whenComplete((url, e)->{ + writeFileHeader(response,deviceCode,startTime,endTime,fileHeader); if(e != null){ writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage())); } else if(StringUtils.isBlank(url)){ @@ -156,15 +152,26 @@ public class Gb28181DownloadService { }); } + private void writeFileHeader(HttpServletResponse response, String deviceCode, Date startTime, Date endTime, Boolean fileHeader){ + if(fileHeader){ + header(response, StringUtils.joinWith("_", + deviceCode, + DateUtil.format(startTime, DatePattern.PURE_DATETIME_FORMAT), + DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT))); + } else { + header(response); + } + } + @SneakyThrows public CompletableFuture download(String deviceCode, Date startTime, Date endTime) { - List deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); + Optional deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); if (deviceByDeviceCode.isEmpty()) { String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode); log.error("{}",reason); throw new JsonException(reason); } else { - WvpProxyDevice device = deviceByDeviceCode.get(0); + WvpProxyDevice device = deviceByDeviceCode.get(); return download(device.getGbDeviceId(), device.getGbDeviceChannelId(), startTime, endTime); } } @@ -192,7 +199,7 @@ public class Gb28181DownloadService { ZoneId zoneId = ZoneId.of(GB28181Constant.TIME_ZONE); long start = LocalDateTimeUtil.of(startTime.toInstant(), zoneId).atZone(zoneId).toEpochSecond(); long end = LocalDateTimeUtil.of(endTime.toInstant(), zoneId).atZone(zoneId).toEpochSecond(); - String streamId = MediaSdpHelper.getStreamId(gbDeviceId, channel, String.valueOf(start), String.valueOf(end)); + String streamId = MediaSdpHelper.getStreamId(gbDeviceId, channel, String.valueOf(start), String.valueOf(end), IdUtil.getSnowflakeNextIdStr()); int streamMode = proxySipConfig.getTransport().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; String ip = zlmMediaConfig.getIp(); int port = openRtpServer(streamId, streamMode); @@ -223,15 +230,15 @@ public class Gb28181DownloadService { CallIdHeader callId = provider.getNewCallId(); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getInviteSubscribe().addPublisher(subscribeKey); - Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result); + Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); - scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); }; } - public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture result){ - return new Flow.Subscriber<>() { + public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture result, long time, TimeUnit unit){ + ScheduledFuture[] schedule = new ScheduledFuture[1]; + Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @Override @@ -255,11 +262,15 @@ public class Gb28181DownloadService { String fromTag = item.getFromTag(); String toTag = item.getToTag(); String callId = item.getCallId().getCallId(); + String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); + subscribe.getByeSubscribe().addPublisher(key); + subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key,streamId, time, unit)); return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId); })); result.complete(videoUrl(streamId)); } else { log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); + zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); result.complete(""); ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); onComplete(); @@ -274,7 +285,46 @@ public class Gb28181DownloadService { @Override public void onComplete() { subscribe.getInviteSubscribe().delPublisher(subscribeKey); + schedule[0].cancel(true); } }; + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit); + return subscriber; + } + + public Flow.Subscriber byeSubscriber(String key,String streamId, long time, TimeUnit unit){ + ScheduledFuture[] schedule = new ScheduledFuture[1]; + Flow.Subscriber subscriber = new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + log.info("创建订阅 {}", key); + subscription.request(1); + } + + @SneakyThrows + @Override + public void onNext(SIPRequest request) { + String transport = request.getTopmostViaHeader().getTransport(); + String ip = request.getLocalAddress().getHostAddress(); + sender.getProvider(transport,ip) + .sendResponse(SipResponseBuilder.response(request, Response.OK, "OK")); + onComplete(); + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + onComplete(); + } + + @Override + public void onComplete() { + subscribe.getByeSubscribe().delPublisher(key); + schedule[0].cancel(true); + zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); + } + }; + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit); + return subscriber; } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java index 2bb1b4e..d5e9ec7 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import java.io.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -163,14 +164,13 @@ public class VideoService { Executor executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,defaultExecuteResultHandler); log.info("开始录制 {}", url); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - scheduledExecutorService.schedule(() -> { + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { log.info("到达结束时间, 结束录制 {}", url); executor.getWatchdog().destroyProcess(); log.info("结束录制 {}", url); }, time, TimeUnit.SECONDS); defaultExecuteResultHandler.waitFor(); - if(errorStream.size() > 0){ - log.error("{}", errorStream); - } + schedule.cancel(true); + log.info("结束录制 {}", url); } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java index 3fec558..9611563 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java @@ -40,6 +40,7 @@ public class ByeRequestProcessor implements MessageProcessor { SIPRequest request = (SIPRequest) requestEvent.getRequest(); String callId = request.getCallId().getCallId(); String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); + log.info("key {}", key); String ip = request.getLocalAddress().getHostAddress(); String transport = request.getTopmostViaHeader().getTransport(); Optional.ofNullable(subscribe.getByeSubscribe().getPublisher(key)) diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml index 19a18b5..2a31399 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml @@ -40,6 +40,8 @@ media: id: amrWMKmbKqoBjRQ9 # secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333 + rtsp: + url: 'rtmp://10.10.10.200:1935' proxy: wvp: @@ -65,6 +67,7 @@ ffmpeg-support: rtp: # input: -i http://10.10.10.200:5080/live/test.live.flv input: -re -i + download: -i output: -vcodec copy -acodec copy -preset ultrafast -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp debug: download: false diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application.yml b/gb28181-wvp-proxy-starter/src/main/resources/application.yml index 3d9ac26..c09cbaa 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application.yml @@ -39,6 +39,8 @@ spring: media: ip: 192.168.1.241 url: 'http://192.168.1.241:5080' + rtsp: + url: 'rtmp://192.168.1.241:1935' # url: 'http://10.10.10.200:12580/anything/' id: amrWMKmbKqoBjRQ9 secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc @@ -53,7 +55,7 @@ proxy: # 是否使用 wvp 的 api(wvp 的 并发有问题,仅保留用于兼容), 否则使用sip 信令直接操作设备 enable: false # 是否使用 ffmpeg 编/解码, 否则使用内置 javacv - use-ffmpeg: false + use-ffmpeg: true gb28181: sip: id: 44050100002000000005 @@ -72,8 +74,9 @@ ffmpeg-support: ffmpeg: C:\ffmpeg\bin\ffmpeg.exe ffprobe: /usr/bin/ffmpeg/ffprobe rtp: - input: -i - output: -vcodec h264 -acodec aac -preset ultrafast -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp + input: -re -i + download: -i + output: -vcodec libx264 -acodec aac -preset ultrafast -movflags empty_moov+frag_keyframe+default_base_moof -f mp4 # -rtsp_transport tcp debug: download: false input: false