Merge pull request #893 from sxh-netizen/wvp-28181-2.0

新增设备主子码流开关选择,默认为不开启
This commit is contained in:
648540858 2023-06-20 14:16:46 +08:00 committed by GitHub
commit fa62ab9a01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 846 additions and 205 deletions

View File

@ -7,6 +7,7 @@ alter table parent_platform
alter table device
add mediaServerId varchar(50) default null;
ALTER TABLE device
ADD COLUMN `switchPrimarySubStream` bit(1) NOT NULL DEFAULT b'0' COMMENT '开启主子码流切换的开关0-不开启1-开启)现在已知支持设备为 大华、TP——LINK全系设备' AFTER `keepalive_interval_time`

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.common;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* 记录每次发送invite消息的状态
@ -123,4 +124,40 @@ public class InviteInfo {
public void setStreamMode(String streamMode) {
this.streamMode = streamMode;
}
/*=========================设备主子码流逻辑START====================*/
@Schema(description = "是否为子码流(true-是false-主码流)")
private boolean subStream;
public boolean isSubStream() {
return subStream;
}
public void setSubStream(boolean subStream) {
this.subStream = subStream;
}
public static InviteInfo getInviteInfo(String deviceId, String channelId,Boolean isSubStream, String stream, SSRCInfo ssrcInfo,
String receiveIp, Integer receivePort, String streamMode,
InviteSessionType type, InviteSessionStatus status) {
InviteInfo inviteInfo = new InviteInfo();
inviteInfo.setDeviceId(deviceId);
inviteInfo.setChannelId(channelId);
inviteInfo.setStream(stream);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setReceiveIp(receiveIp);
inviteInfo.setReceivePort(receivePort);
inviteInfo.setStreamMode(streamMode);
inviteInfo.setType(type);
inviteInfo.setStatus(status);
if(isSubStream != null){
inviteInfo.setSubStream(isSubStream);
}
return inviteInfo;
}
/*=========================设备主子码流逻辑END====================*/
}

View File

@ -528,4 +528,31 @@ public class StreamInfo implements Serializable, Cloneable{
}
return instance;
}
/*=========================设备主子码流逻辑START====================*/
@Schema(description = "是否为子码流(true-是false-主码流)")
private boolean subStream;
public boolean isSubStream() {
return subStream;
}
public void setSubStream(boolean subStream) {
this.subStream = subStream;
}
public static String getPlayStream(String deviceId,String channelId,boolean isSubStream){
String streamId;
if(isSubStream){
streamId = String.format("%s_%s_%s","sub",deviceId, channelId);
}else {
streamId = String.format("%s_%s_%s","main", deviceId, channelId);
}
return streamId;
}
/*=========================设备主子码流逻辑END====================*/
}

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.conf;
import io.swagger.v3.oas.annotations.media.Schema;
import org.springframework.core.annotation.Order;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ -25,11 +26,11 @@ public class UserSetting {
private int platformPlayTimeout = 60000;
private Boolean interfaceAuthentication = Boolean.TRUE;
private Boolean interfaceAuthentication = Boolean.FALSE;
private Boolean recordPushLive = Boolean.TRUE;
private Boolean recordPushLive = Boolean.FALSE;
private Boolean recordSip = Boolean.TRUE;
private Boolean recordSip = Boolean.FALSE;
private Boolean logInDatebase = Boolean.TRUE;

View File

@ -189,6 +189,8 @@ public class Device {
private SipTransactionInfo sipTransactionInfo;
public String getDeviceId() {
return deviceId;
}
@ -447,4 +449,20 @@ public class Device {
public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) {
this.sipTransactionInfo = sipTransactionInfo;
}
/*======================设备主子码流逻辑START=========================*/
@Schema(description = "开启主子码流切换的开关false-不开启true-开启)")
private boolean switchPrimarySubStream;
public boolean isSwitchPrimarySubStream() {
return switchPrimarySubStream;
}
public void setSwitchPrimarySubStream(boolean switchPrimarySubStream) {
this.switchPrimarySubStream = switchPrimarySubStream;
}
/*======================设备主子码流逻辑END=========================*/
}

View File

@ -155,4 +155,30 @@ public class DeferredResultHolder {
map.remove(msg.getKey());
}
}
/*============================设备主子码流逻辑START========================*/
public static String getPlayKey(String deviceId,String channelId,boolean deviceSwitchSubStream,boolean isSubStream){
String key = null;
if(deviceSwitchSubStream){
key = CALLBACK_CMD_PLAY + isSubStream + deviceId + channelId;
}else {
key = CALLBACK_CMD_PLAY +deviceId + channelId;
}
return key;
}
public static String getSnapKey(String deviceId,String channelId,boolean deviceSwitchSubStream,boolean isSubStream){
String key = null;
if(deviceSwitchSubStream){
key = CALLBACK_CMD_SNAP + isSubStream + deviceId + channelId;
}else {
key = CALLBACK_CMD_SNAP +deviceId + channelId;
}
return key;
}
/*============================设备主子码流逻辑END========================*/
}

View File

@ -98,7 +98,7 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
* 请求回放视频流

View File

@ -266,7 +266,7 @@ public class SIPCommander implements ISIPCommander {
* @param errorEvent sip错误订阅
*/
@Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream,
ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String stream = ssrcInfo.getStream();
@ -341,6 +341,22 @@ public class SIPCommander implements ISIPCommander {
}
}
if( device.isSwitchPrimarySubStream() ){
if("TP-LINK".equals(device.getManufacturer())){
if (isSubStream){
content.append("a=streamMode:sub\r\n");
}else {
content.append("a=streamMode:main\r\n");
}
}else {
if (isSubStream){
content.append("a=streamprofile:1\r\n");
}else {
content.append("a=streamprofile:0\r\n");
}
}
}
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
// f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
// content.append("f=v/2/5/25/1/4000a/1/8/1" + "\r\n"); // 未发现支持此特性的设备
@ -356,7 +372,11 @@ public class SIPCommander implements ISIPCommander {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY);
if(device.isSwitchPrimarySubStream()){
streamSession.put(device.getDeviceId(), channelId, "switch-play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY);
}else {
streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY);
}
okEvent.response(e);
});
}

View File

@ -142,8 +142,13 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
// 可能是设备主动停止
Device device = storager.queryVideoDeviceByChannelId(platformGbId);
if (device != null) {
storager.stopPlay(device.getDeviceId(), channelId);
SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
SsrcTransaction ssrcTransactionForPlay = null;
if (device.isSwitchPrimarySubStream() ) {
ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "switch-play", null);
} else {
storager.stopPlay(device.getDeviceId(), channelId);
ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
}
if (ssrcTransactionForPlay != null){
if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
// 释放ssrc
@ -153,10 +158,17 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
if (inviteInfo != null) {
InviteInfo inviteInfo = null;
if (device.isSwitchPrimarySubStream() ) {
String streamType = ssrcTransactionForPlay.getStream().split("_")[0];
boolean isSubStream = "sub".equals(streamType);
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
inviteStreamService.removeInviteInfo(inviteInfo.getType(),inviteInfo.getDeviceId(),inviteInfo.getChannelId(),isSubStream,inviteInfo.getStream());
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfo(inviteInfo);
}
if (inviteInfo != null) {
if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
}

View File

@ -489,7 +489,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
sendRtpItem.setStreamId(streamId);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.play(mediaServerItem, device.getDeviceId(), channelId, ((code, msg, data) -> {
playService.play(mediaServerItem, device.getDeviceId(), channelId,false, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()){
hookEvent.run(code, msg, data);
}else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){

View File

@ -289,6 +289,7 @@ public class ZLMHttpHookListener {
@ResponseBody
@PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) {
if (param.isRegist()) {
logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
} else {
@ -310,11 +311,13 @@ public class ZLMHttpHookListener {
List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
// TODO 重构此处逻辑
boolean isPush = false;
if (param.isRegist()) {
// 处理流注册的鉴权信息
if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
isPush = true;
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream());
if (streamAuthorityInfo == null) {
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
@ -328,7 +331,7 @@ public class ZLMHttpHookListener {
redisCatchStorage.removeStreamAuthorityInfo(param.getApp(), param.getStream());
}
if ("rtmp".equals(param.getSchema())) {
if ("rtsp".equals(param.getSchema())) {
// 更新流媒体负载信息
if (param.isRegist()) {
mediaServerService.addCount(param.getMediaServerId());
@ -342,10 +345,19 @@ public class ZLMHttpHookListener {
}
if ("rtp".equals(param.getApp()) && !param.isRegist()) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
inviteStreamService.removeInviteInfo(inviteInfo);
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
if(param.getStream().split("_").length == 3){
boolean isSubStream = "sub".equals(param.getStream().split("_")[0]);
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream(), isSubStream);
if(inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY )){
inviteStreamService.removeInviteInfo(inviteInfo.getType(),inviteInfo.getDeviceId(),
inviteInfo.getChannelId(),inviteInfo.isSubStream(),inviteInfo.getStream());
}
}else {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
inviteStreamService.removeInviteInfo(inviteInfo);
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
}
}
} else {
if (!"rtp".equals(param.getApp())) {
@ -360,8 +372,6 @@ public class ZLMHttpHookListener {
StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaInfo,
param.getApp(), param.getStream(), tracks, callId);
param.setStreamInfo(new StreamContent(streamInfoByAppAndStream));
// 如果是拉流代理产生的不需要写入推流
redisCatchStorage.addStream(mediaInfo, type, param.getApp(), param.getStream(), param);
if (param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
@ -450,6 +460,11 @@ public class ZLMHttpHookListener {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
// 点播
if (inviteInfo != null) {
// 录像下载
if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
ret.put("close", false);
return ret;
}
// 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
@ -467,27 +482,33 @@ public class ZLMHttpHookListener {
}
}
}
Device device = deviceService.getDevice(inviteInfo.getDeviceId());
if (device != null) {
try {
InviteInfo info = null;
if(device.isSwitchPrimarySubStream()){
boolean isSubStream = "sub".equals(param.getStream().split("_")[0]);
info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(),isSubStream, inviteInfo.getStream());
}else {
info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (userSetting.getStreamOnDemand()) {
// 录像下载
if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
ret.put("close", false);
return ret;
}
Device device = deviceService.getDevice(inviteInfo.getDeviceId());
if (device != null) {
try {
if (inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()) != null) {
cmder.streamByeCmd(device, inviteInfo.getChannelId(),
inviteInfo.getStream(), null);
}
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
}
}
if (info != null) {
cmder.streamByeCmd(device, inviteInfo.getChannelId(),
inviteInfo.getStream(), null);
}
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
}
}
if(device.isSwitchPrimarySubStream()){
boolean isSubStream = "sub".equals(param.getStream().split("_")[0]);
inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(),isSubStream, inviteInfo.getStream());
}else {
inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
@ -499,7 +520,7 @@ public class ZLMHttpHookListener {
// 拉流代理
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
if (streamProxyItem.isEnableRemoveNoneReader()) {
if (streamProxyItem.isEnableDisableNoneReader()) {
// 无人观看自动移除
ret.put("close", true);
streamProxyService.del(param.getApp(), param.getStream());
@ -544,12 +565,26 @@ public class ZLMHttpHookListener {
if ("rtp".equals(param.getApp())) {
String[] s = param.getStream().split("_");
if (!mediaInfo.isRtpEnable() || s.length != 2) {
if (!mediaInfo.isRtpEnable() ) {
defaultResult.setResult(HookResult.SUCCESS());
return defaultResult;
}else if(s.length != 2 && s.length != 3 ){
defaultResult.setResult(HookResult.SUCCESS());
return defaultResult;
}
String deviceId = s[0];
String channelId = s[1];
String deviceId = null;
String channelId = null;
boolean isSubStream = false;
if (s[0].length() < 20) {
if ("sub".equals(s[0])) {
isSubStream = true;
}
deviceId = s[1];
channelId = s[2];
} else {
deviceId = s[0];
channelId = s[1];
}
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
@ -563,7 +598,7 @@ public class ZLMHttpHookListener {
logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
String key = DeferredResultHolder.getPlayKey(deviceId, channelId, device.isSwitchPrimarySubStream(), isSubStream);
boolean exist = resultHolder.exist(key, null);
msg.setKey(key);
String uuid = UUID.randomUUID().toString();
@ -581,7 +616,7 @@ public class ZLMHttpHookListener {
resultHolder.put(key, uuid, result);
if (!exist) {
playService.play(mediaInfo, deviceId, channelId, (code, message, data) -> {
playService.play(mediaInfo, deviceId, channelId,isSubStream, (code, message, data) -> {
msg.setData(new HookResult(code, message));
resultHolder.invokeResult(msg);
});

View File

@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import java.util.List;
/**
* 记录国标点播的状态包括实时预览下载录像回放
*/
@ -70,4 +72,50 @@ public interface IInviteStreamService {
* 统计同一个zlm下的国标收流个数
*/
int getStreamInfoCount(String mediaServerId);
/*======================设备主子码流逻辑START=========================*/
/**
* 获取点播的状态信息
*/
InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type,
String deviceId,
String channelId,boolean isSubStream);
void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId,boolean isSubStream);
InviteInfo getInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
boolean isSubStream,
String stream);
void removeInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
boolean isSubStream,
String stream);
void once(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream, ErrorCallback<Object> callback);
void call(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream, int code, String msg, Object data);
void updateInviteInfoSub(InviteInfo inviteInfo);
/**
* 获取点播的状态信息
*/
InviteInfo getInviteInfoByStream(InviteSessionType type, String stream,boolean isSubStream);
/**
* 获取点播的状态信息
*/
List<Object> getInviteInfos(InviteSessionType type,
String deviceId,
String channelId,
String stream);
/*======================设备主子码流逻辑END=========================*/
}

View File

@ -16,9 +16,9 @@ import java.text.ParseException;
*/
public interface IPlayService {
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream,
ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId,boolean isSubStream, ErrorCallback<Object> callback);
MediaServerItem getNewMediaServerItem(Device device);
@ -43,5 +43,5 @@ public interface IPlayService {
void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback);
void getSnap(String deviceId, String channelId, String fileName,boolean isSubStream, ErrorCallback errorCallback);
}

View File

@ -1,14 +1,17 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
@ -47,6 +50,8 @@ public class DeviceServiceImpl implements IDeviceService {
private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
@Autowired
private SIPCommander cmder;
@Autowired
private DynamicTask dynamicTask;
@ -131,6 +136,10 @@ public class DeviceServiceImpl implements IDeviceService {
}
sync(device);
}else {
if (deviceInDb != null) {
device.setSwitchPrimarySubStream(deviceInDb.isSwitchPrimarySubStream());
}
if(!device.isOnLine()){
device.setOnLine(true);
device.setCreateTime(now);
@ -460,6 +469,22 @@ public class DeviceServiceImpl implements IDeviceService {
logger.warn("更新设备时未找到设备信息");
return;
}
if(deviceInStore.isSwitchPrimarySubStream() != device.isSwitchPrimarySubStream()){
//当修改设备的主子码流开关时需要校验是否存在流如果存在流则直接关闭
List<SsrcTransaction> ssrcTransactionForAll = streamSession.getSsrcTransactionForAll(device.getDeviceId(), null, null, null);
if(ssrcTransactionForAll != null){
for (SsrcTransaction ssrcTransaction: ssrcTransactionForAll) {
try {
cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), ssrcTransaction.getStream(), null, null);
} catch (InvalidArgumentException | SsrcTransactionNotFoundException | ParseException | SipException e) {
throw new RuntimeException(e);
}
}
}
deviceChannelMapper.clearPlay(device.getDeviceId());
inviteStreamService.clearInviteInfo(device.getDeviceId());
}
if (!ObjectUtils.isEmpty(device.getName())) {
deviceInStore.setName(device.getName());
}

View File

@ -198,4 +198,164 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
}
return count;
}
/*======================设备主子码流逻辑START=========================*/
@Override
public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId, boolean isSubStream) {
return getInviteInfo(type, deviceId, channelId,isSubStream, null);
}
@Override
public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId, boolean isSubStream) {
removeInviteInfo(inviteSessionType, deviceId, channelId,isSubStream, null);
}
@Override
public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX +
"_" + (type != null ? type : "*") +
"_" + (isSubStream ? "sub" : "main") +
"_" + (deviceId != null ? deviceId : "*") +
"_" + (channelId != null ? channelId : "*") +
"_" + (stream != null ? stream : "*");
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() != 1) {
return null;
}
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
}
@Override
public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) {
String scanKey = VideoManagerConstants.INVITE_PREFIX +
"_" + (type != null ? type : "*") +
"_" + (isSubStream ? "sub" : "main") +
"_" + (deviceId != null ? deviceId : "*") +
"_" + (channelId != null ? channelId : "*") +
"_" + (stream != null ? stream : "*");
List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
if (scanResult.size() > 0) {
for (Object keyObj : scanResult) {
String key = (String) keyObj;
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
if (inviteInfo == null) {
continue;
}
redisTemplate.delete(key);
inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
}
}
}
@Override
public void once(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, ErrorCallback<Object> callback) {
String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
callbacks = new CopyOnWriteArrayList<>();
inviteErrorCallbackMap.put(key, callbacks);
}
callbacks.add(callback);
}
@Override
public void call(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, int code, String msg, Object data) {
String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
return;
}
for (ErrorCallback<Object> callback : callbacks) {
callback.run(code, msg, data);
}
inviteErrorCallbackMap.remove(key);
}
private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) {
String key = type + "_" + (isSubStream ? "sub":"main") + "_" + deviceId + "_" + channelId;
// 如果ssrc为null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite
if (stream != null) {
key += ("_" + stream);
}
return key;
}
@Override
public void updateInviteInfoSub(InviteInfo inviteInfo) {
if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
return;
}
InviteInfo inviteInfoForUpdate = null;
if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
if (inviteInfo.getDeviceId() == null
|| inviteInfo.getChannelId() == null
|| inviteInfo.getType() == null
|| inviteInfo.getStream() == null
) {
return;
}
inviteInfoForUpdate = inviteInfo;
} else {
InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(),inviteInfo.isSubStream(), inviteInfo.getStream());
if (inviteInfoInRedis == null) {
logger.warn("[更新Invite信息]未从缓存中读取到Invite信息 deviceId: {}, channel: {}, stream: {}",
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
return;
}
if (inviteInfo.getStreamInfo() != null) {
inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
}
if (inviteInfo.getSsrcInfo() != null) {
inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
}
if (inviteInfo.getStreamMode() != null) {
inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
}
if (inviteInfo.getReceiveIp() != null) {
inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
}
if (inviteInfo.getReceivePort() != null) {
inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
}
if (inviteInfo.getStatus() != null) {
inviteInfoInRedis.setStatus(inviteInfo.getStatus());
}
inviteInfoForUpdate = inviteInfoInRedis;
}
String key = VideoManagerConstants.INVITE_PREFIX +
"_" + inviteInfoForUpdate.getType() +
"_" + (inviteInfoForUpdate.isSubStream() ? "sub":"main") +
"_" + inviteInfoForUpdate.getDeviceId() +
"_" + inviteInfoForUpdate.getChannelId() +
"_" + inviteInfoForUpdate.getStream();
redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
}
@Override
public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream, boolean isSubStream) {
return getInviteInfo(type, null, null,isSubStream, stream);
}
@Override
public List<Object> getInviteInfos(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX +
"_" + (type != null ? type : "*") +
"_" + (deviceId != null ? deviceId : "*") +
"_" + (channelId != null ? channelId : "*") +
"_" + (stream != null ? stream : "*");
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
return scanResult;
}
/*======================设备主子码流逻辑END=========================*/
}

View File

@ -18,7 +18,6 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
@ -116,28 +115,43 @@ public class PlayServiceImpl implements IPlayService {
@Override
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback) {
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId,boolean isSubStream, ErrorCallback<Object> callback) {
if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
}
Device device = redisCatchStorage.getDevice(deviceId);
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
InviteInfo inviteInfo;
if(device.isSwitchPrimarySubStream()){
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
if (inviteInfo != null ) {
if (inviteInfo.getStreamInfo() == null) {
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId,isSubStream, null, callback);
}else {
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
}
return inviteInfo.getSsrcInfo();
}else {
StreamInfo streamInfo = inviteInfo.getStreamInfo();
String streamId = streamInfo.getStream();
if (streamId == null) {
callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
"点播失败, redis缓存streamId等于null",
null);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
"点播失败, redis缓存streamId等于null",
null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
"点播失败, redis缓存streamId等于null",
null);
}
return inviteInfo.getSsrcInfo();
}
String mediaServerId = streamInfo.getMediaServerId();
@ -146,41 +160,64 @@ public class PlayServiceImpl implements IPlayService {
Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
if (ready != null && ready) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}
return inviteInfo.getSsrcInfo();
}else {
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if(device.isSwitchPrimarySubStream()) {
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}else {
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId,isSubStream, null, callback);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}
}
}
}
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
if(device.isSwitchPrimarySubStream()){
streamId = StreamInfo.getPlayStream(deviceId, channelId, isSubStream);
}else {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
if (ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
null);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
null);
}
return null;
}
// TODO 记录点播的状态
play(mediaServerItem, ssrcInfo, device, channelId, callback);
play(mediaServerItem, ssrcInfo, device, channelId,isSubStream, callback);
return ssrcInfo;
}
@Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream,
ErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
@ -189,21 +226,11 @@ public class PlayServiceImpl implements IPlayService {
null);
return;
}
logger.info("\r\n" +
" [点播开始] \r\n" +
"deviceId : {}, \r\n" +
"channelId : {},\r\n" +
"收流端口 : {}, \r\n" +
"收流模式 : {}, \r\n" +
"SSRC : {}, \r\n" +
"SSRC校验 {}",
device.getDeviceId(),
channelId,
ssrcInfo.getPort(),
device.getStreamMode(),
ssrcInfo.getSsrc(),
device.isSsrcCheck());
if( device.isSwitchPrimarySubStream() ){
logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流", ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
}else {
logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
}
//端口获取失败的ssrcInfo 没有必要发送点播指令
if (ssrcInfo.getPort() <= 0) {
logger.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
@ -212,23 +239,50 @@ public class PlayServiceImpl implements IPlayService {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
}
return;
}
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
InviteInfo inviteInfo;
if(device.isSwitchPrimarySubStream()){
// 初始化redis中的invite消息状态
inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId,isSubStream, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfoSub(inviteInfo);
}else {
// 初始化redis中的invite消息状态
inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
}
// 超时处理
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 执行超时任务时查询是否已经成功成功了则不执行超时任务防止超时任务取消失败的情况
InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
InviteInfo inviteInfoForTimeOut;
if(device.isSwitchPrimarySubStream()){
// 初始化redis中的invite消息状态
inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
}else {
// 初始化redis中的invite消息状态
inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
if( device.isSwitchPrimarySubStream()){
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getSsrc());
}else {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
}
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
// InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
// if (inviteInfoForTimeout == null) {
@ -240,10 +294,16 @@ public class PlayServiceImpl implements IPlayService {
// // TODO 发送cancel
// }
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
@ -261,25 +321,42 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout());
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isSubStream, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId);
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId,isSubStream);
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}
if( device.isSwitchPrimarySubStream() ){
logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流");
}else {
logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
}
String streamUrl;
if (mediaServerItemInuse.getRtspPort() != 0) {
streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream());
@ -298,16 +375,17 @@ public class PlayServiceImpl implements IPlayService {
ResponseEvent responseEvent = (ResponseEvent) event.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcInResponse != null) {
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有f=字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
String substring = contentString.substring(0, contentString.indexOf("y="));
try {
Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
SessionDescription sdp = gb28181Sdp.getBaseSdb();
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
int port = -1;
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
for (Object description : mediaDescriptions) {
@ -334,21 +412,24 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}
}
}
return;
}
logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
// 单端口模式streamId也有变化重新设置监听即可
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
@ -361,21 +442,34 @@ public class PlayServiceImpl implements IPlayService {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId);
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId,isSubStream);
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(), streamInfo);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}
});
return;
}
@ -391,14 +485,22 @@ public class PlayServiceImpl implements IPlayService {
}
dynamicTask.stop(timeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}
}else {
ssrcInfo.setSsrc(ssrcInResponse);
@ -409,7 +511,11 @@ public class PlayServiceImpl implements IPlayService {
logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
}
}
inviteStreamService.updateInviteInfo(inviteInfo);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.updateInviteInfoSub(inviteInfo);
}else {
inviteStreamService.updateInviteInfo(inviteInfo);
}
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
@ -420,11 +526,19 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
});
} catch (InvalidArgumentException | SipException | ParseException e) {
@ -438,27 +552,51 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
if( device.isSwitchPrimarySubStream()){
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
}
}
private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) {
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId,boolean isSubStream) {
StreamInfo streamInfo = null;
Device device = redisCatchStorage.getDevice(deviceId);
if( device.isSwitchPrimarySubStream() ){
streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId,isSubStream);
}else {
streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
}
if (streamInfo != null) {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
deviceChannel.setStreamId(streamInfo.getStream());
storager.startPlay(deviceId, channelId, streamInfo.getStream());
InviteInfo inviteInfo;
if(device.isSwitchPrimarySubStream()){
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
deviceChannel.setStreamId(streamInfo.getStream());
storager.startPlay(deviceId, channelId, streamInfo.getStream());
}
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfo != null) {
inviteInfo.setStatus(InviteSessionStatus.ok);
inviteInfo.setStreamInfo(streamInfo);
inviteStreamService.updateInviteInfo(inviteInfo);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.updateInviteInfoSub(inviteInfo);
}else {
inviteStreamService.updateInviteInfo(inviteInfo);
}
}
}
return streamInfo;
@ -607,16 +745,17 @@ public class PlayServiceImpl implements IPlayService {
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcInResponse != null) {
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有f=字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
String substring = contentString.substring(0, contentString.indexOf("y="));
try {
Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
SessionDescription sdp = gb28181Sdp.getBaseSdb();
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
int port = -1;
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
for (Object description : mediaDescriptions) {
@ -684,6 +823,8 @@ public class PlayServiceImpl implements IPlayService {
}
dynamicTask.stop(playBackTimeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@ -799,15 +940,17 @@ public class PlayServiceImpl implements IPlayService {
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcInResponse != null) {
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有f=字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
String substring = contentString.substring(0, contentString.indexOf("y="));
try {
Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
SessionDescription sdp = gb28181Sdp.getBaseSdb();
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
int port = -1;
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
for (Object description : mediaDescriptions) {
@ -872,6 +1015,8 @@ public class PlayServiceImpl implements IPlayService {
}
dynamicTask.stop(downLoadTimeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@ -972,6 +1117,7 @@ public class PlayServiceImpl implements IPlayService {
return streamInfo;
}
@Override
public void zlmServerOffline(String mediaServerId) {
// 处理正在向上推流的上级平台
@ -1108,14 +1254,18 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
public void getSnap(String deviceId, String channelId, String fileName,boolean isSubStream, ErrorCallback errorCallback) {
Device device = deviceService.getDevice(deviceId);
if (device == null) {
errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
return;
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
InviteInfo inviteInfo;
if(device.isSwitchPrimarySubStream()){
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
if (inviteInfo != null) {
if (inviteInfo.getStreamInfo() != null) {
// 已存在线直接截图
@ -1130,10 +1280,9 @@ public class PlayServiceImpl implements IPlayService {
// 请求截图
logger.info("[请求截图]: " + fileName);
zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
String filePath = path + File.separator + fileName;
File snapFile = new File(path + File.separator + fileName);
if (snapFile.exists()) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), filePath);
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
}else {
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
}
@ -1142,11 +1291,11 @@ public class PlayServiceImpl implements IPlayService {
}
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
play(newMediaServerItem, deviceId, channelId, (code, msg, data)->{
play(newMediaServerItem, deviceId, channelId,isSubStream, (code, msg, data)->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
getSnap(deviceId, channelId, fileName, errorCallback);
getSnap(deviceId, channelId, fileName,isSubStream, errorCallback);
}else {
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
}
@ -1156,4 +1305,17 @@ public class PlayServiceImpl implements IPlayService {
});
}
/*======================设备主子码流逻辑START=========================*/
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId,boolean isSubStream) {
String streamId = resonse.getString("stream");
JSONArray tracks = resonse.getJSONArray("tracks");
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);
streamInfo.setSubStream(isSubStream);
return streamInfo;
}
/*======================设备主子码流逻辑END=========================*/
}

View File

@ -451,6 +451,10 @@ public interface DeviceChannelMapper {
@Select("select count(1) from wvp_device_channel")
int getAllChannelCount();
// 设备主子码流逻辑START
@Update(value = {"UPDATE wvp_device_channel SET stream_id=null WHERE device_id=#{deviceId}"})
void clearPlay(String deviceId);
// 设备主子码流逻辑END
@Select(value = {" <script>" +
"select * " +
"from device_channel " +
@ -460,4 +464,5 @@ public interface DeviceChannelMapper {
" <if test='onlyCatalog == true '> and parental = 1 </if>" +
" </script>"})
List<DeviceChannel> getSubChannelsByDeviceId(String deviceId, String parentId, boolean onlyCatalog);
}

View File

@ -42,6 +42,7 @@ public interface DeviceMapper {
"geo_coord_sys," +
"on_line," +
"media_server_id," +
"switch_primary_sub_stream," +
"(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+
" FROM wvp_device WHERE device_id = #{deviceId}")
Device getDeviceByDeviceId(String deviceId);
@ -157,6 +158,7 @@ public interface DeviceMapper {
"geo_coord_sys,"+
"on_line,"+
"media_server_id,"+
"switch_primary_sub_stream switchPrimarySubStream,"+
"(SELECT count(0) FROM wvp_device_channel WHERE device_id=de.device_id) as channel_count " +
"FROM wvp_device de" +
"<if test=\"onLine != null\"> where on_line=${onLine}</if>"+
@ -246,6 +248,7 @@ public interface DeviceMapper {
"<if test=\"ssrcCheck != null\">, ssrc_check=#{ssrcCheck}</if>" +
"<if test=\"asMessageChannel != null\">, as_message_channel=#{asMessageChannel}</if>" +
"<if test=\"geoCoordSys != null\">, geo_coord_sys=#{geoCoordSys}</if>" +
"<if test=\"switchPrimarySubStream != null\">, switch_primary_sub_stream=#{switchPrimarySubStream}</if>" +
"<if test=\"mediaServerId != null\">, media_server_id=#{mediaServerId}</if>" +
"WHERE device_id=#{deviceId}"+
" </script>"})
@ -263,7 +266,8 @@ public interface DeviceMapper {
"as_message_channel,"+
"geo_coord_sys,"+
"on_line,"+
"media_server_id"+
"media_server_id,"+
"switch_primary_sub_stream"+
") VALUES (" +
"#{deviceId}," +
"#{name}," +
@ -276,7 +280,8 @@ public interface DeviceMapper {
"#{asMessageChannel}," +
"#{geoCoordSys}," +
"#{onLine}," +
"#{mediaServerId}" +
"#{mediaServerId}," +
"#{switchPrimarySubStream}" +
")")
void addCustomDevice(Device device);

View File

@ -26,7 +26,6 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.SnapPath;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation;
@ -41,7 +40,6 @@ import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.io.File;
import java.text.ParseException;
import java.util.List;
import java.util.UUID;
@ -90,16 +88,17 @@ public class PlayController {
@Operation(summary = "开始点播")
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true)
@Parameter(name = "isSubStream", description = "是否子码流true-子码流false-主码流默认为false", required = true)
@GetMapping("/start/{deviceId}/{channelId}")
public DeferredResult<WVPResult<StreamContent>> play(HttpServletRequest request, @PathVariable String deviceId,
@PathVariable String channelId) {
@PathVariable String channelId,boolean isSubStream) {
// 获取可用的zlm
Device device = storager.queryVideoDevice(deviceId);
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
String key = DeferredResultHolder.getPlayKey(deviceId,channelId,device.isSwitchPrimarySubStream(),isSubStream);
requestMessage.setKey(key);
String uuid = UUID.randomUUID().toString();
requestMessage.setId(uuid);
@ -118,7 +117,7 @@ public class PlayController {
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
playService.play(newMediaServerItem, deviceId, channelId, (code, msg, data) -> {
playService.play(newMediaServerItem, deviceId, channelId,isSubStream, (code, msg, data) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
@ -144,8 +143,9 @@ public class PlayController {
@Operation(summary = "停止点播")
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true)
@Parameter(name = "isSubStream", description = "是否子码流true-子码流false-主码流默认为false", required = true)
@GetMapping("/stop/{deviceId}/{channelId}")
public JSONObject playStop(@PathVariable String deviceId, @PathVariable String channelId) {
public JSONObject playStop(@PathVariable String deviceId, @PathVariable String channelId,boolean isSubStream) {
logger.debug(String.format("设备预览/回放停止API调用streamId%s_%s", deviceId, channelId ));
@ -158,7 +158,12 @@ public class PlayController {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在");
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
InviteInfo inviteInfo =null;
if(device.isSwitchPrimarySubStream()){
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
if (inviteInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
}
@ -171,12 +176,17 @@ public class PlayController {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if(device.isSwitchPrimarySubStream()){
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
storager.stopPlay(deviceId, channelId);
}
storager.stopPlay(deviceId, channelId);
JSONObject json = new JSONObject();
json.put("deviceId", deviceId);
json.put("channelId", channelId);
json.put("isSubStream", isSubStream);
return json;
}
@ -343,30 +353,27 @@ public class PlayController {
@Operation(summary = "获取截图")
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true)
@Parameter(name = "isSubStream", description = "是否子码流true-子码流false-主码流默认为false", required = true)
@GetMapping("/snap")
public DeferredResult<String> getSnap(HttpServletRequest request, String deviceId, String channelId) {
public DeferredResult<String> getSnap(String deviceId, String channelId,boolean isSubStream) {
if (logger.isDebugEnabled()) {
logger.debug("获取截图: {}/{}", deviceId, channelId);
}
Device device = storager.queryVideoDevice(deviceId);
DeferredResult<String> result = new DeferredResult<>(3 * 1000L);
String key = DeferredResultHolder.CALLBACK_CMD_SNAP + deviceId;
String key = DeferredResultHolder.getSnapKey(deviceId,channelId,device.isSwitchPrimarySubStream(),isSubStream);
String uuid = UUID.randomUUID().toString();
resultHolder.put(key, uuid, result);
RequestMessage message = new RequestMessage();
message.setKey(key);
message.setId(uuid);
String nowForUrl = DateUtil.getNowForUrl();
String fileName = deviceId + "_" + channelId + "_" + nowForUrl + ".jpg";
playService.getSnap(deviceId, channelId, fileName, (code, msg, data) -> {
String fileName = deviceId + "_" + channelId + "_" + DateUtil.getNowForUrl() + "jpg";
playService.getSnap(deviceId, channelId, fileName,isSubStream, (code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
File snapFile = new File((String)data);
String fileNameForUrl = deviceId + "/" + channelId + "?mark=" + nowForUrl;
String uri = request.getRequestURL().toString().replace(request.getRequestURI(), "/api/device/query/snap/" + fileNameForUrl);
SnapPath snapPath = SnapPath.getInstance((String) data, snapFile.getAbsolutePath(), uri);
message.setData(snapPath);
message.setData(data);
}else {
message.setData(WVPResult.fail(code, msg));
}

View File

@ -122,7 +122,7 @@ public class ApiStreamController {
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
playService.play(newMediaServerItem, serial, code, (errorCode, msg, data) -> {
playService.play(newMediaServerItem, serial, code,false, (errorCode, msg, data) -> {
if (errorCode == InviteErrorCode.SUCCESS.getCode()) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code);
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {

View File

@ -1,4 +1,6 @@
spring:
thymeleaf:
cache: false
# [可选]上传文件大小限制
servlet:
multipart:
@ -11,18 +13,18 @@ spring:
# [必须修改] 端口号
port: 6379
# [可选] 数据库 DB
database: 6
database: 7
# [可选] 访问密码,若你的redis服务器没有设置密码就不需要用密码去连接
password: face2020
password:
# [可选] 超时时间
timeout: 10000
# mysql数据源
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/wvp2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true&serverTimezone=PRC&useSSL=false&allowMultiQueries=true
url: jdbc:mysql://127.0.0.1:3306/test_gb-89wulian?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true&serverTimezone=PRC&useSSL=false&allowMultiQueries=true
username: root
password: 123456
password: root
hikari:
connection-timeout: 20000 # 是客户端等待连接池连接的最大毫秒数
initialSize: 10 # 连接池初始化连接数
@ -30,11 +32,19 @@ spring:
minimum-idle: 5 # 连接池最小空闲连接数
idle-timeout: 300000 # 允许连接在连接池中空闲的最长时间(以毫秒为单位)
max-lifetime: 1200000 # 是池中连接关闭后的最长生命周期(以毫秒为单位)
#[可选] WVP监听的HTTP端口, 网页和接口调用都是这个端口
server:
port: 18080
port: 18978
# [可选] HTTPS配置 默认不开启
ssl:
# [可选] 是否开启HTTPS访问
enabled: false
# [可选] 证书文件路径放置在resource/目录下即可修改xxx为文件名
key-store: classpath:test.monitor.89iot.cn.jks
# [可选] 证书密码
key-store-password: gpf64qmw
# [可选] 证书类型, 默认为jks根据实际修改
key-store-type: JKS
# 作为28181服务器的配置
sip:
@ -42,26 +52,36 @@ sip:
# 如果要监听多张网卡可以使用逗号分隔多个IP 例如: 192.168.1.4,10.0.0.4
# 如果不明白就使用0.0.0.0,大部分情况都是可以的
# 请不要使用127.0.0.1任何包括localhost在内的域名都是不可以的。
ip: 192.168.41.16
ip: 192.168.1.18
# [可选] 28181服务监听的端口
port: 5060
port: 8116
# 根据国标6.1.2中规定domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码由省级、市级、区级、基层编号组成参照GB/T 2260-2007
# 后两位为行业编码定义参照附录D.3
# 3701020049标识山东济南历下区 信息行业接入
# [可选]
domain: 4401020049
domain: 4101050000
# [可选]
id: 44010200492000000001
id: 41010500002000000001
# [可选] 默认设备认证密码,后续扩展使用设备单独密码, 移除密码将不进行校验
password: admin123
password: bajiuwulian1006
# 是否存储alarm信息
alarm: true
#zlm 默认服务器配置
media:
id: FQ3TF8yT83wh5Wvz
id: 89wulian-one
# [必须修改] zlm服务器的内网IP
ip: 192.168.41.16
ip: 192.168.1.18
# [必须修改] zlm服务器的http.port
http-port: 8091
http-port: 80
# [可选] 返回流地址时的ip置空使用 media.ip
stream-ip: 192.168.1.18
# [可选] wvp在国标信令中使用的ip此ip为摄像机可以访问到的ip 置空使用 media.ip
sdp-ip: 192.168.1.18
# [可选] zlm服务器的hook所使用的IP, 默认使用sip.ip
hook-ip: 192.168.1.18
# [可选] zlm服务器的http.sslport, 置空使用zlm配置文件配置
http-ssl-port: 443
# [可选] zlm服务器的hook.admin_params=secret
secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
# 启用多端口模式, 多端口模式使用端口区分每路流,兼容性更好。 单端口使用流的ssrc区分 点播超时建议使用多端口测试
@ -69,11 +89,24 @@ media:
# [可选] 是否启用多端口模式, 开启后会在portRange范围内选择端口用于媒体流传输
enable: true
# [可选] 在此范围内选择端口用于媒体流传输, 必须提前在zlm上配置该属性不然自动配置此属性可能不成功
port-range: 30000,30500 # 端口范围
port-range: 50000,50300 # 端口范围
# [可选] 国标级联在此范围内选择端口发送媒体流,
send-port-range: 30000,30500 # 端口范围
send-port-range: 50000,50300 # 端口范围
# 录像辅助服务, 部署此服务可以实现zlm录像的管理与下载 0 表示不使用
record-assist-port: 18081
# [根据业务需求配置]
user-settings:
# 点播/录像回放 等待超时时间,单位:毫秒
play-timeout: 180000
# [可选] 自动点播, 使用固定流地址进行播放时,如果未点播则自动进行点播, 需要rtp.enable=true
auto-apply-play: true
# 设备/通道状态变化时发送消息
device-status-notify: true
# 跨域配置,配置你访问前端页面的地址即可, 可以配置多个
allowed-origins:
- http://localhost:8080
- http://127.0.0.1:8080
# [可选] 日志配置, 一般不需要改
logging:
config: classpath:logback-spring-local.xml

View File

@ -2,4 +2,4 @@ spring:
application:
name: wvp
profiles:
active: local
active: dev

View File

@ -12,14 +12,14 @@ module.exports = {
assetsPublicPath: '/',
proxyTable: {
'/debug': {
target: 'http://localhost:18080',
target: 'http://localhost:18978',
changeOrigin: true,
pathRewrite: {
'^/debug': '/'
}
},
'/static/snap': {
target: 'http://localhost:18080',
target: 'http://localhost:18978',
changeOrigin: true,
// pathRewrite: {
// '^/static/snap': '/static/snap'

View File

@ -26,6 +26,12 @@
<el-option label="在线" value="true"></el-option>
<el-option label="离线" value="false"></el-option>
</el-select>
清晰度:
<el-select size="mini" style="margin-right: 1rem;" @change="search" v-model="isSubStream" placeholder="请选择"
default-first-option>
<el-option label="原画" :value="false"></el-option>
<el-option label="流畅" :value="true"></el-option>
</el-select>
</div>
<el-button icon="el-icon-refresh-right" circle size="mini" @click="refresh()"></el-button>
<el-button v-if="showTree" icon="iconfont icon-list" circle size="mini" @click="switchList()"></el-button>
@ -146,6 +152,7 @@ export default {
searchSrt: "",
channelType: "",
online: "",
isSubStream: false,
winHeight: window.innerHeight - 200,
currentPage: 1,
count: 15,
@ -237,7 +244,10 @@ export default {
let that = this;
this.$axios({
method: 'get',
url: '/api/play/start/' + deviceId + '/' + channelId
url: '/api/play/start/' + deviceId + '/' + channelId,
params:{
isSubStream: this.isSubStream
}
}).then(function (res) {
console.log(res)
that.isLoging = false;
@ -277,7 +287,10 @@ export default {
var that = this;
this.$axios({
method: 'get',
url: '/api/play/stop/' + this.deviceId + "/" + itemData.channelId
url: '/api/play/stop/' + this.deviceId + "/" + itemData.channelId,
params:{
isSubStream: this.isSubStream
}
}).then(function (res) {
that.initData();
}).catch(function (error) {

View File

@ -58,6 +58,12 @@
<el-form-item v-if="form.subscribeCycleForMobilePosition > 0" label="移动位置报送间隔" prop="subscribeCycleForCatalog" >
<el-input v-model="form.mobilePositionSubmissionInterval" clearable ></el-input>
</el-form-item>
<el-form-item label="主子码流开关" prop="switchPrimarySubStream" >
<el-select v-model="form.switchPrimarySubStream" style="float: left; width: 100%" >
<el-option key="true" label="开启" :value="true"></el-option>
<el-option key="false" label="关闭" :value="false"></el-option>
</el-select>
</el-form-item>
<el-form-item label="其他选项">
<el-checkbox label="SSRC校验" v-model="form.ssrcCheck" style="float: left"></el-checkbox>
<el-checkbox label="作为消息通道" v-model="form.asMessageChannel" style="float: left"></el-checkbox>