ssrc 码流类型调整

This commit is contained in:
zxb 2023-08-16 15:49:56 +08:00
parent c8a6ad283c
commit 869cb772d9
7 changed files with 256 additions and 73 deletions

View File

@ -21,6 +21,7 @@ public class AssistRESTfulUtils {
private final static Logger logger = LoggerFactory.getLogger(AssistRESTfulUtils.class); private final static Logger logger = LoggerFactory.getLogger(AssistRESTfulUtils.class);
public interface RequestCallback{ public interface RequestCallback{
void run(JSONObject response); void run(JSONObject response);
} }
@ -72,51 +73,51 @@ public class AssistRESTfulUtils {
.get() .get()
.url(url) .url(url)
.build(); .build();
if (callback == null) { if (callback == null) {
try { try {
Response response = client.newCall(request).execute(); Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
ResponseBody responseBody = response.body();
if (responseBody != null) {
String responseStr = responseBody.string();
responseJSON = JSON.parseObject(responseStr);
}
}else {
response.close();
Objects.requireNonNull(response.body()).close();
}
} catch (ConnectException e) {
logger.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认Assist已启动...");
}catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
}else {
client.newCall(request).enqueue(new Callback(){
@Override
public void onResponse(@NotNull Call call, @NotNull Response response){
if (response.isSuccessful()) { if (response.isSuccessful()) {
ResponseBody responseBody = response.body(); try {
if (responseBody != null) { String responseStr = Objects.requireNonNull(response.body()).string();
String responseStr = responseBody.string(); callback.run(JSON.parseObject(responseStr));
responseJSON = JSON.parseObject(responseStr); } catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
} }
}else { }else {
response.close(); response.close();
Objects.requireNonNull(response.body()).close(); Objects.requireNonNull(response.body()).close();
} }
} catch (ConnectException e) { }
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
logger.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage())); logger.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认Assist已启动..."); logger.info("请检查media配置并确认Assist已启动...");
}catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
} }
}else { });
client.newCall(request).enqueue(new Callback(){ }
@Override
public void onResponse(@NotNull Call call, @NotNull Response response){
if (response.isSuccessful()) {
try {
String responseStr = Objects.requireNonNull(response.body()).string();
callback.run(JSON.parseObject(responseStr));
} catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
}else {
response.close();
Objects.requireNonNull(response.body()).close();
}
}
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
logger.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认Assist已启动...");
}
});
}
@ -145,4 +146,25 @@ public class AssistRESTfulUtils {
return sendGet(mediaServerItem, "api/record/addStreamCallInfo",param, callback); return sendGet(mediaServerItem, "api/record/addStreamCallInfo",param, callback);
} }
public JSONObject getDateList(MediaServerItem mediaServerItem, String app, String stream, int year, int month) {
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream", stream);
param.put("year", year);
param.put("month", month);
return sendGet(mediaServerItem, "api/record/date/list", param, null);
}
public JSONObject getFileList(MediaServerItem mediaServerItem, int page, int count, String app, String stream,
String startTime, String endTime) {
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream", stream);
param.put("page", page);
param.put("count", count);
param.put("startTime", startTime);
param.put("endTime", endTime);
return sendGet(mediaServerItem, "api/record/file/listWithDate", param, null);
}
} }

View File

@ -42,7 +42,7 @@ public class ZLMServerFactory {
* @param tcpMode 0/null udp 模式1 tcp 被动模式, 2 tcp 主动模式 * @param tcpMode 0/null udp 模式1 tcp 被动模式, 2 tcp 主动模式
* @return * @return
*/ */
public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port, Boolean reUsePort, Integer tcpMode) { public int createRTPServer(MediaServerItem mediaServerItem, String streamId, long ssrc, Integer port, Boolean reUsePort, Integer tcpMode) {
int result = -1; int result = -1;
// 查询此rtp server 是否已经存在 // 查询此rtp server 是否已经存在
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId); JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad; import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
import java.util.List; import java.util.List;
@ -93,4 +94,14 @@ public interface IMediaServerService {
* @return * @return
*/ */
MediaServerLoad getLoad(MediaServerItem mediaServerItem); MediaServerLoad getLoad(MediaServerItem mediaServerItem);
/**
* 按时间查找录像文件
*/
List<RecordFile> getRecords(String app, String stream, String startTime, String endTime, List<MediaServerItem> mediaServerItems);
/**
* 查找存在录像文件的时间
*/
List<String> getRecordDates(String app, String stream, int year, int month, List<MediaServerItem> mediaServerItems);
} }

View File

@ -24,23 +24,30 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.io.File; import java.io.File;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/** /**
* 媒体服务器节点管理 * 媒体服务器节点管理
@ -104,6 +111,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired @Autowired
private RedisTemplate<Object, Object> redisTemplate; private RedisTemplate<Object, Object> redisTemplate;
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
/** /**
* 初始化 * 初始化
@ -149,7 +161,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
} }
if (streamId == null) { if (streamId == null) {
streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
} }
int ssrcCheckParam = 0; int ssrcCheckParam = 0;
if (ssrcCheck && tcpMode > 1) { if (ssrcCheck && tcpMode > 1) {
@ -158,7 +170,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
} }
int rtpServerPort; int rtpServerPort;
if (mediaServerItem.isRtpEnable()) { if (mediaServerItem.isRtpEnable()) {
rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, (ssrcCheck && tcpMode == 0)?Integer.parseInt(ssrc):0, port, reUsePort, tcpMode); rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, (ssrcCheck && tcpMode == 0) ? Long.parseLong(ssrc) : 0, port, reUsePort, tcpMode);
} else { } else {
rtpServerPort = mediaServerItem.getRtpProxyPort(); rtpServerPort = mediaServerItem.getRtpProxyPort();
} }
@ -588,7 +600,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
param.put("protocol.continue_push_ms", "3000" ); param.put("protocol.continue_push_ms", "3000" );
// 最多等待未初始化的Track时间单位毫秒超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流 // 最多等待未初始化的Track时间单位毫秒超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流
// 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
// param.put("general.wait_track_ready_ms", "3000" ); // param.put("general.wait_track_ready_ms", "3000" );
if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) { if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-")); param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-"));
} }
@ -749,4 +761,89 @@ public class MediaServerServiceImpl implements IMediaServerService {
return result; return result;
} }
@Override
public List<RecordFile> getRecords(String app, String stream, String startTime, String endTime, List<MediaServerItem> mediaServerItems) {
Assert.notNull(app, "app不存在");
Assert.notNull(stream, "stream不存在");
Assert.notNull(startTime, "startTime不存在");
Assert.notNull(endTime, "endTime不存在");
Assert.notEmpty(mediaServerItems, "流媒体列表为空");
CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()];
for (int i = 0; i < mediaServerItems.size(); i++) {
completableFutures[i] = getRecordFilesForOne(app, stream, startTime, endTime, mediaServerItems.get(i));
}
List<RecordFile> result = new ArrayList<>();
for (int i = 0; i < completableFutures.length; i++) {
try {
List<RecordFile> list = (List<RecordFile>) completableFutures[i].get();
if (!list.isEmpty()) {
for (int g = 0; g < list.size(); g++) {
list.get(g).setMediaServerId(mediaServerItems.get(i).getId());
}
result.addAll(list);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
Comparator<RecordFile> comparator = Comparator.comparing(RecordFile::getFileName);
result.sort(comparator);
return result;
}
@Override
public List<String> getRecordDates(String app, String stream, int year, int month, List<MediaServerItem> mediaServerItems) {
Assert.notNull(app, "app不存在");
Assert.notNull(stream, "stream不存在");
Assert.notEmpty(mediaServerItems, "流媒体列表为空");
CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()];
for (int i = 0; i < mediaServerItems.size(); i++) {
completableFutures[i] = getRecordDatesForOne(app, stream, year, month, mediaServerItems.get(i));
}
List<String> result = new ArrayList<>();
CompletableFuture.allOf(completableFutures).join();
for (CompletableFuture completableFuture : completableFutures) {
try {
List<String> list = (List<String>) completableFuture.get();
result.addAll(list);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
Collections.sort(result);
return result;
}
@Async
public CompletableFuture<List<String>> getRecordDatesForOne(String app, String stream, int year, int month, MediaServerItem mediaServerItem) {
JSONObject fileListJson = assistRESTfulUtils.getDateList(mediaServerItem, app, stream, year, month);
if (fileListJson != null && !fileListJson.isEmpty()) {
if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) {
JSONArray data = fileListJson.getJSONArray("data");
return CompletableFuture.completedFuture(data.toJavaList(String.class));
}
}
return CompletableFuture.completedFuture(new ArrayList<>());
}
@Async
public CompletableFuture<List<RecordFile>> getRecordFilesForOne(String app, String stream, String startTime, String endTime, MediaServerItem mediaServerItem) {
JSONObject fileListJson = assistRESTfulUtils.getFileList(mediaServerItem, 1, 100000000, app, stream, startTime, endTime);
if (fileListJson != null && !fileListJson.isEmpty()) {
if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) {
JSONObject data = fileListJson.getJSONObject("data");
JSONArray list = data.getJSONArray("list");
if (list != null) {
return CompletableFuture.completedFuture(list.toJavaList(RecordFile.class));
}
}
}
return CompletableFuture.completedFuture(new ArrayList<>());
}
} }

View File

@ -0,0 +1,53 @@
package com.genersoft.iot.vmp.vmanager.bean;
public class RecordFile {
private String app;
private String stream;
private String fileName;
private String mediaServerId;
private String date;
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getMediaServerId() {
return mediaServerId;
}
public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
}

View File

@ -91,10 +91,10 @@ public class PsController {
if (isSend != null && isSend && callId == null) { if (isSend != null && isSend && callId == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时CallID不能为空"); throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时CallID不能为空");
} }
int ssrcInt = 0; long ssrcInt = 0;
if (ssrc != null) { if (ssrc != null) {
try { try {
ssrcInt = Integer.parseInt(ssrc); ssrcInt = Long.parseLong(ssrc);
}catch (NumberFormatException e) { }catch (NumberFormatException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误"); throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误");
} }
@ -185,7 +185,7 @@ public class PsController {
String stream, String stream,
String callId, String callId,
Boolean isUdp Boolean isUdp
) { ) {
logger.info("[第三方PS服务对接->发送流] " + logger.info("[第三方PS服务对接->发送流] " +
"ssrc->{}, \r\n" + "ssrc->{}, \r\n" +
"dstIp->{}, \n" + "dstIp->{}, \n" +
@ -193,12 +193,12 @@ public class PsController {
"app->{}, \n" + "app->{}, \n" +
"stream->{}, \n" + "stream->{}, \n" +
"callId->{} \n", "callId->{} \n",
ssrc, ssrc,
dstIp, dstIp,
dstPort, dstPort,
app, app,
stream, stream,
callId); callId);
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId; String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key); OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
@ -307,17 +307,17 @@ public class PsController {
public int getTestPort() { public int getTestPort() {
MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer(); MediaServerItem defaultMediaServer = mediaServerService.getDefaultMediaServer();
// for (int i = 0; i <300; i++) { // for (int i = 0; i <300; i++) {
// new Thread(() -> { // new Thread(() -> {
// int nextPort = sendRtpPortManager.getNextPort(defaultMediaServer); // int nextPort = sendRtpPortManager.getNextPort(defaultMediaServer);
// try { // try {
// Thread.sleep((int)Math.random()*10); // Thread.sleep((int)Math.random()*10);
// } catch (InterruptedException e) { // } catch (InterruptedException e) {
// throw new RuntimeException(e); // throw new RuntimeException(e);
// } // }
// System.out.println(nextPort); // System.out.println(nextPort);
// }).start(); // }).start();
// } // }
return sendRtpPortManager.getNextPort(defaultMediaServer); return sendRtpPortManager.getNextPort(defaultMediaServer);
} }

View File

@ -91,10 +91,10 @@ public class RtpController {
if (isSend != null && isSend && callId == null) { if (isSend != null && isSend && callId == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时CallID不能为空"); throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时CallID不能为空");
} }
int ssrcInt = 0; long ssrcInt = 0;
if (ssrc != null) { if (ssrc != null) {
try { try {
ssrcInt = Integer.parseInt(ssrc); ssrcInt = Long.parseLong(ssrc);
}catch (NumberFormatException e) { }catch (NumberFormatException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误"); throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误");
} }
@ -198,7 +198,7 @@ public class RtpController {
Boolean isUdp, Boolean isUdp,
@RequestParam(required = false)Integer ptForAudio, @RequestParam(required = false)Integer ptForAudio,
@RequestParam(required = false)Integer ptForVideo @RequestParam(required = false)Integer ptForVideo
) { ) {
logger.info("[第三方服务对接->发送流] " + logger.info("[第三方服务对接->发送流] " +
"ssrc->{}, \r\n" + "ssrc->{}, \r\n" +
"dstIpForAudio->{}, \n" + "dstIpForAudio->{}, \n" +
@ -210,16 +210,16 @@ public class RtpController {
"callId->{}, \n" + "callId->{}, \n" +
"ptForAudio->{}, \n" + "ptForAudio->{}, \n" +
"ptForVideo->{}", "ptForVideo->{}",
ssrc, ssrc,
dstIpForAudio, dstIpForAudio,
dstIpForVideo, dstIpForVideo,
dstPortForAudio, dstPortForAudio,
dstPortForVideo, dstPortForVideo,
app, app,
stream, stream,
callId, callId,
ptForAudio, ptForAudio,
ptForVideo); ptForVideo);
if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "至少应该存在一组音频或视频发送参数"); throw new ControllerException(ErrorCode.ERROR400.getCode(), "至少应该存在一组音频或视频发送参数");
} }