改为 使用独立的zlm 服务 并 hook

视频 先 ffmpeg 推流到 zlm
再由zlm携带 ssrc 推流到目标 服务
This commit is contained in:
shikong 2023-09-30 21:20:39 +08:00
parent 73305a4969
commit 995c10e1e8
10 changed files with 183 additions and 18 deletions

View File

@ -0,0 +1,26 @@
package cn.skcks.docking.gb28181.mocking.api.zlm;
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
import cn.skcks.docking.gb28181.mocking.api.zlm.dto.ZlmStreamChangeDTO;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@Tag(name = "zlm Hook")
@RestController
@RequestMapping("/zlm/hook")
@RequiredArgsConstructor
public class ZlmHookApi {
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
@PostJson("/on_stream_changed")
public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){
zlmStreamChangeHookService.processEvent(dto.getStream(), dto.getRegist());
}
}

View File

@ -0,0 +1,22 @@
package cn.skcks.docking.gb28181.mocking.api.zlm.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@Data
public class ZlmStreamChangeDTO {
@JsonProperty("mediaServerId")
private String mediaServerId;
@JsonProperty("app")
private String app;
@JsonProperty("regist")
private Boolean regist;
@JsonProperty("schema")
private String schema;
@JsonProperty("stream")
private String stream;
@JsonProperty("vhost")
private String vhost;
}

View File

@ -0,0 +1,12 @@
package cn.skcks.docking.gb28181.mocking.config.sip;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "media.local")
public class ZlmHookConfig {
private String hook;
}

View File

@ -6,6 +6,7 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
@ -217,7 +218,7 @@ public class InviteRequestProcessor implements MessageProcessor {
add(respMediaDescription);
}});
description.setSsrcField(gb28181Description.getSsrcField());
String ssrc = gb28181Description.getSsrcField().getSsrc();
String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(key);
@ -225,9 +226,9 @@ public class InviteRequestProcessor implements MessageProcessor {
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber;
if(!isDownload){
subscriber = placbackSubscriber(request, callId,device,start,stop,address,port,key,schedule);
subscriber = placbackSubscriber(request, callId,device,start,stop,address,port,key,ssrc,schedule);
} else {
subscriber = downloadSubscriber(request, callId,device,start,stop,address,port,key,schedule);
subscriber = downloadSubscriber(request, callId,device,start,stop,address,port,key,ssrc,schedule);
}
// 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
@ -240,7 +241,7 @@ public class InviteRequestProcessor implements MessageProcessor {
}, 1,TimeUnit.SECONDS);
}
public Flow.Subscriber<SIPRequest> placbackSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
public Flow.Subscriber<SIPRequest> placbackSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key, String ssrc,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
@ -252,7 +253,7 @@ public class InviteRequestProcessor implements MessageProcessor {
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.playbackTask());
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port,ssrc, deviceProxyService.playbackTask());
onComplete();
}
@ -269,7 +270,7 @@ public class InviteRequestProcessor implements MessageProcessor {
};
}
public Flow.Subscriber<SIPRequest> downloadSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
public Flow.Subscriber<SIPRequest> downloadSubscriber(SIPRequest request,String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,String ssrc,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
@ -281,7 +282,7 @@ public class InviteRequestProcessor implements MessageProcessor {
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.downloadTask());
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, ssrc,deviceProxyService.downloadTask());
onComplete();
}

View File

@ -4,11 +4,15 @@ import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.net.URLEncodeUtil;
import cn.hutool.core.util.URLUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.rtp.*;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
@ -17,6 +21,7 @@ import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
import cn.skcks.docking.gb28181.mocking.service.zlm.hook.ZlmStreamChangeHookService;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PreDestroy;
import lombok.*;
@ -32,6 +37,7 @@ import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.Date;
@ -57,15 +63,17 @@ public class DeviceProxyService {
private final SipSender sender;
private final FfmpegSupportService ffmpegSupportService;
private final ZlmMediaService zlmMediaService;
private final ZlmMediaConfig zlmMediaConfig;
private final ZlmStreamChangeHookService zlmStreamChangeHookService;
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public interface TaskProcessor {
void process(SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time);
void process(SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
}
public TaskProcessor playbackTask(){
return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> {
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
Optional.ofNullable(callbackTask.get(callId)).ifPresent(task->{
task.getWatchdog().destroyProcess();
});
@ -73,7 +81,23 @@ public class DeviceProxyService {
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
taskNum.getAndIncrement();
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushRtpTask(fromUrl, toUrl, time + 60, executeResultHandler);
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(new OpenRtpServer(0, 0, key));
log.info("openRtpServerResp {}",openRtpServerResp);
Integer port = openRtpServerResp.getPort();
String zlmRtpUrl = "rtp://" + zlmMediaConfig.getIp() + ":" + port;
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
zlmStreamChangeHookService.handlerMap.put(key,()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("rtp");
startSendRtp.setStream(key);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(true);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
});
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor();
@ -81,7 +105,7 @@ public class DeviceProxyService {
}
public TaskProcessor downloadTask(){
return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time)->{
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
Optional.ofNullable(downloadTask.get(callId)).ifPresent(task->{
task.getWatchdog().destroyProcess();
});
@ -89,7 +113,22 @@ public class DeviceProxyService {
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
taskNum.getAndIncrement();
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushDownload2RtpTask(fromUrl, toUrl, time + 60, executeResultHandler);
OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(new OpenRtpServer(0, 0, key));
Integer port = openRtpServerResp.getPort();
String zlmRtpUrl = "rtp://" + zlmMediaConfig.getIp() + ":" + port;
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
zlmStreamChangeHookService.handlerMap.put(key,()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("rtp");
startSendRtp.setStream(key);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(true);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
});
downloadTask.put(device.getDeviceCode(), executor);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
executeResultHandler.waitFor();
@ -130,7 +169,7 @@ public class DeviceProxyService {
};
}
public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) {
public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) {
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3);
String deviceCode = device.getDeviceCode();
@ -140,12 +179,11 @@ public class DeviceProxyService {
String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8);
fromUrl = StringUtils.joinWith("?", fromUrl, query);
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
taskProcessor.process(request, callId,fromUrl,toUrl,device,key,time);
taskProcessor.process(request, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc);
}
@SneakyThrows

View File

@ -0,0 +1,32 @@
package cn.skcks.docking.gb28181.mocking.service.zlm;
import cn.skcks.docking.gb28181.media.dto.config.HookConfig;
import cn.skcks.docking.gb28181.media.dto.config.ServerConfig;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.List;
@Order(0)
@Slf4j
@RequiredArgsConstructor
@Component
public class ZlmInitService {
private final ZlmMediaService zlmMediaService;
private final ZlmHookConfig zlmHookConfig;
@PostConstruct
public void init(){
ZlmResponse<List<ServerConfig>> serverConfig = zlmMediaService.getServerConfig();
List<ServerConfig> data = serverConfig.getData();
ServerConfig config = data.get(0);
HookConfig hook = config.getHook();
hook.setOnStreamChanged(zlmHookConfig.getHook() + "/on_stream_changed");
zlmMediaService.setServerConfig(config);
}
}

View File

@ -0,0 +1,29 @@
package cn.skcks.docking.gb28181.mocking.service.zlm.hook;
import lombok.Data;
import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Data
@Service
public class ZlmStreamChangeHookService {
public interface ZlmStreamChangeHookHandler{
void handler();
}
public ConcurrentMap<String, ZlmStreamChangeHookHandler> handlerMap = new ConcurrentHashMap<>();
synchronized public void processEvent(String streamId, Boolean regist){
if(!regist){
return;
}
Optional.ofNullable(handlerMap.remove(streamId)).ifPresent((handler)->{
handlerMap.remove(streamId);
handler.handler();
});
}
}

View File

@ -8,6 +8,7 @@ import org.springframework.context.annotation.ComponentScan;
@EnableFeignClients(basePackages = "cn.skcks.docking.gb28181.media")
@SpringBootApplication
@ComponentScan(basePackages = {
"cn.skcks.docking.gb28181.media",
"cn.skcks.docking.gb28181.annotation",
"cn.skcks.docking.gb28181.common",
"cn.skcks.docking.gb28181.mocking",

View File

@ -54,7 +54,7 @@ gb28181:
expire: 3600
transport: "UDP"
server:
ip: 10.10.10.200
ip: 10.10.10.20
# ip: 192.168.10.32
# ip: 192.168.3.12
# port: 5060
@ -64,6 +64,8 @@ gb28181:
id: 44050100002000000010
media:
local:
hook: http://10.10.10.20:18182/zlm/hook
ip: 10.10.10.200
url: 'http://10.10.10.200:5081'
# url: 'http://10.10.10.200:12580/anything/'

View File

@ -62,7 +62,9 @@ gb28181:
id: 44050100002000000001
media:
ip: 192.168.10.32
local:
hook: http://192.168.3.11:18182/zlm/hook
ip: 192.168.3.12
url: 'http://192.168.3.12:5081'
# url: 'http://10.10.10.200:12580/anything/'
id: amrWMKmbKqoBjRQ9