hook订阅曾增加过期清除功能,防止内存溢出

This commit is contained in:
648540858 2022-08-31 13:09:45 +08:00
parent e4bd61860d
commit 06dc8da1c4
12 changed files with 73 additions and 77 deletions

View File

@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@ -93,7 +93,7 @@ public interface ISIPCommander {
* @param device 视频设备 * @param device 视频设备
* @param channelId 预览通道 * @param channelId 预览通道
*/ */
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent); void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent);
/** /**
* 请求回放视频流 * 请求回放视频流

View File

@ -13,10 +13,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils; import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@ -34,19 +33,15 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.*; import javax.sip.*;
import javax.sip.address.Address; import javax.sip.address.Address;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.address.URI;
import javax.sip.header.*; import javax.sip.header.*;
import javax.sip.message.Request; import javax.sip.message.Request;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
/** /**
* @description:设备能力接口用于定义设备的控制查询能力 * @description:设备能力接口用于定义设备的控制查询能力
@ -89,7 +84,7 @@ public class SIPCommander implements ISIPCommander {
private UserSetting userSetting; private UserSetting userSetting;
@Autowired @Autowired
private ZLMHttpHookSubscribe subscribe; private ZlmHttpHookSubscribe subscribe;
@Autowired @Autowired
private SipSubscribe sipSubscribe; private SipSubscribe sipSubscribe;
@ -352,7 +347,7 @@ public class SIPCommander implements ISIPCommander {
*/ */
@Override @Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
String stream = ssrcInfo.getStream(); String stream = ssrcInfo.getStream();
try { try {
if (device == null) { if (device == null) {

View File

@ -1,10 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@ -12,7 +9,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
@ -21,7 +18,6 @@ import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils; import com.genersoft.iot.vmp.utils.SerializeUtils;
import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -69,7 +65,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired @Autowired
private ZLMHttpHookSubscribe subscribe; private ZlmHttpHookSubscribe subscribe;
@Autowired @Autowired
private DynamicTask dynamicTask; private DynamicTask dynamicTask;

View File

@ -13,7 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -40,7 +40,6 @@ import org.springframework.stereotype.Component;
import javax.sdp.*; import javax.sdp.*;
import javax.sip.*; import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
@ -307,7 +306,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Long finalStartTime = startTime; Long finalStartTime = startTime;
Long finalStopTime = stopTime; Long finalStopTime = stopTime;
ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
String app = responseJSON.getString("app"); String app = responseJSON.getString("app");
String stream = responseJSON.getString("stream"); String stream = responseJSON.getString("stream");
logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP) {}/{}", app, stream); logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP) {}/{}", app, stream);

View File

@ -22,9 +22,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@ -81,7 +79,7 @@ public class ZLMHttpHookListener {
private ZLMMediaListManager zlmMediaListManager; private ZLMMediaListManager zlmMediaListManager;
@Autowired @Autowired
private ZLMHttpHookSubscribe subscribe; private ZlmHttpHookSubscribe subscribe;
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@ -109,9 +107,9 @@ public class ZLMHttpHookListener {
logger.info("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString()); logger.info("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
String mediaServerId = json.getString("mediaServerId"); String mediaServerId = json.getString("mediaServerId");
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) { if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json); subscribe.response(null, json);
} }
} }
@ -175,7 +173,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_play API调用参数" + JSON.toJSONString(param)); logger.debug("[ ZLM HOOK ]on_play API调用参数" + JSON.toJSONString(param));
} }
String mediaServerId = param.getMediaServerId(); String mediaServerId = param.getMediaServerId();
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
if (subscribe != null ) { if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) { if (mediaInfo != null) {
@ -263,7 +261,7 @@ public class ZLMHttpHookListener {
} }
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) { if (subscribe != null) {
if (mediaInfo != null) { if (mediaInfo != null) {
subscribe.response(mediaInfo, json); subscribe.response(mediaInfo, json);
@ -387,7 +385,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_shell_login API调用参数" + json.toString()); logger.debug("[ ZLM HOOK ]on_shell_login API调用参数" + json.toString());
} }
String mediaServerId = json.getString("mediaServerId"); String mediaServerId = json.getString("mediaServerId");
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
if (subscribe != null ) { if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) { if (mediaInfo != null) {
@ -413,7 +411,7 @@ public class ZLMHttpHookListener {
logger.info("[ ZLM HOOK ]on_stream_changed API调用参数" + JSONObject.toJSONString(item)); logger.info("[ ZLM HOOK ]on_stream_changed API调用参数" + JSONObject.toJSONString(item));
String mediaServerId = item.getMediaServerId(); String mediaServerId = item.getMediaServerId();
JSONObject json = (JSONObject) JSON.toJSON(item); JSONObject json = (JSONObject) JSON.toJSON(item);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
if (subscribe != null ) { if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) { if (mediaInfo != null) {
@ -635,9 +633,9 @@ public class ZLMHttpHookListener {
} }
String remoteAddr = request.getRemoteAddr(); String remoteAddr = request.getRemoteAddr();
jsonObject.put("ip", remoteAddr); jsonObject.put("ip", remoteAddr);
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started); List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
if (subscribes != null && subscribes.size() > 0) { if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, jsonObject); subscribe.response(null, jsonObject);
} }
} }

View File

@ -1,30 +1,24 @@
package com.genersoft.iot.vmp.media.zlm; package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import org.checkerframework.checker.units.qual.C;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* @author lin * @author lin
@ -59,7 +53,7 @@ public class ZLMMediaListManager {
private StreamPushMapper streamPushMapper; private StreamPushMapper streamPushMapper;
@Autowired @Autowired
private ZLMHttpHookSubscribe subscribe; private ZlmHttpHookSubscribe subscribe;
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;

View File

@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -19,9 +18,7 @@ import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
@Component @Component
@Order(value=1) @Order(value=1)
@ -35,7 +32,7 @@ public class ZLMRunner implements CommandLineRunner {
private ZLMRESTfulUtils zlmresTfulUtils; private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired @Autowired
private ZLMHttpHookSubscribe hookSubscribe; private ZlmHttpHookSubscribe hookSubscribe;
@Autowired @Autowired
private EventPublisher publisher; private EventPublisher publisher;
@ -62,8 +59,6 @@ public class ZLMRunner implements CommandLineRunner {
} }
mediaServerService.syncCatchFromDatabase(); mediaServerService.syncCatchFromDatabase();
HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started(); HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
// hookSubscribeForStreamChange.setExpires(expiresInstant);
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统 // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForServerStarted, hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
(MediaServerItem mediaServerItem, JSONObject response)->{ (MediaServerItem mediaServerItem, JSONObject response)->{

View File

@ -4,6 +4,9 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -13,21 +16,22 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* @description:针对 ZLMediaServer的hook事件订阅 * ZLMediaServer的hook事件订阅
* @author: pan * @author lin
* @date: 2020年12月2日 21:17:32
*/ */
@Component @Component
public class ZLMHttpHookSubscribe { public class ZlmHttpHookSubscribe {
private final static Logger logger = LoggerFactory.getLogger(ZlmHttpHookSubscribe.class);
@FunctionalInterface @FunctionalInterface
public interface Event{ public interface Event{
void response(MediaServerItem mediaServerItem, JSONObject response); void response(MediaServerItem mediaServerItem, JSONObject response);
} }
private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>(); private Map<HookType, Map<IHookSubscribe, ZlmHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) { public void addSubscribe(IHookSubscribe hookSubscribe, ZlmHttpHookSubscribe.Event event) {
if (hookSubscribe.getExpires() == null) { if (hookSubscribe.getExpires() == null) {
// 默认5分钟过期 // 默认5分钟过期
Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5)); Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
@ -36,8 +40,8 @@ public class ZLMHttpHookSubscribe {
allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
} }
public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
ZLMHttpHookSubscribe.Event event= null; ZlmHttpHookSubscribe.Event event= null;
Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) { if (eventMap == null) {
return null; return null;
@ -69,8 +73,8 @@ public class ZLMHttpHookSubscribe {
Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet(); Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
if (entries.size() > 0) { if (entries.size() > 0) {
List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>(); List<Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) { for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entries) {
JSONObject content = entry.getKey().getContent(); JSONObject content = entry.getKey().getContent();
if (content == null || content.size() == 0) { if (content == null || content.size() == 0) {
entriesToRemove.add(entry); entriesToRemove.add(entry);
@ -87,13 +91,13 @@ public class ZLMHttpHookSubscribe {
result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s)); result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
} }
} }
if (null != result && result){ if (result){
entriesToRemove.add(entry); entriesToRemove.add(entry);
} }
} }
if (!CollectionUtils.isEmpty(entriesToRemove)) { if (!CollectionUtils.isEmpty(entriesToRemove)) {
for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) { for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entriesToRemove) {
entries.remove(entry); entries.remove(entry);
} }
} }
@ -106,12 +110,12 @@ public class ZLMHttpHookSubscribe {
* @param type * @param type
* @return * @return
*/ */
public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) { public List<ZlmHttpHookSubscribe.Event> getSubscribes(HookType type) {
Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type); Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) { if (eventMap == null) {
return null; return null;
} }
List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>(); List<ZlmHttpHookSubscribe.Event> result = new ArrayList<>();
for (IHookSubscribe key : eventMap.keySet()) { for (IHookSubscribe key : eventMap.keySet()) {
result.add(eventMap.get(key)); result.add(eventMap.get(key));
} }
@ -127,5 +131,28 @@ public class ZLMHttpHookSubscribe {
return result; return result;
} }
/**
* 对订阅数据进行过期清理
*/
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
public void execute(){
logger.info("[hook订阅] 清理");
Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
int total = 0;
for (HookType hookType : allSubscribes.keySet()) {
Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType);
if (hookSubscribeEventMap.size() > 0) {
for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
if (hookSubscribe.getExpires().isBefore(instant)) {
// 过期的
hookSubscribeEventMap.remove(hookSubscribe);
total ++;
}
}
}
}
logger.info("[hook订阅] 清理结束,共清理{}条过期数据", total);
}
} }

View File

@ -6,14 +6,13 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
/** /**
@ -24,9 +23,9 @@ public interface IPlayService {
void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
InviteTimeOutCallback timeoutCallback, String uuid); InviteTimeOutCallback timeoutCallback, String uuid);
PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
MediaServerItem getNewMediaServerItem(Device device); MediaServerItem getNewMediaServerItem(Device device);

View File

@ -39,7 +39,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
@ -99,7 +99,7 @@ public class PlayServiceImpl implements IPlayService {
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
@Autowired @Autowired
private ZLMHttpHookSubscribe subscribe; private ZlmHttpHookSubscribe subscribe;
@Qualifier("taskExecutor") @Qualifier("taskExecutor")
@ -110,7 +110,7 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback) { Runnable timeoutCallback) {
if (mediaServerItem == null) { if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
@ -231,8 +231,8 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
InviteTimeOutCallback timeoutCallback, String uuid) { InviteTimeOutCallback timeoutCallback, String uuid) {
String streamId = null; String streamId = null;
if (mediaServerItem.isRtpEnable()) { if (mediaServerItem.isRtpEnable()) {

View File

@ -5,12 +5,11 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.bean.*;
@ -24,9 +23,6 @@ import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -86,7 +82,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
private ZLMMediaListManager mediaListManager; private ZLMMediaListManager mediaListManager;
@Autowired @Autowired
private ZLMHttpHookSubscribe subscribe; private ZlmHttpHookSubscribe subscribe;
public interface PlayMsgCallback{ public interface PlayMsgCallback{

View File

@ -8,24 +8,21 @@ import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.utils.SpringBeanFactory; import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.SipStackImpl;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import org.ehcache.xml.model.ThreadPoolsType;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.sip.ListeningPoint; import javax.sip.ListeningPoint;
@ -42,7 +39,7 @@ import java.util.List;
public class ServerController { public class ServerController {
@Autowired @Autowired
private ZLMHttpHookSubscribe zlmHttpHookSubscribe; private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;