模拟设备注册
This commit is contained in:
parent
2cc7894fd3
commit
804032b841
@ -0,0 +1,43 @@
|
|||||||
|
package cn.skcks.docking.gb28181.mocking.core.sip.executor;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@Order(1)
|
||||||
|
@EnableAsync(
|
||||||
|
proxyTargetClass = true
|
||||||
|
)
|
||||||
|
public class MockingExecutor{
|
||||||
|
public static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
|
||||||
|
public static final int MAX_POOL_SIZE;
|
||||||
|
private static final int KEEP_ALIVE_TIME = 30;
|
||||||
|
public static final int TASK_NUM = 10000;
|
||||||
|
public static final String THREAD_NAME_PREFIX = "mocking-executor";
|
||||||
|
public static final String EXECUTOR_BEAN_NAME = "mockingTaskExecutor";
|
||||||
|
|
||||||
|
public MockingExecutor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(EXECUTOR_BEAN_NAME)
|
||||||
|
public ThreadPoolTaskExecutor sipTaskExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
executor.setCorePoolSize(CPU_NUM);
|
||||||
|
executor.setMaxPoolSize(MAX_POOL_SIZE);
|
||||||
|
executor.setQueueCapacity(10000);
|
||||||
|
executor.setKeepAliveSeconds(30);
|
||||||
|
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
MAX_POOL_SIZE = CPU_NUM * 2;
|
||||||
|
}
|
||||||
|
}
|
@ -2,10 +2,12 @@ package cn.skcks.docking.gb28181.mocking.service.gb28181.register;
|
|||||||
|
|
||||||
|
|
||||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||||
|
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
|
||||||
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
|
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
|
||||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
|
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
|
||||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
|
||||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||||
|
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
|
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
|
||||||
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
|
||||||
@ -16,6 +18,8 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.bouncycastle.cert.ocsp.Req;
|
import org.bouncycastle.cert.ocsp.Req;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.web.context.request.async.DeferredResult;
|
import org.springframework.web.context.request.async.DeferredResult;
|
||||||
|
|
||||||
@ -24,7 +28,10 @@ import javax.sip.header.CallIdHeader;
|
|||||||
import javax.sip.header.WWWAuthenticateHeader;
|
import javax.sip.header.WWWAuthenticateHeader;
|
||||||
import javax.sip.message.Request;
|
import javax.sip.message.Request;
|
||||||
import javax.sip.message.Response;
|
import javax.sip.message.Response;
|
||||||
|
import java.text.MessageFormat;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -37,19 +44,38 @@ public class RegisterService {
|
|||||||
|
|
||||||
private final SipSubscribe subscribe;
|
private final SipSubscribe subscribe;
|
||||||
|
|
||||||
|
private final MockingExecutor executor;
|
||||||
|
|
||||||
|
private static final int TIMEOUT = 60;
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public DeferredResult<JsonResponse<Boolean>> register(){
|
@SuppressWarnings("unchecked")
|
||||||
DeferredResult<JsonResponse<Boolean>> result = new DeferredResult<>();
|
public DeferredResult<JsonResponse<Boolean>> register() {
|
||||||
|
DeferredResult<JsonResponse<Boolean>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(TIMEOUT));
|
||||||
|
|
||||||
List<MockingDevice> allDevice = deviceService.getAllDevice();
|
List<MockingDevice> allDevice = deviceService.getAllDevice();
|
||||||
allDevice.parallelStream().forEach(this::register);
|
CompletableFuture<JsonResponse<Boolean>>[] array = allDevice.parallelStream().map(this::register).toArray(CompletableFuture[]::new);
|
||||||
result.setResult(JsonResponse.success(true));
|
CompletableFuture.allOf(array).join();
|
||||||
|
|
||||||
|
Optional<JsonResponse<Boolean>> first = Arrays.stream(array).map(item -> {
|
||||||
|
try {
|
||||||
|
return item.get();
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).filter(item -> item == null || item.getCode() != 200).findFirst();
|
||||||
|
first.ifPresentOrElse(item -> {
|
||||||
|
log.info("执行失败 {}", item);
|
||||||
|
result.setResult(JsonResponse.error(item.getMsg()));
|
||||||
|
}, () -> result.setResult(JsonResponse.success(true)));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public void register(MockingDevice device){
|
public CompletableFuture<JsonResponse<Boolean>> register(MockingDevice device) {
|
||||||
|
CompletableFuture<JsonResponse<Boolean>> result = new CompletableFuture<>();
|
||||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
sender.sendRequest((provider,ip,port)->{
|
sender.sendRequest((provider, ip, port) -> {
|
||||||
CallIdHeader callIdHeader = provider.getNewCallId();
|
CallIdHeader callIdHeader = provider.getNewCallId();
|
||||||
String callId = callIdHeader.getCallId();
|
String callId = callIdHeader.getCallId();
|
||||||
Request request = SipRequestBuilder.createRegisterRequest(device, ip, port, 1, SipUtil.generateFromTag(), null, callIdHeader);
|
Request request = SipRequestBuilder.createRegisterRequest(device, ip, port, 1, SipUtil.generateFromTag(), null, callIdHeader);
|
||||||
@ -71,14 +97,14 @@ public class RegisterService {
|
|||||||
@Override
|
@Override
|
||||||
public void onNext(SIPResponse response) {
|
public void onNext(SIPResponse response) {
|
||||||
int statusCode = response.getStatusCode();
|
int statusCode = response.getStatusCode();
|
||||||
if(statusCode == Response.UNAUTHORIZED && !usedAuthorization){
|
if (statusCode == Response.UNAUTHORIZED && !usedAuthorization) {
|
||||||
usedAuthorization = true;
|
usedAuthorization = true;
|
||||||
WWWAuthenticateHeader authorizationHeader = (WWWAuthenticateHeader) response.getHeader(WWWAuthenticateHeader.NAME);
|
WWWAuthenticateHeader authorizationHeader = (WWWAuthenticateHeader) response.getHeader(WWWAuthenticateHeader.NAME);
|
||||||
provider.sendRequest(SipRequestBuilder.createRegisterRequestWithAuthorization(
|
provider.sendRequest(SipRequestBuilder.createRegisterRequestWithAuthorization(
|
||||||
device,
|
device,
|
||||||
ip,
|
ip,
|
||||||
port,
|
port,
|
||||||
response.getCSeq().getSeqNumber()+1,
|
response.getCSeq().getSeqNumber() + 1,
|
||||||
response.getFromTag(),
|
response.getFromTag(),
|
||||||
null,
|
null,
|
||||||
callIdHeader,
|
callIdHeader,
|
||||||
@ -90,12 +116,15 @@ public class RegisterService {
|
|||||||
|
|
||||||
if (statusCode == Response.UNAUTHORIZED) {
|
if (statusCode == Response.UNAUTHORIZED) {
|
||||||
this.onComplete();
|
this.onComplete();
|
||||||
log.info("设备: {}({}), 注册失败, 认证失败", device.getDeviceCode(), device.getGbDeviceId());
|
String reason = MessageFormat.format("设备: {0}({1}), 注册失败, 认证失败", device.getDeviceCode(), device.getGbDeviceId());
|
||||||
|
log.error(reason);
|
||||||
|
result.complete(JsonResponse.error(reason));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(statusCode == Response.OK){
|
if (statusCode == Response.OK) {
|
||||||
log.info("设备: {}({}), 注册成功",device.getDeviceCode(), device.getGbDeviceId());
|
log.info("设备: {}({}), 注册成功", device.getDeviceCode(), device.getGbDeviceId());
|
||||||
|
result.complete(JsonResponse.success(null));
|
||||||
this.onComplete();
|
this.onComplete();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -114,8 +143,9 @@ public class RegisterService {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
subscribe.getRegisterSubscribe().addSubscribe(key, subscriber);
|
subscribe.getRegisterSubscribe().addSubscribe(key, subscriber);
|
||||||
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 30, TimeUnit.SECONDS);
|
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, TIMEOUT / 2, TimeUnit.SECONDS);
|
||||||
return request;
|
return request;
|
||||||
});
|
});
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user