RTP 推流

This commit is contained in:
shikong 2023-09-15 09:59:21 +08:00
parent afd9e47223
commit d40c98dc97
3 changed files with 121 additions and 33 deletions

View File

@ -54,7 +54,7 @@ public class InviteRequestProcessor implements MessageProcessor {
@Override @Override
public void process(EventObject eventObject) { public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject; RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest)requestEvent.getRequest(); SIPRequest request = (SIPRequest) requestEvent.getRequest();
String senderIp = request.getLocalAddress().getHostAddress(); String senderIp = request.getLocalAddress().getHostAddress();
String transport = request.getTopmostViaHeader().getTransport(); String transport = request.getTopmostViaHeader().getTransport();
String content = new String(request.getRawContent()); String content = new String(request.getRawContent());
@ -62,15 +62,15 @@ public class InviteRequestProcessor implements MessageProcessor {
log.info("解析的 sdp信息: \n{}", gb28181Description); log.info("解析的 sdp信息: \n{}", gb28181Description);
String id = gb28181Description.getOrigin().getUsername(); String id = gb28181Description.getOrigin().getUsername();
MockingDevice device = deviceService.getDeviceByGbChannelId(id).orElse(null); MockingDevice device = deviceService.getDeviceByGbChannelId(id).orElse(null);
if(device == null){ if (device == null) {
log.error("未能找到 deviceId: {} 的相关信息", id); log.error("未能找到 deviceId: {} 的相关信息", id);
sender.sendResponse(senderIp, transport, notFound(request)); sender.sendResponse(senderIp, transport, notFound(request));
return; return;
} }
Vector<?> mediaDescriptions = gb28181Description.getMediaDescriptions(true); Vector<?> mediaDescriptions = gb28181Description.getMediaDescriptions(true);
log.info("mediaDescriptions {}",mediaDescriptions); log.info("mediaDescriptions {}", mediaDescriptions);
mediaDescriptions.stream().filter(item->{ mediaDescriptions.stream().filter(item -> {
MediaDescription mediaDescription = (MediaDescription)item; MediaDescription mediaDescription = (MediaDescription) item;
Media media = mediaDescription.getMedia(); Media media = mediaDescription.getMedia();
try { try {
Vector<?> mediaFormats = media.getMediaFormats(false); Vector<?> mediaFormats = media.getMediaFormats(false);
@ -79,19 +79,19 @@ public class InviteRequestProcessor implements MessageProcessor {
log.error("sdp media 解析异常: {}", e.getMessage()); log.error("sdp media 解析异常: {}", e.getMessage());
return false; return false;
} }
}).findFirst().ifPresentOrElse((item)->{ }).findFirst().ifPresentOrElse((item) -> {
SessionName sessionName = gb28181Description.getSessionName(); SessionName sessionName = gb28181Description.getSessionName();
try { try {
String type = sessionName.getValue(); String type = sessionName.getValue();
log.info("type {}", type); log.info("type {}", type);
if(StringUtils.equalsAnyIgnoreCase(type,"Play","PlayBack")){ if (StringUtils.equalsAnyIgnoreCase(type, "Play", "PlayBack")) {
log.info("点播/回放请求"); log.info("点播/回放请求");
if(StringUtils.equalsIgnoreCase(type,"Play")){ if (StringUtils.equalsIgnoreCase(type, "Play")) {
play(device, gb28181Description, (MediaDescription) item); play(device, gb28181Description, (MediaDescription) item);
} else { } else {
playback(device, gb28181Description, (MediaDescription) item); playback(device, gb28181Description, (MediaDescription) item);
} }
} else if(StringUtils.equalsIgnoreCase(type,"Download")){ } else if (StringUtils.equalsIgnoreCase(type, "Download")) {
log.info("下载请求"); log.info("下载请求");
} else { } else {
log.error("未知请求类型: {}", type); log.error("未知请求类型: {}", type);
@ -100,7 +100,7 @@ public class InviteRequestProcessor implements MessageProcessor {
} catch (SdpParseException e) { } catch (SdpParseException e) {
log.error("sdp 解析异常: {}", e.getMessage()); log.error("sdp 解析异常: {}", e.getMessage());
} }
},()->{ }, () -> {
log.info("未找到支持的媒体类型"); log.info("未找到支持的媒体类型");
sender.sendResponse(senderIp, transport, unsupported(request)); sender.sendResponse(senderIp, transport, unsupported(request));
}); });
@ -118,11 +118,12 @@ public class InviteRequestProcessor implements MessageProcessor {
/** /**
* 模拟设备不支持实时 故直接回放 最近15分钟 当前时间录像 * 模拟设备不支持实时 故直接回放 最近15分钟 当前时间录像
*
* @param gb28181Description gb28181 sdp * @param gb28181Description gb28181 sdp
* @param mediaDescription 媒体描述符 * @param mediaDescription 媒体描述符
*/ */
@SneakyThrows @SneakyThrows
private void play(MockingDevice device,GB28181Description gb28181Description, MediaDescription mediaDescription){ private void play(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
TimeField time = new TimeField(); TimeField time = new TimeField();
time.setStart(DateUtil.offsetMinute(DateUtil.date(), -15)); time.setStart(DateUtil.offsetMinute(DateUtil.date(), -15));
time.setStop(DateUtil.date()); time.setStop(DateUtil.date());
@ -131,18 +132,19 @@ public class InviteRequestProcessor implements MessageProcessor {
/** /**
* 模拟设备 录像回放 当前小时至当前时间录像 * 模拟设备 录像回放 当前小时至当前时间录像
*
* @param gb28181Description gb28181 sdp * @param gb28181Description gb28181 sdp
* @param mediaDescription 媒体描述符 * @param mediaDescription 媒体描述符
*/ */
@SneakyThrows @SneakyThrows
private void playback(MockingDevice device,GB28181Description gb28181Description, MediaDescription mediaDescription) { private void playback(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) gb28181Description.getTimeDescriptions(true).get(0); TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) gb28181Description.getTimeDescriptions(true).get(0);
TimeField time = (TimeField) timeDescription.getTime(); TimeField time = (TimeField) timeDescription.getTime();
playback(device, gb28181Description, mediaDescription, time); playback(device, gb28181Description, mediaDescription, time);
} }
@SneakyThrows @SneakyThrows
private void playback(MockingDevice device,GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time){ private void playback(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time) {
Date start = new Date(time.getStartTime() * 1000); Date start = new Date(time.getStartTime() * 1000);
Date stop = new Date(time.getStopTime() * 1000); Date stop = new Date(time.getStopTime() * 1000);
log.info("{} ~ {}", start, stop); log.info("{} ~ {}", start, stop);
@ -154,7 +156,7 @@ public class InviteRequestProcessor implements MessageProcessor {
int port = media.getMediaPort(); int port = media.getMediaPort();
log.info("目标端口号: {}", port); log.info("目标端口号: {}", port);
deviceProxyService.proxyVideo2Rtp(device,start,stop); deviceProxyService.proxyVideo2Rtp(device, start, stop, address, port);
// TODO 推流 && 关流事件订阅 // TODO 推流 && 关流事件订阅
} }
} }

View File

@ -1,5 +1,6 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo; package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.common.xml.XmlUtils;
@ -20,7 +21,7 @@ import org.springframework.stereotype.Component;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.Collections; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -57,6 +58,22 @@ public class RecordInfoRequestProcessor {
DateUtil.format(startTime, DatePattern.NORM_DATETIME_FORMATTER), DateUtil.format(startTime, DatePattern.NORM_DATETIME_FORMATTER),
DateUtil.format(endTime, DatePattern.NORM_DATETIME_FORMATTER)); DateUtil.format(endTime, DatePattern.NORM_DATETIME_FORMATTER));
List<RecordInfoItemDTO> recordInfoItemDTOList = new ArrayList<>();
Date tmpStart = startTime;
Date tmpEnd = DateUtil.offsetMinute(tmpStart,5);
while(DateUtil.compare(tmpStart, endTime) < 0){
RecordInfoItemDTO recordInfoItemDTO = new RecordInfoItemDTO();
recordInfoItemDTO.setName(name);
recordInfoItemDTO.setStartTime(tmpStart);
recordInfoItemDTO.setEndTime(tmpEnd);
recordInfoItemDTO.setSecrecy(recordInfoRequestDTO.getSecrecy());
recordInfoItemDTO.setDeviceId(device.getGbChannelId());
recordInfoItemDTOList.add(recordInfoItemDTO);
tmpStart = tmpEnd;
tmpEnd = DateUtil.offsetMinute(tmpStart,5);
}
RecordInfoItemDTO recordInfoItemDTO = new RecordInfoItemDTO(); RecordInfoItemDTO recordInfoItemDTO = new RecordInfoItemDTO();
recordInfoItemDTO.setName(name); recordInfoItemDTO.setName(name);
recordInfoItemDTO.setStartTime(startTime); recordInfoItemDTO.setStartTime(startTime);
@ -64,19 +81,20 @@ public class RecordInfoRequestProcessor {
recordInfoItemDTO.setSecrecy(recordInfoRequestDTO.getSecrecy()); recordInfoItemDTO.setSecrecy(recordInfoRequestDTO.getSecrecy());
recordInfoItemDTO.setDeviceId(device.getGbChannelId()); recordInfoItemDTO.setDeviceId(device.getGbChannelId());
List<RecordInfoItemDTO> recordInfoItemDTOList = Collections.singletonList(recordInfoItemDTO);
RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO();
recordInfoResponseDTO.setSn(recordInfoRequestDTO.getSn());
recordInfoResponseDTO.setDeviceId(device.getGbChannelId());
recordInfoResponseDTO.setName(device.getName());
recordInfoResponseDTO.setSumNum((long) recordInfoItemDTOList.size());
recordInfoResponseDTO.setRecordList(recordInfoItemDTOList);
FromHeader fromHeader = request.getFromHeader(); FromHeader fromHeader = request.getFromHeader();
sender.sendRequest((provider, ip, port) -> { ListUtil.partition(recordInfoItemDTOList,2).forEach(recordList->{
CallIdHeader callIdHeader = provider.getNewCallId(); RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO();
return SipRequestBuilder.createMessageRequest(device, recordInfoResponseDTO.setSn(recordInfoRequestDTO.getSn());
ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader); recordInfoResponseDTO.setDeviceId(device.getGbChannelId());
recordInfoResponseDTO.setName(device.getName());
recordInfoResponseDTO.setSumNum((long) recordInfoItemDTOList.size());
recordInfoResponseDTO.setRecordList(recordList);
sender.sendRequest((provider, ip, port) -> {
CallIdHeader callIdHeader = provider.getNewCallId();
return SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader);
});
}); });
} }

View File

@ -1,18 +1,30 @@
package cn.skcks.docking.gb28181.mocking.service.device; package cn.skcks.docking.gb28181.mocking.service.device;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.URLUtil; import cn.hutool.core.util.URLUtil;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.bytedeco.ffmpeg.avcodec.AVPacket;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j @Slf4j
@Service @Service
@ -22,15 +34,71 @@ public class DeviceProxyService {
private final DeviceProxyConfig proxyConfig; private final DeviceProxyConfig proxyConfig;
public void proxyVideo2Rtp(MockingDevice device, Date startTime, Date endTime){ public void proxyVideo2Rtp(MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort){
String url = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3); HashMap<String, String> map = new HashMap<>(3);
String deviceCode = device.getDeviceCode(); String deviceCode = device.getDeviceCode();
map.put("device_id", deviceCode); map.put("device_id", deviceCode);
map.put("begin_time", DateUtil.format(startTime, DatePattern.PURE_DATETIME_FORMAT)); map.put("begin_time", DateUtil.format(startTime, DatePattern.PURE_DATETIME_FORMAT));
map.put("end_time", DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT)); map.put("end_time", DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT));
String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8); String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8);
url = StringUtils.joinWith("?",url,query); fromUrl = StringUtils.joinWith("?", fromUrl, query);
log.info("设备: {} 视频 url: {}", deviceCode, url); log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
rtpAddr = "192.168.1.241";
String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
pushRtp(fromUrl, toUrl, time);
}
@SneakyThrows
public void pushRtp(String fromUrl, String toUrl, long time) {
log.info("创建推流任务 fromUrl {}, toUrl {}, time: {}", fromUrl, toUrl, time);
// FFmpeg 调试日志
// FFmpegLogCallback.set();
FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(fromUrl);
grabber.start();
FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(toUrl, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels());
recorder.setInterleaved(true);
recorder.setVideoOption("preset", "ultrafast");
recorder.setVideoOption("tune", "zerolatency");
recorder.setVideoOption("crf", "25");
recorder.setFrameRate(grabber.getFrameRate());
recorder.setSampleRate(grabber.getSampleRate());
recorder.setOption("flvflags", "no_duration_filesize");
recorder.setOption("movflags","frag_keyframe+empty_moov");
if (grabber.getAudioChannels() > 0) {
recorder.setAudioChannels(grabber.getAudioChannels());
recorder.setAudioBitrate(grabber.getAudioBitrate());
recorder.setAudioCodec(grabber.getAudioCodec());
}
recorder.setVideoBitrate(grabber.getVideoBitrate());
// recorder.setVideoCodec(grabber.getVideoCodec());
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); // 视频源数据yuv
recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); // 设置音频压缩方式
recorder.setFormat("rtp_mpegts");
recorder.setVideoOption("threads", String.valueOf(Runtime.getRuntime().availableProcessors())); // 解码线程数
recorder.start(grabber.getFormatContext());
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
AtomicBoolean record = new AtomicBoolean(true);
scheduledExecutorService.schedule(() -> {
log.info("到达结束时间, 结束推送 fromUrl: {}, toUrl: {}", fromUrl, toUrl);
record.set(false);
}, time, TimeUnit.SECONDS);
try {
AVPacket k;
while (record.get() && (k = grabber.grabPacket()) != null) {
recorder.recordPacket(k);
avcodec.av_packet_unref(k);
}
grabber.close();
recorder.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
log.info("结束推送 fromUrl: {}, toUrl: {}", fromUrl, toUrl);
} }
} }