尝试整合sip信令 实现国标级联上级(未完)

This commit is contained in:
shikong 2023-09-20 03:16:28 +08:00
parent 7c6f0a612c
commit 39b65a2aee
33 changed files with 1596 additions and 68 deletions

View File

@ -0,0 +1,29 @@
package cn.skcks.docking.gb28181.wvp.api.gb28181;
import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService;
import lombok.RequiredArgsConstructor;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor
@RestController
@JsonMapping("/gb28181")
public class Gb28181Controller {
private final CatalogService catalogService;
@Bean
public GroupedOpenApi gb28181Api() {
return SwaggerConfig.api("Gb28181Api", "/gb28181");
}
@GetJson("/catalog")
public JsonResponse<Void> catalog(@RequestParam("gbDeviceId") String id){
catalogService.getCatalog(id);
return JsonResponse.success(null);
}
}

View File

@ -1,6 +1,6 @@
package cn.skcks.docking.gb28181.wvp.api.video;
import cn.skcks.docking.gb28181.wvp.service.video.RecordService;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
@ -15,17 +15,17 @@ import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/video/record")
public class RecordController {
private final RecordService recordService;
private final VideoService videoService;
@Operation(summary = "返回文件下载 http 头信息",description = "禁止多线程下载, 默认文件名为 record.mp4")
@RequestMapping(method = {RequestMethod.HEAD,RequestMethod.OPTIONS})
public void record(HttpServletResponse response){
recordService.header(response);
videoService.header(response);
}
@Operation(summary = "录制flv视频流, 并下载为flv文件")
@GetMapping
public void record(HttpServletRequest request, HttpServletResponse response, @RequestParam String url, @RequestParam long time){
recordService.record(request, response,url,time);
videoService.record(request, response,url,time);
}
}

View File

@ -3,7 +3,7 @@ package cn.skcks.docking.gb28181.wvp.api.video;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.service.video.RecordService;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import cn.skcks.docking.gb28181.wvp.service.wvp.WvpService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -15,7 +15,6 @@ import org.springdoc.core.annotations.ParameterObject;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@ -28,7 +27,7 @@ import org.springframework.web.bind.annotation.ResponseBody;
@RequiredArgsConstructor
public class VideoController {
private final ZlmMediaConfig config;
private final RecordService recordService;
private final VideoService videoService;
private final WvpService wvpService;
@Bean

View File

@ -1,5 +1,6 @@
package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper;
import static cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDeviceDynamicSqlSupport.*;
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
@ -31,7 +32,7 @@ import org.mybatis.dynamic.sql.util.mybatis3.MyBatis3Utils;
@Mapper
public interface WvpProxyDeviceMapper extends CommonCountMapper, CommonDeleteMapper, CommonInsertMapper<WvpProxyDevice>, CommonUpdateMapper {
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
BasicColumn[] selectList = BasicColumn.columnList(WvpProxyDeviceDynamicSqlSupport.id, WvpProxyDeviceDynamicSqlSupport.deviceCode, WvpProxyDeviceDynamicSqlSupport.gbDeviceId, WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId, WvpProxyDeviceDynamicSqlSupport.name, WvpProxyDeviceDynamicSqlSupport.address);
BasicColumn[] selectList = BasicColumn.columnList(id, deviceCode, gbDeviceId, gbDeviceChannelId, name, address);
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
@SelectProvider(type=SqlProviderAdapter.class, method="select")
@ -52,125 +53,125 @@ public interface WvpProxyDeviceMapper extends CommonCountMapper, CommonDeleteMap
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default long count(CountDSLCompleter completer) {
return MyBatis3Utils.countFrom(this::count, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer);
return MyBatis3Utils.countFrom(this::count, wvpProxyDevice, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int delete(DeleteDSLCompleter completer) {
return MyBatis3Utils.deleteFrom(this::delete, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer);
return MyBatis3Utils.deleteFrom(this::delete, wvpProxyDevice, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int deleteByPrimaryKey(Long id_) {
return delete(c ->
c.where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(id_))
c.where(id, isEqualTo(id_))
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int insert(WvpProxyDevice row) {
return MyBatis3Utils.insert(this::insert, row, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, c ->
c.map(WvpProxyDeviceDynamicSqlSupport.id).toProperty("id")
.map(WvpProxyDeviceDynamicSqlSupport.deviceCode).toProperty("deviceCode")
.map(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).toProperty("gbDeviceId")
.map(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).toProperty("gbDeviceChannelId")
.map(WvpProxyDeviceDynamicSqlSupport.name).toProperty("name")
.map(WvpProxyDeviceDynamicSqlSupport.address).toProperty("address")
return MyBatis3Utils.insert(this::insert, row, wvpProxyDevice, c ->
c.map(id).toProperty("id")
.map(deviceCode).toProperty("deviceCode")
.map(gbDeviceId).toProperty("gbDeviceId")
.map(gbDeviceChannelId).toProperty("gbDeviceChannelId")
.map(name).toProperty("name")
.map(address).toProperty("address")
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int insertMultiple(Collection<WvpProxyDevice> records) {
return MyBatis3Utils.insertMultiple(this::insertMultiple, records, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, c ->
c.map(WvpProxyDeviceDynamicSqlSupport.id).toProperty("id")
.map(WvpProxyDeviceDynamicSqlSupport.deviceCode).toProperty("deviceCode")
.map(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).toProperty("gbDeviceId")
.map(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).toProperty("gbDeviceChannelId")
.map(WvpProxyDeviceDynamicSqlSupport.name).toProperty("name")
.map(WvpProxyDeviceDynamicSqlSupport.address).toProperty("address")
return MyBatis3Utils.insertMultiple(this::insertMultiple, records, wvpProxyDevice, c ->
c.map(id).toProperty("id")
.map(deviceCode).toProperty("deviceCode")
.map(gbDeviceId).toProperty("gbDeviceId")
.map(gbDeviceChannelId).toProperty("gbDeviceChannelId")
.map(name).toProperty("name")
.map(address).toProperty("address")
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int insertSelective(WvpProxyDevice row) {
return MyBatis3Utils.insert(this::insert, row, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, c ->
c.map(WvpProxyDeviceDynamicSqlSupport.id).toPropertyWhenPresent("id", row::getId)
.map(WvpProxyDeviceDynamicSqlSupport.deviceCode).toPropertyWhenPresent("deviceCode", row::getDeviceCode)
.map(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).toPropertyWhenPresent("gbDeviceId", row::getGbDeviceId)
.map(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).toPropertyWhenPresent("gbDeviceChannelId", row::getGbDeviceChannelId)
.map(WvpProxyDeviceDynamicSqlSupport.name).toPropertyWhenPresent("name", row::getName)
.map(WvpProxyDeviceDynamicSqlSupport.address).toPropertyWhenPresent("address", row::getAddress)
return MyBatis3Utils.insert(this::insert, row, wvpProxyDevice, c ->
c.map(id).toPropertyWhenPresent("id", row::getId)
.map(deviceCode).toPropertyWhenPresent("deviceCode", row::getDeviceCode)
.map(gbDeviceId).toPropertyWhenPresent("gbDeviceId", row::getGbDeviceId)
.map(gbDeviceChannelId).toPropertyWhenPresent("gbDeviceChannelId", row::getGbDeviceChannelId)
.map(name).toPropertyWhenPresent("name", row::getName)
.map(address).toPropertyWhenPresent("address", row::getAddress)
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default Optional<WvpProxyDevice> selectOne(SelectDSLCompleter completer) {
return MyBatis3Utils.selectOne(this::selectOne, selectList, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer);
return MyBatis3Utils.selectOne(this::selectOne, selectList, wvpProxyDevice, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default List<WvpProxyDevice> select(SelectDSLCompleter completer) {
return MyBatis3Utils.selectList(this::selectMany, selectList, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer);
return MyBatis3Utils.selectList(this::selectMany, selectList, wvpProxyDevice, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default List<WvpProxyDevice> selectDistinct(SelectDSLCompleter completer) {
return MyBatis3Utils.selectDistinct(this::selectMany, selectList, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer);
return MyBatis3Utils.selectDistinct(this::selectMany, selectList, wvpProxyDevice, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default Optional<WvpProxyDevice> selectByPrimaryKey(Long id_) {
return selectOne(c ->
c.where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(id_))
c.where(id, isEqualTo(id_))
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int update(UpdateDSLCompleter completer) {
return MyBatis3Utils.update(this::update, WvpProxyDeviceDynamicSqlSupport.wvpProxyDevice, completer);
return MyBatis3Utils.update(this::update, wvpProxyDevice, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
static UpdateDSL<UpdateModel> updateAllColumns(WvpProxyDevice row, UpdateDSL<UpdateModel> dsl) {
return dsl.set(WvpProxyDeviceDynamicSqlSupport.id).equalTo(row::getId)
.set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalTo(row::getDeviceCode)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalTo(row::getGbDeviceId)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalTo(row::getGbDeviceChannelId)
.set(WvpProxyDeviceDynamicSqlSupport.name).equalTo(row::getName)
.set(WvpProxyDeviceDynamicSqlSupport.address).equalTo(row::getAddress);
return dsl.set(id).equalTo(row::getId)
.set(deviceCode).equalTo(row::getDeviceCode)
.set(gbDeviceId).equalTo(row::getGbDeviceId)
.set(gbDeviceChannelId).equalTo(row::getGbDeviceChannelId)
.set(name).equalTo(row::getName)
.set(address).equalTo(row::getAddress);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
static UpdateDSL<UpdateModel> updateSelectiveColumns(WvpProxyDevice row, UpdateDSL<UpdateModel> dsl) {
return dsl.set(WvpProxyDeviceDynamicSqlSupport.id).equalToWhenPresent(row::getId)
.set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalToWhenPresent(row::getDeviceCode)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalToWhenPresent(row::getGbDeviceId)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId)
.set(WvpProxyDeviceDynamicSqlSupport.name).equalToWhenPresent(row::getName)
.set(WvpProxyDeviceDynamicSqlSupport.address).equalToWhenPresent(row::getAddress);
return dsl.set(id).equalToWhenPresent(row::getId)
.set(deviceCode).equalToWhenPresent(row::getDeviceCode)
.set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId)
.set(gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId)
.set(name).equalToWhenPresent(row::getName)
.set(address).equalToWhenPresent(row::getAddress);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int updateByPrimaryKey(WvpProxyDevice row) {
return update(c ->
c.set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalTo(row::getDeviceCode)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalTo(row::getGbDeviceId)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalTo(row::getGbDeviceChannelId)
.set(WvpProxyDeviceDynamicSqlSupport.name).equalTo(row::getName)
.set(WvpProxyDeviceDynamicSqlSupport.address).equalTo(row::getAddress)
.where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(row::getId))
c.set(deviceCode).equalTo(row::getDeviceCode)
.set(gbDeviceId).equalTo(row::getGbDeviceId)
.set(gbDeviceChannelId).equalTo(row::getGbDeviceChannelId)
.set(name).equalTo(row::getName)
.set(address).equalTo(row::getAddress)
.where(id, isEqualTo(row::getId))
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_device")
default int updateByPrimaryKeySelective(WvpProxyDevice row) {
return update(c ->
c.set(WvpProxyDeviceDynamicSqlSupport.deviceCode).equalToWhenPresent(row::getDeviceCode)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceId).equalToWhenPresent(row::getGbDeviceId)
.set(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId)
.set(WvpProxyDeviceDynamicSqlSupport.name).equalToWhenPresent(row::getName)
.set(WvpProxyDeviceDynamicSqlSupport.address).equalToWhenPresent(row::getAddress)
.where(WvpProxyDeviceDynamicSqlSupport.id, isEqualTo(row::getId))
c.set(deviceCode).equalToWhenPresent(row::getDeviceCode)
.set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId)
.set(gbDeviceChannelId).equalToWhenPresent(row::getGbDeviceChannelId)
.set(name).equalToWhenPresent(row::getName)
.set(address).equalToWhenPresent(row::getAddress)
.where(id, isEqualTo(row::getId))
);
}
}

View File

@ -0,0 +1,38 @@
package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper;
import jakarta.annotation.Generated;
import java.sql.JDBCType;
import org.mybatis.dynamic.sql.AliasableSqlTable;
import org.mybatis.dynamic.sql.SqlColumn;
public final class WvpProxyDockingDynamicSqlSupport {
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
public static final WvpProxyDocking wvpProxyDocking = new WvpProxyDocking();
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id")
public static final SqlColumn<Long> id = wvpProxyDocking.id;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id")
public static final SqlColumn<String> gbDeviceId = wvpProxyDocking.gbDeviceId;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip")
public static final SqlColumn<String> ip = wvpProxyDocking.ip;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port")
public static final SqlColumn<String> port = wvpProxyDocking.port;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
public static final class WvpProxyDocking extends AliasableSqlTable<WvpProxyDocking> {
public final SqlColumn<Long> id = column("id", JDBCType.BIGINT);
public final SqlColumn<String> gbDeviceId = column("gb_device_id", JDBCType.VARCHAR);
public final SqlColumn<String> ip = column("ip", JDBCType.VARCHAR);
public final SqlColumn<String> port = column("port", JDBCType.VARCHAR);
public WvpProxyDocking() {
super("wvp_proxy_docking", WvpProxyDocking::new);
}
}
}

View File

@ -0,0 +1,161 @@
package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper;
import static cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingDynamicSqlSupport.*;
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import jakarta.annotation.Generated;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.type.JdbcType;
import org.mybatis.dynamic.sql.BasicColumn;
import org.mybatis.dynamic.sql.delete.DeleteDSLCompleter;
import org.mybatis.dynamic.sql.select.CountDSLCompleter;
import org.mybatis.dynamic.sql.select.SelectDSLCompleter;
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
import org.mybatis.dynamic.sql.update.UpdateDSL;
import org.mybatis.dynamic.sql.update.UpdateDSLCompleter;
import org.mybatis.dynamic.sql.update.UpdateModel;
import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
import org.mybatis.dynamic.sql.util.mybatis3.CommonCountMapper;
import org.mybatis.dynamic.sql.util.mybatis3.CommonDeleteMapper;
import org.mybatis.dynamic.sql.util.mybatis3.CommonInsertMapper;
import org.mybatis.dynamic.sql.util.mybatis3.CommonUpdateMapper;
import org.mybatis.dynamic.sql.util.mybatis3.MyBatis3Utils;
@Mapper
public interface WvpProxyDockingMapper extends CommonCountMapper, CommonDeleteMapper, CommonInsertMapper<WvpProxyDocking>, CommonUpdateMapper {
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
BasicColumn[] selectList = BasicColumn.columnList(id, gbDeviceId, ip, port);
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
@SelectProvider(type=SqlProviderAdapter.class, method="select")
@Results(id="WvpProxyDockingResult", value = {
@Result(column="id", property="id", jdbcType=JdbcType.BIGINT, id=true),
@Result(column="gb_device_id", property="gbDeviceId", jdbcType=JdbcType.VARCHAR),
@Result(column="ip", property="ip", jdbcType=JdbcType.VARCHAR),
@Result(column="port", property="port", jdbcType=JdbcType.VARCHAR)
})
List<WvpProxyDocking> selectMany(SelectStatementProvider selectStatement);
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
@SelectProvider(type=SqlProviderAdapter.class, method="select")
@ResultMap("WvpProxyDockingResult")
Optional<WvpProxyDocking> selectOne(SelectStatementProvider selectStatement);
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default long count(CountDSLCompleter completer) {
return MyBatis3Utils.countFrom(this::count, wvpProxyDocking, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int delete(DeleteDSLCompleter completer) {
return MyBatis3Utils.deleteFrom(this::delete, wvpProxyDocking, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int deleteByPrimaryKey(Long id_) {
return delete(c ->
c.where(id, isEqualTo(id_))
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int insert(WvpProxyDocking row) {
return MyBatis3Utils.insert(this::insert, row, wvpProxyDocking, c ->
c.map(id).toProperty("id")
.map(gbDeviceId).toProperty("gbDeviceId")
.map(ip).toProperty("ip")
.map(port).toProperty("port")
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int insertMultiple(Collection<WvpProxyDocking> records) {
return MyBatis3Utils.insertMultiple(this::insertMultiple, records, wvpProxyDocking, c ->
c.map(id).toProperty("id")
.map(gbDeviceId).toProperty("gbDeviceId")
.map(ip).toProperty("ip")
.map(port).toProperty("port")
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int insertSelective(WvpProxyDocking row) {
return MyBatis3Utils.insert(this::insert, row, wvpProxyDocking, c ->
c.map(id).toPropertyWhenPresent("id", row::getId)
.map(gbDeviceId).toPropertyWhenPresent("gbDeviceId", row::getGbDeviceId)
.map(ip).toPropertyWhenPresent("ip", row::getIp)
.map(port).toPropertyWhenPresent("port", row::getPort)
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default Optional<WvpProxyDocking> selectOne(SelectDSLCompleter completer) {
return MyBatis3Utils.selectOne(this::selectOne, selectList, wvpProxyDocking, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default List<WvpProxyDocking> select(SelectDSLCompleter completer) {
return MyBatis3Utils.selectList(this::selectMany, selectList, wvpProxyDocking, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default List<WvpProxyDocking> selectDistinct(SelectDSLCompleter completer) {
return MyBatis3Utils.selectDistinct(this::selectMany, selectList, wvpProxyDocking, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default Optional<WvpProxyDocking> selectByPrimaryKey(Long id_) {
return selectOne(c ->
c.where(id, isEqualTo(id_))
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int update(UpdateDSLCompleter completer) {
return MyBatis3Utils.update(this::update, wvpProxyDocking, completer);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
static UpdateDSL<UpdateModel> updateAllColumns(WvpProxyDocking row, UpdateDSL<UpdateModel> dsl) {
return dsl.set(id).equalTo(row::getId)
.set(gbDeviceId).equalTo(row::getGbDeviceId)
.set(ip).equalTo(row::getIp)
.set(port).equalTo(row::getPort);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
static UpdateDSL<UpdateModel> updateSelectiveColumns(WvpProxyDocking row, UpdateDSL<UpdateModel> dsl) {
return dsl.set(id).equalToWhenPresent(row::getId)
.set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId)
.set(ip).equalToWhenPresent(row::getIp)
.set(port).equalToWhenPresent(row::getPort);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int updateByPrimaryKey(WvpProxyDocking row) {
return update(c ->
c.set(gbDeviceId).equalTo(row::getGbDeviceId)
.set(ip).equalTo(row::getIp)
.set(port).equalTo(row::getPort)
.where(id, isEqualTo(row::getId))
);
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source Table: wvp_proxy_docking")
default int updateByPrimaryKeySelective(WvpProxyDocking row) {
return update(c ->
c.set(gbDeviceId).equalToWhenPresent(row::getGbDeviceId)
.set(ip).equalToWhenPresent(row::getIp)
.set(port).equalToWhenPresent(row::getPort)
.where(id, isEqualTo(row::getId))
);
}
}

View File

@ -0,0 +1,62 @@
package cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model;
import jakarta.annotation.Generated;
/**
*
* This class was generated by MyBatis Generator.
* This class corresponds to the database table wvp_proxy_docking
*/
public class WvpProxyDocking {
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id")
private Long id;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id")
private String gbDeviceId;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip")
private String ip;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port")
private String port;
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id")
public Long getId() {
return id;
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.id")
public void setId(Long id) {
this.id = id;
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id")
public String getGbDeviceId() {
return gbDeviceId;
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.gb_device_id")
public void setGbDeviceId(String gbDeviceId) {
this.gbDeviceId = gbDeviceId == null ? null : gbDeviceId.trim();
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip")
public String getIp() {
return ip;
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.ip")
public void setIp(String ip) {
this.ip = ip == null ? null : ip.trim();
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port")
public String getPort() {
return port;
}
@Generated(value="org.mybatis.generator.api.MyBatisGenerator", comments="Source field: wvp_proxy_docking.port")
public void setPort(String port) {
this.port = port == null ? null : port.trim();
}
}

View File

@ -6,4 +6,6 @@ import org.apache.ibatis.annotations.Mapper;
public interface WvpProxyOperateTableMapper {
// int createNewTable(@Param("tableName")String tableName);
void createDeviceTable();
void createDockingTable();
}

View File

@ -21,4 +21,13 @@
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci;
</update>
<update id="createDockingTable">
CREATE TABLE IF NOT EXISTS `wvp_proxy_docking` (
`id` bigint NOT NULL AUTO_INCREMENT,
`gb_device_id` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL,
`ip` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL,
`port` varchar(5) COLLATE utf8mb4_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
</update>
</mapper>

View File

@ -29,6 +29,11 @@
<artifactId>common</artifactId>
</dependency>
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>gb28181-service</artifactId>
</dependency>
<dependency>
<groupId>cn.skcks.docking</groupId>
<artifactId>zlmediakit-service</artifactId>

View File

@ -0,0 +1,44 @@
package cn.skcks.docking.gb28181.wvp.config;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.sip.ListeningPoint;
import java.util.List;
@Component
@ConfigurationProperties(prefix = "proxy.gb28181.sip", ignoreInvalidFields = true)
@Order(0)
@Data
public class ProxySipConfig {
private List<String> ip;
private List<String> showIp;
private Integer port;
private String domain;
private String id;
private String password;
private String transport = ListeningPoint.UDP;
@Bean
public SipConfig sipConfig(){
SipConfig sipConfig = new SipConfig();
sipConfig.setIp(ip);
sipConfig.setShowIp(showIp);
sipConfig.setPort(port);
sipConfig.setDomain(domain);
sipConfig.setId(id);
sipConfig.setPassword(password);
return sipConfig;
}
}

View File

@ -24,5 +24,6 @@ public class WvpProxyOrmInitService {
public void init(){
log.info("[orm] 自动建表");
mapper.createDeviceTable();
mapper.createDockingTable();
}
}

View File

@ -0,0 +1,89 @@
package cn.skcks.docking.gb28181.wvp.service.catalog;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.sip.message.Request;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Service
@RequiredArgsConstructor
public class CatalogService {
private final SipSender sipSender;
private final SipSubscribe sipSubscribe;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final DockingService dockingService;
public void getCatalog(String deviceId){
WvpProxyDocking device = dockingService.getDeviceByDeviceCode(deviceId).orElse(null);
if (device == null){
throw new RuntimeException("设备不存在");
}
CatalogRequestDTO catalogRequestDTO = new CatalogRequestDTO();
catalogRequestDTO.setDeviceId(deviceId);
String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000);
catalogRequestDTO.setSn(sn);
String key = GenericSubscribe.Helper.getKey(Request.MESSAGE, deviceId);
sipSubscribe.getCatalogSubscribe().addPublisher(key);
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber = catalog(key, device, schedule);
// 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,1L, XmlUtils.toXml(catalogRequestDTO), SipUtil.generateViaTag(),
SipUtil.generateFromTag(), provider.getNewCallId()));
}
private Flow.Subscriber<SIPRequest> catalog(String key, WvpProxyDocking device, ScheduledFuture<?>[] schedule){
return new Flow.Subscriber<>() {
Flow.Subscription subscription;
final AtomicLong getNum = new AtomicLong(0);
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("创建 订阅 {}", key);
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
CatalogResponseDTO responseDTO = XmlUtils.parse(item.getRawContent(), CatalogResponseDTO.class, GB28181Constant.CHARSET);
Long sumNum = responseDTO.getSumNum();
log.info("{}",responseDTO);
getNum.getAndAdd(responseDTO.getDeviceList().getDeviceList().size());
if(getNum.get() < sumNum){
subscription.request(1);
} else{
onComplete();
}
}
@Override
public void onError(Throwable throwable) {
onComplete();
}
@Override
public void onComplete() {
sipSubscribe.getCatalogSubscribe().delPublisher(key);
schedule[0].cancel(true);
}
};
}
}

View File

@ -0,0 +1,60 @@
package cn.skcks.docking.gb28181.wvp.service.docking;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingDynamicSqlSupport;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingMapper;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.text.MessageFormat;
import java.util.Optional;
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
@Slf4j
@Service
@RequiredArgsConstructor
public class DockingService {
private final WvpProxyDockingMapper wvpProxyDockingMapper;
public Optional<WvpProxyDocking> getDeviceById(Long id){
return wvpProxyDockingMapper.selectOne(s->
s.where(WvpProxyDockingDynamicSqlSupport.id, isEqualTo(id)));
}
public Optional<WvpProxyDocking> getDeviceByDeviceCode(String deviceCode){
return wvpProxyDockingMapper.selectOne(s->
s.where(WvpProxyDockingDynamicSqlSupport.gbDeviceId,isEqualTo(deviceCode)));
}
public Boolean hasDeviceByDeviceCode(String deviceCode){
return getDeviceByDeviceCode(deviceCode).orElse(null) != null;
}
/**
* 添加设备
* @param device 设备
* @return 是否成功
*/
@SneakyThrows
public boolean addDevice(WvpProxyDocking device) {
if(device == null){
return false;
}
String deviceCode = device.getGbDeviceId();
if(StringUtils.isBlank(deviceCode)){
throw new JsonException("设备编码不能为空");
}
if(getDeviceByDeviceCode(deviceCode).isPresent()){
wvpProxyDockingMapper.delete(d->d.where(WvpProxyDockingDynamicSqlSupport.gbDeviceId,isEqualTo(deviceCode)));
}
return wvpProxyDockingMapper.insert(device) > 0;
}
}

View File

@ -17,7 +17,6 @@ import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.bytedeco.javacv.FrameGrabber;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.*;
@ -29,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
@RequiredArgsConstructor
public class RecordService {
public class VideoService {
private final FfmpegSupportService ffmpegSupportService;
private final WvpProxyConfig wvpProxyConfig;
/**

View File

@ -24,7 +24,7 @@ import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.download.DownloadService;
import cn.skcks.docking.gb28181.wvp.service.video.RecordService;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import cn.skcks.docking.gb28181.wvp.utils.RetryUtil;
import com.github.rholder.retry.*;
import jakarta.servlet.AsyncContext;
@ -34,7 +34,6 @@ import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -53,7 +52,7 @@ public class WvpService {
private final WvpProxyConfig wvpProxyConfig;
private final DeviceService deviceService;
private final DownloadService downloadService;
private final RecordService recordService;
private final VideoService videoService;
public void header(HttpServletResponse response) {
response.setContentType("video/mp4");
@ -214,7 +213,7 @@ public class WvpService {
String streamUrl = streamContent.getFlv();
try {
header(response);
recordService.record(response, streamUrl, DateUtil.between(DateUtil.parseDateTime(startTime), DateUtil.parseDateTime(endTime), DateUnit.SECOND));
videoService.record(response, streamUrl, DateUtil.between(DateUtil.parseDateTime(startTime), DateUtil.parseDateTime(endTime), DateUnit.SECOND));
} finally {
wvpProxyClient.playbackStop(token, deviceId, channelId, stream);
}

View File

@ -0,0 +1,55 @@
package cn.skcks.docking.gb28181.wvp.sip;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@Order(0)
@Slf4j
@RequiredArgsConstructor
@Component
@DependsOn("wvpProxyOrmInitService")
public class SipStarter implements SmartLifecycle {
private final ProxySipConfig proxySipConfig;
private final SipService sipService;
private boolean isRunning;
@Override
public void start() {
if(checkConfig()){
isRunning = true;
log.debug("sip 服务 启动");
sipService.run();
}
}
@Override
public void stop() {
log.debug("sip 服务 关闭");
sipService.stop();
isRunning = false;
}
@Override
public boolean isRunning() {
return isRunning;
}
public boolean checkConfig(){
log.debug("sip 配置信息 => \n{}", JsonUtils.toJson(proxySipConfig));
if(CollectionUtils.isEmpty(proxySipConfig.getIp())){
log.error("sip ip 配置错误, 请检查配置是否正确");
return false;
}
return true;
}
}

View File

@ -0,0 +1,104 @@
package cn.skcks.docking.gb28181.wvp.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@RequiredArgsConstructor
@Component
@Slf4j
public class SipListenerImpl implements SipListener {
private final SipSubscribe sipSubscribe;
private final ConcurrentMap<String, MessageProcessor> requestProcessor = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MessageProcessor> responseProcessor = new ConcurrentHashMap<>();
public void addRequestProcessor(String method, MessageProcessor messageProcessor) {
log.debug("[SipListener] 注册 {} 请求处理器", method);
requestProcessor.put(method, messageProcessor);
}
public void addResponseProcessor(String method, MessageProcessor messageProcessor) {
log.debug("[SipListener] 注册 {} 响应处理器", method);
responseProcessor.put(method, messageProcessor);
}
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
log.debug("传入请求 method => {}", method);
Optional.ofNullable(requestProcessor.get(method)).ifPresent(processor -> {
processor.process(requestEvent);
});
}
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processResponse(ResponseEvent responseEvent) {
Response response = responseEvent.getResponse();
int status = response.getStatusCode();
CSeqHeader cseqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
log.debug("{} {}", method, response);
// Success
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
log.debug("传入响应 method => {}", method);
Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> {
processor.process(responseEvent);
});
} else if ((status >= Response.TRYING) && (status < Response.OK)) {
// 增加其它无需回复的响应如101180等
} else {
log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase());
if (responseEvent.getDialog() != null) {
responseEvent.getDialog().delete();
}
}
}
@Override
public void processTimeout(TimeoutEvent timeoutEvent) {
ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
if (clientTransaction != null) {
Request request = clientTransaction.getRequest();
if (request != null) {
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
log.debug("会话超时 callId => {}", callIdHeader.getCallId());
}
}
}
}
@Override
public void processIOException(IOExceptionEvent exceptionEvent) {
}
@Override
public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
}
@Override
public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
CallIdHeader callIdHeader = dialogTerminatedEvent.getDialog().getCallId();
log.debug("会话终止 callId => {}", callIdHeader.getCallId());
}
}

View File

@ -0,0 +1,22 @@
package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@JacksonXmlRootElement(localName = "DeviceList")
@AllArgsConstructor
@NoArgsConstructor
@Data
public class CatalogDeviceListDTO {
@JacksonXmlProperty(isAttribute = true)
private Integer num;
@JacksonXmlProperty(localName = "Item")
@JacksonXmlElementWrapper(useWrapping = false)
private List<CatalogItemDTO> deviceList;
}

View File

@ -0,0 +1,151 @@
package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto;
import cn.hutool.core.date.DatePattern;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@JacksonXmlRootElement(localName = "Item")
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
public class CatalogItemDTO {
/**
* 设备/区域/系统编码(必选)
*/
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
/**
* 设备/区域/系统名称(必选)
*/
private String name;
/**
* 当为设备时,设备厂商(必选)
*/
private String manufacturer;
/**
* 当为设备时,设备型号(必选)
*/
private String model;
/**
* 当为设备时,设备归属(必选)
*/
private String owner;
/**
* 行政区域(必选)
*/
@JacksonXmlProperty(localName = "CivilCode")
private String civilCode;
/**
* 警区(可选)
*/
private String block;
/**
* 当为设备时,安装地址(必选)
*/
private String address;
/**
* 当为设备时,是否有子设备(必选)1有, 0没有
*/
@Builder.Default
private Integer parental = 0;
/**
* 父设备/区域/系统ID(必选)
*/
@JacksonXmlProperty(localName = "ParentID")
private String parentId;
/**
* 信令安全模式(可选)缺省为0; 0:不采用;2:S/MIME 签名方式;3:S/ MIME加密签名同时采用方式;4:数字摘要方式
*/
@Builder.Default
private Integer safetyWay = 0;
/**
* 注册方式(必选)缺省为1;1:符合IETF RFC3261标准的认证注册模 ;2:基于口令的双向认证注册模式;3:基于数字证书的双向认证注册模式
*/
@Builder.Default
private Integer registerWay = 1;
/**
* 证书序列号(有证书的设备必选)
*/
private String certNum;
/**
* 证书有效标识(有证书的设备必选)缺省为0;证书有效标识:0:无效 1: 有效
*/
@Builder.Default
private Integer certifiable = 0;
/**
* 无效原因码(有证书且证书无效的设备必选)
*/
@Builder.Default
private Integer errCode = 0;
/**
* 证书终止有效期(有证书的设备必选)
*/
@JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE)
private Date endTime;
/**
* 保密属性(必选)缺省为0;0:不涉密,1:涉密
*/
@Builder.Default
private Integer secrecy = 0;
/**
* 设备/区域/系统IP地址(可选)
*/
@JacksonXmlProperty(localName = "IPAddress")
private String ipAddress;
/**
* 设备/区域/系统端口(可选)
*/
private Integer port;
/**
* 设备口令(可选)
*/
private String password;
/**
* 设备状态(必选)
*/
@Builder.Default
private String status = "ON";
/**
* 经度(可选)
*/
@Builder.Default
private String longitude = "0.0";
/**
* 纬度(可选)
*/
@Builder.Default
private String latitude = "0.0";
}

View File

@ -0,0 +1,27 @@
package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Query")
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Data
public class CatalogRequestDTO {
@Builder.Default
private String cmdType = CmdType.CATALOG;
@JacksonXmlProperty(localName = "SN")
private String sn;
/**
* 目标设备的设备编码(必选)
*/
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
}

View File

@ -0,0 +1,31 @@
package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@JacksonXmlRootElement(localName = "Response")
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class CatalogResponseDTO {
@Builder.Default
private String cmdType = CmdType.CATALOG;
@JacksonXmlProperty(localName = "SN")
private String sn;
/**
* 目标设备的设备编码(必选)
*/
@JacksonXmlProperty(localName = "DeviceID")
private String deviceId;
private Long sumNum;
private CatalogDeviceListDTO deviceList;
}

View File

@ -0,0 +1,94 @@
package cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.request;
import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.util.EventObject;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@Component
public class MessageRequestProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipMessageSender sender;
private final SipSubscribe subscribe;
private final DockingService dockingService;
@Override
public void init() {
}
@Override
public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest)requestEvent.getRequest();
String deviceId = SipUtil.getUserIdFromFromHeader(request);
CallIdHeader callIdHeader = request.getCallIdHeader();
byte[] content = request.getRawContent();
MessageDTO messageDto = XmlUtils.parse(content, MessageDTO.class, GB28181Constant.CHARSET);
log.debug("接收到的消息 => {}", messageDto);
String senderIp = request.getLocalAddress().getHostAddress();
if(dockingService.hasDeviceByDeviceCode(deviceId)){
log.info("未找到相关设备信息 => {}", deviceId);
Response response = response(request,Response.NOT_FOUND,"设备未注册");
sender.send(senderIp,response);
return;
}
Response ok = response(request, Response.OK, "OK");
Response response;
if(messageDto.getCmdType().equalsIgnoreCase(CmdType.KEEPALIVE)){
response = ok;
// 更新设备在线状态
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
response = ok;
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
Optional.ofNullable(subscribe.getCatalogSubscribe().getPublisher(key))
.ifPresentOrElse(publisher-> publisher.submit(request),
()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto));
} else {
response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage());
}
sender.send(senderIp, response);
}
@SneakyThrows
public Response response(SIPRequest request, int status, String message){
if (request.getToHeader().getTag() == null) {
request.getToHeader().setTag(SipUtil.generateTag());
}
SIPResponse response = (SIPResponse)getMessageFactory().createResponse(status, request);
if (message != null) {
response.setReasonPhrase(message);
}
return response;
}
}

View File

@ -0,0 +1,164 @@
package cn.skcks.docking.gb28181.wvp.sip.message.register.request;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.dto.RemoteInfo;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.gb28181.sip.GbSipDate;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.auth.DigestServerAuthenticationHelper;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingMapper;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Authorization;
import gov.nist.javax.sip.header.SIPDateHeader;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.ListeningPoint;
import javax.sip.RequestEvent;
import javax.sip.SipProvider;
import javax.sip.address.Address;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.ViaHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@RequiredArgsConstructor
@Component
public class RegisterRequestProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipMessageSender sender;
private final SipConfig sipConfig;
private final DockingService dockingService;
@PostConstruct
@Override
public void init(){
sipListener.addRequestProcessor(Method.REGISTER,this);
}
@SneakyThrows
@Override
public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest)requestEvent.getRequest();
FromHeader fromHeader = request.getFrom();
Address address = fromHeader.getAddress();
log.debug("From {}",address);
SipUri uri = (SipUri)address.getURI();
String deviceId = uri.getUser();
log.debug("请求注册 设备id => {}", deviceId);
Boolean hasDevice = dockingService.hasDeviceByDeviceCode(deviceId);
String senderIp = request.getLocalAddress().getHostAddress();
RemoteInfo remoteInfo = SipUtil.getRemoteInfoFromRequest(request, false);
log.debug("远程连接信息 => {}", remoteInfo);
if(!hasDevice){
log.info("新注册的设备 deviceId => {}", deviceId);
}
String password = sipConfig.getPassword();
Authorization authorization = request.getAuthorization();
if(authorization == null && StringUtils.isNotBlank(password)){
Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
DigestServerAuthenticationHelper.generateChallenge(getHeaderFactory(),response,sipConfig.getDomain());
sender.send(senderIp,response);
return;
}
log.debug("认证信息 => {}", authorization);
boolean authPass = StringUtils.isBlank(password) ||
DigestServerAuthenticationHelper.doAuthenticatePlainTextPassword(request,password);
if(!authPass){
Response response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
response.setReasonPhrase("认证失败");
log.info("设备注册信息认证失败 deviceId => {}", deviceId);
sender.send(senderIp,response);
return;
}
log.debug("设备 deviceId => {}, 认证通过", deviceId);
registerDevice(deviceId, request, senderIp, remoteInfo);
}
@SneakyThrows
private Response generateRegisterResponse(Request request){
SIPRequest sipRequest = (SIPRequest) request;
ExpiresHeader expires = sipRequest.getExpires();
if(expires == null){
return getMessageFactory().createResponse(Response.BAD_REQUEST, request);
}
Response response = getMessageFactory().createResponse(Response.OK, request);
// 添加date头
SIPDateHeader dateHeader = new SIPDateHeader();
// GB28181 日期
GbSipDate gbSipDate = new GbSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
dateHeader.setDate(gbSipDate);
response.addHeader(dateHeader);
response.addHeader(sipRequest.getContactHeader());
response.addHeader(expires);
return response;
}
@SneakyThrows
private void registerDevice(String deviceId, SIPRequest request, String senderIp, RemoteInfo remoteInfo) {
WvpProxyDocking device = new WvpProxyDocking();
device.setGbDeviceId(deviceId);
device.setIp(remoteInfo.getIp());
device.setPort(String.valueOf(remoteInfo.getPort()));
dockingService.addDevice(device);
Response response = generateRegisterResponse(request);
sender.send(senderIp, response);
ViaHeader viaHeader = request.getTopmostViaHeader();
String transport = viaHeader.getTransport();
int expires = request.getExpires().getExpires();
// expires == 0 注销
if (expires == 0) {
log.info("设备注销 deviceId => {}", deviceId);
}
}
}

View File

@ -0,0 +1,128 @@
package cn.skcks.docking.gb28181.wvp.sip.request;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.MessageHelper;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import javax.sip.SipFactory;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
import javax.sip.message.Request;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@DependsOn("proxySipConfig")
@Component
public class SipRequestBuilder implements ApplicationContextAware {
private static ProxySipConfig sipConfig;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
sipConfig = applicationContext.getBean(ProxySipConfig.class);
}
private static SipFactory getSipFactory(){
return SipFactory.getInstance();
}
@SneakyThrows
private static List<ViaHeader> getViaHeaders(String ip,int port, String transport, String viaTag){
ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, transport, viaTag);
viaHeader.setRPort();
return Collections.singletonList(viaHeader);
}
@SneakyThrows
private static CSeqHeader getCSeqHeader(long cSeq, String method){
return getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, method);
}
@SneakyThrows
public static Request createMessageRequest(WvpProxyDocking device, String ip, int port, long cSeq, String content, String fromTag, CallIdHeader callIdHeader) {
Request request;
String target = StringUtils.joinWith(":", device.getIp(), device.getPort());
// sip uri
SipURI requestURI = MessageHelper.createSipURI(device.getGbDeviceId(), target);
// via
List<ViaHeader> viaHeaders = getViaHeaders(ip, port, sipConfig.getTransport(), null );
String from = StringUtils.joinWith(":", ip, port);
// from
SipURI fromSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), from);
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), target);
Address toAddress = MessageHelper.createAddress(toSipURI);
ToHeader toHeader = MessageHelper.createToHeader(toAddress, null);
// Forwards
MaxForwardsHeader maxForwards = MessageHelper.createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, Request.MESSAGE);
// 使用 GB28181 默认编码 否则中文将会乱码
MessageFactoryImpl messageFactory = (MessageFactoryImpl) getSipFactory().createMessageFactory();
messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET);
request = messageFactory.createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtil.createUserAgentHeader());
ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
request.setContent(content, contentTypeHeader);
return request;
}
@SneakyThrows
public static Request createMessageRequest(WvpProxyDocking device, String ip, int port,long cSeq,String content, String viaTag, String fromTag, CallIdHeader callIdHeader) {
Request request;
String target = StringUtils.joinWith(":", device.getIp(), device.getPort());
// sip uri
SipURI requestURI = MessageHelper.createSipURI(device.getGbDeviceId(), target);
// via
List<ViaHeader> viaHeaders = getViaHeaders(ip, port, sipConfig.getTransport(), viaTag );
String from = StringUtils.joinWith(":", ip, port);
// from
SipURI fromSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), from);
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = MessageHelper.createSipURI(sipConfig.getId(), target);
Address toAddress = MessageHelper.createAddress(toSipURI);
ToHeader toHeader = MessageHelper.createToHeader(toAddress, null);
// Forwards
MaxForwardsHeader maxForwards = MessageHelper.createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(cSeq, Request.MESSAGE);
// 使用 GB28181 默认编码 否则中文将会乱码
MessageFactoryImpl messageFactory = (MessageFactoryImpl) getSipFactory().createMessageFactory();
messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET);
request = messageFactory.createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtil.createUserAgentHeader());
ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
request.setContent(content, contentTypeHeader);
return request;
}
}

View File

@ -0,0 +1,56 @@
package cn.skcks.docking.gb28181.wvp.sip.response;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.message.MessageHelper;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import javax.sip.SipFactory;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.MaxForwardsHeader;
import javax.sip.message.Response;
@Slf4j
public class SipResponseBuilder {
@SneakyThrows
public static Response response(SIPRequest request, int status, String message){
if (request.getToHeader().getTag() == null) {
request.getToHeader().setTag(SipUtil.generateTag());
}
MessageFactoryImpl messageFactory = (MessageFactoryImpl)MessageHelper.getSipFactory().createMessageFactory();
// 使用 GB28181 默认编码 否则中文将会乱码
messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET);
SIPResponse response = (SIPResponse)messageFactory.createResponse(status, request);
if (message != null) {
response.setReasonPhrase(message);
}
return response;
}
@SneakyThrows
public static Response responseSdp(SIPRequest request, GB28181Description sdp) {
MessageFactoryImpl messageFactory = (MessageFactoryImpl)MessageHelper.getSipFactory().createMessageFactory();
// 使用 GB28181 默认编码 否则中文将会乱码
messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET);
SIPResponse response = (SIPResponse)messageFactory.createResponse(Response.OK, request);
SipFactory sipFactory = SipFactory.getInstance();
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("application", "sdp");
response.setContent(sdp.toString(), contentTypeHeader);
SipURI sipURI = (SipURI) request.getRequestURI();
SipURI uri = MessageHelper.createSipURI(sipURI.getUser(), StringUtils.joinWith(":", sipURI.getHost() + ":" + sipURI.getPort()));
Address concatAddress = sipFactory.createAddressFactory().createAddress(uri);
MaxForwardsHeader maxForwardsHeader = MessageHelper.createMaxForwardsHeader(70);
response.setMaxForwards(maxForwardsHeader);
response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
return response;
}
}

View File

@ -0,0 +1,83 @@
package cn.skcks.docking.gb28181.wvp.sip.sender;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.sip.ListeningPoint;
import javax.sip.SipException;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.List;
import java.util.Objects;
@Slf4j
@RequiredArgsConstructor(onConstructor_ = {@Lazy})
@Component
public class SipSender {
private final SipService sipService;
private final ProxySipConfig sipConfig;
public SipProvider getProvider(String transport, String ip) {
return sipService.getProvider(transport, ip);
}
public List<SipProvider> getProviders() {
return sipConfig.getIp().stream().map(item -> getProvider(sipConfig.getTransport(), item))
.filter(Objects::nonNull)
.toList();
}
public void sendResponse(SipProvider sipProvider, SendResponse response) {
log.info("{}", sipProvider);
ListeningPoint[] listeningPoints = sipProvider.getListeningPoints();
if (listeningPoints == null || listeningPoints.length == 0) {
log.error("发送响应失败, 未找到有效的监听地址");
return;
}
ListeningPoint listeningPoint = listeningPoints[0];
String ip = listeningPoint.getIPAddress();
int port = listeningPoint.getPort();
try {
sipProvider.sendResponse(response.build(sipProvider, ip, port));
} catch (SipException e) {
log.error("向{} {}:{} 发送响应失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage());
}
}
public void sendResponse(String senderIp,String transport, SendResponse response) {
SipProvider sipProvider = getProvider(transport, senderIp);
sendResponse(sipProvider, response);
}
public void sendRequest(SendRequest request) {
getProviders().parallelStream().forEach(sipProvider -> {
log.info("{}", sipProvider);
ListeningPoint[] listeningPoints = sipProvider.getListeningPoints();
if (listeningPoints == null || listeningPoints.length == 0) {
log.error("发送请求失败, 未找到有效的监听地址");
return;
}
ListeningPoint listeningPoint = listeningPoints[0];
String ip = listeningPoint.getIPAddress();
int port = listeningPoint.getPort();
try {
sipProvider.sendRequest(request.build(sipProvider, ip, port));
} catch (SipException e) {
log.error("向{} {}:{} 发送请求失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage());
}
});
}
public interface SendRequest {
Request build(SipProvider provider, String ip, int port);
}
public interface SendResponse {
Response build(SipProvider provider, String ip, int port);
}
}

View File

@ -0,0 +1,39 @@
package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@RequiredArgsConstructor
public class CatalogSubscribe implements GenericSubscribe<SIPRequest> {
private final Executor executor;
private static final Map<String, SubmissionPublisher<SIPRequest>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SIPRequest> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SIPRequest> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
@Override
public void delPublisher(String key) {
Helper.delPublisher(publishers, key);
}
}

View File

@ -0,0 +1,35 @@
package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
@Slf4j
@Data
@RequiredArgsConstructor
@Service
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private GenericSubscribe<SIPRequest> catalogSubscribe;
@PostConstruct
private void init() {
catalogSubscribe = new CatalogSubscribe(executor);
}
@PreDestroy
private void destroy() {
catalogSubscribe.close();
}
}

View File

@ -4,6 +4,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableFeignClients(basePackages = {
@ -16,6 +17,23 @@ import org.springframework.scheduling.annotation.EnableAsync;
"cn.skcks.docking.gb28181.common",
"cn.skcks.docking.gb28181.wvp",
"cn.skcks.docking.gb28181.media",
"cn.skcks.docking.gb28181"
}, includeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"cn.skcks.docking.gb28181.service.ssrc.*",
})
},
excludeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"cn.skcks.docking.gb28181.starter.*",
"cn.skcks.docking.gb28181.config.sip.SipConfig",
"cn.skcks.docking.gb28181.core.sip.listener.*",
"cn.skcks.docking.gb28181.core.sip.message.processor.*",
"cn.skcks.docking.gb28181.core.sip.message.subscribe.*",
"cn.skcks.docking.gb28181.service.play.*",
"cn.skcks.docking.gb28181.service.record.*",
"cn.skcks.docking.gb28181.core.sip.message.request.*"
})
})
@EnableAsync
public class Gb28181WvpProxyStarter {

View File

@ -48,7 +48,15 @@ proxy:
passwd: admi
use-ffmpeg: false
enable: true
gb28181:
sip:
id: 44050100002000000003
domain: 4405010000
password: 123456
port: 5063
ip:
- 10.10.10.20
# - 192.168.1.241
ffmpeg-support:
task:
max: 4

View File

@ -52,6 +52,15 @@ proxy:
use-wvp-assist: false
enable: true
use-ffmpeg: false
gb28181:
sip:
id: 44050100002000000003
domain: 4405010000
password: 123456
port: 5063
ip:
- 192.168.3.10
# - 192.168.1.241
ffmpeg-support:
task:

View File

@ -126,6 +126,12 @@
<version>${gb28181.docking.version}</version>
</dependency>
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>gb28181-service</artifactId>
<version>${gb28181.docking.version}</version>
</dependency>
<!--MapStruct-->
<dependency>
<groupId>org.mapstruct</groupId>