兼容 不标准的 recordInfo 协议
This commit is contained in:
parent
17278523d9
commit
8c559a8de1
@ -11,7 +11,8 @@ import org.springframework.context.annotation.ScopedProxyMode;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* @description: 录像查询结束事件
|
||||
@ -25,6 +26,8 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
|
||||
private final static Logger logger = LoggerFactory.getLogger(RecordEndEventListener.class);
|
||||
|
||||
private final static Map<String, RecordEndEventHandler> handlerMap = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final static Map<String, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();
|
||||
public interface RecordEndEventHandler{
|
||||
void handler(RecordInfo recordInfo);
|
||||
}
|
||||
@ -40,13 +43,23 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
|
||||
event.getRecordInfo().getChannelId(), count,sumNum);
|
||||
logger.debug("handlerMap.size => {}", handlerMap.size());
|
||||
if (handlerMap.size() > 0) {
|
||||
String key = deviceId + channelId;
|
||||
logger.debug("handlerMap.keys => {}", handlerMap.keySet());
|
||||
RecordEndEventHandler handler = handlerMap.get(deviceId + channelId);
|
||||
logger.debug("handler => {}", handler);
|
||||
if (handler != null){
|
||||
handler.handler(event.getRecordInfo());
|
||||
if (count >= sumNum){
|
||||
handlerMap.remove(deviceId + channelId);
|
||||
Optional.ofNullable(scheduleMap.get(key)).ifPresent(scheduledFuture -> {
|
||||
scheduleMap.remove(key);
|
||||
scheduledFuture.cancel(true);
|
||||
});
|
||||
|
||||
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
|
||||
scheduleMap.remove(key);
|
||||
handlerMap.remove(deviceId + channelId);
|
||||
}, 500, TimeUnit.MILLISECONDS);
|
||||
scheduleMap.put(key,schedule);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,8 +31,8 @@ import javax.sip.SipException;
|
||||
import javax.sip.message.Response;
|
||||
import java.text.ParseException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
|
||||
@ -66,6 +66,14 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
|
||||
|
||||
private Long recordInfoTtl = 1800L;
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
private final static Map<String, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<String, AtomicInteger> atomicMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<String, List<RecordItem>> recordListMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
responseMessageHandler.addHandler(cmdType, this);
|
||||
@ -132,29 +140,47 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
|
||||
record.setRecorderId(getText(itemRecord, "RecorderID"));
|
||||
recordList.add(record);
|
||||
}
|
||||
Map<String, String> map = recordList.stream()
|
||||
.filter(record -> record.getDeviceId() != null)
|
||||
.collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
|
||||
// Map<String, String> map = recordList.stream()
|
||||
// .filter(record -> record.getDeviceId() != null)
|
||||
// .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson));
|
||||
// 获取任务结果数据
|
||||
String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn;
|
||||
redisTemplate.opsForHash().putAll(resKey, map);
|
||||
redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
|
||||
// redisTemplate.opsForHash().putAll(resKey, map);
|
||||
// redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS);
|
||||
String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn;
|
||||
long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
|
||||
redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
|
||||
// long incr = redisTemplate.opsForValue().increment(resCountKey, map.size());
|
||||
// redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS);
|
||||
|
||||
if(!Optional.ofNullable(atomicMap.get(resKey)).isPresent()){
|
||||
atomicMap.put(resKey, new AtomicInteger(0));
|
||||
}
|
||||
AtomicInteger count = atomicMap.get(resKey);
|
||||
count.addAndGet(recordList.size());
|
||||
recordInfo.setRecordList(recordList);
|
||||
recordInfo.setCount(Math.toIntExact(incr));
|
||||
recordInfo.setCount(Math.toIntExact(count.get()));
|
||||
eventPublisher.recordEndEventPush(recordInfo);
|
||||
if (incr < sumNum) {
|
||||
List<RecordItem> recordItems = Optional.ofNullable(recordListMap.get(resKey)).orElse(new ArrayList<>());
|
||||
recordItems.addAll(recordList);
|
||||
recordListMap.put(resKey, recordItems);
|
||||
logger.info("{}/{}", count.get(), sumNum);
|
||||
if (count.get() < sumNum) {
|
||||
return;
|
||||
} else {
|
||||
Optional.ofNullable(scheduleMap.get(resKey)).ifPresent(scheduledFuture -> {
|
||||
scheduleMap.remove(resKey);
|
||||
scheduledFuture.cancel(true);
|
||||
});
|
||||
|
||||
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
|
||||
scheduleMap.remove(resKey);
|
||||
atomicMap.remove(resKey);
|
||||
recordListMap.remove(resKey);
|
||||
recordInfo.setRecordList(recordItems);
|
||||
logger.info("scheduledExecutorService.schedule recordInfo {}",recordInfo);
|
||||
releaseRequest(device.getDeviceId(), sn,recordInfo);
|
||||
}, 500, TimeUnit.MILLISECONDS);
|
||||
scheduleMap.put(resKey,schedule);
|
||||
}
|
||||
// 已接收完成
|
||||
List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList());
|
||||
if (resList.size() < sumNum) {
|
||||
return;
|
||||
}
|
||||
recordInfo.setRecordList(resList);
|
||||
releaseRequest(device.getDeviceId(), sn,recordInfo);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
Loading…
Reference in New Issue
Block a user