增加上级点播停止后通知设备停止推流功能,并自动与本地播放协同

This commit is contained in:
lawrencehj 2021-03-14 21:20:47 +08:00
parent bf22ca6dbb
commit dc6b76b4db
6 changed files with 93 additions and 19 deletions

View File

@ -156,6 +156,7 @@ public class SIPProcessorFactory {
processor.setRequestEvent(evt);
processor.setRedisCatchStorage(redisCatchStorage);
processor.setZlmrtpServerFactory(zlmrtpServerFactory);
processor.setSIPCommander(cmder);
return processor;
} else if (Request.CANCEL.equals(method)) {
CancelRequestProcessor processor = new CancelRequestProcessor();

View File

@ -1,29 +1,38 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sip.address.SipURI;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.apache.log4j.Logger;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: BYE请求处理器
* @author: swwheihei
* @date: 2020年5月3日 下午5:32:05
* @author: lawrencehj
* @date: 2021年3月9日
*/
public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
private IRedisCatchStorage redisCatchStorage;
private ISIPCommander cmder;
private IRedisCatchStorage redisCatchStorage;
private ZLMRTPServerFactory zlmrtpServerFactory;
@ -38,10 +47,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
Dialog dialog = evt.getDialog();
if (dialog == null) return;
if (dialog.getState().equals(DialogState.TERMINATED)) {
String remoteUri = dialog.getRemoteParty().getURI().toString();
String localUri = dialog.getLocalParty().getURI().toString();
String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
@ -50,6 +57,11 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
param.put("stream",streamId);
System.out.println("停止向上级推流:" + streamId);
zlmrtpServerFactory.stopSendRtpStream(param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
System.out.println(streamId + "无其它观看者,通知设备停止推流");
cmder.streamByeCmd(streamId);
}
}
} catch (SipException e) {
e.printStackTrace();
@ -58,8 +70,6 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
} catch (ParseException e) {
e.printStackTrace();
}
// TODO 优先级99 Bye Request消息实现此消息一般为级联消息上级给下级发送视频停止指令
}
/***
@ -89,4 +99,13 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
this.zlmrtpServerFactory = zlmrtpServerFactory;
}
public ISIPCommander getSIPCommander() {
return cmder;
}
public void setSIPCommander(ISIPCommander cmder) {
this.cmder = cmder;
}
}

View File

@ -267,20 +267,25 @@ public class ZLMHttpHookListener {
}
String streamId = json.getString("stream");
cmder.streamByeCmd(streamId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
if (streamInfo!=null){
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
}else{
streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
redisCatchStorage.stopPlayback(streamInfo);
}
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("close", true);
if (streamInfo != null) {
if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
ret.put("close", false);
} else {
cmder.streamByeCmd(streamId);
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
}
}else{
cmder.streamByeCmd(streamId);
streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
redisCatchStorage.stopPlayback(streamInfo);
}
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
}

View File

@ -152,6 +152,16 @@ public class ZLMRTPServerFactory {
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
}
/**
* 查询转推的流是否有其它观看者
* @param streamId
* @return
*/
public int totalReaderCount(String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
return mediaInfo.getInteger("totalReaderCount");
}
/**
* 调用zlm RESTful API stopSendRtp
*/

View File

@ -89,4 +89,17 @@ public interface IRedisCatchStorage {
*/
SendRtpItem querySendRTPServer(String platformGbId, String channelId);
/**
* 删除RTP推送信息缓存
* @param platformGbId
* @param channelId
*/
void deleteSendRTPServer(String platformGbId, String channelId);
/**
* 查询某个通道是否存在上级点播RTP推送
* @param channelId
*/
boolean isChannelSendingRTP(String channelId);
}

View File

@ -225,4 +225,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return (SendRtpItem)redis.get(key);
}
/**
* 删除RTP推送信息缓存
* @param platformGbId
* @param channelId
*/
@Override
public void deleteSendRTPServer(String platformGbId, String channelId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId;
redis.del(key);
}
/**
* 查询某个通道是否存在上级点播RTP推送
* @param channelId
*/
@Override
public boolean isChannelSendingRTP(String channelId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId;
List<Object> RtpStreams = redis.scan(key);
if (RtpStreams.size() > 0) {
return true;
} else {
return false;
}
}
}