完善 SipListener, MessageProcessor

This commit is contained in:
shikong 2023-08-10 15:16:33 +08:00
parent 0359edd83a
commit b62f8ac75a
18 changed files with 465 additions and 59 deletions

View File

@ -89,6 +89,17 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<delimiters>
<delimiter>@</delimiter>
</delimiters>
<useDefaultDelimiters>false</useDefaultDelimiters>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
@ -107,5 +118,15 @@
<filtering>true</filtering>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<includes>
<include>**/**</include>
</includes>
<filtering>true</filtering>
</testResource>
</testResources>
</build>
</project>

View File

@ -0,0 +1,23 @@
package cn.skcks.docking.gb28181.common.config;
import jakarta.annotation.PostConstruct;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
@Slf4j
@Data
@Order(0)
@Configuration
@ConfigurationProperties(prefix = "project")
public class ProjectConfig {
private String version;
@PostConstruct
private void init(){
log.info("项目版本号 {}", version);
}
}

View File

@ -1,3 +1,6 @@
spring:
session:
store-type: none
project:
version: '@project.version@'

View File

@ -0,0 +1,13 @@
package cn.skcks.docking.gb28181.core.sip.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class RemoteInfo {
private String ip;
private int port;
}

View File

@ -0,0 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
public interface SipListener extends javax.sip.SipListener {
void addProcessor(String method, MessageProcessor messageProcessor);
}

View File

@ -0,0 +1,69 @@
package cn.skcks.docking.gb28181.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Component
@Slf4j
public class SipListenerImpl implements SipListener {
private final ConcurrentMap<String, MessageProcessor> processor = new ConcurrentHashMap<>();
public void addProcessor(String method,MessageProcessor messageProcessor){
log.debug("[SipListener] 注册 {} 处理器", method);
processor.put(method, messageProcessor);
}
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
log.debug("method => {}",method);
Optional.ofNullable(processor.get(method)).ifPresent(processor -> {
processor.process(requestEvent);
});
}
@Override
public void processResponse(ResponseEvent responseEvent) {
}
@Override
public void processTimeout(TimeoutEvent timeoutEvent) {
ClientTransaction clientTransaction = timeoutEvent.getClientTransaction();
if (clientTransaction != null) {
Request request = clientTransaction.getRequest();
if (request != null) {
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
log.debug("会话超时 callId => {}", callIdHeader.getCallId());
}
}
}
}
@Override
public void processIOException(IOExceptionEvent exceptionEvent) {
}
@Override
public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
}
@Override
public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
CallIdHeader callIdHeader = dialogTerminatedEvent.getDialog().getCallId();
log.debug("会话终止 callId => {}", callIdHeader.getCallId());
}
}

View File

@ -1,6 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.listener;
import javax.sip.SipListener;
public interface SipObserver extends SipListener {
}

View File

@ -1,43 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.*;
@Component
@Slf4j
public class SipObserverImpl implements SipObserver{
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processRequest(RequestEvent requestEvent) {
log.debug("method => {}",requestEvent.getRequest().getMethod());
}
@Override
public void processResponse(ResponseEvent responseEvent) {
}
@Override
public void processTimeout(TimeoutEvent timeoutEvent) {
}
@Override
public void processIOException(IOExceptionEvent exceptionEvent) {
}
@Override
public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
}
@Override
public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
}
}

View File

@ -1,5 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.message;
public class MessageProcessor {
}

View File

@ -0,0 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.message.processor;
import javax.sip.RequestEvent;
public interface MessageProcessor {
void process(RequestEvent requestEvent);
}

View File

@ -0,0 +1,29 @@
package cn.skcks.docking.gb28181.core.sip.message.processor.request;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
@Slf4j
@RequiredArgsConstructor
@Component
public class RegisterRequestProcessor implements MessageProcessor {
private final static String METHOD = "REGISTER";
private final SipListener sipListener;
@PostConstruct
private void init(){
sipListener.addProcessor(METHOD,this);
}
@Override
public void process(RequestEvent requestEvent) {
}
}

View File

@ -0,0 +1,13 @@
package cn.skcks.docking.gb28181.core.sip.message.send;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@Component
public class SipMessageSender {
private final SipService sipService;
}

View File

@ -0,0 +1,16 @@
package cn.skcks.docking.gb28181.core.sip.sdp;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.sdp.SessionDescription;
@Builder
@Data
public class Gb28181Sdp {
private SessionDescription baseSdb;
private String ssrc;
private String mediaDescription;
}

View File

@ -1,7 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.service;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.listener.SipObserver;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.parser.GbStringMsgParserFactory;
import cn.skcks.docking.gb28181.core.sip.properties.DefaultProperties;
import gov.nist.javax.sip.SipProviderImpl;
@ -9,6 +9,7 @@ import gov.nist.javax.sip.SipStackImpl;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import javax.sip.*;
@ -18,11 +19,12 @@ import java.util.List;
@Slf4j
@RequiredArgsConstructor
@Data
@Order(10)
@Service
public class SipServiceImpl implements SipService{
public class SipServiceImpl implements SipService {
private final SipFactory sipFactory = SipFactory.getInstance();
private final SipConfig sipConfig;
private final SipObserver sipObserver;
private final SipListener sipListener;
private final List<SipProviderImpl> pool = new ArrayList<>(2);
private SipStackImpl sipStack;
@ -45,6 +47,7 @@ public class SipServiceImpl implements SipService{
pool.parallelStream().forEach(sipProvider -> {
ListeningPoint listen = sipProvider.getListeningPoint();
log.debug("移除监听 {}://{}:{}",listen.getTransport(),listen.getIPAddress(),listen.getPort());
sipProvider.removeSipListener(sipListener);
sipProvider.removeListeningPoints();
try{
@ -65,7 +68,7 @@ public class SipServiceImpl implements SipService{
ListeningPoint tcpListen = sipStack.createListeningPoint(ip, port, "TCP");
SipProviderImpl tcpSipProvider = (SipProviderImpl) sipStack.createSipProvider(tcpListen);
tcpSipProvider.setDialogErrorsAutomaticallyHandled();
tcpSipProvider.addSipListener(sipObserver);
tcpSipProvider.addSipListener(sipListener);
pool.add(tcpSipProvider);
log.info("[sip] 监听 tcp://{}:{}", ip, port);
} catch (TransportNotSupportedException
@ -77,7 +80,7 @@ public class SipServiceImpl implements SipService{
try {
ListeningPoint udpListen = sipStack.createListeningPoint(ip, port, "UDP");
SipProviderImpl udpSipProvider = (SipProviderImpl) sipStack.createSipProvider(udpListen);
udpSipProvider.addSipListener(sipObserver);
udpSipProvider.addSipListener(sipListener);
pool.add(udpSipProvider);
log.info("[sip] 监听 udp://{}:{}", ip, port);
} catch (TransportNotSupportedException

View File

@ -0,0 +1,233 @@
package cn.skcks.docking.gb28181.core.sip.utils;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import cn.skcks.docking.gb28181.common.config.ProjectConfig;
import cn.skcks.docking.gb28181.core.sip.dto.RemoteInfo;
import cn.skcks.docking.gb28181.core.sip.sdp.Gb28181Sdp;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Subject;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sdp.SdpFactory;
import javax.sdp.SdpParseException;
import javax.sdp.SessionDescription;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
import javax.sip.header.FromHeader;
import javax.sip.header.Header;
import javax.sip.header.UserAgentHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
@SuppressWarnings({"unused"})
@Slf4j
@Component
public class SipUtil implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SipUtil.projectConfig = applicationContext.getBean(ProjectConfig.class);
}
private static ProjectConfig projectConfig;
public static String getUserIdFromFromHeader(FromHeader fromHeader) {
AddressImpl address = (AddressImpl)fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
return uri.getUser();
}
public static String getUserIdFromFromHeader(Request request) {
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
return getUserIdFromFromHeader(fromHeader);
}
/**
* 从subject读取channelId
* */
public static String getChannelIdFromRequest(Request request) {
Header subject = request.getHeader("subject");
if (subject == null) {
// 如果缺失subject
return null;
}
return ((Subject) subject).getSubject().split(":")[0];
}
public static String generateViaTag() {
return "z9hG4bK" + RandomStringUtils.randomNumeric(10);
}
public static UserAgentHeader createUserAgentHeader() throws PeerUnavailableException, ParseException {
List<String> agentParam = new ArrayList<>();
agentParam.add("GB28181-Docking-Platform");
agentParam.add(" ");
agentParam.add(StringUtils.replace(projectConfig.getVersion(),"-SNAPSHOT",""));
return SipFactory.getInstance().createHeaderFactory().createUserAgentHeader(agentParam);
}
public static String generateFromTag(){
return IdUtil.fastSimpleUUID();
}
public static String generateTag(){
return String.valueOf(System.currentTimeMillis());
}
/**
* 从请求中获取设备ip地址和端口号
* @param request 请求
* @param sipUseSourceIpAsRemoteAddress false 从via中获取地址 true 直接获取远程地址
* @return 地址信息
*/
public static RemoteInfo getRemoteInfoFromRequest(SIPRequest request, boolean sipUseSourceIpAsRemoteAddress) {
String remoteAddress;
int remotePort;
if (sipUseSourceIpAsRemoteAddress) {
remoteAddress = request.getPeerPacketSourceAddress().getHostAddress();
remotePort = request.getPeerPacketSourcePort();
}else {
// 判断RPort是否改变改变则说明路由nat信息变化修改设备信息
// 获取到通信地址等信息
remoteAddress = request.getTopmostViaHeader().getReceived();
remotePort = request.getTopmostViaHeader().getRPort();
// 解析本地地址替代
if (ObjectUtils.isEmpty(remoteAddress) || remotePort == -1) {
remoteAddress = request.getPeerPacketSourceAddress().getHostAddress();
remotePort = request.getPeerPacketSourcePort();
}
}
return new RemoteInfo(remoteAddress, remotePort);
}
/**
* 云台指令码计算
*
* @param leftRight 镜头左移右移 0:停止 1:左移 2:右移
* @param upDown 镜头上移下移 0:停止 1:上移 2:下移
* @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大
* @param moveSpeed 镜头移动速度 默认 0XFF (0-255)
* @param zoomSpeed 镜头缩放速度 默认 0X1 (0-255)
*/
public static String cmdString(int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed) {
int cmdCode = 0;
if (leftRight == 2) {
cmdCode|=0x01; // 右移
} else if(leftRight == 1) {
cmdCode|=0x02; // 左移
}
if (upDown == 2) {
cmdCode|=0x04; // 下移
} else if(upDown == 1) {
cmdCode|=0x08; // 上移
}
if (inOut == 2) {
cmdCode |= 0x10; // 放大
} else if(inOut == 1) {
cmdCode |= 0x20; // 缩小
}
StringBuilder builder = new StringBuilder("A50F01");
String strTmp;
strTmp = String.format("%02X", cmdCode);
builder.append(strTmp, 0, 2);
strTmp = String.format("%02X", moveSpeed);
builder.append(strTmp, 0, 2);
builder.append(strTmp, 0, 2);
//优化zoom低倍速下的变倍速率
if ((zoomSpeed > 0) && (zoomSpeed <16))
{
zoomSpeed = 16;
}
strTmp = String.format("%X", zoomSpeed);
builder.append(strTmp, 0, 1).append("0");
//计算校验码
int checkCode = (0XA5 + 0X0F + 0X01 + cmdCode + moveSpeed + moveSpeed + (zoomSpeed /*<< 4*/ & 0XF0)) % 0X100;
strTmp = String.format("%02X", checkCode);
builder.append(strTmp, 0, 2);
return builder.toString();
}
public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException {
// jainSip不支持y= f=字段 移除以解析
int ssrcIndex = sdpStr.indexOf("y=");
int mediaDescriptionIndex = sdpStr.indexOf("f=");
// 检查是否有y字段
SessionDescription sdp;
String ssrc = null;
String mediaDescription = null;
if (mediaDescriptionIndex == 0 && ssrcIndex == 0) {
sdp = SdpFactory.getInstance().createSessionDescription(sdpStr);
}else {
String[] lines = sdpStr.split("\\r?\\n");
StringBuilder sdpBuffer = new StringBuilder();
for (String line : lines) {
if (line.trim().startsWith("y=")) {
ssrc = line.substring(2);
}else if (line.trim().startsWith("f=")) {
mediaDescription = line.substring(2);
}else {
sdpBuffer.append(line.trim()).append("\r\n");
}
}
sdp = SdpFactory.getInstance().createSessionDescription(sdpBuffer.toString());
}
return Gb28181Sdp.builder()
.baseSdb(sdp)
.ssrc(ssrc)
.mediaDescription(mediaDescription)
.build();
}
public static String getSsrcFromSdp(String sdpStr) {
// jainSip不支持y= f=字段 移除以解析
int ssrcIndex = sdpStr.indexOf("y=");
if (ssrcIndex == 0) {
return null;
}
String[] lines = sdpStr.split("\\r?\\n");
for (String line : lines) {
if (line.trim().startsWith("y=")) {
return line.substring(2);
}
}
return null;
}
public static String parseTime(String timeStr) {
if (ObjectUtils.isEmpty(timeStr)){
return null;
}
LocalDateTime localDateTime;
try {
localDateTime = LocalDateTime.parse(timeStr);
}catch (DateTimeParseException e) {
try {
localDateTime = LocalDateTimeUtil.parse(timeStr, DatePattern.UTC_SIMPLE_PATTERN);
}catch (DateTimeParseException e2) {
log.error("[格式化时间] 无法格式化时间: {}", timeStr);
return null;
}
}
return localDateTime.format(DatePattern.NORM_DATETIME_FORMATTER.withZone(ZoneId.of("Asia/Shanghai")));
}
}

10
pom.xml
View File

@ -207,5 +207,15 @@
<filtering>true</filtering>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<includes>
<include>**/**</include>
</includes>
<filtering>true</filtering>
</testResource>
</testResources>
</build>
</project>

View File

@ -120,5 +120,15 @@
<filtering>true</filtering>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<includes>
<include>**/**</include>
</includes>
<filtering>true</filtering>
</testResource>
</testResources>
</build>
</project>

View File

@ -1,6 +1,9 @@
server:
port: 28181
project:
version: @project.version@
gb28181:
# 作为28181服务器的配置
sip: