添加 RegisterSubscribe 订阅器

This commit is contained in:
shikong 2023-09-12 20:33:16 +08:00
parent 137b96bbf7
commit 801479c248
7 changed files with 207 additions and 44 deletions

View File

@ -1,5 +0,0 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.register.request;
public class RegisterRequest {
}

View File

@ -0,0 +1,39 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@RequiredArgsConstructor
public class RegisterSubscribe implements GenericSubscribe<SIPResponse> {
private final Executor executor;
private static final Map<String, SubmissionPublisher<SIPResponse>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SIPResponse> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SIPResponse> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
@Override
public void delPublisher(String key) {
Helper.delPublisher(publishers, key);
}
}

View File

@ -0,0 +1,34 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
@Slf4j
@Data
@RequiredArgsConstructor
@Service
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private GenericSubscribe<SIPResponse> registerSubscribe;
@PostConstruct
private void init() {
registerSubscribe = new RegisterSubscribe(executor);
}
@PreDestroy
private void destroy() {
registerSubscribe.close();
}
}

View File

@ -41,38 +41,6 @@ public class SipRequestBuilder implements ApplicationContextAware {
return SipFactory.getInstance();
}
@SneakyThrows
private static SipURI getSipURI(String id, String address){
return MessageHelper.createSipURI(id, address);
}
@SneakyThrows
private static Address getAddress(SipURI uri){
return MessageHelper.createAddress(uri);
}
@SneakyThrows
private static FromHeader getFromHeader(Address fromAddress, String fromTag){
return MessageHelper.createFromHeader(fromAddress, fromTag);
}
@SneakyThrows
private static ToHeader getToHeader(Address toAddress, String toTag){
return MessageHelper.createToHeader(toAddress, toTag);
}
@SneakyThrows
private static MaxForwardsHeader getMaxForwardsHeader(int maxForwards){
return getSipFactory().createHeaderFactory().createMaxForwardsHeader(maxForwards);
}
@SneakyThrows
private static List<ViaHeader> getDeviceViaHeaders(DockingDevice device, String viaTag){
ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(device.getLocalIp(), serverConfig.getPort(), device.getTransport(), viaTag);
viaHeader.setRPort();
return Collections.singletonList(viaHeader);
}
@SneakyThrows
private static List<ViaHeader> getViaHeaders(String ip,int port, String transport, String viaTag){
ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, transport, viaTag);
@ -88,21 +56,21 @@ public class SipRequestBuilder implements ApplicationContextAware {
@SneakyThrows
public static Request createRegisterRequest(MockingDevice device, String ip, int port, long cSeq, String fromTag, String viaTag, CallIdHeader callIdHeader) {
String target = StringUtils.joinWith(":", serverConfig.getIp(), serverConfig.getPort());
SipURI requestURI = getSipURI(serverConfig.getId(), target);
SipURI requestURI = MessageHelper.createSipURI(serverConfig.getId(), target);
// via
List<ViaHeader> viaHeaders = getViaHeaders(serverConfig.getIp(), serverConfig.getPort(), sipConfig.getTransport(), viaTag);
// from
String from = StringUtils.joinWith(":", ip, port);
SipURI fromSipURI = getSipURI(device.getGbDeviceId(), from);
Address fromAddress = getAddress(fromSipURI);
FromHeader fromHeader = getFromHeader(fromAddress, fromTag);
SipURI fromSipURI = MessageHelper.createSipURI(device.getGbDeviceId(), from);
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
ToHeader toHeader = getToHeader(fromAddress, null);
ToHeader toHeader = MessageHelper.createToHeader(fromAddress, null);
// forwards
MaxForwardsHeader maxForwardsHeader = getMaxForwardsHeader(70);
MaxForwardsHeader maxForwardsHeader = MessageHelper.createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = getCSeqHeader(cSeq, Request.REGISTER);
@ -129,7 +97,7 @@ public class SipRequestBuilder implements ApplicationContextAware {
String qop = www.getQop();
String target = StringUtils.joinWith(":", serverConfig.getIp(), serverConfig.getPort());
SipURI requestURI = getSipURI(serverConfig.getId(), target);
SipURI requestURI = MessageHelper.createSipURI(serverConfig.getId(), target);
String cNonce = null;
String nc = "00000001";
if (qop != null) {

View File

@ -0,0 +1,71 @@
server:
port: 18182
project:
version: @project.version@
spring:
data:
redis:
# [必须修改] Redis服务器IP, REDIS安装在本机的,使用127.0.0.1
# host: 192.168.1.241
host: 10.10.10.200
# [必须修改] 端口号
port: 6379
# [可选] 数据库 DB
database: 15
# [可选] 访问密码,若你的redis服务器没有设置密码就不需要用密码去连接
password: 12341234
# [可选] 超时时间
timeout: 10000
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 12341234
url: jdbc:mysql://10.10.10.200:3306/gb28181_docking_platform?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
config:
activate:
on-profile: local
gb28181:
# 作为28181服务器的配置
sip:
# [必须修改] 本机的IP对应你的网卡监听什么ip就是使用什么网卡
# 如果不明白就使用0.0.0.0,大部分情况都是可以的
# 请不要使用127.0.0.1任何包括localhost在内的域名都是不可以的。
ip:
# - 10.27.0.1
# - 192.168.0.195
# - 192.168.10.195
- 10.10.10.20
# - 10.27.0.6
# [可选] 28181服务监听的端口
port: 15060
# 根据国标6.1.2中规定domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码由省级、市级、区级、基层编号组成参照GB/T 2260-2007
# 后两位为行业编码定义参照附录D.3
# 3701020049标识山东济南历下区 信息行业接入
# [可选]
domain: 4405010000
# [可选]
id: 44050100002000000002
# [可选] 默认设备认证密码,后续扩展使用设备单独密码, 移除密码将不进行校验
password: 123456
expire: 3600
transport: "UDP"
server:
ip: 10.10.10.20
# ip: 192.168.10.32
# ip: 192.168.3.12
port: 5060
password: 123456
domain: 4405010000
id: 44050100002000000001
media:
ip: 10.10.10.200
url: 'http://10.10.10.200:5080'
# url: 'http://10.10.10.200:12580/anything/'
id: amrWMKmbKqoBjRQ9
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333

View File

@ -24,6 +24,8 @@ spring:
username: root
password: 123456a
url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
profiles:
active: local
gb28181:
# 作为28181服务器的配置

View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<contextName>logback</contextName>
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
<!--<property name="log.path" value="./log/business_Log" />-->
<!--输出到控制台-->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">-->
<!-- <level>INFO</level>-->
<!-- </filter>-->
<!-- <withJansi>true</withJansi>-->
<encoder>
<!--<pattern>%d %p (%file:%line\)- %m%n</pattern>-->
<!--格式化输出:%d:表示日期 %thread:表示线程名 %-5level:级别从左显示5个字符宽度 %msg:日志消息 %n:是换行符-->
<pattern>%red(%d{yyyy-MM-dd HH:mm:ss.SSS}) %green([%thread]) %highlight(%-5level) %yellow(at %class.%method) (%file:%line\) - %cyan(%msg%n)</pattern>
<!--<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %magenta(%-5level) %green([%-50.50class]) >>> %cyan(%msg) %n</pattern>-->
<charset>UTF-8</charset>
</encoder>
</appender>
<!--&lt;!&ndash;输出到文件&ndash;&gt;-->
<!--<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">-->
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">-->
<!-- <level>INFO</level>-->
<!-- </filter>-->
<!-- <file>${log.path}/logback.log</file>-->
<!-- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">-->
<!-- <fileNamePattern>${log.path}/logback-%d{yyyy-MM-dd-HH-mm}.log</fileNamePattern>-->
<!-- <maxHistory>365</maxHistory>-->
<!-- &lt;!&ndash; <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">&ndash;&gt;-->
<!-- &lt;!&ndash; <maxFileSize>100kB</maxFileSize>&ndash;&gt;-->
<!-- &lt;!&ndash; </timeBasedFileNamingAndTriggeringPolicy>&ndash;&gt;-->
<!-- </rollingPolicy>-->
<!-- <encoder>-->
<!-- &lt;!&ndash;格式化输出:%d:表示日期 %thread:表示线程名 %-5level:级别从左显示5个字符宽度 %msg:日志消息 %n:是换行符&ndash;&gt;-->
<!-- <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>-->
<!-- <charset>UTF-8</charset>-->
<!-- </encoder>-->
<!--</appender>-->
<!-- 如果appender里没有限定日志级别那么root可以统一设置如果没有配置那么控制台和文件不会输出任何日志这里root的level不做限制-->
<root level="INFO">
<!-- 允许控制台输出-->
<appender-ref ref="console" />
<!--&lt;!&ndash; 允许文件输出&ndash;&gt;-->
<!--<appender-ref ref="file" />-->
</root>
<logger name="cn.skcks.docking.gb28181.core.sip.logger" level="INFO" />
<logger name="cn.skcks.docking.gb28181" level="DEBUG" />
</configuration>