Merge remote-tracking branch 'origin/dev-2.6.9_整理' into dev-2.6.9_整理

This commit is contained in:
zxb 2023-09-05 11:49:04 +08:00
commit 279d789c12
2 changed files with 62 additions and 22 deletions

View File

@ -11,7 +11,8 @@ import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Optional;
import java.util.concurrent.*;
/**
* @description: 录像查询结束事件
@ -25,6 +26,8 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class);
private final static Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final static Map<String, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();
public interface RecordEndEventHandler{
void handler(RecordInfo recordInfo);
}
@ -40,13 +43,23 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
event.getRecordInfo().getChannelId(), count,sumNum);
logger.debug("handlerMap.size => {}", handlerMap.size());
if (handlerMap.size() > 0) {
String key = deviceId + channelId;
logger.debug("handlerMap.keys => {}", handlerMap.keySet());
RecordEndEventHandler handler = handlerMap.get(deviceId + channelId);
logger.debug("handler => {}", handler);
if (handler != null){
handler.handler(event.getRecordInfo());
if (count >= sumNum){
handlerMap.remove(deviceId + channelId);
Optional.ofNullable(scheduleMap.get(key)).ifPresent(scheduledFuture -> {
scheduleMap.remove(key);
scheduledFuture.cancel(true);
});
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
scheduleMap.remove(key);
handlerMap.remove(deviceId + channelId);
}, 500, TimeUnit.MILLISECONDS);
scheduleMap.put(key,schedule);
}
}
}

View File

@ -31,8 +31,8 @@ import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@ -66,6 +66,14 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
private Long recordInfoTtl = 1800L;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final static Map<String, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();
private final static Map<String, AtomicInteger> atomicMap = new ConcurrentHashMap<>();
private final static Map<String, List<RecordItem>> recordListMap = new ConcurrentHashMap<>();
@Override
public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this);
@ -132,29 +140,48 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
Map<String, String> map = recordList.stream()
.filter(record -> record.getDeviceId() != null)
.collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
// Map<String, String> map = recordList.stream()
// .filter(record -> record.getDeviceId() != null)
// .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
// 获取任务结果数据
String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
redisTemplate.opsForHash().putAll(resKey, map);
redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
// redisTemplate.opsForHash().putAll(resKey, map);
// redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
// long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
// redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
if(!Optional.ofNullable(atomicMap.get(resKey)).isPresent()){
atomicMap.put(resKey, new AtomicInteger(0));
}
AtomicInteger count = atomicMap.get(resKey);
count.addAndGet(recordList.size());
recordInfo.setRecordList(recordList);
recordInfo.setCount(Math.toIntExact(incr));
recordInfo.setCount(Math.toIntExact(count.get()));
eventPublisher.recordEndEventPush(recordInfo);
if (incr < sumNum) {
return;
}
// 已接收完成
List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
if (resList.size() < sumNum) {
return;
}
recordInfo.setRecordList(resList);
releaseRequest(device.getDeviceId(), sn,recordInfo);
List<RecordItem> recordItems = Optional.ofNullable(recordListMap.get(resKey)).orElse(new ArrayList<>());
recordItems.addAll(recordList);
recordListMap.put(resKey, recordItems);
logger.info("{}/{}", count.get(), sumNum);
// if (count.get() < sumNum) {
// return;
// } else {
// 不管有没有拿全 超过 时间间隔就直接返回
Optional.ofNullable(scheduleMap.get(resKey)).ifPresent(scheduledFuture -> {
scheduleMap.remove(resKey);
scheduledFuture.cancel(true);
});
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
scheduleMap.remove(resKey);
atomicMap.remove(resKey);
recordListMap.remove(resKey);
recordInfo.setRecordList(recordItems);
logger.info("scheduledExecutorService.schedule recordInfo {}",recordInfo);
releaseRequest(device.getDeviceId(), sn,recordInfo);
}, 500, TimeUnit.MILLISECONDS);
scheduleMap.put(resKey,schedule);
// }
}
}
} catch (Exception e) {