subscribe key 改为 使用 GenericSubscribe.Helper.getKey

This commit is contained in:
shikong 2023-08-23 10:37:22 +08:00
parent fd8aa3dbc9
commit c6a7bcdcff
3 changed files with 14 additions and 8 deletions

View File

@ -2,7 +2,6 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request;
import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
@ -10,6 +9,7 @@ import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
@ -71,10 +71,10 @@ public class MessageRequestProcessor implements MessageProcessor {
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.RECORD_INFO)){
response = ok;
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
String key = CacheUtil.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key)).ifPresentOrElse(publisher->{
publisher.submit(dto);
},()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto));
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key))
.ifPresentOrElse(publisher-> publisher.submit(dto),
()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto));
} else {
response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage());
}

View File

@ -1,6 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.concurrent.Executor;
@ -18,6 +19,11 @@ public interface GenericSubscribe<T> {
void delPublisher(String key);
class Helper {
public final static String SEPARATOR = ":";
public static String getKey(String prefix,String... ids){
return StringUtils.joinWith(SEPARATOR, (Object[]) ArrayUtils.addFirst(ids,prefix));
}
public static <T> void close(Map<String,SubmissionPublisher<T>> publishers){
publishers.values().forEach(SubmissionPublisher::close);
publishers.clear();

View File

@ -3,13 +3,13 @@ package cn.skcks.docking.gb28181.service.record;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
@ -69,7 +69,7 @@ public class RecordService {
null,
callId);
String key = CacheUtil.getKey(CmdType.RECORD_INFO, deviceId, sn);
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, deviceId, sn);
subscribe.getRecordInfoSubscribe().addPublisher(key);
sender.send(senderIp, request);
List<RecordInfoItemDTO> list = new ArrayList<>();