简单整合 sip

This commit is contained in:
shikong 2023-09-11 15:42:51 +08:00
parent 5e6f0dadbe
commit 871ed32f43
9 changed files with 334 additions and 4 deletions

View File

@ -8,11 +8,12 @@
`id` bigint NOT NULL AUTO_INCREMENT,
`device_code` varchar(50) NOT NULL,
`gb_device_id` varbinary(50) NOT NULL,
`gb_channel_id` varbinary(50) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`address` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `device_code` (`device_code`),
UNIQUE KEY `gb_device_id` (`gb_device_id`)
UNIQUE KEY `gb_device_id` (`gb_device_id`,`gb_channel_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
</update>
</mapper>

View File

@ -19,6 +19,11 @@
</properties>
<dependencies>
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>gb28181-service</artifactId>
</dependency>
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>gb28181-mocking-orm</artifactId>

View File

@ -0,0 +1,18 @@
package cn.skcks.docking.gb28181.mocking.config.sip;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "gb28181.server", ignoreInvalidFields = true)
@Order(0)
@Data
public class ServerConfig {
private String id;
private String domain;
private String ip;
private int port;
private String password;
}

View File

@ -0,0 +1,41 @@
package cn.skcks.docking.gb28181.mocking.config.sip;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@ConfigurationProperties(prefix = "gb28181.sip", ignoreInvalidFields = true)
@Order(0)
@Data
public class SipConfig {
private List<String> ip;
private List<String> showIp;
private Integer port;
private String domain;
private String id;
private String password;
Integer ptzSpeed = 50;
Integer registerTimeInterval = 120;
private boolean alarm;
public List<String> getShowIp() {
if (this.showIp == null) {
return this.ip;
}
return showIp;
}
}

View File

@ -0,0 +1,102 @@
package cn.skcks.docking.gb28181.mocking.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@RequiredArgsConstructor
@Component
@Slf4j
public class SipListenerImpl implements SipListener {
private final ConcurrentMap<String, MessageProcessor> requestProcessor = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MessageProcessor> responseProcessor = new ConcurrentHashMap<>();
public void addRequestProcessor(String method, MessageProcessor messageProcessor) {
log.debug("[SipListener] 注册 {} 请求处理器", method);
requestProcessor.put(method, messageProcessor);
}
public void addResponseProcessor(String method, MessageProcessor messageProcessor) {
log.debug("[SipListener] 注册 {} 响应处理器", method);
responseProcessor.put(method, messageProcessor);
}
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
log.debug("传入请求 method => {}", method);
Optional.ofNullable(requestProcessor.get(method)).ifPresent(processor -> {
processor.process(requestEvent);
});
}
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processResponse(ResponseEvent responseEvent) {
Response response = responseEvent.getResponse();
int status = response.getStatusCode();
CSeqHeader cseqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
log.debug("{} {}", method, response);
// Success
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
log.debug("传入响应 method => {}", method);
Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> {
processor.process(responseEvent);
});
} else if ((status >= Response.TRYING) && (status < Response.OK)) {
// 增加其它无需回复的响应如101180等
} else {
log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase());
if (responseEvent.getDialog() != null) {
responseEvent.getDialog().delete();
}
}
}
@Override
public void processTimeout(TimeoutEvent timeoutEvent) {
ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
if (clientTransaction != null) {
Request request = clientTransaction.getRequest();
if (request != null) {
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
log.debug("会话超时 callId => {}", callIdHeader.getCallId());
}
}
}
}
@Override
public void processIOException(IOExceptionEvent exceptionEvent) {
}
@Override
public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
}
@Override
public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
CallIdHeader callIdHeader = dialogTerminatedEvent.getDialog().getCallId();
log.debug("会话终止 callId => {}", callIdHeader.getCallId());
}
}

View File

@ -0,0 +1,106 @@
package cn.skcks.docking.gb28181.mocking.core.sip.service;
import cn.skcks.docking.gb28181.core.sip.message.parser.GbStringMsgParserFactory;
import cn.skcks.docking.gb28181.core.sip.properties.DefaultProperties;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.mocking.config.sip.SipConfig;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import javax.sip.*;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@RequiredArgsConstructor
@Data
@Order(10)
@Service
public class SipServiceImpl implements SipService {
private final SipFactory sipFactory = SipFactory.getInstance();
private final SipConfig sipConfig;
private final SipListener sipListener;
private final List<SipProviderImpl> pool = new ArrayList<>(2);
private SipStackImpl sipStack;
@Override
public void run() {
sipFactory.setPathName("gov.nist");
sipConfig.getIp().parallelStream().forEach(ip -> {
listen(ip, sipConfig.getPort());
});
}
@Override
public void stop() {
if (sipStack == null) {
return;
}
sipStack.closeAllSockets();
pool.parallelStream().forEach(sipProvider -> {
ListeningPoint listen = sipProvider.getListeningPoint();
log.debug("移除监听 {}://{}:{}", listen.getTransport(), listen.getIPAddress(), listen.getPort());
sipProvider.removeSipListener(sipListener);
sipProvider.removeListeningPoints();
try {
sipStack.deleteListeningPoint(listen);
sipStack.deleteSipProvider(sipProvider);
} catch (Exception ignore) {
}
});
pool.clear();
}
@Override
public SipProvider getProvider(String transport, String ip) {
return pool.parallelStream().filter(sipProvider -> {
ListeningPoint listeningPoint = sipProvider.getListeningPoint();
return listeningPoint != null && listeningPoint.getIPAddress().equals(ip) && listeningPoint.getTransport().equalsIgnoreCase(transport);
}).findFirst().orElse(null);
}
public void listen(String ip, int port) {
try {
sipStack = (SipStackImpl) sipFactory.createSipStack(DefaultProperties.getProperties("GB28181_SIP"));
sipStack.setMessageParserFactory(new GbStringMsgParserFactory());
// sipStack.setMessageProcessorFactory();
try {
ListeningPoint tcpListen = sipStack.createListeningPoint(ip, port, ListeningPoint.TCP);
SipProviderImpl tcpSipProvider = (SipProviderImpl) sipStack.createSipProvider(tcpListen);
tcpSipProvider.setDialogErrorsAutomaticallyHandled();
tcpSipProvider.addSipListener(sipListener);
pool.add(tcpSipProvider);
log.info("[sip] 监听 tcp://{}:{}", ip, port);
} catch (TransportNotSupportedException
| ObjectInUseException
| InvalidArgumentException e) {
log.error("[sip] tcp://{}:{} 监听失败, 请检查端口是否被占用, 错误信息 => {}", ip, port, e.getMessage());
}
try {
ListeningPoint udpListen = sipStack.createListeningPoint(ip, port, ListeningPoint.UDP);
SipProviderImpl udpSipProvider = (SipProviderImpl) sipStack.createSipProvider(udpListen);
udpSipProvider.addSipListener(sipListener);
pool.add(udpSipProvider);
log.info("[sip] 监听 udp://{}:{}", ip, port);
} catch (TransportNotSupportedException
| ObjectInUseException
| InvalidArgumentException e) {
log.error("[sip] udp://{}:{} 监听失败, 请检查端口是否被占用, 错误信息 => {}", ip, port, e.getMessage());
}
} catch (Exception e) {
log.error("[sip] {}:{} 监听失败, 请检查端口是否被占用, 错误信息 => {}", ip, port, e.getMessage());
}
}
}

View File

@ -0,0 +1,53 @@
package cn.skcks.docking.gb28181.mocking.starter;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.mocking.config.sip.SipConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@Order(0)
@Slf4j
@RequiredArgsConstructor
@Component
@DependsOn("mockingOrmInitService")
public class SipStarter implements SmartLifecycle {
private final SipService sipService;
private final SipConfig sipConfig;
private boolean isRunning;
@Override
public void start() {
if(checkConfig()){
isRunning = true;
log.debug("sip 服务 启动");
sipService.run();
}
}
@Override
public void stop() {
log.debug("sip 服务 关闭");
sipService.stop();
isRunning = false;
}
@Override
public boolean isRunning() {
return isRunning;
}
public boolean checkConfig(){
log.debug("sip 配置信息 => \n{}", JsonUtils.toJson(sipConfig));
if(CollectionUtils.isEmpty(sipConfig.getIp())){
log.error("sip ip 配置错误, 请检查配置是否正确");
return false;
}
return true;
}
}

View File

@ -47,8 +47,6 @@ gb28181:
id: 44050100002000000002
# [可选] 默认设备认证密码,后续扩展使用设备单独密码, 移除密码将不进行校验
password: 123456
# 是否存储alarm信息
alarm: true
media:
ip: 192.168.10.32

View File

@ -64,7 +64,7 @@
<profile>
<id>jar</id>
<properties>
<skip.docker>true</skip.docker>
<skip.docker>true</skip.docker>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
@ -108,6 +108,12 @@
<type>pom</type>
</dependency>
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>gb28181-service</artifactId>
<version>${gb28181.docking.version}</version>
</dependency>
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>common</artifactId>