添加使用多线程消息处理sip消息
This commit is contained in:
parent
22bf3c04b4
commit
a6ffd322f7
@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication;
|
|||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
import org.springframework.boot.web.servlet.ServletComponentScan;
|
import org.springframework.boot.web.servlet.ServletComponentScan;
|
||||||
import org.springframework.context.ConfigurableApplicationContext;
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
import springfox.documentation.oas.annotations.EnableOpenApi;
|
import springfox.documentation.oas.annotations.EnableOpenApi;
|
||||||
|
|
||||||
|
@ -0,0 +1,57 @@
|
|||||||
|
package com.genersoft.iot.vmp.conf;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableAsync
|
||||||
|
public class ThreadPoolTaskConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
|
||||||
|
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
|
||||||
|
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 核心线程数(默认线程数)
|
||||||
|
*/
|
||||||
|
private static final int corePoolSize = 5;
|
||||||
|
/**
|
||||||
|
* 最大线程数
|
||||||
|
*/
|
||||||
|
private static final int maxPoolSize = 30;
|
||||||
|
/**
|
||||||
|
* 允许线程空闲时间(单位:默认为秒)
|
||||||
|
*/
|
||||||
|
private static final int keepAliveTime = 30;
|
||||||
|
/**
|
||||||
|
* 缓冲队列大小
|
||||||
|
*/
|
||||||
|
private static final int queueCapacity = 10000;
|
||||||
|
/**
|
||||||
|
* 线程池名前缀
|
||||||
|
*/
|
||||||
|
private static final String threadNamePrefix = "hdl-uhi-service-";
|
||||||
|
|
||||||
|
@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
|
||||||
|
public ThreadPoolTaskExecutor taskExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
executor.setCorePoolSize(corePoolSize);
|
||||||
|
executor.setMaxPoolSize(maxPoolSize);
|
||||||
|
executor.setQueueCapacity(queueCapacity);
|
||||||
|
executor.setKeepAliveSeconds(keepAliveTime);
|
||||||
|
executor.setThreadNamePrefix(threadNamePrefix);
|
||||||
|
|
||||||
|
// 线程池对拒绝任务的处理策略
|
||||||
|
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
// 初始化
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181;
|
|||||||
|
|
||||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
||||||
import gov.nist.javax.sip.SipProviderImpl;
|
import gov.nist.javax.sip.SipProviderImpl;
|
||||||
import gov.nist.javax.sip.SipStackImpl;
|
import gov.nist.javax.sip.SipStackImpl;
|
||||||
@ -28,28 +29,12 @@ public class SipLayer{
|
|||||||
private SipConfig sipConfig;
|
private SipConfig sipConfig;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private SIPProcessorObserver sipProcessorObserver;
|
private ISIPProcessorObserver sipProcessorObserver;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private SipSubscribe sipSubscribe;
|
|
||||||
|
|
||||||
private SipStackImpl sipStack;
|
private SipStackImpl sipStack;
|
||||||
|
|
||||||
private SipFactory sipFactory;
|
private SipFactory sipFactory;
|
||||||
|
|
||||||
/**
|
|
||||||
* 消息处理器线程池
|
|
||||||
*/
|
|
||||||
private ThreadPoolExecutor processThreadPool;
|
|
||||||
|
|
||||||
public SipLayer() {
|
|
||||||
int processThreadNum = Runtime.getRuntime().availableProcessors() * 10;
|
|
||||||
LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<>(10000);
|
|
||||||
processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum,
|
|
||||||
0L,TimeUnit.MILLISECONDS,processQueue,
|
|
||||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Bean("sipFactory")
|
@Bean("sipFactory")
|
||||||
private SipFactory createSipFactory() {
|
private SipFactory createSipFactory() {
|
||||||
|
@ -0,0 +1,6 @@
|
|||||||
|
package com.genersoft.iot.vmp.gb28181.transmit;
|
||||||
|
|
||||||
|
import javax.sip.SipListener;
|
||||||
|
|
||||||
|
public interface ISIPProcessorObserver extends SipListener {
|
||||||
|
}
|
@ -7,6 +7,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.sip.*;
|
import javax.sip.*;
|
||||||
@ -22,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
* @date: 2021年11月5日 下午15:32
|
* @date: 2021年11月5日 下午15:32
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class SIPProcessorObserver implements SipListener {
|
public class SIPProcessorObserver implements ISIPProcessorObserver {
|
||||||
|
|
||||||
private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class);
|
private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class);
|
||||||
|
|
||||||
@ -33,6 +36,10 @@ public class SIPProcessorObserver implements SipListener {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private SipSubscribe sipSubscribe;
|
private SipSubscribe sipSubscribe;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier(value = "taskExecutor")
|
||||||
|
private ThreadPoolTaskExecutor poolTaskExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 添加 request订阅
|
* 添加 request订阅
|
||||||
* @param method 方法名
|
* @param method 方法名
|
||||||
@ -65,13 +72,17 @@ public class SIPProcessorObserver implements SipListener {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void processRequest(RequestEvent requestEvent) {
|
public void processRequest(RequestEvent requestEvent) {
|
||||||
String method = requestEvent.getRequest().getMethod();
|
|
||||||
ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
|
poolTaskExecutor.execute(() -> {
|
||||||
if (sipRequestProcessor == null) {
|
String method = requestEvent.getRequest().getMethod();
|
||||||
logger.warn("不支持方法{}的request", method);
|
ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
|
||||||
return;
|
if (sipRequestProcessor == null) {
|
||||||
}
|
logger.warn("不支持方法{}的request", method);
|
||||||
requestProcessorMap.get(method).process(requestEvent);
|
return;
|
||||||
|
}
|
||||||
|
requestProcessorMap.get(method).process(requestEvent);
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -90,43 +101,45 @@ public class SIPProcessorObserver implements SipListener {
|
|||||||
// }
|
// }
|
||||||
// sipRequestProcessor.process(responseEvent);
|
// sipRequestProcessor.process(responseEvent);
|
||||||
|
|
||||||
|
poolTaskExecutor.execute(() -> {
|
||||||
Response response = responseEvent.getResponse();
|
Response response = responseEvent.getResponse();
|
||||||
logger.debug(responseEvent.getResponse().toString());
|
logger.debug(responseEvent.getResponse().toString());
|
||||||
int status = response.getStatusCode();
|
int status = response.getStatusCode();
|
||||||
if (((status >= 200) && (status < 300)) || status == 401) { // Success!
|
if (((status >= 200) && (status < 300)) || status == 401) { // Success!
|
||||||
// ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
|
// ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
|
||||||
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
|
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
|
||||||
String method = cseqHeader.getMethod();
|
String method = cseqHeader.getMethod();
|
||||||
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
|
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
|
||||||
if (sipRequestProcessor != null) {
|
if (sipRequestProcessor != null) {
|
||||||
sipRequestProcessor.process(responseEvent);
|
sipRequestProcessor.process(responseEvent);
|
||||||
}
|
}
|
||||||
if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
|
if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
|
||||||
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
|
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
|
||||||
if (callIdHeader != null) {
|
if (callIdHeader != null) {
|
||||||
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
|
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
|
||||||
if (subscribe != null) {
|
if (subscribe != null) {
|
||||||
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
|
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
|
||||||
subscribe.response(eventResult);
|
subscribe.response(eventResult);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if ((status >= 100) && (status < 200)) {
|
||||||
|
// 增加其它无需回复的响应,如101、180等
|
||||||
|
} else {
|
||||||
|
logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
|
||||||
|
if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
|
||||||
|
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
|
||||||
|
if (callIdHeader != null) {
|
||||||
|
SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
|
||||||
|
if (subscribe != null) {
|
||||||
|
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
|
||||||
|
subscribe.response(eventResult);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if ((status >= 100) && (status < 200)) {
|
});
|
||||||
// 增加其它无需回复的响应,如101、180等
|
|
||||||
} else {
|
|
||||||
logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
|
|
||||||
if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
|
|
||||||
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
|
|
||||||
if (callIdHeader != null) {
|
|
||||||
SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
|
|
||||||
if (subscribe != null) {
|
|
||||||
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
|
|
||||||
subscribe.response(eventResult);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,7 +204,6 @@ public class SIPRequestHeaderProvider {
|
|||||||
|
|
||||||
// Event
|
// Event
|
||||||
EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
|
EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
|
||||||
eventHeader.setEventType("Catalog");
|
|
||||||
request.addHeader(eventHeader);
|
request.addHeader(eventHeader);
|
||||||
|
|
||||||
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
|
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");
|
||||||
|
@ -1496,7 +1496,7 @@ public class SIPCommander implements ISIPCommander {
|
|||||||
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
|
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
|
||||||
: udpSipProvider.getNewCallId();
|
: udpSipProvider.getNewCallId();
|
||||||
|
|
||||||
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader);
|
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader);
|
||||||
transmitRequest(device, request, errorEvent, okEvent);
|
transmitRequest(device, request, errorEvent, okEvent);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
Loading…
Reference in New Issue
Block a user