diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index 5dc80dec..d8fbe4a4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -11,7 +11,8 @@ import org.springframework.context.annotation.ScopedProxyMode; import org.springframework.stereotype.Component; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Optional; +import java.util.concurrent.*; /** * @description: 录像查询结束事件 @@ -25,6 +26,8 @@ public class RecordEndEventListener implements ApplicationListener handlerMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final static Map> scheduleMap = new ConcurrentHashMap<>(); public interface RecordEndEventHandler{ void handler(RecordInfo recordInfo); } @@ -40,13 +43,23 @@ public class RecordEndEventListener implements ApplicationListener {}", handlerMap.size()); if (handlerMap.size() > 0) { + String key = deviceId + channelId; logger.debug("handlerMap.keys => {}", handlerMap.keySet()); RecordEndEventHandler handler = handlerMap.get(deviceId + channelId); logger.debug("handler => {}", handler); if (handler != null){ handler.handler(event.getRecordInfo()); if (count >= sumNum){ - handlerMap.remove(deviceId + channelId); + Optional.ofNullable(scheduleMap.get(key)).ifPresent(scheduledFuture -> { + scheduleMap.remove(key); + scheduledFuture.cancel(true); + }); + + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + scheduleMap.remove(key); + handlerMap.remove(deviceId + channelId); + }, 500, TimeUnit.MILLISECONDS); + scheduleMap.put(key,schedule); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 4f2ba593..ba4fe227 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -31,8 +31,8 @@ import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -66,6 +66,14 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent private Long recordInfoTtl = 1800L; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + private final static Map> scheduleMap = new ConcurrentHashMap<>(); + + private final static Map atomicMap = new ConcurrentHashMap<>(); + + private final static Map> recordListMap = new ConcurrentHashMap<>(); + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -132,29 +140,47 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent record.setRecorderId(getText(itemRecord, "RecorderID")); recordList.add(record); } - Map map = recordList.stream() - .filter(record -> record.getDeviceId() != null) - .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); +// Map map = recordList.stream() +// .filter(record -> record.getDeviceId() != null) +// .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); // 获取任务结果数据 String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; - redisTemplate.opsForHash().putAll(resKey, map); - redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); +// redisTemplate.opsForHash().putAll(resKey, map); +// redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; - long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); - redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); +// long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); +// redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); + + if(!Optional.ofNullable(atomicMap.get(resKey)).isPresent()){ + atomicMap.put(resKey, new AtomicInteger(0)); + } + AtomicInteger count = atomicMap.get(resKey); + count.addAndGet(recordList.size()); recordInfo.setRecordList(recordList); - recordInfo.setCount(Math.toIntExact(incr)); + recordInfo.setCount(Math.toIntExact(count.get())); eventPublisher.recordEndEventPush(recordInfo); - if (incr < sumNum) { + List recordItems = Optional.ofNullable(recordListMap.get(resKey)).orElse(new ArrayList<>()); + recordItems.addAll(recordList); + recordListMap.put(resKey, recordItems); + logger.info("{}/{}", count.get(), sumNum); + if (count.get() < sumNum) { return; + } else { + Optional.ofNullable(scheduleMap.get(resKey)).ifPresent(scheduledFuture -> { + scheduleMap.remove(resKey); + scheduledFuture.cancel(true); + }); + + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + scheduleMap.remove(resKey); + atomicMap.remove(resKey); + recordListMap.remove(resKey); + recordInfo.setRecordList(recordItems); + logger.info("scheduledExecutorService.schedule recordInfo {}",recordInfo); + releaseRequest(device.getDeviceId(), sn,recordInfo); + }, 500, TimeUnit.MILLISECONDS); + scheduleMap.put(resKey,schedule); } - // 已接收完成 - List resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); - if (resList.size() < sumNum) { - return; - } - recordInfo.setRecordList(resList); - releaseRequest(device.getDeviceId(), sn,recordInfo); } } } catch (Exception e) {