From 1d8bbd41e575556aecd5fb33f4852592cf0568a8 Mon Sep 17 00:00:00 2001 From: zxb <919411476@qq.com> Date: Tue, 29 Aug 2023 17:18:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=BC=E5=AE=B9=20=E4=B8=8D=E6=A0=87?= =?UTF-8?q?=E5=87=86=E7=9A=84=20recordInfo=20=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../event/record/RecordEndEventListener.java | 17 ++++- .../cmd/RecordInfoResponseMessageHandler.java | 62 +++++++++++++------ 2 files changed, 59 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index 5dc80dec..d8fbe4a4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -11,7 +11,8 @@ import org.springframework.context.annotation.ScopedProxyMode; import org.springframework.stereotype.Component; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Optional; +import java.util.concurrent.*; /** * @description: 录像查询结束事件 @@ -25,6 +26,8 @@ public class RecordEndEventListener implements ApplicationListener handlerMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final static Map> scheduleMap = new ConcurrentHashMap<>(); public interface RecordEndEventHandler{ void handler(RecordInfo recordInfo); } @@ -40,13 +43,23 @@ public class RecordEndEventListener implements ApplicationListener {}", handlerMap.size()); if (handlerMap.size() > 0) { + String key = deviceId + channelId; logger.debug("handlerMap.keys => {}", handlerMap.keySet()); RecordEndEventHandler handler = handlerMap.get(deviceId + channelId); logger.debug("handler => {}", handler); if (handler != null){ handler.handler(event.getRecordInfo()); if (count >= sumNum){ - handlerMap.remove(deviceId + channelId); + Optional.ofNullable(scheduleMap.get(key)).ifPresent(scheduledFuture -> { + scheduleMap.remove(key); + scheduledFuture.cancel(true); + }); + + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + scheduleMap.remove(key); + handlerMap.remove(deviceId + channelId); + }, 500, TimeUnit.MILLISECONDS); + scheduleMap.put(key,schedule); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index fc0fda8d..c0a277e7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -28,8 +28,8 @@ import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -63,6 +63,14 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent private Long recordInfoTtl = 1800L; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + private final static Map> scheduleMap = new ConcurrentHashMap<>(); + + private final static Map atomicMap = new ConcurrentHashMap<>(); + + private final static Map> recordListMap = new ConcurrentHashMap<>(); + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -128,29 +136,47 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent record.setRecorderId(getText(itemRecord, "RecorderID")); recordList.add(record); } - Map map = recordList.stream() - .filter(record -> record.getDeviceId() != null) - .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); +// Map map = recordList.stream() +// .filter(record -> record.getDeviceId() != null) +// .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); // 获取任务结果数据 String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; - redisTemplate.opsForHash().putAll(resKey, map); - redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); +// redisTemplate.opsForHash().putAll(resKey, map); +// redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; - long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); - redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); +// long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); +// redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); + + if(!Optional.ofNullable(atomicMap.get(resKey)).isPresent()){ + atomicMap.put(resKey, new AtomicInteger(0)); + } + AtomicInteger count = atomicMap.get(resKey); + count.addAndGet(recordList.size()); recordInfo.setRecordList(recordList); - recordInfo.setCount(Math.toIntExact(incr)); + recordInfo.setCount(Math.toIntExact(count.get())); eventPublisher.recordEndEventPush(recordInfo); - if (incr < sumNum) { + List recordItems = Optional.ofNullable(recordListMap.get(resKey)).orElse(new ArrayList<>()); + recordItems.addAll(recordList); + recordListMap.put(resKey, recordItems); + logger.info("{}/{}", count.get(), sumNum); + if (count.get() < sumNum) { return; + } else { + Optional.ofNullable(scheduleMap.get(resKey)).ifPresent(scheduledFuture -> { + scheduleMap.remove(resKey); + scheduledFuture.cancel(true); + }); + + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + scheduleMap.remove(resKey); + atomicMap.remove(resKey); + recordListMap.remove(resKey); + recordInfo.setRecordList(recordItems); + logger.info("scheduledExecutorService.schedule recordInfo {}",recordInfo); + releaseRequest(device.getDeviceId(), sn,recordInfo); + }, 500, TimeUnit.MILLISECONDS); + scheduleMap.put(resKey,schedule); } - // 已接收完成 - List resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); - if (resList.size() < sumNum) { - return; - } - recordInfo.setRecordList(resList); - releaseRequest(device.getDeviceId(), sn,recordInfo); } } } catch (Exception e) {