diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/executor/DefaultSipExecutor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/executor/DefaultSipExecutor.java new file mode 100644 index 0000000..5e27e6a --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/executor/DefaultSipExecutor.java @@ -0,0 +1,55 @@ +package cn.skcks.docking.gb28181.core.sip.executor; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.*; + +@Configuration +@Order(1) +@EnableAsync(proxyTargetClass = true) +public class DefaultSipExecutor { + /** + * cpu 核心数 + */ + public static final int CPU_NUM = Runtime.getRuntime().availableProcessors(); + /** + * 最大线程数 + */ + public static final int MAX_POOL_SIZE = CPU_NUM * 2; + /** + * 允许线程空闲时间(单位:默认为秒) + */ + private static final int KEEP_ALIVE_TIME = 30; + /** + * 队列长度 + */ + public static final int TASK_NUM = 10000; + /** + * 线程名称(前缀) + */ + public static final String THREAD_NAME_PREFIX = "sip-executor"; + + public static final String EXECUTOR_BEAN_NAME = "sipTaskExecutor"; + + + @Bean(EXECUTOR_BEAN_NAME) + public Executor sipTaskExecutor(){ + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(CPU_NUM); + executor.setMaxPoolSize(MAX_POOL_SIZE); + executor.setQueueCapacity(TASK_NUM); + executor.setKeepAliveSeconds(KEEP_ALIVE_TIME); + executor.setThreadNamePrefix(THREAD_NAME_PREFIX); + + // 线程池对拒绝任务的处理策略 + // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 初始化 + executor.initialize(); + return executor; + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipObserverImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipObserverImpl.java new file mode 100644 index 0000000..11cc587 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipObserverImpl.java @@ -0,0 +1,43 @@ +package cn.skcks.docking.gb28181.core.sip.listener; + +import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import javax.sip.*; + +@Component +@Slf4j +public class SipObserverImpl implements SipObserver{ + @Override + @Async(DefaultSipExecutor.EXECUTOR_BEAN_NAME) + public void processRequest(RequestEvent requestEvent) { + log.debug("method => {}",requestEvent.getRequest().getMethod()); + } + + @Override + public void processResponse(ResponseEvent responseEvent) { + + } + + @Override + public void processTimeout(TimeoutEvent timeoutEvent) { + + } + + @Override + public void processIOException(IOExceptionEvent exceptionEvent) { + + } + + @Override + public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) { + + } + + @Override + public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { + + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java index a8f06cb..2e13054 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java @@ -1,6 +1,7 @@ package cn.skcks.docking.gb28181.core.sip.service; import cn.skcks.docking.gb28181.config.sip.SipConfig; +import cn.skcks.docking.gb28181.core.sip.listener.SipObserver; import cn.skcks.docking.gb28181.core.sip.message.parser.GbStringMsgParserFactory; import cn.skcks.docking.gb28181.core.sip.properties.DefaultProperties; import gov.nist.javax.sip.SipProviderImpl; @@ -21,6 +22,8 @@ import java.util.List; public class SipServiceImpl implements SipService{ private final SipFactory sipFactory = SipFactory.getInstance(); private final SipConfig sipConfig; + private final SipObserver sipObserver; + private final List pool = new ArrayList<>(2); private SipStackImpl sipStack; @@ -62,7 +65,7 @@ public class SipServiceImpl implements SipService{ ListeningPoint tcpListen = sipStack.createListeningPoint(ip, port, "TCP"); SipProviderImpl tcpSipProvider = (SipProviderImpl) sipStack.createSipProvider(tcpListen); tcpSipProvider.setDialogErrorsAutomaticallyHandled(); - // tcpSipProvider.addSipListener(); + tcpSipProvider.addSipListener(sipObserver); pool.add(tcpSipProvider); log.info("[sip] 监听 tcp://{}:{}", ip, port); } catch (TransportNotSupportedException @@ -74,7 +77,7 @@ public class SipServiceImpl implements SipService{ try { ListeningPoint udpListen = sipStack.createListeningPoint(ip, port, "UDP"); SipProviderImpl udpSipProvider = (SipProviderImpl) sipStack.createSipProvider(udpListen); - // udpSipProvider.addSipListener(); + udpSipProvider.addSipListener(sipObserver); pool.add(udpSipProvider); log.info("[sip] 监听 udp://{}:{}", ip, port); } catch (TransportNotSupportedException