设备目录查询

This commit is contained in:
shikong 2023-10-02 23:29:09 +08:00
parent 2ab7e9dff5
commit 3db949ed26
10 changed files with 379 additions and 10 deletions

View File

@ -0,0 +1,16 @@
package cn.skcks.docking.gb28181.api.gb28181;
import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.config.SwaggerConfig;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RestController;
@JsonMapping("/api/gb28181")
@RestController
public class GB28181Controller {
@Bean
public GroupedOpenApi gb28181Api() {
return SwaggerConfig.api("GB28181", "/api/gb28181");
}
}

View File

@ -0,0 +1,29 @@
package cn.skcks.docking.gb28181.api.gb28181.catalog;
import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.service.catalog.CatalogService;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Tag(name="获取设备目录信息")
@RestController
@JsonMapping("/api/gb28181/catalog")
@RequiredArgsConstructor
public class CatalogController {
private final CatalogService catalogService;
@SneakyThrows
@GetJson
public JsonResponse<List<?>> catalog(String gbDeviceId){
CompletableFuture<List<CatalogItemDTO>> catalog = catalogService.catalog(gbDeviceId);
return JsonResponse.success(catalog.get());
}
}

View File

@ -2,11 +2,10 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request;
import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
@ -14,6 +13,9 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
@ -24,6 +26,7 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.EventObject;
import java.util.Optional;
@ -48,10 +51,10 @@ public class MessageRequestProcessor implements MessageProcessor {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest)requestEvent.getRequest();
String deviceId = SipUtil.getUserIdFromFromHeader(request);
CallIdHeader callIdHeader = request.getCallIdHeader();
String callId = request.getCallIdHeader().getCallId();
byte[] content = request.getRawContent();
MessageDTO messageDto = XmlUtils.parse(content, MessageDTO.class, GB28181Constant.CHARSET);
MessageDTO messageDto = MANSCDPUtils.parse(content, MessageDTO.class);
log.debug("接收到的消息 => {}", messageDto);
DockingDevice device = deviceService.getDevice(deviceId);
@ -77,6 +80,12 @@ public class MessageRequestProcessor implements MessageProcessor {
Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key))
.ifPresentOrElse(publisher -> publisher.submit(dto),
() -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto));
}else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(content, CatalogResponseDTO.class);
Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(catalogResponseDTO.getSn())).ifPresent(publisher->{
publisher.submit(request);
});
response = ok;
} else {
response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage());
}

View File

@ -50,7 +50,7 @@ public class RegisterRequestProcessor implements MessageProcessor {
@PostConstruct
@Override
public void init(){
sipListener.addRequestProcessor(Method.REGISTER,this);
sipListener.addRequestProcessor(Request.REGISTER,this);
}
@SneakyThrows
@ -92,8 +92,6 @@ public class RegisterRequestProcessor implements MessageProcessor {
return;
}
log.debug("认证信息 => {}", authorization);
boolean authPass = StringUtils.isBlank(password) ||
DigestAuthenticationHelper.doAuthenticatePlainTextPassword(request,password);

View File

@ -0,0 +1,65 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;
@Slf4j
@RequiredArgsConstructor
public class SipRequestSubscribe implements GenericTimeoutSubscribe<SIPRequest>, Closeable {
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentMap<String, ScheduledFuture<?>> scheduledFutureManager = new ConcurrentHashMap<>(0);
private static final Map<String, SubmissionPublisher<SIPRequest>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SIPRequest> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SIPRequest> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
@Override
public void delPublisher(String key) {
ScheduledFuture<?> schedule = scheduledFutureManager.remove(key);
Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true));
Helper.delPublisher(publishers, key);
}
@Override
public void addPublisher(String key, long time, TimeUnit timeUnit) {
addPublisher(key);
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
scheduledFutureManager.remove(key);
delPublisher(key);
log.debug("清理超时 请求 订阅器 {}", key);
}, time, timeUnit);
scheduledFutureManager.put(key,schedule);
}
@Override
public void refreshPublisher(String key, long time, TimeUnit timeUnit) {
ScheduledFuture<?> schedule = scheduledFutureManager.remove(key);
Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true));
schedule = scheduledExecutorService.schedule(() -> {
scheduledFutureManager.remove(key);
delPublisher(key);
log.debug("清理超时 请求 订阅器 {}", key);
}, time, timeUnit);
scheduledFutureManager.put(key,schedule);
}
}

View File

@ -0,0 +1,65 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;
@Slf4j
@RequiredArgsConstructor
public class SipResponseSubscribe implements GenericTimeoutSubscribe<SIPResponse>, Closeable {
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentMap<String, ScheduledFuture<?>> scheduledFutureManager = new ConcurrentHashMap<>(0);
private static final Map<String, SubmissionPublisher<SIPResponse>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SIPResponse> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SIPResponse> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
@Override
public void delPublisher(String key) {
ScheduledFuture<?> schedule = scheduledFutureManager.remove(key);
Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true));
Helper.delPublisher(publishers, key);
}
@Override
public void addPublisher(String key, long time, TimeUnit timeUnit) {
addPublisher(key);
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
scheduledFutureManager.remove(key);
delPublisher(key);
log.debug("清理超时 响应 订阅器 {}", key);
}, time, timeUnit);
scheduledFutureManager.put(key,schedule);
}
@Override
public void refreshPublisher(String key, long time, TimeUnit timeUnit) {
ScheduledFuture<?> schedule = scheduledFutureManager.remove(key);
Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true));
schedule = scheduledExecutorService.schedule(() -> {
scheduledFutureManager.remove(key);
delPublisher(key);
log.debug("清理超时 响应 订阅器 {}", key);
}, time, timeUnit);
scheduledFutureManager.put(key,schedule);
}
}

View File

@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@ -12,6 +13,8 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Slf4j
@Data
@ -20,18 +23,28 @@ import java.util.concurrent.Executor;
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private GenericSubscribe<RecordInfoResponseDTO> recordInfoSubscribe;
private GenericSubscribe<SIPResponse> inviteSubscribe;
private GenericTimeoutSubscribe<SIPResponse> sipResponseSubscribe;
private GenericTimeoutSubscribe<SIPRequest> sipRequestSubscribe;
@PostConstruct
private void init() {
// TODO 准备废弃
recordInfoSubscribe = new RecordInfoSubscribe(executor);
inviteSubscribe = new InviteSubscribe(executor);
// 通用订阅器
sipResponseSubscribe = new SipResponseSubscribe(executor, scheduledExecutorService);
sipRequestSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService);
}
@PreDestroy
private void destroy() {
inviteSubscribe.close();
recordInfoSubscribe.close();
sipResponseSubscribe.close();
sipRequestSubscribe.close();
}
}

View File

@ -13,8 +13,11 @@ import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import javax.sip.*;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@Slf4j
@RequiredArgsConstructor
@ -67,6 +70,61 @@ public class SipServiceImpl implements SipService {
}).findFirst().orElse(null);
}
public List<SipProvider> getProviders(String transport) {
return sipConfig.getIp().stream().map(item -> getProvider(transport, item))
.filter(Objects::nonNull)
.toList();
}
public void sendResponse(SipProvider sipProvider, SendResponse response) {
log.info("{}", sipProvider);
ListeningPoint[] listeningPoints = sipProvider.getListeningPoints();
if (listeningPoints == null || listeningPoints.length == 0) {
log.error("发送响应失败, 未找到有效的监听地址");
return;
}
ListeningPoint listeningPoint = listeningPoints[0];
String ip = listeningPoint.getIPAddress();
int port = listeningPoint.getPort();
try {
sipProvider.sendResponse(response.build(sipProvider, ip, port));
} catch (SipException e) {
log.error("向{} {}:{} 发送响应失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage());
}
}
public void sendResponse(String senderIp,String transport, SendResponse response) {
SipProvider sipProvider = getProvider(transport, senderIp);
sendResponse(sipProvider, response);
}
public void sendRequest(String transport, SendRequest request) {
getProviders(transport).parallelStream().forEach(sipProvider -> {
log.info("{}", sipProvider);
ListeningPoint[] listeningPoints = sipProvider.getListeningPoints();
if (listeningPoints == null || listeningPoints.length == 0) {
log.error("发送请求失败, 未找到有效的监听地址");
return;
}
ListeningPoint listeningPoint = listeningPoints[0];
String ip = listeningPoint.getIPAddress();
int port = listeningPoint.getPort();
try {
sipProvider.sendRequest(request.build(sipProvider, ip, port));
} catch (SipException e) {
log.error("向{} {}:{} 发送请求失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage());
}
});
}
public interface SendRequest {
Request build(SipProvider provider, String ip, int port);
}
public interface SendResponse {
Response build(SipProvider provider, String ip, int port);
}
public void listen(String ip, int port){
try{
sipStack = (SipStackImpl)sipFactory.createSipStack(DefaultProperties.getProperties("GB28181_SIP"));

View File

@ -0,0 +1,112 @@
package cn.skcks.docking.gb28181.service.catalog;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.cache.DockingDeviceCacheService;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.query.CatalogQueryDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.method.message.request.MessageRequestBuilder;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import cn.skcks.docking.gb28181.sip.utils.SipUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Service
@RequiredArgsConstructor
public class CatalogService {
private final SipService sipService;
private final DockingDeviceCacheService deviceCacheService;
private final SipConfig sipConfig;
private final SipSubscribe subscribe;
@SneakyThrows
public CompletableFuture<List<CatalogItemDTO>> catalog(String gbDeviceId){
CompletableFuture<List<CatalogItemDTO>> result = new CompletableFuture<>();
result.completeOnTimeout(Collections.emptyList(), 60, TimeUnit.SECONDS);
DockingDevice device = deviceCacheService.getDevice(gbDeviceId);
SipProvider provider = sipService.getProvider(device.getTransport(), device.getLocalIp());
MessageRequestBuilder requestBuilder = MessageRequestBuilder.builder()
.localIp(device.getLocalIp())
.localId(sipConfig.getId())
.localPort(sipConfig.getPort())
.transport(device.getTransport())
.targetId(device.getDeviceId())
.targetIp(device.getIp())
.targetPort(device.getPort())
.build();
String callId = provider.getNewCallId().getCallId();
long cSeq = SipRequestBuilder.getCSeq();
String sn = SipUtil.generateSn();
CatalogQueryDTO catalogQueryDTO = CatalogQueryDTO.builder()
.deviceId(gbDeviceId)
.sn(sn)
.build();
Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO));
subscribe.getSipRequestSubscribe().addPublisher(sn, 60, TimeUnit.SECONDS);
subscribe.getSipRequestSubscribe().addSubscribe(sn, new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private final AtomicLong num = new AtomicLong(0);
private long sumNum = 0;
private final List<CatalogItemDTO> data = new ArrayList<>();
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class);
sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum());
long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum());
log.debug("当前获取数量: {}/{}", curNum, sumNum);
data.addAll(catalogResponseDTO.getDeviceList().getDeviceList());
if(curNum >= sumNum){
log.info("获取完成");
onComplete();
} else {
subscription.request(1);
}
}
@Override
public void onError(Throwable throwable) {
if(throwable == null){
return;
}
throwable.printStackTrace();
onComplete();
}
@Override
public void onComplete() {
log.info("返回结果 {}",result.complete(data));
subscribe.getSipRequestSubscribe().delPublisher(callId);
}
});
provider.sendRequest(request);
return result;
}
}

View File

@ -47,6 +47,10 @@ public class SipUtil {
return getIdFromFromHeader(fromHeader);
}
public static String generateSn(){
return String.valueOf((int) (Math.random() * 9 + 1) * 100000);
}
/**
* 从subject读取channelId
* */