From 39b65a2aeea9bf89f22e3e5e43cd05b18c2e969a Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Wed, 20 Sep 2023 03:16:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E6=95=B4=E5=90=88sip?= =?UTF-8?q?=E4=BF=A1=E4=BB=A4=20=E5=AE=9E=E7=8E=B0=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E7=BA=A7=E8=81=94=E4=B8=8A=E7=BA=A7(=E6=9C=AA=E5=AE=8C)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wvp/api/gb28181/Gb28181Controller.java | 29 ++++ .../wvp/api/video/RecordController.java | 8 +- .../wvp/api/video/VideoController.java | 5 +- .../dynamic/mapper/WvpProxyDeviceMapper.java | 109 ++++++------ .../WvpProxyDockingDynamicSqlSupport.java | 38 ++++ .../dynamic/mapper/WvpProxyDockingMapper.java | 161 +++++++++++++++++ .../dynamic/model/WvpProxyDocking.java | 62 +++++++ .../operation/WvpProxyOperateTableMapper.java | 2 + .../operation/WvpProxyOperateTableMapper.xml | 9 + gb28181-wvp-proxy-service/pom.xml | 5 + .../gb28181/wvp/config/ProxySipConfig.java | 44 +++++ .../wvp/orm/WvpProxyOrmInitService.java | 1 + .../wvp/service/catalog/CatalogService.java | 89 ++++++++++ .../wvp/service/docking/DockingService.java | 60 +++++++ .../{RecordService.java => VideoService.java} | 3 +- .../gb28181/wvp/service/wvp/WvpService.java | 7 +- .../docking/gb28181/wvp/sip/SipStarter.java | 55 ++++++ .../wvp/sip/listener/SipListenerImpl.java | 104 +++++++++++ .../catalog/dto/CatalogDeviceListDTO.java | 22 +++ .../message/catalog/dto/CatalogItemDTO.java | 151 ++++++++++++++++ .../catalog/dto/CatalogRequestDTO.java | 27 +++ .../catalog/dto/CatalogResponseDTO.java | 31 ++++ .../dto/request/MessageRequestProcessor.java | 94 ++++++++++ .../request/RegisterRequestProcessor.java | 164 ++++++++++++++++++ .../wvp/sip/request/SipRequestBuilder.java | 128 ++++++++++++++ .../wvp/sip/response/SipResponseBuilder.java | 56 ++++++ .../gb28181/wvp/sip/sender/SipSender.java | 83 +++++++++ .../wvp/sip/subscribe/CatalogSubscribe.java | 39 +++++ .../wvp/sip/subscribe/SipSubscribe.java | 35 ++++ .../gb28181/wvp/Gb28181WvpProxyStarter.java | 18 ++ .../src/main/resources/application-local.yml | 10 +- .../src/main/resources/application.yml | 9 + pom.xml | 6 + 33 files changed, 1596 insertions(+), 68 deletions(-) create mode 100644 gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java create mode 100644 gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingDynamicSqlSupport.java create mode 100644 gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingMapper.java create mode 100644 gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/model/WvpProxyDocking.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java rename gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/{RecordService.java => VideoService.java} (98%) create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/SipStarter.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogDeviceListDTO.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogItemDTO.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogRequestDTO.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogResponseDTO.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/response/SipResponseBuilder.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/sender/SipSender.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/CatalogSubscribe.java create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java new file mode 100644 index 0000000..1b7fcc6 --- /dev/null +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java @@ -0,0 +1,29 @@ +package cn.skcks.docking.gb28181.wvp.api.gb28181; + +import cn.skcks.docking.gb28181.annotation.web.JsonMapping; +import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; +import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService; +import lombok.RequiredArgsConstructor; +import org.springdoc.core.models.GroupedOpenApi; +import org.springframework.context.annotation.Bean; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RequiredArgsConstructor +@RestController +@JsonMapping("/gb28181") +public class Gb28181Controller { + private final CatalogService catalogService; + @Bean + public GroupedOpenApi gb28181Api() { + return SwaggerConfig.api("Gb28181Api", "/gb28181"); + } + + @GetJson("/catalog") + public JsonResponse catalog(@RequestParam("gbDeviceId") String id){ + catalogService.getCatalog(id); + return JsonResponse.success(null); + } +} diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java index ddaea82..362e9cd 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java @@ -1,6 +1,6 @@ package cn.skcks.docking.gb28181.wvp.api.video; -import cn.skcks.docking.gb28181.wvp.service.video.RecordService; +import cn.skcks.docking.gb28181.wvp.service.video.VideoService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletRequest; @@ -15,17 +15,17 @@ import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/video/record") public class RecordController { - private final RecordService recordService; + private final VideoService videoService; @Operation(summary = "返回文件下载 http 头信息",description = "禁止多线程下载, 默认文件名为 record.mp4") @RequestMapping(method = {RequestMethod.HEAD,RequestMethod.OPTIONS}) public void record(HttpServletResponse response){ - recordService.header(response); + videoService.header(response); } @Operation(summary = "录制flv视频流, 并下载为flv文件") @GetMapping public void record(HttpServletRequest request, HttpServletResponse response, @RequestParam String url, @RequestParam long time){ - recordService.record(request, response,url,time); + videoService.record(request, response,url,time); } } diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java index 10ce2c8..cb1d19e 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java @@ -3,7 +3,7 @@ package cn.skcks.docking.gb28181.wvp.api.video; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq; import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; -import cn.skcks.docking.gb28181.wvp.service.video.RecordService; +import cn.skcks.docking.gb28181.wvp.service.video.VideoService; import cn.skcks.docking.gb28181.wvp.service.wvp.WvpService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -15,7 +15,6 @@ import org.springdoc.core.annotations.ParameterObject; import org.springdoc.core.models.GroupedOpenApi; import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -28,7 +27,7 @@ import org.springframework.web.bind.annotation.ResponseBody; @RequiredArgsConstructor public class VideoController { private final ZlmMediaConfig config; - private final RecordService recordService; + private final VideoService videoService; private final WvpService wvpService; @Bean diff --git a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDeviceMapper.java b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDeviceMapper.java index 2496087..399f184 100644 --- a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDeviceMapper.java +++ b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDeviceMapper.java @@ -1,5 +1,6 @@ package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper; +import static cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDeviceDynamicSqlSupport.*; import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; @@ -31,7 +32,7 @@ import org.mybatis.dynamic.sql.util.mybatis3.MyBatis3Utils; @Mapper public interface WvpProxyDeviceMapper extends CommonCountMapper, CommonDeleteMapper, CommonInsertMapper, CommonUpdateMapper { @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") - BasicColumn[] selectList = BasicColumn.columnList(WvpProxyDeviceDynamicSqlSupport.id, WvpProxyDeviceDynamicSqlSupport.deviceCode, WvpProxyDeviceDynamicSqlSupport.gbDeviceId, WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId, WvpProxyDeviceDynamicSqlSupport.name, WvpProxyDeviceDynamicSqlSupport.address); + BasicColumn[] selectList = BasicColumn.columnList(id, deviceCode, gbDeviceId, gbDeviceChannelId, name, address); @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") @SelectProvider(type=SqlProviderAdapter.class, method="select") @@ -52,125 +53,125 @@ public interface WvpProxyDeviceMapper extends CommonCountMapper, CommonDeleteMap @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default long count(CountDSLCompleter completer) { - return MyBatis3Utils.countFrom(this::count, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer); + return MyBatis3Utils.countFrom(this::count, wvpProxyDevice, completer); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int delete(DeleteDSLCompleter completer) { - return MyBatis3Utils.deleteFrom(this::delete, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer); + return MyBatis3Utils.deleteFrom(this::delete, wvpProxyDevice, completer); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int deleteByPrimaryKey(Long id_) { return delete(c -> - c.where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(id_)) + c.where(id, isEqualTo(id_)) ); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int insert(WvpProxyDevice row) { - return MyBatis3Utils.insert(this::insert, row, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, c -> - c.map(WvpProxyDeviceDynamicSqlSupport.id).toProperty("id") - .map(WvpProxyDeviceDynamicSqlSupport.deviceCode).toProperty("deviceCode") - .map(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).toProperty("gbDeviceId") - .map(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).toProperty("gbDeviceChannelId") - .map(WvpProxyDeviceDynamicSqlSupport.name).toProperty("name") - .map(WvpProxyDeviceDynamicSqlSupport.address).toProperty("address") + return MyBatis3Utils.insert(this::insert, row, wvpProxyDevice, c -> + c.map(id).toProperty("id") + .map(deviceCode).toProperty("deviceCode") + .map(gbDeviceId).toProperty("gbDeviceId") + .map(gbDeviceChannelId).toProperty("gbDeviceChannelId") + .map(name).toProperty("name") + .map(address).toProperty("address") ); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int insertMultiple(Collection records) { - return MyBatis3Utils.insertMultiple(this::insertMultiple, records, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, c -> - c.map(WvpProxyDeviceDynamicSqlSupport.id).toProperty("id") - .map(WvpProxyDeviceDynamicSqlSupport.deviceCode).toProperty("deviceCode") - .map(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).toProperty("gbDeviceId") - .map(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).toProperty("gbDeviceChannelId") - .map(WvpProxyDeviceDynamicSqlSupport.name).toProperty("name") - .map(WvpProxyDeviceDynamicSqlSupport.address).toProperty("address") + return MyBatis3Utils.insertMultiple(this::insertMultiple, records, wvpProxyDevice, c -> + c.map(id).toProperty("id") + .map(deviceCode).toProperty("deviceCode") + .map(gbDeviceId).toProperty("gbDeviceId") + .map(gbDeviceChannelId).toProperty("gbDeviceChannelId") + .map(name).toProperty("name") + .map(address).toProperty("address") ); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int insertSelective(WvpProxyDevice row) { - return MyBatis3Utils.insert(this::insert, row, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, c -> - c.map(WvpProxyDeviceDynamicSqlSupport.id).toPropertyWhenPresent("id", row::getId) - .map(WvpProxyDeviceDynamicSqlSupport.deviceCode).toPropertyWhenPresent("deviceCode", row::getDeviceCode) - .map(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).toPropertyWhenPresent("gbDeviceId", row::getGbDeviceId) - .map(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).toPropertyWhenPresent("gbDeviceChannelId", row::getGbDeviceChannelId) - .map(WvpProxyDeviceDynamicSqlSupport.name).toPropertyWhenPresent("name", row::getName) - .map(WvpProxyDeviceDynamicSqlSupport.address).toPropertyWhenPresent("address", row::getAddress) + return MyBatis3Utils.insert(this::insert, row, wvpProxyDevice, c -> + c.map(id).toPropertyWhenPresent("id", row::getId) + .map(deviceCode).toPropertyWhenPresent("deviceCode", row::getDeviceCode) + .map(gbDeviceId).toPropertyWhenPresent("gbDeviceId", row::getGbDeviceId) + .map(gbDeviceChannelId).toPropertyWhenPresent("gbDeviceChannelId", row::getGbDeviceChannelId) + .map(name).toPropertyWhenPresent("name", row::getName) + .map(address).toPropertyWhenPresent("address", row::getAddress) ); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default Optional selectOne(SelectDSLCompleter completer) { - return MyBatis3Utils.selectOne(this::selectOne, selectList, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer); + return MyBatis3Utils.selectOne(this::selectOne, selectList, wvpProxyDevice, completer); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default List select(SelectDSLCompleter completer) { - return MyBatis3Utils.selectList(this::selectMany, selectList, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer); + return MyBatis3Utils.selectList(this::selectMany, selectList, wvpProxyDevice, completer); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default List selectDistinct(SelectDSLCompleter completer) { - return MyBatis3Utils.selectDistinct(this::selectMany, selectList, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer); + return MyBatis3Utils.selectDistinct(this::selectMany, selectList, wvpProxyDevice, completer); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default Optional selectByPrimaryKey(Long id_) { return selectOne(c -> - c.where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(id_)) + c.where(id, isEqualTo(id_)) ); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int update(UpdateDSLCompleter completer) { - return MyBatis3Utils.update(this::update, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer); + return MyBatis3Utils.update(this::update, wvpProxyDevice, completer); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") static UpdateDSL updateAllColumns(WvpProxyDevice row, UpdateDSL dsl) { - return dsl.set(WvpProxyDeviceDynamicSqlSupport.id).equalTo(row::getId) - .set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalTo(row::getDeviceCode) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalTo(row::getGbDeviceId) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalTo(row::getGbDeviceChannelId) - .set(WvpProxyDeviceDynamicSqlSupport.name).equalTo(row::getName) - .set(WvpProxyDeviceDynamicSqlSupport.address).equalTo(row::getAddress); + return dsl.set(id).equalTo(row::getId) + .set(deviceCode).equalTo(row::getDeviceCode) + .set(gbDeviceId).equalTo(row::getGbDeviceId) + .set(gbDeviceChannelId).equalTo(row::getGbDeviceChannelId) + .set(name).equalTo(row::getName) + .set(address).equalTo(row::getAddress); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") static UpdateDSL updateSelectiveColumns(WvpProxyDevice row, UpdateDSL dsl) { - return dsl.set(WvpProxyDeviceDynamicSqlSupport.id).equalToWhenPresent(row::getId) - .set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalToWhenPresent(row::getDeviceCode) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalToWhenPresent(row::getGbDeviceId) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId) - .set(WvpProxyDeviceDynamicSqlSupport.name).equalToWhenPresent(row::getName) - .set(WvpProxyDeviceDynamicSqlSupport.address).equalToWhenPresent(row::getAddress); + return dsl.set(id).equalToWhenPresent(row::getId) + .set(deviceCode).equalToWhenPresent(row::getDeviceCode) + .set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId) + .set(gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId) + .set(name).equalToWhenPresent(row::getName) + .set(address).equalToWhenPresent(row::getAddress); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int updateByPrimaryKey(WvpProxyDevice row) { return update(c -> - c.set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalTo(row::getDeviceCode) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalTo(row::getGbDeviceId) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalTo(row::getGbDeviceChannelId) - .set(WvpProxyDeviceDynamicSqlSupport.name).equalTo(row::getName) - .set(WvpProxyDeviceDynamicSqlSupport.address).equalTo(row::getAddress) - .where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(row::getId)) + c.set(deviceCode).equalTo(row::getDeviceCode) + .set(gbDeviceId).equalTo(row::getGbDeviceId) + .set(gbDeviceChannelId).equalTo(row::getGbDeviceChannelId) + .set(name).equalTo(row::getName) + .set(address).equalTo(row::getAddress) + .where(id, isEqualTo(row::getId)) ); } @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device") default int updateByPrimaryKeySelective(WvpProxyDevice row) { return update(c -> - c.set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalToWhenPresent(row::getDeviceCode) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalToWhenPresent(row::getGbDeviceId) - .set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId) - .set(WvpProxyDeviceDynamicSqlSupport.name).equalToWhenPresent(row::getName) - .set(WvpProxyDeviceDynamicSqlSupport.address).equalToWhenPresent(row::getAddress) - .where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(row::getId)) + c.set(deviceCode).equalToWhenPresent(row::getDeviceCode) + .set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId) + .set(gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId) + .set(name).equalToWhenPresent(row::getName) + .set(address).equalToWhenPresent(row::getAddress) + .where(id, isEqualTo(row::getId)) ); } } \ No newline at end of file diff --git a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingDynamicSqlSupport.java b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingDynamicSqlSupport.java new file mode 100644 index 0000000..2d1f979 --- /dev/null +++ b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingDynamicSqlSupport.java @@ -0,0 +1,38 @@ +package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper; + +import jakarta.annotation.Generated; +import java.sql.JDBCType; +import org.mybatis.dynamic.sql.AliasableSqlTable; +import org.mybatis.dynamic.sql.SqlColumn; + +public final class WvpProxyDockingDynamicSqlSupport { + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + public static final WvpProxyDocking wvpProxyDocking = new WvpProxyDocking(); + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id") + public static final SqlColumn id = wvpProxyDocking.id; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id") + public static final SqlColumn gbDeviceId = wvpProxyDocking.gbDeviceId; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip") + public static final SqlColumn ip = wvpProxyDocking.ip; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port") + public static final SqlColumn port = wvpProxyDocking.port; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + public static final class WvpProxyDocking extends AliasableSqlTable { + public final SqlColumn id = column("id", JDBCType.BIGINT); + + public final SqlColumn gbDeviceId = column("gb_device_id", JDBCType.VARCHAR); + + public final SqlColumn ip = column("ip", JDBCType.VARCHAR); + + public final SqlColumn port = column("port", JDBCType.VARCHAR); + + public WvpProxyDocking() { + super("wvp_proxy_docking", WvpProxyDocking::new); + } + } +} \ No newline at end of file diff --git a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingMapper.java b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingMapper.java new file mode 100644 index 0000000..5519514 --- /dev/null +++ b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/mapper/WvpProxyDockingMapper.java @@ -0,0 +1,161 @@ +package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper; + +import static cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingDynamicSqlSupport.*; +import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; + +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import jakarta.annotation.Generated; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Result; +import org.apache.ibatis.annotations.ResultMap; +import org.apache.ibatis.annotations.Results; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.type.JdbcType; +import org.mybatis.dynamic.sql.BasicColumn; +import org.mybatis.dynamic.sql.delete.DeleteDSLCompleter; +import org.mybatis.dynamic.sql.select.CountDSLCompleter; +import org.mybatis.dynamic.sql.select.SelectDSLCompleter; +import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; +import org.mybatis.dynamic.sql.update.UpdateDSL; +import org.mybatis.dynamic.sql.update.UpdateDSLCompleter; +import org.mybatis.dynamic.sql.update.UpdateModel; +import org.mybatis.dynamic.sql.util.SqlProviderAdapter; +import org.mybatis.dynamic.sql.util.mybatis3.CommonCountMapper; +import org.mybatis.dynamic.sql.util.mybatis3.CommonDeleteMapper; +import org.mybatis.dynamic.sql.util.mybatis3.CommonInsertMapper; +import org.mybatis.dynamic.sql.util.mybatis3.CommonUpdateMapper; +import org.mybatis.dynamic.sql.util.mybatis3.MyBatis3Utils; + +@Mapper +public interface WvpProxyDockingMapper extends CommonCountMapper, CommonDeleteMapper, CommonInsertMapper, CommonUpdateMapper { + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + BasicColumn[] selectList = BasicColumn.columnList(id, gbDeviceId, ip, port); + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + @SelectProvider(type=SqlProviderAdapter.class, method="select") + @Results(id="WvpProxyDockingResult", value = { + @Result(column="id", property="id", jdbcType=JdbcType.BIGINT, id=true), + @Result(column="gb_device_id", property="gbDeviceId", jdbcType=JdbcType.VARCHAR), + @Result(column="ip", property="ip", jdbcType=JdbcType.VARCHAR), + @Result(column="port", property="port", jdbcType=JdbcType.VARCHAR) + }) + List selectMany(SelectStatementProvider selectStatement); + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + @SelectProvider(type=SqlProviderAdapter.class, method="select") + @ResultMap("WvpProxyDockingResult") + Optional selectOne(SelectStatementProvider selectStatement); + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default long count(CountDSLCompleter completer) { + return MyBatis3Utils.countFrom(this::count, wvpProxyDocking, completer); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int delete(DeleteDSLCompleter completer) { + return MyBatis3Utils.deleteFrom(this::delete, wvpProxyDocking, completer); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int deleteByPrimaryKey(Long id_) { + return delete(c -> + c.where(id, isEqualTo(id_)) + ); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int insert(WvpProxyDocking row) { + return MyBatis3Utils.insert(this::insert, row, wvpProxyDocking, c -> + c.map(id).toProperty("id") + .map(gbDeviceId).toProperty("gbDeviceId") + .map(ip).toProperty("ip") + .map(port).toProperty("port") + ); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int insertMultiple(Collection records) { + return MyBatis3Utils.insertMultiple(this::insertMultiple, records, wvpProxyDocking, c -> + c.map(id).toProperty("id") + .map(gbDeviceId).toProperty("gbDeviceId") + .map(ip).toProperty("ip") + .map(port).toProperty("port") + ); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int insertSelective(WvpProxyDocking row) { + return MyBatis3Utils.insert(this::insert, row, wvpProxyDocking, c -> + c.map(id).toPropertyWhenPresent("id", row::getId) + .map(gbDeviceId).toPropertyWhenPresent("gbDeviceId", row::getGbDeviceId) + .map(ip).toPropertyWhenPresent("ip", row::getIp) + .map(port).toPropertyWhenPresent("port", row::getPort) + ); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default Optional selectOne(SelectDSLCompleter completer) { + return MyBatis3Utils.selectOne(this::selectOne, selectList, wvpProxyDocking, completer); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default List select(SelectDSLCompleter completer) { + return MyBatis3Utils.selectList(this::selectMany, selectList, wvpProxyDocking, completer); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default List selectDistinct(SelectDSLCompleter completer) { + return MyBatis3Utils.selectDistinct(this::selectMany, selectList, wvpProxyDocking, completer); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default Optional selectByPrimaryKey(Long id_) { + return selectOne(c -> + c.where(id, isEqualTo(id_)) + ); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int update(UpdateDSLCompleter completer) { + return MyBatis3Utils.update(this::update, wvpProxyDocking, completer); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + static UpdateDSL updateAllColumns(WvpProxyDocking row, UpdateDSL dsl) { + return dsl.set(id).equalTo(row::getId) + .set(gbDeviceId).equalTo(row::getGbDeviceId) + .set(ip).equalTo(row::getIp) + .set(port).equalTo(row::getPort); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + static UpdateDSL updateSelectiveColumns(WvpProxyDocking row, UpdateDSL dsl) { + return dsl.set(id).equalToWhenPresent(row::getId) + .set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId) + .set(ip).equalToWhenPresent(row::getIp) + .set(port).equalToWhenPresent(row::getPort); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int updateByPrimaryKey(WvpProxyDocking row) { + return update(c -> + c.set(gbDeviceId).equalTo(row::getGbDeviceId) + .set(ip).equalTo(row::getIp) + .set(port).equalTo(row::getPort) + .where(id, isEqualTo(row::getId)) + ); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking") + default int updateByPrimaryKeySelective(WvpProxyDocking row) { + return update(c -> + c.set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId) + .set(ip).equalToWhenPresent(row::getIp) + .set(port).equalToWhenPresent(row::getPort) + .where(id, isEqualTo(row::getId)) + ); + } +} \ No newline at end of file diff --git a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/model/WvpProxyDocking.java b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/model/WvpProxyDocking.java new file mode 100644 index 0000000..f5898fa --- /dev/null +++ b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/dynamic/model/WvpProxyDocking.java @@ -0,0 +1,62 @@ +package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model; + +import jakarta.annotation.Generated; + +/** + * + * This class was generated by MyBatis Generator. + * This class corresponds to the database table wvp_proxy_docking + */ +public class WvpProxyDocking { + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id") + private Long id; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id") + private String gbDeviceId; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip") + private String ip; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port") + private String port; + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id") + public Long getId() { + return id; + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id") + public void setId(Long id) { + this.id = id; + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id") + public String getGbDeviceId() { + return gbDeviceId; + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id") + public void setGbDeviceId(String gbDeviceId) { + this.gbDeviceId = gbDeviceId == null ? null : gbDeviceId.trim(); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip") + public String getIp() { + return ip; + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip") + public void setIp(String ip) { + this.ip = ip == null ? null : ip.trim(); + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port") + public String getPort() { + return port; + } + + @Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port") + public void setPort(String port) { + this.port = port == null ? null : port.trim(); + } +} \ No newline at end of file diff --git a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.java b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.java index 6154d65..dd2120c 100644 --- a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.java +++ b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.java @@ -6,4 +6,6 @@ import org.apache.ibatis.annotations.Mapper; public interface WvpProxyOperateTableMapper { // int createNewTable(@Param("tableName")String tableName); void createDeviceTable(); + + void createDockingTable(); } diff --git a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.xml b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.xml index edfb28e..487024d 100644 --- a/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.xml +++ b/gb28181-wvp-proxy-orm/src/main/java/cn/skcks/docking/gb28181/wvp/orm/mybatis/operation/WvpProxyOperateTableMapper.xml @@ -21,4 +21,13 @@ DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; + + CREATE TABLE IF NOT EXISTS `wvp_proxy_docking` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `gb_device_id` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL, + `ip` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL, + `port` varchar(5) COLLATE utf8mb4_unicode_ci NOT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + diff --git a/gb28181-wvp-proxy-service/pom.xml b/gb28181-wvp-proxy-service/pom.xml index 2ce2758..3af12fe 100644 --- a/gb28181-wvp-proxy-service/pom.xml +++ b/gb28181-wvp-proxy-service/pom.xml @@ -29,6 +29,11 @@ common + + cn.skcks.docking.gb28181 + gb28181-service + + cn.skcks.docking zlmediakit-service diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java new file mode 100644 index 0000000..27c19e2 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java @@ -0,0 +1,44 @@ +package cn.skcks.docking.gb28181.wvp.config; + +import cn.skcks.docking.gb28181.config.sip.SipConfig; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.sip.ListeningPoint; +import java.util.List; + +@Component +@ConfigurationProperties(prefix = "proxy.gb28181.sip", ignoreInvalidFields = true) +@Order(0) +@Data +public class ProxySipConfig { + private List ip; + + private List showIp; + + private Integer port; + + private String domain; + + private String id; + + private String password; + + private String transport = ListeningPoint.UDP; + + @Bean + public SipConfig sipConfig(){ + SipConfig sipConfig = new SipConfig(); + sipConfig.setIp(ip); + sipConfig.setShowIp(showIp); + sipConfig.setPort(port); + sipConfig.setDomain(domain); + sipConfig.setId(id); + sipConfig.setPassword(password); + return sipConfig; + + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/orm/WvpProxyOrmInitService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/orm/WvpProxyOrmInitService.java index 4c4d00e..73702d8 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/orm/WvpProxyOrmInitService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/orm/WvpProxyOrmInitService.java @@ -24,5 +24,6 @@ public class WvpProxyOrmInitService { public void init(){ log.info("[orm] 自动建表"); mapper.createDeviceTable(); + mapper.createDockingTable(); } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java new file mode 100644 index 0000000..3fc43e1 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java @@ -0,0 +1,89 @@ +package cn.skcks.docking.gb28181.wvp.service.catalog; + +import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; +import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.sip.message.Request; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CatalogService { + private final SipSender sipSender; + private final SipSubscribe sipSubscribe; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final DockingService dockingService; + + public void getCatalog(String deviceId){ + WvpProxyDocking device = dockingService.getDeviceByDeviceCode(deviceId).orElse(null); + if (device == null){ + throw new RuntimeException("设备不存在"); + } + CatalogRequestDTO catalogRequestDTO = new CatalogRequestDTO(); + catalogRequestDTO.setDeviceId(deviceId); + String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000); + catalogRequestDTO.setSn(sn); + String key = GenericSubscribe.Helper.getKey(Request.MESSAGE, deviceId); + sipSubscribe.getCatalogSubscribe().addPublisher(key); + final ScheduledFuture[] schedule = new ScheduledFuture[1]; + Flow.Subscriber subscriber = catalog(key, device, schedule); + // 60秒超时计时器 + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS); + sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,1L, XmlUtils.toXml(catalogRequestDTO), SipUtil.generateViaTag(), + SipUtil.generateFromTag(), provider.getNewCallId())); + } + + private Flow.Subscriber catalog(String key, WvpProxyDocking device, ScheduledFuture[] schedule){ + return new Flow.Subscriber<>() { + Flow.Subscription subscription; + + final AtomicLong getNum = new AtomicLong(0); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + log.info("创建 订阅 {}", key); + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + CatalogResponseDTO responseDTO = XmlUtils.parse(item.getRawContent(), CatalogResponseDTO.class, GB28181Constant.CHARSET); + Long sumNum = responseDTO.getSumNum(); + log.info("{}",responseDTO); + getNum.getAndAdd(responseDTO.getDeviceList().getDeviceList().size()); + if(getNum.get() < sumNum){ + subscription.request(1); + } else{ + onComplete(); + } + } + + @Override + public void onError(Throwable throwable) { + onComplete(); + } + + @Override + public void onComplete() { + sipSubscribe.getCatalogSubscribe().delPublisher(key); + schedule[0].cancel(true); + } + }; + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java new file mode 100644 index 0000000..cc0f063 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/docking/DockingService.java @@ -0,0 +1,60 @@ +package cn.skcks.docking.gb28181.wvp.service.docking; + +import cn.skcks.docking.gb28181.common.json.JsonException; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingDynamicSqlSupport; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingMapper; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import java.text.MessageFormat; +import java.util.Optional; + +import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DockingService { + private final WvpProxyDockingMapper wvpProxyDockingMapper; + + public Optional getDeviceById(Long id){ + return wvpProxyDockingMapper.selectOne(s-> + s.where(WvpProxyDockingDynamicSqlSupport.id, isEqualTo(id))); + } + + public Optional getDeviceByDeviceCode(String deviceCode){ + return wvpProxyDockingMapper.selectOne(s-> + s.where(WvpProxyDockingDynamicSqlSupport.gbDeviceId,isEqualTo(deviceCode))); + } + + public Boolean hasDeviceByDeviceCode(String deviceCode){ + return getDeviceByDeviceCode(deviceCode).orElse(null) != null; + } + + /** + * 添加设备 + * @param device 设备 + * @return 是否成功 + */ + @SneakyThrows + public boolean addDevice(WvpProxyDocking device) { + if(device == null){ + return false; + } + + String deviceCode = device.getGbDeviceId(); + if(StringUtils.isBlank(deviceCode)){ + throw new JsonException("设备编码不能为空"); + } + if(getDeviceByDeviceCode(deviceCode).isPresent()){ + wvpProxyDockingMapper.delete(d->d.where(WvpProxyDockingDynamicSqlSupport.gbDeviceId,isEqualTo(deviceCode))); + } + + return wvpProxyDockingMapper.insert(device) > 0; + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/RecordService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java similarity index 98% rename from gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/RecordService.java rename to gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java index c512a9c..5b444aa 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/RecordService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/VideoService.java @@ -17,7 +17,6 @@ import org.bytedeco.ffmpeg.global.avutil; import org.bytedeco.javacv.FFmpegFrameGrabber; import org.bytedeco.javacv.FFmpegFrameRecorder; import org.bytedeco.javacv.FrameGrabber; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.io.*; @@ -29,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @Service @RequiredArgsConstructor -public class RecordService { +public class VideoService { private final FfmpegSupportService ffmpegSupportService; private final WvpProxyConfig wvpProxyConfig; /** diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java index 7f5790b..25c3406 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java @@ -24,7 +24,7 @@ import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; import cn.skcks.docking.gb28181.wvp.service.download.DownloadService; -import cn.skcks.docking.gb28181.wvp.service.video.RecordService; +import cn.skcks.docking.gb28181.wvp.service.video.VideoService; import cn.skcks.docking.gb28181.wvp.utils.RetryUtil; import com.github.rholder.retry.*; import jakarta.servlet.AsyncContext; @@ -34,7 +34,6 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.http.MediaType; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -53,7 +52,7 @@ public class WvpService { private final WvpProxyConfig wvpProxyConfig; private final DeviceService deviceService; private final DownloadService downloadService; - private final RecordService recordService; + private final VideoService videoService; public void header(HttpServletResponse response) { response.setContentType("video/mp4"); @@ -214,7 +213,7 @@ public class WvpService { String streamUrl = streamContent.getFlv(); try { header(response); - recordService.record(response, streamUrl, DateUtil.between(DateUtil.parseDateTime(startTime), DateUtil.parseDateTime(endTime), DateUnit.SECOND)); + videoService.record(response, streamUrl, DateUtil.between(DateUtil.parseDateTime(startTime), DateUtil.parseDateTime(endTime), DateUnit.SECOND)); } finally { wvpProxyClient.playbackStop(token, deviceId, channelId, stream); } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/SipStarter.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/SipStarter.java new file mode 100644 index 0000000..b9d1395 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/SipStarter.java @@ -0,0 +1,55 @@ +package cn.skcks.docking.gb28181.wvp.sip; + +import cn.skcks.docking.gb28181.common.json.JsonUtils; +import cn.skcks.docking.gb28181.core.sip.service.SipService; +import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.SmartLifecycle; +import org.springframework.context.annotation.DependsOn; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +@Order(0) +@Slf4j +@RequiredArgsConstructor +@Component +@DependsOn("wvpProxyOrmInitService") +public class SipStarter implements SmartLifecycle { + private final ProxySipConfig proxySipConfig; + private final SipService sipService; + + private boolean isRunning; + + @Override + public void start() { + if(checkConfig()){ + isRunning = true; + log.debug("sip 服务 启动"); + sipService.run(); + } + } + + @Override + public void stop() { + log.debug("sip 服务 关闭"); + sipService.stop(); + isRunning = false; + } + + @Override + public boolean isRunning() { + return isRunning; + } + + public boolean checkConfig(){ + log.debug("sip 配置信息 => \n{}", JsonUtils.toJson(proxySipConfig)); + if(CollectionUtils.isEmpty(proxySipConfig.getIp())){ + log.error("sip ip 配置错误, 请检查配置是否正确"); + return false; + } + + return true; + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java new file mode 100644 index 0000000..7891487 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/listener/SipListenerImpl.java @@ -0,0 +1,104 @@ +package cn.skcks.docking.gb28181.wvp.sip.listener; + +import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; +import cn.skcks.docking.gb28181.core.sip.listener.SipListener; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import javax.sip.*; +import javax.sip.header.CSeqHeader; +import javax.sip.header.CallIdHeader; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@RequiredArgsConstructor +@Component +@Slf4j +public class SipListenerImpl implements SipListener { + private final SipSubscribe sipSubscribe; + private final ConcurrentMap requestProcessor = new ConcurrentHashMap<>(); + private final ConcurrentMap responseProcessor = new ConcurrentHashMap<>(); + + public void addRequestProcessor(String method, MessageProcessor messageProcessor) { + log.debug("[SipListener] 注册 {} 请求处理器", method); + requestProcessor.put(method, messageProcessor); + } + + public void addResponseProcessor(String method, MessageProcessor messageProcessor) { + log.debug("[SipListener] 注册 {} 响应处理器", method); + responseProcessor.put(method, messageProcessor); + } + + + @Override + @Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME) + public void processRequest(RequestEvent requestEvent) { + String method = requestEvent.getRequest().getMethod(); + log.debug("传入请求 method => {}", method); + Optional.ofNullable(requestProcessor.get(method)).ifPresent(processor -> { + processor.process(requestEvent); + }); + } + + @Override + @Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME) + public void processResponse(ResponseEvent responseEvent) { + Response response = responseEvent.getResponse(); + int status = response.getStatusCode(); + CSeqHeader cseqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME); + String method = cseqHeader.getMethod(); + log.debug("{} {}", method, response); + + // Success + if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { + log.debug("传入响应 method => {}", method); + Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> { + processor.process(responseEvent); + }); + } else if ((status >= Response.TRYING) && (status < Response.OK)) { + // 增加其它无需回复的响应,如101、180等 + } else { + log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()); + if (responseEvent.getDialog() != null) { + responseEvent.getDialog().delete(); + } + } + } + + @Override + public void processTimeout(TimeoutEvent timeoutEvent) { + ClientTransaction clientTransaction = timeoutEvent.getClientTransaction(); + if (clientTransaction != null) { + Request request = clientTransaction.getRequest(); + if (request != null) { + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + log.debug("会话超时 callId => {}", callIdHeader.getCallId()); + } + } + } + } + + @Override + public void processIOException(IOExceptionEvent exceptionEvent) { + + } + + @Override + public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) { + + } + + @Override + public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { + CallIdHeader callIdHeader = dialogTerminatedEvent.getDialog().getCallId(); + log.debug("会话终止 callId => {}", callIdHeader.getCallId()); + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogDeviceListDTO.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogDeviceListDTO.java new file mode 100644 index 0000000..9a0cbea --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogDeviceListDTO.java @@ -0,0 +1,22 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@JacksonXmlRootElement(localName = "DeviceList") +@AllArgsConstructor +@NoArgsConstructor +@Data +public class CatalogDeviceListDTO { + @JacksonXmlProperty(isAttribute = true) + private Integer num; + @JacksonXmlProperty(localName = "Item") + @JacksonXmlElementWrapper(useWrapping = false) + private List deviceList; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogItemDTO.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogItemDTO.java new file mode 100644 index 0000000..be90905 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogItemDTO.java @@ -0,0 +1,151 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto; + + +import cn.hutool.core.date.DatePattern; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + + +@JacksonXmlRootElement(localName = "Item") +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Data +public class CatalogItemDTO { + /** + * 设备/区域/系统编码(必选) + */ + @JacksonXmlProperty(localName = "DeviceID") + private String deviceId; + + /** + * 设备/区域/系统名称(必选) + */ + private String name; + + /** + * 当为设备时,设备厂商(必选) + */ + private String manufacturer; + + /** + * 当为设备时,设备型号(必选) + */ + private String model; + + /** + * 当为设备时,设备归属(必选) + */ + private String owner; + + /** + * 行政区域(必选) + */ + @JacksonXmlProperty(localName = "CivilCode") + private String civilCode; + + /** + * 警区(可选) + */ + private String block; + + /** + * 当为设备时,安装地址(必选) + */ + private String address; + + /** + * 当为设备时,是否有子设备(必选)1有, 0没有 + */ + @Builder.Default + private Integer parental = 0; + + /** + * 父设备/区域/系统ID(必选) + */ + @JacksonXmlProperty(localName = "ParentID") + private String parentId; + + /** + * 信令安全模式(可选)缺省为0; 0:不采用;2:S/MIME 签名方式;3:S/ MIME加密签名同时采用方式;4:数字摘要方式 + */ + @Builder.Default + private Integer safetyWay = 0; + + /** + * 注册方式(必选)缺省为1;1:符合IETF RFC3261标准的认证注册模 式;2:基于口令的双向认证注册模式;3:基于数字证书的双向认证注册模式 + */ + @Builder.Default + private Integer registerWay = 1; + + /** + * 证书序列号(有证书的设备必选) + */ + private String certNum; + + /** + * 证书有效标识(有证书的设备必选)缺省为0;证书有效标识:0:无效 1: 有效 + */ + @Builder.Default + private Integer certifiable = 0; + + /** + * 无效原因码(有证书且证书无效的设备必选) + */ + @Builder.Default + private Integer errCode = 0; + + /** + * 证书终止有效期(有证书的设备必选) + */ + @JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE) + private Date endTime; + + /** + * 保密属性(必选)缺省为0;0:不涉密,1:涉密 + */ + @Builder.Default + private Integer secrecy = 0; + + /** + * 设备/区域/系统IP地址(可选) + */ + @JacksonXmlProperty(localName = "IPAddress") + private String ipAddress; + + /** + * 设备/区域/系统端口(可选) + */ + private Integer port; + + /** + * 设备口令(可选) + */ + private String password; + + /** + * 设备状态(必选) + */ + @Builder.Default + private String status = "ON"; + + /** + * 经度(可选) + */ + @Builder.Default + private String longitude = "0.0"; + + /** + * 纬度(可选) + */ + @Builder.Default + private String latitude = "0.0"; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogRequestDTO.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogRequestDTO.java new file mode 100644 index 0000000..1a6d579 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogRequestDTO.java @@ -0,0 +1,27 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto; + +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@JacksonXmlRootElement(localName = "Query") +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Data +public class CatalogRequestDTO { + @Builder.Default + private String cmdType = CmdType.CATALOG; + @JacksonXmlProperty(localName = "SN") + private String sn; + + /** + * 目标设备的设备编码(必选) + */ + @JacksonXmlProperty(localName = "DeviceID") + private String deviceId; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogResponseDTO.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogResponseDTO.java new file mode 100644 index 0000000..7971bf7 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/CatalogResponseDTO.java @@ -0,0 +1,31 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto; + +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@JacksonXmlRootElement(localName = "Response") +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Data +public class CatalogResponseDTO { + @Builder.Default + private String cmdType = CmdType.CATALOG; + @JacksonXmlProperty(localName = "SN") + private String sn; + + /** + * 目标设备的设备编码(必选) + */ + @JacksonXmlProperty(localName = "DeviceID") + private String deviceId; + + private Long sumNum; + + private CatalogDeviceListDTO deviceList; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java new file mode 100644 index 0000000..afa9893 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java @@ -0,0 +1,94 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.request; + +import cn.skcks.docking.gb28181.common.json.ResponseStatus; +import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import cn.skcks.docking.gb28181.core.sip.listener.SipListener; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; +import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; +import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.sip.RequestEvent; +import javax.sip.header.CallIdHeader; +import javax.sip.message.Response; +import java.util.EventObject; +import java.util.Optional; + +@Slf4j +@RequiredArgsConstructor +@Component +public class MessageRequestProcessor implements MessageProcessor { + private final SipListener sipListener; + private final SipMessageSender sender; + private final SipSubscribe subscribe; + private final DockingService dockingService; + + @Override + public void init() { + + } + + @Override + public void process(EventObject eventObject) { + RequestEvent requestEvent = (RequestEvent) eventObject; + SIPRequest request = (SIPRequest)requestEvent.getRequest(); + String deviceId = SipUtil.getUserIdFromFromHeader(request); + CallIdHeader callIdHeader = request.getCallIdHeader(); + + byte[] content = request.getRawContent(); + MessageDTO messageDto = XmlUtils.parse(content, MessageDTO.class, GB28181Constant.CHARSET); + log.debug("接收到的消息 => {}", messageDto); + + String senderIp = request.getLocalAddress().getHostAddress(); + + if(dockingService.hasDeviceByDeviceCode(deviceId)){ + log.info("未找到相关设备信息 => {}", deviceId); + Response response = response(request,Response.NOT_FOUND,"设备未注册"); + sender.send(senderIp,response); + return; + } + + Response ok = response(request, Response.OK, "OK"); + Response response; + if(messageDto.getCmdType().equalsIgnoreCase(CmdType.KEEPALIVE)){ + response = ok; + // 更新设备在线状态 + } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ + response = ok; + RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); + String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); + Optional.ofNullable(subscribe.getCatalogSubscribe().getPublisher(key)) + .ifPresentOrElse(publisher-> publisher.submit(request), + ()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto)); + } else { + response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage()); + } + sender.send(senderIp, response); + } + + @SneakyThrows + public Response response(SIPRequest request, int status, String message){ + if (request.getToHeader().getTag() == null) { + request.getToHeader().setTag(SipUtil.generateTag()); + } + SIPResponse response = (SIPResponse)getMessageFactory().createResponse(status, request); + if (message != null) { + response.setReasonPhrase(message); + } + return response; + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java new file mode 100644 index 0000000..4500d1a --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/register/request/RegisterRequestProcessor.java @@ -0,0 +1,164 @@ +package cn.skcks.docking.gb28181.wvp.sip.message.register.request; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.config.sip.SipConfig; +import cn.skcks.docking.gb28181.core.sip.dto.RemoteInfo; +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import cn.skcks.docking.gb28181.core.sip.gb28181.sip.GbSipDate; +import cn.skcks.docking.gb28181.core.sip.listener.SipListener; +import cn.skcks.docking.gb28181.core.sip.message.auth.DigestServerAuthenticationHelper; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; +import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingMapper; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; +import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import gov.nist.javax.sip.address.SipUri; +import gov.nist.javax.sip.header.Authorization; +import gov.nist.javax.sip.header.SIPDateHeader; +import gov.nist.javax.sip.message.SIPRequest; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import javax.sip.ListeningPoint; +import javax.sip.RequestEvent; +import javax.sip.SipProvider; +import javax.sip.address.Address; +import javax.sip.header.ExpiresHeader; +import javax.sip.header.FromHeader; +import javax.sip.header.ViaHeader; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +@RequiredArgsConstructor +@Component +public class RegisterRequestProcessor implements MessageProcessor { + private final SipListener sipListener; + private final SipMessageSender sender; + + private final SipConfig sipConfig; + private final DockingService dockingService; + + + @PostConstruct + @Override + public void init(){ + sipListener.addRequestProcessor(Method.REGISTER,this); + } + + @SneakyThrows + @Override + public void process(EventObject eventObject) { + RequestEvent requestEvent = (RequestEvent) eventObject; + SIPRequest request = (SIPRequest)requestEvent.getRequest(); + FromHeader fromHeader = request.getFrom(); + Address address = fromHeader.getAddress(); + log.debug("From {}",address); + + SipUri uri = (SipUri)address.getURI(); + String deviceId = uri.getUser(); + log.debug("请求注册 设备id => {}", deviceId); + + + Boolean hasDevice = dockingService.hasDeviceByDeviceCode(deviceId); + String senderIp = request.getLocalAddress().getHostAddress(); + RemoteInfo remoteInfo = SipUtil.getRemoteInfoFromRequest(request, false); + log.debug("远程连接信息 => {}", remoteInfo); + if(!hasDevice){ + log.info("新注册的设备 deviceId => {}", deviceId); + } + + String password = sipConfig.getPassword(); + Authorization authorization = request.getAuthorization(); + if(authorization == null && StringUtils.isNotBlank(password)){ + Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); + DigestServerAuthenticationHelper.generateChallenge(getHeaderFactory(),response,sipConfig.getDomain()); + sender.send(senderIp,response); + return; + } + + log.debug("认证信息 => {}", authorization); + boolean authPass = StringUtils.isBlank(password) || + DigestServerAuthenticationHelper.doAuthenticatePlainTextPassword(request,password); + if(!authPass){ + Response response = getMessageFactory().createResponse(Response.FORBIDDEN, request); + response.setReasonPhrase("认证失败"); + log.info("设备注册信息认证失败 deviceId => {}", deviceId); + sender.send(senderIp,response); + return; + } + + + log.debug("设备 deviceId => {}, 认证通过", deviceId); + registerDevice(deviceId, request, senderIp, remoteInfo); + } + + @SneakyThrows + private Response generateRegisterResponse(Request request){ + SIPRequest sipRequest = (SIPRequest) request; + ExpiresHeader expires = sipRequest.getExpires(); + if(expires == null){ + return getMessageFactory().createResponse(Response.BAD_REQUEST, request); + } + + Response response = getMessageFactory().createResponse(Response.OK, request); + // 添加date头 + SIPDateHeader dateHeader = new SIPDateHeader(); + // GB28181 日期 + GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); + dateHeader.setDate(gbSipDate); + + response.addHeader(dateHeader); + response.addHeader(sipRequest.getContactHeader()); + response.addHeader(expires); + + return response; + } + + @SneakyThrows + private void registerDevice(String deviceId, SIPRequest request, String senderIp, RemoteInfo remoteInfo) { + WvpProxyDocking device = new WvpProxyDocking(); + device.setGbDeviceId(deviceId); + device.setIp(remoteInfo.getIp()); + device.setPort(String.valueOf(remoteInfo.getPort())); + dockingService.addDevice(device); + + Response response = generateRegisterResponse(request); + sender.send(senderIp, response); + + + ViaHeader viaHeader = request.getTopmostViaHeader(); + String transport = viaHeader.getTransport(); + + int expires = request.getExpires().getExpires(); + + // expires == 0 时 注销 + if (expires == 0) { + log.info("设备注销 deviceId => {}", deviceId); + } + } + + +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java new file mode 100644 index 0000000..2e27807 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java @@ -0,0 +1,128 @@ +package cn.skcks.docking.gb28181.wvp.sip.request; + +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; +import gov.nist.javax.sip.message.MessageFactoryImpl; +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.annotation.DependsOn; +import org.springframework.stereotype.Component; +import org.springframework.util.DigestUtils; + +import javax.sip.SipFactory; +import javax.sip.address.Address; +import javax.sip.address.SipURI; +import javax.sip.header.*; +import javax.sip.message.Request; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +@DependsOn("proxySipConfig") +@Component +public class SipRequestBuilder implements ApplicationContextAware { + private static ProxySipConfig sipConfig; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + sipConfig = applicationContext.getBean(ProxySipConfig.class); + } + + private static SipFactory getSipFactory(){ + return SipFactory.getInstance(); + } + + @SneakyThrows + private static List getViaHeaders(String ip,int port, String transport, String viaTag){ + ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, transport, viaTag); + viaHeader.setRPort(); + return Collections.singletonList(viaHeader); + } + + @SneakyThrows + private static CSeqHeader getCSeqHeader(long cSeq, String method){ + return getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, method); + } + + @SneakyThrows + public static Request createMessageRequest(WvpProxyDocking device, String ip, int port, long cSeq, String content, String fromTag, CallIdHeader callIdHeader) { + Request request; + String target = StringUtils.joinWith(":", device.getIp(), device.getPort()); + // sip uri + SipURI requestURI = MessageHelper.createSipURI(device.getGbDeviceId(), target); + + // via + List viaHeaders = getViaHeaders(ip, port, sipConfig.getTransport(), null ); + + String from = StringUtils.joinWith(":", ip, port); + // from + SipURI fromSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), from); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, null); + + // Forwards + MaxForwardsHeader maxForwards = MessageHelper.createMaxForwardsHeader(70); + // ceq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, Request.MESSAGE); + + // 使用 GB28181 默认编码 否则中文将会乱码 + MessageFactoryImpl messageFactory = (MessageFactoryImpl) getSipFactory().createMessageFactory(); + messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET); + request = messageFactory.createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, + toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtil.createUserAgentHeader()); + + ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); + request.setContent(content, contentTypeHeader); + return request; + } + + @SneakyThrows + public static Request createMessageRequest(WvpProxyDocking device, String ip, int port,long cSeq,String content, String viaTag, String fromTag, CallIdHeader callIdHeader) { + Request request; + String target = StringUtils.joinWith(":", device.getIp(), device.getPort()); + // sip uri + SipURI requestURI = MessageHelper.createSipURI(device.getGbDeviceId(), target); + + // via + List viaHeaders = getViaHeaders(ip, port, sipConfig.getTransport(), viaTag ); + + String from = StringUtils.joinWith(":", ip, port); + // from + SipURI fromSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), from); + Address fromAddress = MessageHelper.createAddress(fromSipURI); + FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = MessageHelper.createSipURI(sipConfig.getId(), target); + Address toAddress = MessageHelper.createAddress(toSipURI); + ToHeader toHeader = MessageHelper.createToHeader(toAddress, null); + + // Forwards + MaxForwardsHeader maxForwards = MessageHelper.createMaxForwardsHeader(70); + // ceq + CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, Request.MESSAGE); + + // 使用 GB28181 默认编码 否则中文将会乱码 + MessageFactoryImpl messageFactory = (MessageFactoryImpl) getSipFactory().createMessageFactory(); + messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET); + request = messageFactory.createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, + toHeader, viaHeaders, maxForwards); + + request.addHeader(SipUtil.createUserAgentHeader()); + + ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); + request.setContent(content, contentTypeHeader); + return request; + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/response/SipResponseBuilder.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/response/SipResponseBuilder.java new file mode 100644 index 0000000..62f01c2 --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/response/SipResponseBuilder.java @@ -0,0 +1,56 @@ +package cn.skcks.docking.gb28181.wvp.sip.response; + +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; +import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; +import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import gov.nist.javax.sip.message.MessageFactoryImpl; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import javax.sip.SipFactory; +import javax.sip.address.Address; +import javax.sip.address.SipURI; +import javax.sip.header.ContentTypeHeader; +import javax.sip.header.MaxForwardsHeader; +import javax.sip.message.Response; + +@Slf4j +public class SipResponseBuilder { + @SneakyThrows + public static Response response(SIPRequest request, int status, String message){ + if (request.getToHeader().getTag() == null) { + request.getToHeader().setTag(SipUtil.generateTag()); + } + + MessageFactoryImpl messageFactory = (MessageFactoryImpl)MessageHelper.getSipFactory().createMessageFactory(); + // 使用 GB28181 默认编码 否则中文将会乱码 + messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET); + SIPResponse response = (SIPResponse)messageFactory.createResponse(status, request); + if (message != null) { + response.setReasonPhrase(message); + } + return response; + } + + @SneakyThrows + public static Response responseSdp(SIPRequest request, GB28181Description sdp) { + MessageFactoryImpl messageFactory = (MessageFactoryImpl)MessageHelper.getSipFactory().createMessageFactory(); + // 使用 GB28181 默认编码 否则中文将会乱码 + messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET); + SIPResponse response = (SIPResponse)messageFactory.createResponse(Response.OK, request); + SipFactory sipFactory = SipFactory.getInstance(); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("application", "sdp"); + response.setContent(sdp.toString(), contentTypeHeader); + SipURI sipURI = (SipURI) request.getRequestURI(); + SipURI uri = MessageHelper.createSipURI(sipURI.getUser(), StringUtils.joinWith(":", sipURI.getHost() + ":" + sipURI.getPort())); + Address concatAddress = sipFactory.createAddressFactory().createAddress(uri); + MaxForwardsHeader maxForwardsHeader = MessageHelper.createMaxForwardsHeader(70); + response.setMaxForwards(maxForwardsHeader); + response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + return response; + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/sender/SipSender.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/sender/SipSender.java new file mode 100644 index 0000000..71e4cbf --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/sender/SipSender.java @@ -0,0 +1,83 @@ +package cn.skcks.docking.gb28181.wvp.sip.sender; + +import cn.skcks.docking.gb28181.core.sip.service.SipService; +import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import javax.sip.ListeningPoint; +import javax.sip.SipException; +import javax.sip.SipProvider; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.util.List; +import java.util.Objects; + +@Slf4j +@RequiredArgsConstructor(onConstructor_ = {@Lazy}) +@Component +public class SipSender { + private final SipService sipService; + private final ProxySipConfig sipConfig; + + public SipProvider getProvider(String transport, String ip) { + return sipService.getProvider(transport, ip); + } + + public List getProviders() { + return sipConfig.getIp().stream().map(item -> getProvider(sipConfig.getTransport(), item)) + .filter(Objects::nonNull) + .toList(); + } + + public void sendResponse(SipProvider sipProvider, SendResponse response) { + log.info("{}", sipProvider); + ListeningPoint[] listeningPoints = sipProvider.getListeningPoints(); + if (listeningPoints == null || listeningPoints.length == 0) { + log.error("发送响应失败, 未找到有效的监听地址"); + return; + } + ListeningPoint listeningPoint = listeningPoints[0]; + String ip = listeningPoint.getIPAddress(); + int port = listeningPoint.getPort(); + try { + sipProvider.sendResponse(response.build(sipProvider, ip, port)); + } catch (SipException e) { + log.error("向{} {}:{} 发送响应失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage()); + } + } + + public void sendResponse(String senderIp,String transport, SendResponse response) { + SipProvider sipProvider = getProvider(transport, senderIp); + sendResponse(sipProvider, response); + } + + public void sendRequest(SendRequest request) { + getProviders().parallelStream().forEach(sipProvider -> { + log.info("{}", sipProvider); + ListeningPoint[] listeningPoints = sipProvider.getListeningPoints(); + if (listeningPoints == null || listeningPoints.length == 0) { + log.error("发送请求失败, 未找到有效的监听地址"); + return; + } + ListeningPoint listeningPoint = listeningPoints[0]; + String ip = listeningPoint.getIPAddress(); + int port = listeningPoint.getPort(); + try { + sipProvider.sendRequest(request.build(sipProvider, ip, port)); + } catch (SipException e) { + log.error("向{} {}:{} 发送请求失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage()); + } + }); + } + + public interface SendRequest { + Request build(SipProvider provider, String ip, int port); + } + + public interface SendResponse { + Response build(SipProvider provider, String ip, int port); + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/CatalogSubscribe.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/CatalogSubscribe.java new file mode 100644 index 0000000..4f6a1eb --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/CatalogSubscribe.java @@ -0,0 +1,39 @@ +package cn.skcks.docking.gb28181.wvp.sip.subscribe; + +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +@RequiredArgsConstructor +public class CatalogSubscribe implements GenericSubscribe { + private final Executor executor; + + private static final Map> publishers = new ConcurrentHashMap<>(); + + public void close() { + Helper.close(publishers); + } + + public void addPublisher(String key) { + Helper.addPublisher(executor, publishers, key); + } + + public SubmissionPublisher getPublisher(String key) { + return Helper.getPublisher(publishers, key); + } + + public void addSubscribe(String key, Flow.Subscriber subscribe) { + Helper.addSubscribe(publishers, key, subscribe); + } + + @Override + public void delPublisher(String key) { + Helper.delPublisher(publishers, key); + } +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java new file mode 100644 index 0000000..4b25c1c --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java @@ -0,0 +1,35 @@ +package cn.skcks.docking.gb28181.wvp.sip.subscribe; + +import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + +import java.util.concurrent.Executor; + +@Slf4j +@Data +@RequiredArgsConstructor +@Service +public class SipSubscribe { + @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) + private final Executor executor; + private GenericSubscribe catalogSubscribe; + + @PostConstruct + private void init() { + catalogSubscribe = new CatalogSubscribe(executor); + } + + @PreDestroy + private void destroy() { + catalogSubscribe.close(); + } +} diff --git a/gb28181-wvp-proxy-starter/src/main/java/cn/skcks/docking/gb28181/wvp/Gb28181WvpProxyStarter.java b/gb28181-wvp-proxy-starter/src/main/java/cn/skcks/docking/gb28181/wvp/Gb28181WvpProxyStarter.java index 3cace52..c216350 100644 --- a/gb28181-wvp-proxy-starter/src/main/java/cn/skcks/docking/gb28181/wvp/Gb28181WvpProxyStarter.java +++ b/gb28181-wvp-proxy-starter/src/main/java/cn/skcks/docking/gb28181/wvp/Gb28181WvpProxyStarter.java @@ -4,6 +4,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; import org.springframework.scheduling.annotation.EnableAsync; @EnableFeignClients(basePackages = { @@ -16,6 +17,23 @@ import org.springframework.scheduling.annotation.EnableAsync; "cn.skcks.docking.gb28181.common", "cn.skcks.docking.gb28181.wvp", "cn.skcks.docking.gb28181.media", + "cn.skcks.docking.gb28181" +}, includeFilters = { + @ComponentScan.Filter(type = FilterType.REGEX, pattern = { + "cn.skcks.docking.gb28181.service.ssrc.*", + }) +}, +excludeFilters = { + @ComponentScan.Filter(type = FilterType.REGEX, pattern = { + "cn.skcks.docking.gb28181.starter.*", + "cn.skcks.docking.gb28181.config.sip.SipConfig", + "cn.skcks.docking.gb28181.core.sip.listener.*", + "cn.skcks.docking.gb28181.core.sip.message.processor.*", + "cn.skcks.docking.gb28181.core.sip.message.subscribe.*", + "cn.skcks.docking.gb28181.service.play.*", + "cn.skcks.docking.gb28181.service.record.*", + "cn.skcks.docking.gb28181.core.sip.message.request.*" + }) }) @EnableAsync public class Gb28181WvpProxyStarter { diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml index afa629a..f16da3e 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml @@ -48,7 +48,15 @@ proxy: passwd: admi use-ffmpeg: false enable: true - + gb28181: + sip: + id: 44050100002000000003 + domain: 4405010000 + password: 123456 + port: 5063 + ip: + - 10.10.10.20 +# - 192.168.1.241 ffmpeg-support: task: max: 4 diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application.yml b/gb28181-wvp-proxy-starter/src/main/resources/application.yml index d0dca2f..7d3bdad 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application.yml @@ -52,6 +52,15 @@ proxy: use-wvp-assist: false enable: true use-ffmpeg: false + gb28181: + sip: + id: 44050100002000000003 + domain: 4405010000 + password: 123456 + port: 5063 + ip: + - 192.168.3.10 +# - 192.168.1.241 ffmpeg-support: task: diff --git a/pom.xml b/pom.xml index e8c96ad..f59f210 100644 --- a/pom.xml +++ b/pom.xml @@ -126,6 +126,12 @@ ${gb28181.docking.version} + + cn.skcks.docking.gb28181 + gb28181-service + ${gb28181.docking.version} + + org.mapstruct