This commit is contained in:
shikong 2023-08-23 15:04:19 +08:00
parent 1da36a6a73
commit 4bbf2987e3
2 changed files with 16 additions and 10 deletions

View File

@ -5,7 +5,6 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.config.SwaggerConfig;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.service.record.RecordService;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -21,17 +20,17 @@ import java.util.List;
@Tag(name="历史录像")
@RestController
@JsonMapping("/record")
@JsonMapping("/device/record")
@RequiredArgsConstructor
public class RecordController {
private final RecordService recordService;
@Bean
public GroupedOpenApi recordApi() {
return SwaggerConfig.api("Record", "/record");
return SwaggerConfig.api("Record", "/device/record");
}
@GetJson("/getInfo")
@GetJson("/getInfoList")
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> getInfo(@ParameterObject @Validated GetInfoDTO dto){
return recordService.requestRecordInfo(dto.getDeviceId(), dto.getTimeout(), dto.getDate());
}

View File

@ -83,8 +83,8 @@ public class RecordService {
subscribe.getRecordInfoSubscribe().addPublisher(key);
sender.send(senderIp, request);
List<RecordInfoItemDTO> list = new ArrayList<>();
AtomicLong sum = new AtomicLong(0);
AtomicLong getNum = new AtomicLong(0);
AtomicLong atomicSum = new AtomicLong(0);
AtomicLong atomicNum = new AtomicLong(0);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<RecordInfoResponseDTO> subscriber = new Flow.Subscriber<>() {
@ -99,11 +99,18 @@ public class RecordService {
@Override
public void onNext(RecordInfoResponseDTO item) {
sum.set(item.getSumNum());
getNum.getAndAdd(item.getRecordList().size());
atomicSum.set(item.getSumNum());
atomicNum.getAndAdd(item.getRecordList().size());
list.addAll(item.getRecordList());
log.info("获取订阅 => {}, {}/{}", key, getNum.get(), sum.get());
if (getNum.get() >= sum.get()) {
long num = atomicNum.get();
long sum = atomicSum.get();
if(num > sum){
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
} else {
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
}
if (num >= sum) {
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束