针对某些不按规范的设备 的 历史媒体记录查询

This commit is contained in:
shikong 2023-08-23 10:25:20 +08:00
parent 6ea57afc10
commit 81e20fee9a
2 changed files with 23 additions and 11 deletions

View File

@ -4,10 +4,10 @@ import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
@ -25,7 +25,7 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.util.concurrent.SubmissionPublisher;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@ -72,8 +72,9 @@ public class MessageRequestProcessor implements MessageProcessor {
response = ok;
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
String key = CacheUtil.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
SubmissionPublisher<RecordInfoResponseDTO> publisher = subscribe.getRecordInfoSubscribe().getPublisher(key);
publisher.submit(dto);
Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key)).ifPresentOrElse(publisher->{
publisher.submit(dto);
},()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto));
} else {
response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage());
}

View File

@ -26,7 +26,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@ -40,7 +40,8 @@ public class RecordService {
@SneakyThrows
public DeferredResult<JsonResponse<List<RecordInfoItemDTO>>> requestRecordInfo(String deviceId) {
DeferredResult<JsonResponse<List<RecordInfoItemDTO>>> result = new DeferredResult<>(30 * 1000L);
log.info("TimeUnit.SECONDS.toMillis(30) {}",TimeUnit.SECONDS.toMillis(30));
DeferredResult<JsonResponse<List<RecordInfoItemDTO>>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(30));
DockingDevice device = deviceService.getDevice(deviceId);
if (device == null) {
@ -73,7 +74,9 @@ public class RecordService {
List<RecordInfoItemDTO> list = new ArrayList<>();
AtomicLong sum = new AtomicLong(0);
AtomicLong getNum = new AtomicLong(0);
subscribe.getRecordInfoSubscribe().addSubscribe(key, new Flow.Subscriber<>() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<RecordInfoResponseDTO> subscriber = new Flow.Subscriber<>() {
Flow.Subscription subscription;
@Override
@ -90,10 +93,15 @@ public class RecordService {
list.addAll(item.getRecordList());
log.info("获取订阅 => {}, {}/{}", key, getNum.get(), sum.get());
if (getNum.get() >= sum.get()) {
onComplete();
} else {
subscription.request(1);
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束
if(schedule[0] != null){
schedule[0].cancel(true);
}
schedule[0] = scheduledExecutorService.schedule(this::onComplete, 500, TimeUnit.MILLISECONDS);
}
subscription.request(1);
}
@Override
@ -103,11 +111,14 @@ public class RecordService {
@Override
public void onComplete() {
schedule[0].cancel(true);
subscribe.getRecordInfoSubscribe().delPublisher(key);
result.setResult(JsonResponse.success(list));
log.debug("订阅结束 => {}", key);
}
});
};
subscribe.getRecordInfoSubscribe().addSubscribe(key, subscriber);
result.onTimeout(()->{
result.setResult(JsonResponse.success(list,"查询超时, 结果可能不完整"));