历史回放

This commit is contained in:
shikong 2023-09-15 11:49:02 +08:00
parent d40c98dc97
commit 7ecb87ef44
7 changed files with 190 additions and 16 deletions

View File

@ -0,0 +1,40 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.ack.request;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.message.Request;
import java.util.EventObject;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@Component
public class AckRequestProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipSubscribe subscribe;
@PostConstruct
@Override
public void init() {
sipListener.addRequestProcessor(Request.ACK, this);
}
@Override
public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest) requestEvent.getRequest();
String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
Optional.ofNullable(subscribe.getAckSubscribe().getPublisher(key))
.ifPresent(publisher -> publisher.submit(request));
}
}

View File

@ -2,15 +2,22 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.reque
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import cn.skcks.docking.gb28181.mocking.service.device.DeviceProxyService;
import cn.skcks.docking.gb28181.mocking.service.device.DeviceService;
import gov.nist.core.Separators;
import gov.nist.javax.sdp.SessionDescriptionImpl;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.AttributeField;
import gov.nist.javax.sdp.fields.ConnectionField;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PostConstruct;
@ -20,16 +27,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.sdp.Media;
import javax.sdp.MediaDescription;
import javax.sdp.SdpParseException;
import javax.sdp.SessionName;
import javax.sdp.*;
import javax.sip.RequestEvent;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.Arrays;
import java.util.Date;
import java.util.EventObject;
import java.util.Vector;
import java.util.concurrent.*;
@Slf4j
@RequiredArgsConstructor
@ -43,6 +49,8 @@ public class InviteRequestProcessor implements MessageProcessor {
private final DeviceService deviceService;
private final SipSubscribe subscribe;
@PostConstruct
@Override
public void init() {
@ -87,9 +95,9 @@ public class InviteRequestProcessor implements MessageProcessor {
if (StringUtils.equalsAnyIgnoreCase(type, "Play", "PlayBack")) {
log.info("点播/回放请求");
if (StringUtils.equalsIgnoreCase(type, "Play")) {
play(device, gb28181Description, (MediaDescription) item);
play(request, device, gb28181Description, (MediaDescription) item);
} else {
playback(device, gb28181Description, (MediaDescription) item);
playback(request, device, gb28181Description, (MediaDescription) item);
}
} else if (StringUtils.equalsIgnoreCase(type, "Download")) {
log.info("下载请求");
@ -123,11 +131,11 @@ public class InviteRequestProcessor implements MessageProcessor {
* @param mediaDescription 媒体描述符
*/
@SneakyThrows
private void play(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
private void play(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
TimeField time = new TimeField();
time.setStart(DateUtil.offsetMinute(DateUtil.date(), -15));
time.setStop(DateUtil.date());
playback(device, gb28181Description, mediaDescription, time);
playback(request, device, gb28181Description, mediaDescription, time);
}
/**
@ -137,14 +145,14 @@ public class InviteRequestProcessor implements MessageProcessor {
* @param mediaDescription 媒体描述符
*/
@SneakyThrows
private void playback(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
private void playback(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) gb28181Description.getTimeDescriptions(true).get(0);
TimeField time = (TimeField) timeDescription.getTime();
playback(device, gb28181Description, mediaDescription, time);
playback(request, device, gb28181Description, mediaDescription, time);
}
@SneakyThrows
private void playback(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time) {
private void playback(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time) {
Date start = new Date(time.getStartTime() * 1000);
Date stop = new Date(time.getStopTime() * 1000);
log.info("{} ~ {}", start, stop);
@ -156,7 +164,68 @@ public class InviteRequestProcessor implements MessageProcessor {
int port = media.getMediaPort();
log.info("目标端口号: {}", port);
deviceProxyService.proxyVideo2Rtp(device, start, stop, address, port);
// TODO 推流 && 关流事件订阅
String senderIp = request.getLocalAddress().getHostAddress();
SdpFactory sdpFactory = SdpFactory.getInstance();
SessionDescriptionImpl sessionDescription = new SessionDescriptionImpl();
sessionDescription.setVersion(sdpFactory.createVersion(0));
// 目前只配置 ipv4
sessionDescription.setOrigin(sdpFactory.createOrigin(channelId, 0, 0, ConnectionField.IN, Connection.IP4, senderIp));
sessionDescription.setSessionName(gb28181Description.getSessionName());
sessionDescription.setConnection(sdpFactory.createConnection(ConnectionField.IN, Connection.IP4, senderIp));
TimeField respTime = new TimeField();
respTime.setZero();
TimeDescription timeDescription = SdpFactory.getInstance().createTimeDescription(respTime);
sessionDescription.setTimeDescriptions(new Vector<>() {{
add(timeDescription);
}});
String[] mediaTypeCodes = new String[]{"98","96"};
MediaDescription respMediaDescription = SdpFactory.getInstance().createMediaDescription("video", port, 0, SdpConstants.RTP_AVP, mediaTypeCodes);
Arrays.stream(mediaTypeCodes).forEach((k)->{
String v = MediaSdpHelper.RTPMAP.get(k);
mediaDescription.addAttribute((AttributeField) SdpFactory.getInstance().createAttribute(SdpConstants.RTPMAP, StringUtils.joinWith(Separators.SP,k,v)));
});
respMediaDescription.addAttribute((AttributeField) SdpFactory.getInstance().createAttribute("sendonly", null));
GB28181Description description = GB28181Description.Convertor.convert(sessionDescription);
description.setSsrcField(gb28181Description.getSsrcField());
String transport = request.getTopmostViaHeader().getTransport();
String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(key);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("创建 ack 订阅 {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.proxyVideo2Rtp(device, start, stop, address, port);
onComplete();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscribe.getAckSubscribe().delPublisher(key);
schedule[0].cancel(true);
}
};
// 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
// 推流 ack 事件订阅
subscribe.getAckSubscribe().addSubscribe(key, subscriber);
// 发送 sdp 响应
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, description));
}
}

View File

@ -0,0 +1,39 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@RequiredArgsConstructor
public class AckSubscribe implements GenericSubscribe<SIPRequest> {
private final Executor executor;
private static final Map<String, SubmissionPublisher<SIPRequest>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SIPRequest> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SIPRequest> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
@Override
public void delPublisher(String key) {
Helper.delPublisher(publishers, key);
}
}

View File

@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@ -21,14 +22,17 @@ public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private GenericSubscribe<SIPResponse> registerSubscribe;
private GenericSubscribe<SIPRequest> ackSubscribe;
@PostConstruct
private void init() {
registerSubscribe = new RegisterSubscribe(executor);
ackSubscribe = new AckSubscribe(executor);
}
@PreDestroy
private void destroy() {
registerSubscribe.close();
ackSubscribe.close();
}
}

View File

@ -1,6 +1,7 @@
package cn.skcks.docking.gb28181.mocking.core.sip.response;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.message.MessageHelper;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import gov.nist.javax.sip.message.MessageFactoryImpl;
@ -8,7 +9,12 @@ import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import javax.sip.SipFactory;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.message.Response;
@Slf4j
@ -28,4 +34,20 @@ public class SipResponseBuilder {
}
return response;
}
@SneakyThrows
public static Response responseSdp(SIPRequest request, GB28181Description sdp) {
MessageFactoryImpl messageFactory = (MessageFactoryImpl)MessageHelper.getSipFactory().createMessageFactory();
// 使用 GB28181 默认编码 否则中文将会乱码
messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET);
SIPResponse response = (SIPResponse)messageFactory.createResponse(Response.OK, request);
SipFactory sipFactory = SipFactory.getInstance();
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
response.setContent(sdp.toString(), contentTypeHeader);
SipURI sipURI = (SipURI) request.getRequestURI();
SipURI uri = MessageHelper.createSipURI(sipURI.getUser(), StringUtils.joinWith(":", sipURI.getHost() + ":" + sipURI.getPort()));
Address concatAddress = sipFactory.createAddressFactory().createAddress(uri);
response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
return response;
}
}

View File

@ -44,7 +44,6 @@ public class DeviceProxyService {
String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8);
fromUrl = StringUtils.joinWith("?", fromUrl, query);
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
rtpAddr = "192.168.1.241";
String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
pushRtp(fromUrl, toUrl, time);
@ -56,6 +55,7 @@ public class DeviceProxyService {
// FFmpeg 调试日志
// FFmpegLogCallback.set();
FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(fromUrl);
grabber.setFrameRate(30);
grabber.start();
FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(toUrl, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels());

View File

@ -53,8 +53,8 @@ gb28181:
expire: 3600
transport: "UDP"
server:
# ip: 192.168.10.32
ip: 192.168.3.12
ip: 192.168.10.32
# ip: 192.168.3.12
port: 5060
password: 123456
domain: 4405010000