sipObserver 简单实现

This commit is contained in:
shikong 2023-08-10 02:41:24 +08:00
parent d73d758c62
commit 0359edd83a
3 changed files with 103 additions and 2 deletions

View File

@ -0,0 +1,55 @@
package cn.skcks.docking.gb28181.core.sip.executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
@Configuration
@Order(1)
@EnableAsync(proxyTargetClass = true)
public class DefaultSipExecutor {
/**
* cpu 核心数
*/
public static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
/**
* 最大线程数
*/
public static final int MAX_POOL_SIZE = CPU_NUM * 2;
/**
* 允许线程空闲时间单位默认为秒
*/
private static final int KEEP_ALIVE_TIME = 30;
/**
* 队列长度
*/
public static final int TASK_NUM = 10000;
/**
* 线程名称(前缀)
*/
public static final String THREAD_NAME_PREFIX = "sip-executor";
public static final String EXECUTOR_BEAN_NAME = "sipTaskExecutor";
@Bean(EXECUTOR_BEAN_NAME)
public Executor sipTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CPU_NUM);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(TASK_NUM);
executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
// 线程池对拒绝任务的处理策略
// CallerRunsPolicy由调用线程提交任务的线程处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,43 @@
package cn.skcks.docking.gb28181.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.sip.*;
@Component
@Slf4j
public class SipObserverImpl implements SipObserver{
@Override
@Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
public void processRequest(RequestEvent requestEvent) {
log.debug("method => {}",requestEvent.getRequest().getMethod());
}
@Override
public void processResponse(ResponseEvent responseEvent) {
}
@Override
public void processTimeout(TimeoutEvent timeoutEvent) {
}
@Override
public void processIOException(IOExceptionEvent exceptionEvent) {
}
@Override
public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
}
@Override
public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
}
}

View File

@ -1,6 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.service;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.listener.SipObserver;
import cn.skcks.docking.gb28181.core.sip.message.parser.GbStringMsgParserFactory;
import cn.skcks.docking.gb28181.core.sip.properties.DefaultProperties;
import gov.nist.javax.sip.SipProviderImpl;
@ -21,6 +22,8 @@ import java.util.List;
public class SipServiceImpl implements SipService{
private final SipFactory sipFactory = SipFactory.getInstance();
private final SipConfig sipConfig;
private final SipObserver sipObserver;
private final List<SipProviderImpl> pool = new ArrayList<>(2);
private SipStackImpl sipStack;
@ -62,7 +65,7 @@ public class SipServiceImpl implements SipService{
ListeningPoint tcpListen = sipStack.createListeningPoint(ip, port, "TCP");
SipProviderImpl tcpSipProvider = (SipProviderImpl) sipStack.createSipProvider(tcpListen);
tcpSipProvider.setDialogErrorsAutomaticallyHandled();
// tcpSipProvider.addSipListener();
tcpSipProvider.addSipListener(sipObserver);
pool.add(tcpSipProvider);
log.info("[sip] 监听 tcp://{}:{}", ip, port);
} catch (TransportNotSupportedException
@ -74,7 +77,7 @@ public class SipServiceImpl implements SipService{
try {
ListeningPoint udpListen = sipStack.createListeningPoint(ip, port, "UDP");
SipProviderImpl udpSipProvider = (SipProviderImpl) sipStack.createSipProvider(udpListen);
// udpSipProvider.addSipListener();
udpSipProvider.addSipListener(sipObserver);
pool.add(udpSipProvider);
log.info("[sip] 监听 udp://{}:{}", ip, port);
} catch (TransportNotSupportedException