推流方式调整

改为 先使用 rtmp 推 flv 流到 zlm
再使用 zlm 转推 rtp 流(带ssrc) 到目标服务器端口
This commit is contained in:
shikong 2023-10-04 23:13:30 +08:00
parent 902f6cc72b
commit 3cd911f5a0
6 changed files with 85 additions and 52 deletions

View File

@ -0,0 +1,12 @@
package cn.skcks.docking.gb28181.mocking.config.sip;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "media.rtmp")
public class ZlmRtmpConfig {
int port = 1935;
}

View File

@ -8,12 +8,16 @@ import cn.hutool.core.net.URLEncodeUtil;
import cn.hutool.core.util.URLUtil; import cn.hutool.core.util.URLUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.rtp.*; import cn.skcks.docking.gb28181.media.dto.rtp.*;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtmpConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser;
import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParserFactory;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
@ -32,6 +36,7 @@ import org.apache.commons.exec.Executor;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.sdp.MediaDescription;
import javax.sip.SipProvider; import javax.sip.SipProvider;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
@ -39,6 +44,7 @@ import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -66,6 +72,7 @@ public class DeviceProxyService {
private final ZlmMediaService zlmMediaService; private final ZlmMediaService zlmMediaService;
private final ZlmMediaConfig zlmMediaConfig; private final ZlmMediaConfig zlmMediaConfig;
private final ZlmStreamChangeHookService zlmStreamChangeHookService; private final ZlmStreamChangeHookService zlmStreamChangeHookService;
private final ZlmRtmpConfig zlmRtmpConfig;
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public interface TaskProcessor { public interface TaskProcessor {
@ -81,27 +88,31 @@ public class DeviceProxyService {
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet(); int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num); log.info("当前任务数 {}", num);
zlmStreamChangeHookService.handlerMap.put(key,()->{ try {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.handlerMap.put(callId,()->{
StartSendRtp startSendRtp = new StartSendRtp(); StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("rtp"); startSendRtp.setApp("live");
startSendRtp.setStream(key); startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc); startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr); startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort); startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(true); startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp); log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp); log.info("startSendRtpResp {}",startSendRtpResp);
}); });
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(new OpenRtpServer(0, 0, key)); String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
log.info("openRtpServerResp {}",openRtpServerResp);
Integer port = openRtpServerResp.getPort();
String zlmRtpUrl = "rtp://" + zlmMediaConfig.getIp() + ":" + port;
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor); callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor(); executeResultHandler.waitFor();
} catch (Exception e) {
throw new RuntimeException(e);
}
}; };
} }
@ -114,26 +125,31 @@ public class DeviceProxyService {
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet(); int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num); log.info("当前任务数 {}", num);
zlmStreamChangeHookService.handlerMap.put(key,()->{ try {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.handlerMap.put(callId,()->{
StartSendRtp startSendRtp = new StartSendRtp(); StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("rtp"); startSendRtp.setApp("live");
startSendRtp.setStream(key); startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc); startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr); startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort); startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(true); startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp); log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp); log.info("startSendRtpResp {}",startSendRtpResp);
}); });
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(new OpenRtpServer(0, 0, key)); String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
Integer port = openRtpServerResp.getPort();
String zlmRtpUrl = "rtp://" + zlmMediaConfig.getIp() + ":" + port;
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor(); executeResultHandler.waitFor();
} catch (Exception e) {
throw new RuntimeException(e);
}
}; };
} }

View File

@ -1,12 +1,14 @@
package cn.skcks.docking.gb28181.mocking.service.zlm.hook; package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@Slf4j
@Data @Data
@Service @Service
public class ZlmStreamChangeHookService { public class ZlmStreamChangeHookService {
@ -17,13 +19,11 @@ public class ZlmStreamChangeHookService {
public ConcurrentMap<String, ZlmStreamChangeHookHandler> handlerMap = new ConcurrentHashMap<>(); public ConcurrentMap<String, ZlmStreamChangeHookHandler> handlerMap = new ConcurrentHashMap<>();
public void processEvent(String streamId, Boolean regist){ public void processEvent(String streamId, Boolean regist){
log.debug("stream {}, regist {}", streamId, regist);
if(!regist){ if(!regist){
return; return;
} }
Optional.ofNullable(handlerMap.remove(streamId)).ifPresent((handler)->{ Optional.ofNullable(handlerMap.remove(streamId)).ifPresent(ZlmStreamChangeHookHandler::handler);
handlerMap.remove(streamId);
handler.handler();
});
} }
} }

View File

@ -75,7 +75,7 @@ ffmpeg-support:
ffprobe: /usr/bin/ffmpeg/ffprobe ffprobe: /usr/bin/ffmpeg/ffprobe
rtp: rtp:
input: -re -i input: -re -i
output: -vcodec h264 -acodec aac -f rtp_mpegts output: -vcodec h264 -acodec aac -f flv
debug: debug:
download: false download: false
input: false input: false

View File

@ -53,15 +53,16 @@ gb28181:
password: 123456 password: 123456
expire: 3600 expire: 3600
transport: "UDP" transport: "UDP"
keep-alive: 60
server: server:
ip: 10.10.10.20 ip: 10.10.10.20
# ip: 192.168.10.32 # ip: 192.168.10.32
# ip: 192.168.3.12 # ip: 192.168.3.12
# port: 5060 # port: 5060
port: 5061 port: 5060
password: 123456 password: 123456
domain: 4405010000 domain: 4405010000
id: 44050100002000000010 id: 44050100002000000003
media: media:
local: local:
@ -72,6 +73,8 @@ media:
id: amrWMKmbKqoBjRQ9 id: amrWMKmbKqoBjRQ9
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc # secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333 secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333
rtmp:
port: 1936
proxy: proxy:
device: device:
url: http://10.10.10.20:18186 url: http://10.10.10.20:18186
@ -82,11 +85,11 @@ ffmpeg-support:
ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe
ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe
rtp: rtp:
input: -re -i http://10.10.10.200:5080/live/test.live.flv input: -thread_queue_size 128 -re -i http://10.10.10.200:5081/live/test.live.flv
# input: -re -i # input: -re -i
output: -vcodec h264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtp_mpegts output: -tune zerolatency -vcodec libx264 -acodec aac -preset ultrafast -vf scale=640:-1 -f flv #rtp_mpegts
# output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp # output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp
download: -i http://10.10.10.200:5080/live/test.live.flv download: -i http://10.10.10.200:5081/live/test.live.flv
debug: debug:
download: true download: true
input: true input: true

View File

@ -70,6 +70,8 @@ media:
id: amrWMKmbKqoBjRQ9 id: amrWMKmbKqoBjRQ9
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc # secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333 secret: 4155cca6-2f9f-11ee-85e6-8de4ce2e7333
rtmp:
port: 1935
proxy: proxy:
device: device:
url: http://192.168.2.3:18183 url: http://192.168.2.3:18183
@ -82,7 +84,7 @@ ffmpeg-support:
rtp: rtp:
download: -i download: -i
input: -re -i input: -re -i
output: -vcodec h264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtp_mpegts output: -vcodec h264 -acodec aac -preset ultrafast -vf scale=640:-1 -f flv
debug: debug:
download: false download: false
input: false input: false