批量注册调整

This commit is contained in:
shikong 2023-09-20 17:06:27 +08:00
parent 8ad7c5a15f
commit 8c35b8ae41

View File

@ -1,6 +1,7 @@
package cn.skcks.docking.gb28181.mocking.service.gb28181.register; package cn.skcks.docking.gb28181.mocking.service.gb28181.register;
import cn.hutool.core.collection.ListUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
@ -22,6 +23,7 @@ import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -41,16 +43,29 @@ public class RegisterService {
private static final int TIMEOUT = 60; private static final int TIMEOUT = 60;
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@SneakyThrows @SneakyThrows
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public DeferredResult<JsonResponse<Boolean>> register() { public DeferredResult<JsonResponse<Boolean>> register() {
DeferredResult<JsonResponse<Boolean>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(TIMEOUT)); DeferredResult<JsonResponse<Boolean>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(TIMEOUT));
List<MockingDevice> allDevice = deviceService.getAllDevice(); List<MockingDevice> allDevice = deviceService.getAllDevice();
CompletableFuture<JsonResponse<Boolean>>[] array = allDevice.parallelStream().map(this::register).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(array).join();
Optional<JsonResponse<Boolean>> first = Arrays.stream(array).map(item -> {
List<CompletableFuture<JsonResponse<Boolean>>[]> completableFutures = ListUtil.split(allDevice, 10).stream().map(items -> {
CompletableFuture<JsonResponse<Boolean>>[] array = allDevice.stream().map(this::register).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(array);
return array;
}).toList();
List<CompletableFuture<JsonResponse<Boolean>>> reduce = completableFutures.stream().map(item -> Arrays.stream(item).toList())
.reduce(new ArrayList<>(), (prev, cur) -> {
prev.addAll(cur);
return prev;
});
Optional<JsonResponse<Boolean>> first = reduce.stream().map(item -> {
try { try {
return item.get(); return item.get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
@ -69,7 +84,6 @@ public class RegisterService {
keepaliveService.unKeepalive(device); keepaliveService.unKeepalive(device);
CompletableFuture<JsonResponse<Boolean>> result = new CompletableFuture<>(); CompletableFuture<JsonResponse<Boolean>> result = new CompletableFuture<>();
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
sender.sendRequest((provider, ip, port) -> { sender.sendRequest((provider, ip, port) -> {
CallIdHeader callIdHeader = provider.getNewCallId(); CallIdHeader callIdHeader = provider.getNewCallId();
String callId = callIdHeader.getCallId(); String callId = callIdHeader.getCallId();