sip 下载信令 构造 (未完)

待实现 notify=MediaStatus, BYE 事件
This commit is contained in:
shikong 2023-09-21 02:48:13 +08:00
parent 5d7e30eec9
commit a3a23db8df
5 changed files with 151 additions and 4 deletions

View File

@ -35,6 +35,15 @@ public class DockingService {
return getDeviceByDeviceCode(deviceCode).orElse(null) != null; return getDeviceByDeviceCode(deviceCode).orElse(null) != null;
} }
public Optional<WvpProxyDocking> getDeviceByGbDeviceId(String gbDeviceId){
return wvpProxyDockingMapper.selectOne(s->
s.where(WvpProxyDockingDynamicSqlSupport.gbDeviceId, isEqualTo(gbDeviceId)));
}
public Boolean hasDeviceByGbDeviceId(String deviceCode){
return getDeviceByGbDeviceId(deviceCode).orElse(null) != null;
}
/** /**
* 添加设备 * 添加设备
* @param device 设备 * @param device 设备

View File

@ -1,8 +1,17 @@
package cn.skcks.docking.gb28181.wvp.service.gb28181; package cn.skcks.docking.gb28181.wvp.service.gb28181;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp; import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp;
import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer; import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer;
@ -12,8 +21,16 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.service.ssrc.SsrcService; import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sdp.MediaDescriptionImpl;
import gov.nist.javax.sdp.fields.AttributeField;
import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -24,10 +41,14 @@ import javax.sdp.Connection;
import javax.sdp.SdpFactory; import javax.sdp.SdpFactory;
import javax.sdp.TimeDescription; import javax.sdp.TimeDescription;
import javax.sip.ListeningPoint; import javax.sip.ListeningPoint;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.*;
@Slf4j @Slf4j
@Service @Service
@ -37,7 +58,11 @@ public class Gb28181DownloadService {
private final ZlmMediaConfig zlmMediaConfig; private final ZlmMediaConfig zlmMediaConfig;
private final SsrcService ssrcService; private final SsrcService ssrcService;
private final DeviceService deviceService; private final DeviceService deviceService;
private final DockingService dockingService;
private final ProxySipConfig proxySipConfig; private final ProxySipConfig proxySipConfig;
private final SipSender sender;
private final SipSubscribe subscribe;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private String videoUrl(String streamId) { private String videoUrl(String streamId) {
return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.flv"); return StringUtils.joinWith("/", zlmMediaConfig.getUrl(), "rtp", streamId + ".live.flv");
@ -74,19 +99,35 @@ public class Gb28181DownloadService {
} }
@SneakyThrows @SneakyThrows
public void download(String gbDeviceId, String channel,Date startTime, Date endTime){ public CompletableFuture<String> download(String gbDeviceId, String channel, Date startTime, Date endTime){
CompletableFuture<String> result = new CompletableFuture<>();
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
if(deviceByGbDeviceId.isEmpty()){
log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId);
result.complete("");
return result;
}
Optional<WvpProxyDevice> deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel); Optional<WvpProxyDevice> deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel);
if (deviceByGbDeviceIdAndChannel.isEmpty()) { if (deviceByGbDeviceIdAndChannel.isEmpty()) {
log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel); log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel);
return ; result.complete("");
return result;
} }
WvpProxyDevice device = deviceByGbDeviceIdAndChannel.get();
WvpProxyDocking docking = deviceByGbDeviceId.get();
long start = startTime.toInstant().getEpochSecond(); long start = startTime.toInstant().getEpochSecond();
long end = endTime.toInstant().getEpochSecond(); long end = endTime.toInstant().getEpochSecond();
String streamId = MediaSdpHelper.getStreamId(gbDeviceId, channel, String.valueOf(start), String.valueOf(end)); String streamId = MediaSdpHelper.getStreamId(gbDeviceId, channel, String.valueOf(start), String.valueOf(end));
int streamMode = proxySipConfig.getTransport().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; int streamMode = proxySipConfig.getTransport().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1;
String ip = zlmMediaConfig.getIp(); String ip = zlmMediaConfig.getIp();
int port = openRtpServer(streamId, streamMode); int port = openRtpServer(streamId, streamMode);
if(port <= 0){
log.error("zlm 暂无可用端口");
result.complete("");
return result;
}
String ssrc = ssrcService.getPlaySsrc(); String ssrc = ssrcService.getPlaySsrc();
TimeField timeField = new TimeField(); TimeField timeField = new TimeField();
timeField.setStartTime(start); timeField.setStartTime(start);
@ -95,7 +136,63 @@ public class Gb28181DownloadService {
GB28181Description gb28181Description = MediaSdpHelper.playback(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, StreamMode.of(ListeningPoint.UDP), startTime, endTime); GB28181Description gb28181Description = MediaSdpHelper.playback(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, StreamMode.of(ListeningPoint.UDP), startTime, endTime);
gb28181Description.setSessionName(SdpFactory.getInstance().createSessionName(MediaSdpHelper.Action.DOWNLOAD.getAction())); gb28181Description.setSessionName(SdpFactory.getInstance().createSessionName(MediaSdpHelper.Action.DOWNLOAD.getAction()));
gb28181Description.setTimeDescriptions(new Vector<>(){{add(timeDescription);}}); gb28181Description.setTimeDescriptions(new Vector<>(){{add(timeDescription);}});
log.debug("{}", gb28181Description); MediaDescriptionImpl media = (MediaDescriptionImpl) gb28181Description.getMediaDescriptions(true).get(0);
media.setAttribute("downloadspeed", String.valueOf(4));
sender.sendRequest(inviteRequest(docking, device, gb28181Description, MediaSdpHelper.Action.DOWNLOAD, ssrc, streamId, result, time));
return result;
} }
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture<String> result, long time) {
return (provider, ip, port) -> {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = inviteSubscriber(subscribeKey, ssrc, streamId, result, time);
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
};
}
public Flow.Subscriber<SIPResponse> inviteSubscriber(String subscribeKey,String ssrc,String streamId,CompletableFuture<String> result, long time){
return new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey);
subscription.request(1);
}
@Override
public void onNext(SIPResponse item) {
int statusCode = item.getStatusCode();
log.debug("{} 收到订阅消息 {}", subscribeKey, item);
if (statusCode == Response.TRYING) {
log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey);
subscription.request(1);
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
result.complete(videoUrl(streamId));
onComplete();
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
result.complete("");
ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc);
onComplete();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscribe.getInviteSubscribe().delPublisher(subscribeKey);
}
};
}
} }

View File

@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; import cn.skcks.docking.gb28181.core.sip.message.MessageHelper;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.MessageFactoryImpl;
@ -16,11 +17,14 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory; import javax.sip.SipFactory;
import javax.sip.address.Address; import javax.sip.address.Address;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.*; import javax.sip.header.*;
import javax.sip.message.Request; import javax.sip.message.Request;
import java.text.ParseException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -126,6 +130,39 @@ public class SipRequestBuilder implements ApplicationContextAware {
return request; return request;
} }
@SneakyThrows
public static Request createInviteRequest(String ip, int port, WvpProxyDocking device, String channelId, String content, String viaTag, String fromTag, String toTag, String ssrc, CallIdHeader callIdHeader){
Request request;
String target = StringUtils.joinWith(":", device.getIp(), device.getPort());
SipURI requestLine = MessageHelper.createSipURI(channelId, target);
// via
List<ViaHeader> viaHeaders = getViaHeaders(ip, port, sipConfig.getTransport(), viaTag );
// from
SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain());
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = MessageHelper.createSipURI(channelId, target);
Address toAddress = MessageHelper.createAddress(toSipURI);
ToHeader toHeader = MessageHelper.createToHeader(toAddress, null);
// Forwards
MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70);
// cSeq
CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.INVITE);
request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtil.createUserAgentHeader());
Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port));
request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress));
// Subject
SubjectHeader subjectHeader = getSipFactory().createHeaderFactory().createSubjectHeader(String.format("%s:%s,%s:%s", channelId, ssrc, sipConfig.getId(), 0));
request.addHeader(subjectHeader);
ContentTypeHeader contentTypeHeader = getSipFactory().createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
request.setContent(content, contentTypeHeader);
return request;
}
public static long getCSeq() { public static long getCSeq() {
String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId()); String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId());

View File

@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.InviteSubscribe;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
@ -22,14 +23,17 @@ public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor; private final Executor executor;
private GenericSubscribe<SIPRequest> catalogSubscribe; private GenericSubscribe<SIPRequest> catalogSubscribe;
private GenericSubscribe<SIPResponse> inviteSubscribe;
@PostConstruct @PostConstruct
private void init() { private void init() {
catalogSubscribe = new CatalogSubscribe(executor); catalogSubscribe = new CatalogSubscribe(executor);
inviteSubscribe = new InviteSubscribe(executor);
} }
@PreDestroy @PreDestroy
private void destroy() { private void destroy() {
catalogSubscribe.close(); catalogSubscribe.close();
inviteSubscribe.close();
} }
} }

View File

@ -1,5 +1,5 @@
server: server:
port: 18186 port: 18183
project: project:
version: @project.version@ version: @project.version@