推流方法调整

This commit is contained in:
shikong 2023-09-22 22:47:24 +08:00
parent ba665ca5dc
commit ffbb2fff54
4 changed files with 72 additions and 41 deletions

View File

@ -19,10 +19,9 @@ import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.DefaultExecuteResultHandler;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.Executor;
@ -48,11 +47,11 @@ public class DeviceProxyService {
private final SipSubscribe subscribe;
private final ConcurrentHashMap<String, Executor> callbackTask = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Executor> callbackTask = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Executor> downloadTask = new ConcurrentHashMap<>();
@Getter
private final AtomicInteger taskNum = new AtomicInteger(0);
private static final AtomicInteger taskNum = new AtomicInteger(0);
private final SipSender sender;
@ -72,8 +71,11 @@ public class DeviceProxyService {
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
taskNum.getAndIncrement();
callbackTask.put(device.getDeviceCode(), pushRtpTask(fromUrl, toUrl, time + 60, mediaStatus(request, device, key)));
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushRtpTask(fromUrl, toUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor();
};
}
@ -85,8 +87,11 @@ public class DeviceProxyService {
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
taskNum.getAndIncrement();
downloadTask.put(device.getDeviceCode(), pushDownload2RtpTask( fromUrl, toUrl, time + 60, mediaStatus(request,device,key)));
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushDownload2RtpTask(fromUrl, toUrl, time + 60, executeResultHandler);
downloadTask.put(device.getDeviceCode(), executor);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
executeResultHandler.waitFor();
};
}
@ -152,34 +157,59 @@ public class DeviceProxyService {
return ffmpegSupportService.pushDownload2Rtp(fromUrl, toUrl, time, TimeUnit.SECONDS, resultHandler);
}
public ExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){
return new ExecuteResultHandler() {
private void mediaStatus(){
taskNum.getAndDecrement();
CallIdHeader requestCallId = request.getCallId();
String callId = requestCallId.getCallId();
callbackTask.remove(callId);
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
@RequiredArgsConstructor
public static class FfmpegExecuteResultHandler implements ExecuteResultHandler {
private final static long SLEEP_TIME_MS = 50;
@Setter(AccessLevel.PRIVATE)
private boolean hasResult = false;
String tag = request.getFromHeader().getTag();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
}
private final SIPRequest request;
private final MockingDevice device;
private final String key;
private final SipSender sender;
@Override
public void onProcessComplete(int exitValue) {
mediaStatus();
}
private void mediaStatus(){
taskNum.getAndDecrement();
CallIdHeader requestCallId = request.getCallId();
String callId = requestCallId.getCallId();
callbackTask.remove(callId);
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
@Override
public void onProcessFailed(ExecuteException e) {
mediaStatus();
String tag = request.getFromHeader().getTag();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
}
public boolean hasResult() {
return hasResult;
}
@SneakyThrows
public void waitFor() {
while (!hasResult()) {
Thread.sleep(SLEEP_TIME_MS);
}
};
}
@Override
public void onProcessComplete(int exitValue) {
hasResult = true;
mediaStatus();
}
@Override
public void onProcessFailed(ExecuteException e) {
hasResult = true;
mediaStatus();
}
}
public FfmpegExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){
return new FfmpegExecuteResultHandler(request,device,key,sender);
}
/**

View File

@ -46,11 +46,10 @@ public class FfmpegSupportService {
public Executor ffmpegExecutor(String inputParam,String outputParam, long time, TimeUnit unit,ExecuteResultHandler resultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), inputParam, outputParam, logLevelParam);
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), logLevelParam, inputParam, outputParam);
CommandLine commandLine = CommandLine.parse(command);
Executor executor = new DefaultExecutor();
ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time));
executor.setExitValue(0);
executor.setWatchdog(watchdog);
executor.execute(commandLine, resultHandler);
return executor;

View File

@ -54,18 +54,18 @@ gb28181:
expire: 3600
transport: "UDP"
server:
ip: 10.10.10.20
ip: 10.10.10.200
# ip: 192.168.10.32
# ip: 192.168.3.12
# port: 5060
port: 5061
password: 123456
domain: 4405010000
id: 44050100002000000001
id: 44050100002000000010
media:
ip: 10.10.10.200
url: 'http://10.10.10.200:5080'
url: 'http://10.10.10.200:5081'
# url: 'http://10.10.10.200:12580/anything/'
id: amrWMKmbKqoBjRQ9
# secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
@ -82,9 +82,11 @@ ffmpeg-support:
rtp:
input: -re -i http://10.10.10.200:5080/live/test.live.flv
# input: -re -i
output: -vcodec h264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp
output: -vcodec h264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtp_mpegts
# output: -vcodec h264 -acodec aac -vf scale=640:-1 -f rtp_mpegts # -rtsp_transport tcp
download: -i http://10.10.10.200:5080/live/test.live.flv
debug:
download: false
download: true
input: true
output: false

View File

@ -24,8 +24,8 @@ spring:
username: root
password: 123456a
url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
# profiles:
# active: local
profiles:
active: local
gb28181:
# 作为28181服务器的配置
@ -80,7 +80,7 @@ ffmpeg-support:
rtp:
download: -i
input: -re -i
output: -vcodec h264 -acodec aac -preset ultrafast -f rtp_mpegts
output: -vcodec h264 -acodec aac -preset ultrafast -vf scale=640:-1 -f rtp_mpegts
debug:
download: false
input: false