处理 sip 信令后续请求

This commit is contained in:
shikong 2023-09-21 12:10:38 +08:00
parent 6fdc9c5c3b
commit b372fd8d8e
10 changed files with 231 additions and 40 deletions

View File

@ -45,7 +45,7 @@ public class DeviceService {
public Optional<WvpProxyDevice> getDeviceByGbDeviceIdAndChannel(String gbDeviceId,String channel){
return deviceMapper.selectOne(s->
s.where(WvpProxyDeviceDynamicSqlSupport.gbDeviceId,isEqualTo(gbDeviceId))
.and(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId,isEqualTo(channel)));
.and(WvpProxyDeviceDynamicSqlSupport.gbDeviceChannelId,isEqualTo(channel)).limit(1));
}
@Transactional

View File

@ -2,10 +2,6 @@ package cn.skcks.docking.gb28181.wvp.service.gb28181;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode;
@ -28,7 +24,6 @@ import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sdp.MediaDescriptionImpl;
import gov.nist.javax.sdp.fields.AttributeField;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField;
import gov.nist.javax.sip.message.SIPResponse;
@ -149,16 +144,16 @@ public class Gb28181DownloadService {
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture<String> result, long time) {
return (provider, ip, port) -> {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, action.getAction(), callId.getCallId());
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(subscribeKey, ssrc, streamId, result, time);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result, time);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
};
}
public Flow.Subscriber<SIPResponse> inviteSubscriber(String subscribeKey,String ssrc,String streamId,CompletableFuture<String> result, long time){
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture<String> result, long time){
return new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@ -178,8 +173,14 @@ public class Gb28181DownloadService {
subscription.request(1);
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
log.info("收到响应状态 {}", statusCode);
sender.sendRequest(((provider, ip, port) -> {
String fromTag = item.getFromTag();
String toTag = item.getToTag();
String callId = item.getCallId().getCallId();
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
}));
result.complete(videoUrl(streamId));
onComplete();
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
result.complete("");
@ -190,7 +191,7 @@ public class Gb28181DownloadService {
@Override
public void onError(Throwable throwable) {
onComplete();
}
@Override

View File

@ -3,7 +3,6 @@ package cn.skcks.docking.gb28181.wvp.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
@ -22,7 +21,6 @@ import java.util.concurrent.ConcurrentMap;
@Component
@Slf4j
public class SipListenerImpl implements SipListener {
private final SipSubscribe sipSubscribe;
private final ConcurrentMap<String, MessageProcessor> requestProcessor = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MessageProcessor> responseProcessor = new ConcurrentHashMap<>();
@ -56,6 +54,7 @@ public class SipListenerImpl implements SipListener {
String method = cseqHeader.getMethod();
log.debug("{} {}", method, response);
// Success
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
log.debug("传入响应 method => {}", method);
@ -64,6 +63,9 @@ public class SipListenerImpl implements SipListener {
});
} else if ((status >= Response.TRYING) && (status < Response.OK)) {
// 增加其它无需回复的响应如101180等
Optional.ofNullable(responseProcessor.get(method)).ifPresent(processor -> {
processor.process(responseEvent);
});
} else {
log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase());
if (responseEvent.getDialog() != null) {

View File

@ -0,0 +1,51 @@
package cn.skcks.docking.gb28181.wvp.sip.message.bye.request.request;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.EventObject;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@Component
public class ByeRequestProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipSubscribe subscribe;
private final SipSender sender;
@PostConstruct
@Override
public void init() {
sipListener.addRequestProcessor(Request.BYE, this);
}
@Override
public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest) requestEvent.getRequest();
String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
String ip = request.getLocalAddress().getHostAddress();
String transport = request.getTopmostViaHeader().getTransport();
Optional.ofNullable(subscribe.getByeSubscribe().getPublisher(key))
.ifPresentOrElse(
publisher -> publisher.submit(request),
() -> sender.sendResponse(ip, transport, ((provider, ip1, port) ->
SipResponseBuilder.response(request, Response.OK, "OK"))));
}
}

View File

@ -0,0 +1,41 @@
package cn.skcks.docking.gb28181.wvp.sip.message.invite.response;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
import javax.sip.message.Request;
import java.util.EventObject;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@Component
public class InviteResponseProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipSubscribe subscribe;
@PostConstruct
@Override
public void init() {
sipListener.addResponseProcessor(Request.INVITE, this);
}
@Override
public void process(EventObject eventObject) {
ResponseEvent requestEvent = (ResponseEvent) eventObject;
SIPResponse response = (SIPResponse) requestEvent.getResponse();
String callId = response.getCallId().getCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId);
log.info("收到 INVITE 响应, key {}", subscribeKey);
Optional.ofNullable(subscribe.getInviteSubscribe().getPublisher(subscribeKey))
.ifPresent(publisher->publisher.submit(response));
}
}

View File

@ -24,6 +24,7 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.EventObject;
import java.util.Optional;
@ -40,7 +41,7 @@ public class MessageRequestProcessor implements MessageProcessor {
@PostConstruct
@Override
public void init() {
sipListener.addRequestProcessor(Method.MESSAGE,this);
sipListener.addRequestProcessor(Request.MESSAGE,this);
}
@Override

View File

@ -1,31 +1,15 @@
package cn.skcks.docking.gb28181.wvp.sip.message.register.request;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.dto.RemoteInfo;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.gb28181.sip.GbSipDate;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.auth.DigestServerAuthenticationHelper;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDockingMapper;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Authorization;
import gov.nist.javax.sip.header.SIPDateHeader;
@ -36,20 +20,17 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.ListeningPoint;
import javax.sip.RequestEvent;
import javax.sip.SipProvider;
import javax.sip.address.Address;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.ViaHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Calendar;
import java.util.EventObject;
import java.util.Locale;
@Slf4j
@RequiredArgsConstructor

View File

@ -5,7 +5,6 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.MessageHelper;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import gov.nist.javax.sip.message.MessageFactoryImpl;
@ -17,17 +16,16 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
import javax.sip.message.Request;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@SuppressWarnings("Duplicates")
@DependsOn("proxySipConfig")
@Component
public class SipRequestBuilder implements ApplicationContextAware {
@ -158,11 +156,85 @@ public class SipRequestBuilder implements ApplicationContextAware {
// Subject
SubjectHeader subjectHeader = getSipFactory().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", channelId, ssrc, sipConfig.getId(), 0));
request.addHeader(subjectHeader);
request.addHeader(SipUtil.createUserAgentHeader());
ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
request.setContent(content, contentTypeHeader);
return request;
}
@SneakyThrows
public static Request createAckRequest(int status,String ip, int port, WvpProxyDocking device, String channelId, String fromTag, String toTag, String callId) {
Request request;
// 请求行
String target = StringUtils.joinWith(":", device.getIp(), device.getPort());
SipURI requestLine = MessageHelper.createSipURI(channelId, target);
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, sipConfig.getTransport(), SipUtil.generateViaTag());
viaHeaders.add(viaHeader);
// from
SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain());
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = MessageHelper.createSipURI(channelId, target);
Address toAddress = MessageHelper.createAddress(toSipURI);
ToHeader toHeader = MessageHelper.createToHeader(toAddress, toTag);
// Forwards
MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.ACK);
CallIdHeader callIdHeader = getSipFactory().createHeaderFactory().createCallIdHeader(callId);
request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.ACK, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtil.createUserAgentHeader());
Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port));
request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress));
request.addHeader(SipUtil.createUserAgentHeader());
return request;
}
@SneakyThrows
public static Request createByeRequest(String ip, int port, WvpProxyDocking device, String channelId, String fromTag, String toTag, String callId) {
Request request;
// 请求行
String target = StringUtils.joinWith(":", device.getIp(), device.getPort());
SipURI requestLine = MessageHelper.createSipURI(channelId, target);
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, sipConfig.getTransport(), SipUtil.generateViaTag());
viaHeaders.add(viaHeader);
// from
SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain());
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = MessageHelper.createSipURI(channelId, target);
Address toAddress = MessageHelper.createAddress(toSipURI);
ToHeader toHeader = MessageHelper.createToHeader(toAddress, toTag);
// Forwards
MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.BYE);
CallIdHeader callIdHeader = getSipFactory().createHeaderFactory().createCallIdHeader(callId);
request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.BYE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtil.createUserAgentHeader());
Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port));
request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress));
request.addHeader(SipUtil.createUserAgentHeader());
return request;
}
public static long getCSeq() {
String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId());

View File

@ -0,0 +1,39 @@
package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@RequiredArgsConstructor
public class ByeSubscribe implements GenericSubscribe<SIPRequest> {
private final Executor executor;
private static final Map<String, SubmissionPublisher<SIPRequest>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SIPRequest> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SIPRequest> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
@Override
public void delPublisher(String key) {
Helper.delPublisher(publishers, key);
}
}

View File

@ -24,16 +24,19 @@ public class SipSubscribe {
private final Executor executor;
private GenericSubscribe<SIPRequest> catalogSubscribe;
private GenericSubscribe<SIPResponse> inviteSubscribe;
private GenericSubscribe<SIPRequest> byeSubscribe;
@PostConstruct
private void init() {
catalogSubscribe = new CatalogSubscribe(executor);
inviteSubscribe = new InviteSubscribe(executor);
byeSubscribe = new ByeSubscribe(executor);
}
@PreDestroy
private void destroy() {
catalogSubscribe.close();
inviteSubscribe.close();
byeSubscribe.close();
}
}