From 8c35b8ae416bcd8a26e0543340cec59ce167a043 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Wed, 20 Sep 2023 17:06:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=B9=E9=87=8F=E6=B3=A8=E5=86=8C=E8=B0=83?= =?UTF-8?q?=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/register/RegisterService.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java index ddf9655..a8d0f24 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/gb28181/register/RegisterService.java @@ -1,6 +1,7 @@ package cn.skcks.docking.gb28181.mocking.service.gb28181.register; +import cn.hutool.core.collection.ListUtil; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; @@ -22,6 +23,7 @@ import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -41,16 +43,29 @@ public class RegisterService { private static final int TIMEOUT = 60; + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + @SneakyThrows @SuppressWarnings("unchecked") public DeferredResult> register() { DeferredResult> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(TIMEOUT)); List allDevice = deviceService.getAllDevice(); - CompletableFuture>[] array = allDevice.parallelStream().map(this::register).toArray(CompletableFuture[]::new); - CompletableFuture.allOf(array).join(); - Optional> first = Arrays.stream(array).map(item -> { + + List>[]> completableFutures = ListUtil.split(allDevice, 10).stream().map(items -> { + CompletableFuture>[] array = allDevice.stream().map(this::register).toArray(CompletableFuture[]::new); + CompletableFuture.allOf(array); + return array; + }).toList(); + + List>> reduce = completableFutures.stream().map(item -> Arrays.stream(item).toList()) + .reduce(new ArrayList<>(), (prev, cur) -> { + prev.addAll(cur); + return prev; + }); + + Optional> first = reduce.stream().map(item -> { try { return item.get(); } catch (InterruptedException | ExecutionException e) { @@ -69,7 +84,6 @@ public class RegisterService { keepaliveService.unKeepalive(device); CompletableFuture> result = new CompletableFuture<>(); - ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); sender.sendRequest((provider, ip, port) -> { CallIdHeader callIdHeader = provider.getNewCallId(); String callId = callIdHeader.getCallId();