设备 媒体记录查询
This commit is contained in:
parent
18b8c78b30
commit
bfa056d2ca
@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
|
||||
import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.config.SwaggerConfig;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO;
|
||||
import cn.skcks.docking.gb28181.service.record.RecordService;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -15,6 +16,8 @@ import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Tag(name="历史录像")
|
||||
@RestController
|
||||
@JsonMapping("/record")
|
||||
@ -28,10 +31,7 @@ public class RecordController {
|
||||
}
|
||||
|
||||
@GetJson("/getInfo")
|
||||
public DeferredResult<JsonResponse<Void>> getInfo(@ParameterObject @Validated GetInfoDTO dto){
|
||||
recordService.requestRecordInfo(dto.getDeviceId());
|
||||
DeferredResult<JsonResponse<Void>> result = new DeferredResult<>();
|
||||
result.setResult(JsonResponse.success(null));
|
||||
return result;
|
||||
public DeferredResult<JsonResponse<List<RecordInfoItemDTO>>> getInfo(@ParameterObject @Validated GetInfoDTO dto){
|
||||
return recordService.requestRecordInfo(dto.getDeviceId());
|
||||
}
|
||||
}
|
||||
|
@ -78,6 +78,10 @@
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-web</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -2,12 +2,15 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request;
|
||||
|
||||
import cn.skcks.docking.gb28181.common.json.ResponseStatus;
|
||||
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
|
||||
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
|
||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
|
||||
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
|
||||
@ -22,6 +25,7 @@ import org.springframework.stereotype.Component;
|
||||
import javax.sip.RequestEvent;
|
||||
import javax.sip.header.CallIdHeader;
|
||||
import javax.sip.message.Response;
|
||||
import java.util.concurrent.SubmissionPublisher;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@ -30,6 +34,7 @@ public class MessageRequestProcessor implements MessageProcessor {
|
||||
private final SipListener sipListener;
|
||||
private final DockingDeviceService deviceService;
|
||||
private final SipMessageSender sender;
|
||||
private final SipSubscribe subscribe;
|
||||
|
||||
@PostConstruct
|
||||
@Override
|
||||
@ -43,7 +48,8 @@ public class MessageRequestProcessor implements MessageProcessor {
|
||||
String deviceId = SipUtil.getUserIdFromFromHeader(request);
|
||||
CallIdHeader callIdHeader = request.getCallIdHeader();
|
||||
|
||||
MessageDTO messageDto = XmlUtils.parse(request.getRawContent(), MessageDTO.class, GB28181Constant.CHARSET);
|
||||
byte[] content = request.getRawContent();
|
||||
MessageDTO messageDto = XmlUtils.parse(content, MessageDTO.class, GB28181Constant.CHARSET);
|
||||
log.debug("接收到的消息 => {}", messageDto);
|
||||
|
||||
DockingDevice device = deviceService.getDevice(deviceId);
|
||||
@ -56,15 +62,22 @@ public class MessageRequestProcessor implements MessageProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
Response ok = response(request, Response.OK, "OK");
|
||||
Response response;
|
||||
if(messageDto.getCmdType().equalsIgnoreCase(CmdType.KEEPALIVE)){
|
||||
response = response(request, Response.OK, "OK");
|
||||
response = ok;
|
||||
// 更新设备在线状态
|
||||
deviceService.online(device, response);
|
||||
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.RECORD_INFO)){
|
||||
response = ok;
|
||||
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
|
||||
String key = CacheUtil.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
|
||||
SubmissionPublisher<RecordInfoResponseDTO> publisher = subscribe.getRecordInfoSubscribe().getPublisher(key);
|
||||
publisher.submit(dto);
|
||||
} else {
|
||||
response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage());
|
||||
}
|
||||
sender.send(senderIp,response);
|
||||
sender.send(senderIp, response);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
|
@ -15,6 +15,7 @@ public interface GenericSubscribe<T> {
|
||||
SubmissionPublisher<T> getPublisher(String key);
|
||||
|
||||
void addSubscribe(String key,Flow.Subscriber<T> subscribe);
|
||||
void delPublisher(String key);
|
||||
|
||||
class Helper {
|
||||
public static <T> void close(Map<String,SubmissionPublisher<T>> publishers){
|
||||
@ -22,6 +23,11 @@ public interface GenericSubscribe<T> {
|
||||
publishers.clear();
|
||||
}
|
||||
|
||||
public static <T> void delPublisher(Map<String, SubmissionPublisher<T>> publishers, String key){
|
||||
SubmissionPublisher<T> publisher = publishers.remove(key);
|
||||
publisher.close();
|
||||
}
|
||||
|
||||
public static <T> void addPublisher(Executor executor, Map<String, SubmissionPublisher<T>> publishers, String key){
|
||||
SubmissionPublisher<T> publisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
|
||||
publishers.put(key, publisher);
|
||||
|
@ -29,4 +29,9 @@ public class RecordInfoSubscribe implements GenericSubscribe<RecordInfoResponseD
|
||||
public void addSubscribe(String key, Flow.Subscriber<RecordInfoResponseDTO> subscribe) {
|
||||
Helper.addSubscribe(publishers, key, subscribe);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delPublisher(String key) {
|
||||
Helper.delPublisher(publishers, key);
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,16 @@
|
||||
package cn.skcks.docking.gb28181.service.record;
|
||||
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.skcks.docking.gb28181.common.json.JsonResponse;
|
||||
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
|
||||
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
|
||||
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
|
||||
import cn.skcks.docking.gb28181.core.sip.service.SipService;
|
||||
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
|
||||
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
|
||||
@ -13,10 +19,15 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
|
||||
import javax.sip.SipProvider;
|
||||
import javax.sip.header.CallIdHeader;
|
||||
import javax.sip.message.Request;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Flow;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@ -25,24 +36,29 @@ public class RecordService {
|
||||
private final DockingDeviceService deviceService;
|
||||
private final SipService sipService;
|
||||
private final SipMessageSender sender;
|
||||
private final SipSubscribe subscribe;
|
||||
|
||||
@SneakyThrows
|
||||
public void requestRecordInfo(String deviceId){
|
||||
public DeferredResult<JsonResponse<List<RecordInfoItemDTO>>> requestRecordInfo(String deviceId) {
|
||||
DeferredResult<JsonResponse<List<RecordInfoItemDTO>>> result = new DeferredResult<>(30 * 1000L);
|
||||
|
||||
DockingDevice device = deviceService.getDevice(deviceId);
|
||||
if (device == null) {
|
||||
log.info("未能找到 编码为 => {} 的设备", deviceId);
|
||||
return;
|
||||
result.setResult(JsonResponse.error(null, "未找到设备"));
|
||||
return result;
|
||||
}
|
||||
|
||||
String transport = device.getTransport();
|
||||
String senderIp = device.getLocalIp();
|
||||
SipProvider provider = sipService.getProvider(transport, senderIp);
|
||||
CallIdHeader callId = provider.getNewCallId();
|
||||
String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000);
|
||||
RecordInfoRequestDTO dto = RecordInfoRequestDTO.builder()
|
||||
.deviceId(deviceId)
|
||||
.startTime(DateUtil.beginOfDay(DateUtil.date()))
|
||||
.endTime(DateUtil.endOfDay(DateUtil.date()))
|
||||
.sn(String.valueOf((int)(Math.random() * 9 + 1) * 100000))
|
||||
.sn(sn)
|
||||
.build();
|
||||
Request request = SipRequestBuilder.createMessageRequest(device,
|
||||
XmlUtils.toXml(dto),
|
||||
@ -50,6 +66,55 @@ public class RecordService {
|
||||
SipUtil.generateFromTag(),
|
||||
null,
|
||||
callId);
|
||||
|
||||
String key = CacheUtil.getKey(CmdType.RECORD_INFO, deviceId, sn);
|
||||
subscribe.getRecordInfoSubscribe().addPublisher(key);
|
||||
sender.send(senderIp, request);
|
||||
List<RecordInfoItemDTO> list = new ArrayList<>();
|
||||
AtomicLong sum = new AtomicLong(0);
|
||||
AtomicLong getNum = new AtomicLong(0);
|
||||
subscribe.getRecordInfoSubscribe().addSubscribe(key, new Flow.Subscriber<>() {
|
||||
Flow.Subscription subscription;
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
this.subscription = subscription;
|
||||
log.debug("建立订阅 => {}", key);
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(RecordInfoResponseDTO item) {
|
||||
sum.set(item.getSumNum());
|
||||
getNum.getAndAdd(item.getRecordList().size());
|
||||
log.info("{}", item);
|
||||
list.addAll(item.getRecordList());
|
||||
log.info("{}/{}", getNum.get(), sum.get());
|
||||
if (getNum.get() >= sum.get()) {
|
||||
onComplete();
|
||||
} else {
|
||||
subscription.request(1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
subscribe.getRecordInfoSubscribe().delPublisher(key);
|
||||
result.setResult(JsonResponse.success(list));
|
||||
log.debug("订阅结束 => {}", key);
|
||||
}
|
||||
});
|
||||
|
||||
result.onTimeout(()->{
|
||||
result.setResult(JsonResponse.success(list,"查询超时, 结果可能不完整"));
|
||||
subscribe.getRecordInfoSubscribe().delPublisher(key);
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user