测试 拉流结束发送bye

This commit is contained in:
shikong 2023-09-25 14:00:48 +08:00
parent 2ec9dad3c8
commit 510319cd65
3 changed files with 162 additions and 18 deletions

View File

@ -42,8 +42,7 @@ import gov.nist.javax.sip.message.SIPResponse;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
@ -80,6 +79,15 @@ public class Gb28181DownloadService {
private final WvpProxyConfig wvpProxyConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class VideoInfo {
private String url;
private String callId;
private WvpProxyDevice device;
}
public void header(HttpServletResponse response) {
response.setContentType("video/mp4");
response.setHeader("Accept-Ranges", "none");
@ -131,16 +139,16 @@ public class Gb28181DownloadService {
asyncContext.start(()->{
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
try{
download(deviceCode, startTime,endTime).whenComplete((url, e)->{
download(deviceCode, startTime,endTime).whenComplete((videoInfo, e)->{
writeFileHeader(response,deviceCode,startTime,endTime,fileHeader);
if(e != null){
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
} else if(StringUtils.isBlank(url)){
} else if(videoInfo == null){
writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败"));
} else if(wvpProxyConfig.getUseFfmpeg()){
videoService.ffmpegRecord(asyncResponse, url, DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId());
} else {
videoService.javaCVrecord(asyncResponse, url, DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
}
asyncContext.complete();
});
@ -163,7 +171,7 @@ public class Gb28181DownloadService {
}
@SneakyThrows
public CompletableFuture<String> download(String deviceCode, Date startTime, Date endTime) {
public CompletableFuture<VideoInfo> download(String deviceCode, Date startTime, Date endTime) {
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
if (deviceByDeviceCode.isEmpty()) {
String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode);
@ -176,19 +184,19 @@ public class Gb28181DownloadService {
}
@SneakyThrows
public CompletableFuture<String> download(String gbDeviceId, String channel, Date startTime, Date endTime){
CompletableFuture<String> result = new CompletableFuture<>();
public CompletableFuture<VideoInfo> download(String gbDeviceId, String channel, Date startTime, Date endTime){
CompletableFuture<VideoInfo> result = new CompletableFuture<>();
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
if(deviceByGbDeviceId.isEmpty()){
log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId);
result.complete("");
result.complete(null);
return result;
}
Optional<WvpProxyDevice> deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel);
if (deviceByGbDeviceIdAndChannel.isEmpty()) {
log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel);
result.complete("");
result.complete(null);
return result;
}
@ -204,7 +212,7 @@ public class Gb28181DownloadService {
int port = openRtpServer(streamId, streamMode);
if(port <= 0){
log.error("zlm 暂无可用端口");
result.complete("");
result.complete(null);
return result;
}
String ssrc = ssrcService.getPlaySsrc();
@ -224,7 +232,7 @@ public class Gb28181DownloadService {
return result;
}
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture<String> result, long time) {
public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture<VideoInfo> result, long time) {
return (provider, ip, port) -> {
CallIdHeader callId = provider.getNewCallId();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
@ -235,7 +243,7 @@ public class Gb28181DownloadService {
};
}
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture<String> result, long time, TimeUnit unit){
public Flow.Subscriber<SIPResponse> inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture<VideoInfo> result, long time, TimeUnit unit){
ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@ -257,20 +265,20 @@ public class Gb28181DownloadService {
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
log.info("收到响应状态 {}", statusCode);
String callId = item.getCallId().getCallId();
sender.sendRequest(((provider, ip, port) -> {
String fromTag = item.getFromTag();
String toTag = item.getToTag();
String callId = item.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key,streamId, time, unit));
return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId);
}));
result.complete(videoUrl(streamId));
result.complete(new VideoInfo(videoUrl(streamId), callId, device));
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));
result.complete("");
result.complete(null);
ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc);
onComplete();
}

View File

@ -3,14 +3,23 @@ package cn.skcks.docking.gb28181.wvp.service.video;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.ffmpeg.FfmpegSupportService;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.*;
@ -22,7 +31,12 @@ import org.bytedeco.javacv.FFmpegFrameRecorder;
import org.bytedeco.javacv.FrameGrabber;
import org.springframework.stereotype.Service;
import java.io.*;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -35,6 +49,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class VideoService {
private final FfmpegSupportService ffmpegSupportService;
private final WvpProxyConfig wvpProxyConfig;
private final ProxySipConfig proxySipConfig;
private final DockingService dockingService;
private final SipSender sender;
/**
* 写入 flv 响应头信息
* @param response HttpServletResponse 响应
@ -83,6 +100,62 @@ public class VideoService {
}
}
@RequiredArgsConstructor
public class FfmpegExecuteResultHandler extends DefaultExecuteResultHandler implements ExecuteResultHandler {
private final static long SLEEP_TIME_MS = 50;
@Setter(AccessLevel.PRIVATE)
private boolean hasResult = false;
private final WvpProxyDevice device;
private final String callId;
private final SipSender sender;
private void mediaStatus(){
String deviceId = device.getGbDeviceId();
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(deviceId);
if(deviceByGbDeviceId.isEmpty()){
return;
}
WvpProxyDocking wvpProxyDocking = deviceByGbDeviceId.get();
String ip = wvpProxyDocking.getIp();
int port = Integer.parseInt(wvpProxyDocking.getPort());
String transport = proxySipConfig.getTransport();
SipProvider provider = sender.getProvider(transport, ip);
Request byeRequest = SipRequestBuilder.createByeRequest(ip, port, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, callId);
try{
provider.sendRequest(byeRequest);
}catch (Exception e){
log.error("bye 请求发送失败 {}",e.getMessage());
}
}
public boolean hasResult() {
return hasResult;
}
@SneakyThrows
public void waitFor() {
while (!hasResult()) {
Thread.sleep(SLEEP_TIME_MS);
}
}
@Override
public void onProcessComplete(int exitValue) {
hasResult = true;
mediaStatus();
}
@Override
public void onProcessFailed(ExecuteException e) {
hasResult = true;
mediaStatus();
}
}
public FfmpegExecuteResultHandler mediaStatus(WvpProxyDevice device, String key){
return new FfmpegExecuteResultHandler(device,key,sender);
}
/**
* 录制视频 并写入 异步响应
* @param response AsyncContext.getResponse 异步响应
@ -152,6 +225,33 @@ public class VideoService {
}
}
/**
* 录制视频 并写入 异步响应
* @param response AsyncContext.getResponse 异步响应
* @param url 要录制的视频地址
* @param time 录制时长 (单位: )
*/
@SneakyThrows
public void ffmpegRecord(ServletResponse response, String url, long time, WvpProxyDevice device,String callId){
ServletOutputStream outputStream = response.getOutputStream();
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, errorStream);
DefaultExecuteResultHandler executeResultHandler = mediaStatus(device,callId);
DateTime startTime = DateUtil.date();
Executor executor = ffmpegSupportService.downloadToStream(url, time, TimeUnit.SECONDS,streamHandler,executeResultHandler);
log.info("开始录制 {}", url);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
log.info("到达结束时间, 结束录制 {}", url);
executor.getWatchdog().destroyProcess();
log.info("结束录制 {}", url);
}, time, TimeUnit.SECONDS);
executeResultHandler.waitFor();
schedule.cancel(true);
DateTime endTime = DateUtil.date();
log.info("录制进程结束 {}, 录制耗时: {}", url, DateUtil.between(startTime,endTime, DateUnit.SECOND));
}
/**
* 录制视频 并写入 异步响应
* @param response AsyncContext.getResponse 异步响应

View File

@ -235,6 +235,42 @@ public class SipRequestBuilder implements ApplicationContextAware {
return request;
}
@SneakyThrows
public static Request createByeRequest(String ip, int port, String targetId, String fromTag, String toTag, String callId) {
Request request;
// 请求行
String target = StringUtils.joinWith(":", ip, port);
SipURI requestLine = MessageHelper.createSipURI(targetId, target);
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = getSipFactory().createHeaderFactory().createViaHeader(ip, port, sipConfig.getTransport(), SipUtil.generateViaTag());
viaHeaders.add(viaHeader);
// from
SipURI fromSipURI = MessageHelper.createSipURI(sipConfig.getId(), sipConfig.getDomain());
Address fromAddress = MessageHelper.createAddress(fromSipURI);
FromHeader fromHeader = MessageHelper.createFromHeader(fromAddress, fromTag);
// to
SipURI toSipURI = MessageHelper.createSipURI(targetId, target);
Address toAddress = MessageHelper.createAddress(toSipURI);
ToHeader toHeader = MessageHelper.createToHeader(toAddress, toTag);
// Forwards
MaxForwardsHeader maxForwards = getSipFactory().createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = getSipFactory().createHeaderFactory().createCSeqHeader(getCSeq(), Request.BYE);
CallIdHeader callIdHeader = getSipFactory().createHeaderFactory().createCallIdHeader(callId);
request = getSipFactory().createMessageFactory().createRequest(requestLine, Request.BYE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtil.createUserAgentHeader());
Address concatAddress = MessageHelper.createAddress(MessageHelper.createSipURI(sipConfig.getId(), ip + ":" + port));
request.addHeader(getSipFactory().createHeaderFactory().createContactHeader(concatAddress));
request.addHeader(SipUtil.createUserAgentHeader());
return request;
}
public static long getCSeq() {
String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId());