diff --git a/pom.xml b/pom.xml index 2ac45ec2..ae4d6666 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,13 @@ 8.0.22 + + + org.xerial + sqlite-jdbc + 3.32.3.2 + + org.mybatis diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 92ba204a..e171297c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -138,16 +138,25 @@ public class SipLayer implements SipListener { // TODO Auto-generated catch block e.printStackTrace(); } + if (evt.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { + CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); + if (subscribe != null) { + subscribe.response(evt); + } + } + } // } else if (status == Response.TRYING) { // trying不会回复 } else if ((status >= 100) && (status < 200)) { // 增加其它无需回复的响应,如101、180等 } else { logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); - if (evt.getResponse() != null && sipSubscribe.getSize() > 0 ) { + if (evt.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getSubscribe(callIdHeader.getCallId()); + SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); if (subscribe != null) { subscribe.response(evt); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index de52ac69..12b8a007 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -1,11 +1,17 @@ package com.genersoft.iot.vmp.gb28181.bean; +import java.util.Date; import java.util.List; import java.util.Map; public class Device { + /** + * 数据库存储ID + */ + private int id; + /** * 设备Id */ @@ -55,14 +61,24 @@ public class Device { */ private int online; - /** - * 通道列表 - */ -// private Map channelMap; + /** + * 注册时间 + */ + private Long registerTimeMillis; + + /** + * 通道个数 + */ private int channelCount; - private List channelList; + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } public String getDeviceId() { return deviceId; @@ -144,11 +160,11 @@ public class Device { this.channelCount = channelCount; } - public List getChannelList() { - return channelList; + public Long getRegisterTimeMillis() { + return registerTimeMillis; } - public void setChannelList(List channelList) { - this.channelList = channelList; + public void setRegisterTimeMillis(Long registerTimeMillis) { + this.registerTimeMillis = registerTimeMillis; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 19e9eda9..ca6ef60f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -2,10 +2,17 @@ package com.genersoft.iot.vmp.gb28181.bean; public class DeviceChannel { + + /** * 通道id */ private String channelId; + + /** + * 设备id + */ + private String deviceId; /** * 通道名 @@ -146,13 +153,15 @@ public class DeviceChannel { /** * 是否含有音频 */ - private boolean hasAudio; + private boolean hasAudio; - /** - * 是否正在播放 - */ - private boolean play; + public String getDeviceId() { + return deviceId; + } + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } public void setPTZType(int PTZType) { this.PTZType = PTZType; @@ -387,14 +396,6 @@ public class DeviceChannel { this.hasAudio = hasAudio; } - public boolean isPlay() { - return play; - } - - public void setPlay(boolean play) { - this.play = play; - } - public String getStreamId() { return streamId; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index 1f78df44..176a435e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -17,21 +17,34 @@ public class SipSubscribe { private final static Logger logger = LoggerFactory.getLogger(SipSubscribe.class); - private Map allSubscribes = new ConcurrentHashMap<>(); + private Map errorSubscribes = new ConcurrentHashMap<>(); + + private Map okSubscribes = new ConcurrentHashMap<>(); public interface Event { void response(ResponseEvent event); } - public void addSubscribe(String key, SipSubscribe.Event event) { - allSubscribes.put(key, event); + public void addErrorSubscribe(String key, SipSubscribe.Event event) { + errorSubscribes.put(key, event); } - public SipSubscribe.Event getSubscribe(String key) { - return allSubscribes.get(key); + public void addOkSubscribe(String key, SipSubscribe.Event event) { + okSubscribes.put(key, event); } - public int getSize(){ - return allSubscribes.size(); + public SipSubscribe.Event getErrorSubscribe(String key) { + return errorSubscribes.get(key); + } + + public SipSubscribe.Event getOkSubscribe(String key) { + return okSubscribes.get(key); + } + + public int getErrorSubscribesSize(){ + return errorSubscribes.size(); + } + public int getOkSubscribesSize(){ + return okSubscribes.size(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java index d37259ac..b50cc957 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java @@ -4,13 +4,10 @@ import javax.sip.RequestEvent; import javax.sip.ResponseEvent; import javax.sip.SipProvider; import javax.sip.header.CSeqHeader; -import javax.sip.header.CallIdHeader; -import javax.sip.header.Header; import javax.sip.message.Request; import javax.sip.message.Response; -import com.alibaba.fastjson.JSON; -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -59,6 +56,9 @@ public class SIPProcessorFactory { @Autowired private IVideoManagerStorager storager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; @Autowired private EventPublisher publisher; @@ -143,6 +143,7 @@ public class SIPProcessorFactory { processor.setOffLineDetector(offLineDetector); processor.setCmder(cmder); processor.setStorager(storager); + processor.setRedisCatchStorage(redisCatchStorage); return processor; } else { return new OtherRequestProcessor(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index 692e31e5..5fd8cbc9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -25,6 +25,8 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_PlAY = "CALLBACK_PLAY"; + public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP"; + private Map map = new ConcurrentHashMap(); public void put(String key, DeferredResult result) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 732b2cdc..67fd9967 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -101,8 +101,9 @@ public interface ISIPCommander { * * @param ssrc ssrc */ + void streamByeCmd(String ssrc, SipSubscribe.Event okEvent); void streamByeCmd(String ssrc); - + /** * 语音广播 * diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 3f0adfd9..af9030b9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import java.text.ParseException; +import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -12,11 +13,13 @@ import javax.sip.header.ViaHeader; import javax.sip.message.Request; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +56,9 @@ public class SIPCommander implements ISIPCommander { @Autowired private IVideoManagerStorager storager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; @Autowired @Qualifier(value="tcpSipProvider") @@ -229,7 +235,7 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag"); - transmitRequest(device, request, null); + transmitRequest(device, request); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -264,7 +270,7 @@ public class SIPCommander implements ISIPCommander { ptzXml.append("\r\n"); Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtzTag", "ToPtzTag"); - transmitRequest(device, request, null); + transmitRequest(device, request); return true; } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -291,7 +297,7 @@ public class SIPCommander implements ISIPCommander { streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); } String streamMode = device.getStreamMode().toUpperCase(); - MediaServerConfig mediaInfo = storager.getMediaInfo(); + MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); if (mediaInfo == null) { logger.warn("点播时发现ZLM尚未连接..."); return; @@ -344,6 +350,9 @@ public class SIPCommander implements ISIPCommander { } content.append("y="+ssrc+"\r\n");//ssrc +// String fromTag = UUID.randomUUID().toString(); +// Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, fromTag, null, ssrc); + Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "live", null, ssrc); ClientTransaction transaction = transmitRequest(device, request, errorEvent); @@ -372,7 +381,7 @@ public class SIPCommander implements ISIPCommander { public void playbackStreamCmd(Device device, String channelId, String startTime, String endTime, ZLMHttpHookSubscribe.Event event , SipSubscribe.Event errorEvent) { try { - MediaServerConfig mediaInfo = storager.getMediaInfo(); + MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); String ssrc = streamSession.createPlayBackSsrc(); String streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); // 添加订阅 @@ -457,17 +466,28 @@ public class SIPCommander implements ISIPCommander { e.printStackTrace(); } } - + + + /** * 视频流停止 * */ @Override - public void streamByeCmd(String streamId) { + public void streamByeCmd(String ssrc) { + streamByeCmd(ssrc, null); + } + @Override + public void streamByeCmd(String streamId, SipSubscribe.Event okEvent) { try { ClientTransaction transaction = streamSession.get(streamId); + // 服务重启后 if (transaction == null) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); + if (streamInfo != null) { + + } return; } @@ -475,6 +495,9 @@ public class SIPCommander implements ISIPCommander { if (dialog == null) { return; } + + + Request byeRequest = dialog.createRequest(Request.BYE); SipURI byeURI = (SipURI) byeRequest.getRequestURI(); String vh = transaction.getRequest().getHeader(ViaHeader.NAME).toString(); @@ -491,7 +514,14 @@ public class SIPCommander implements ISIPCommander { } else if("UDP".equals(protocol)) { clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); } + + CallIdHeader callIdHeader = (CallIdHeader) byeRequest.getHeader(CallIdHeader.NAME); + if (okEvent != null) { + sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); + } + dialog.sendRequest(clientTransaction); + streamSession.remove(streamId); zlmrtpServerFactory.closeRTPServer(streamId); } catch (TransactionDoesNotExistException e) { @@ -612,7 +642,7 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDeviceInfoTag", "ToDeviceInfoTag"); - transmitRequest(device, request, null); + transmitRequest(device, request); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -676,7 +706,7 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "ViaRecordInfoBranch", "FromRecordInfoTag", null); - transmitRequest(device, request, null); + transmitRequest(device, request); } catch (SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); return false; @@ -727,8 +757,16 @@ public class SIPCommander implements ISIPCommander { // TODO Auto-generated method stub return false; } - + + private ClientTransaction transmitRequest(Device device, Request request) throws SipException { + return transmitRequest(device, request, null, null); + } + private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent) throws SipException { + return transmitRequest(device, request, errorEvent, null); + } + + private ClientTransaction transmitRequest(Device device, Request request, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException { ClientTransaction clientTransaction = null; if("TCP".equals(device.getTransport())) { clientTransaction = tcpSipProvider.getNewClientTransaction(request); @@ -736,10 +774,14 @@ public class SIPCommander implements ISIPCommander { clientTransaction = udpSipProvider.getNewClientTransaction(request); } - // 添加订阅 + CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); + // 添加错误订阅 if (errorEvent != null) { - CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); - sipSubscribe.addSubscribe(callIdHeader.getCallId(), errorEvent); + sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), errorEvent); + } + // 添加订阅 + if (okEvent != null) { + sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent); } clientTransaction.sendRequest(); @@ -747,6 +789,8 @@ public class SIPCommander implements ISIPCommander { } + + @Override public void closeRTPServer(Device device, String channelId) { if (rtpEnable) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index 7b793c09..cab4a9bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -10,6 +10,7 @@ import javax.sip.SipException; import javax.sip.message.Request; import javax.sip.message.Response; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -48,6 +49,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { private IVideoManagerStorager storager; + private IRedisCatchStorage redisCatchStorage; + private EventPublisher publisher; private RedisUtil redis; @@ -451,9 +454,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { String NotifyType =XmlUtil.getText(rootElement, "NotifyType"); if (NotifyType.equals("121")){ logger.info("媒体播放完毕,通知关流"); - StreamInfo streamInfo = storager.queryPlaybackByDevice(deviceId, "*"); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*"); if (streamInfo != null) { - storager.stopPlayback(streamInfo); + redisCatchStorage.stopPlayback(streamInfo); cmder.streamByeCmd(streamInfo.getStreamId()); } } @@ -507,4 +510,11 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { this.offLineDetector = offLineDetector; } + public IRedisCatchStorage getRedisCatchStorage() { + return redisCatchStorage; + } + + public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { + this.redisCatchStorage = redisCatchStorage; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java index be076bd8..bcd44825 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/RegisterRequestProcessor.java @@ -141,9 +141,15 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor { // 下发catelog查询目录 if (registerFlag == 1 && device != null) { logger.info("注册成功! deviceId:" + device.getDeviceId()); + boolean exists = storager.exists(device.getDeviceId()); + device.setRegisterTimeMillis(System.currentTimeMillis()); storager.updateDevice(device); publisher.onlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_ONLINE_REGISTER); - handler.onRegister(device); + + // 只有第一次注册才更新通道 + if (!exists) { + handler.onRegister(device); + } } else if (registerFlag == 2) { logger.info("注销成功! deviceId:" + device.getDeviceId()); publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHTTPProxyController.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHTTPProxyController.java index f76cdd90..9daef230 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHTTPProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHTTPProxyController.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaServerConfig; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,9 @@ public class ZLMHTTPProxyController { @Autowired private IVideoManagerStorager storager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Value("${media.port}") private int mediaHttpPort; @@ -36,10 +40,10 @@ public class ZLMHTTPProxyController { @RequestMapping(value = "/**/**/**", produces = "application/json;charset=UTF-8") public Object proxy(HttpServletRequest request, HttpServletResponse response){ - if (storager.getMediaInfo() == null) { + if (redisCatchStorage.getMediaInfo() == null) { return "未接入流媒体"; } - MediaServerConfig mediaInfo = storager.getMediaInfo(); + MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); String requestURI = String.format("http://%s:%s%s?%s&%s", mediaInfo.getLocalIP(), mediaHttpPort, diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 1116ae5a..cb8ad055 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -11,6 +11,7 @@ import com.alibaba.fastjson.JSONArray; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.IpUtil; import com.genersoft.iot.vmp.vmanager.service.IPlayService; @@ -52,6 +53,9 @@ public class ZLMHttpHookListener { @Autowired private IVideoManagerStorager storager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -249,13 +253,13 @@ public class ZLMHttpHookListener { String app = json.getString("app"); String streamId = json.getString("stream"); boolean regist = json.getBoolean("regist"); - StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if ("rtp".equals(app) && !regist ) { if (streamInfo!=null){ - storager.stopPlay(streamInfo); + redisCatchStorage.stopPlay(streamInfo); }else{ - streamInfo = storager.queryPlaybackByStreamId(streamId); - storager.stopPlayback(streamInfo); + streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + redisCatchStorage.stopPlayback(streamInfo); } } @@ -281,12 +285,12 @@ public class ZLMHttpHookListener { String streamId = json.getString("stream"); cmder.streamByeCmd(streamId); - StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo!=null){ - storager.stopPlay(streamInfo); + redisCatchStorage.stopPlay(streamInfo); }else{ - streamInfo = storager.queryPlaybackByStreamId(streamId); - storager.stopPlayback(streamInfo); + streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + redisCatchStorage.stopPlayback(streamInfo); } JSONObject ret = new JSONObject(); @@ -311,7 +315,7 @@ public class ZLMHttpHookListener { if (autoApplyPlay) { String app = json.getString("app"); String streamId = json.getString("stream"); - StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if ("rtp".equals(app) && streamId.indexOf("gb_play") > -1 && streamInfo == null) { String[] s = streamId.split("_"); if (s.length == 4) { @@ -355,7 +359,7 @@ public class ZLMHttpHookListener { // MediaServerConfig mediaServerConfig = mediaServerConfigs.get(0); MediaServerConfig mediaServerConfig = JSON.toJavaObject(json, MediaServerConfig.class); mediaServerConfig.setLocalIP(mediaIp); - storager.updateMediaInfo(mediaServerConfig); + redisCatchStorage.updateMediaInfo(mediaServerConfig); // TODO Auto-generated method stub JSONObject ret = new JSONObject(); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 3f88b2a3..282699f9 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaServerConfig; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import okhttp3.*; import org.slf4j.Logger; @@ -30,6 +31,9 @@ public class ZLMRunner implements CommandLineRunner { @Autowired private IVideoManagerStorager storager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Value("${media.ip}") private String mediaIp; @@ -69,7 +73,7 @@ public class ZLMRunner implements CommandLineRunner { logger.info("zlm接入成功..."); if (autoConfig) saveZLMConfig(); mediaServerConfig = getMediaServerConfig(); - storager.updateMediaInfo(mediaServerConfig); + redisCatchStorage.updateMediaInfo(mediaServerConfig); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java new file mode 100644 index 00000000..8bc78b93 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -0,0 +1,58 @@ +package com.genersoft.iot.vmp.storager; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.MediaServerConfig; + +import java.util.Map; + +public interface IRedisCatchStorage { + + /** + * 开始播放时将流存入 + * + * @param stream 流信息 + * @return + */ + boolean startPlay(StreamInfo stream); + + + /** + * 停止播放时删除 + * + * @return + */ + boolean stopPlay(StreamInfo streamInfo); + + /** + * 查询播放列表 + * @return + */ + StreamInfo queryPlay(StreamInfo streamInfo); + + StreamInfo queryPlayByStreamId(String steamId); + + StreamInfo queryPlaybackByStreamId(String steamId); + + StreamInfo queryPlayByDevice(String deviceId, String code); + + /** + * 更新流媒体信息 + * @param mediaServerConfig + * @return + */ + boolean updateMediaInfo(MediaServerConfig mediaServerConfig); + + /** + * 获取流媒体信息 + * @return + */ + MediaServerConfig getMediaInfo(); + + Map queryPlayByDeviceId(String deviceId); + + boolean startPlayback(StreamInfo stream); + + boolean stopPlayback(StreamInfo streamInfo); + + StreamInfo queryPlaybackByDevice(String deviceId, String code); +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index e85d1ff8..4174507b 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -17,19 +17,6 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; */ public interface IVideoManagerStorager { - /** - * 更新流媒体信息 - * @param mediaServerConfig - * @return - */ - public boolean updateMediaInfo(MediaServerConfig mediaServerConfig); - - /** - * 获取流媒体信息 - * @return - */ - public MediaServerConfig getMediaInfo(); - /** * 根据设备ID判断设备是否存在 * @@ -106,10 +93,9 @@ public interface IVideoManagerStorager { /** * 获取多个设备 * - * @param deviceIds 设备ID数组 * @return List 设备对象数组 */ - public List queryVideoDeviceList(String[] deviceIds); + public List queryVideoDeviceList(); /** * 删除设备 @@ -135,27 +121,6 @@ public interface IVideoManagerStorager { */ public boolean outline(String deviceId); - /** - * 开始播放时将流存入 - * - * @param stream 流信息 - * @return - */ - public boolean startPlay(StreamInfo stream); - - /** - * 停止播放时删除 - * - * @return - */ - public boolean stopPlay(StreamInfo streamInfo); - - /** - * 查找视频流 - * - * @return - */ - public StreamInfo queryPlay(StreamInfo streamInfo); /** * 查询子设备 @@ -168,10 +133,6 @@ public interface IVideoManagerStorager { */ PageResult querySubChannels(String deviceId, String channelId, String query, Boolean hasSubChannel, String online, int page, int count); - /** - * 更新缓存 - */ - public void updateCatch(); /** * 清空通道 @@ -179,17 +140,4 @@ public interface IVideoManagerStorager { */ void cleanChannelsForDevice(String deviceId); - StreamInfo queryPlayByStreamId(String streamId); - - StreamInfo queryPlayByDevice(String deviceId, String code); - - Map queryPlayByDeviceId(String deviceId); - - boolean startPlayback(StreamInfo streamInfo); - - boolean stopPlayback(StreamInfo streamInfo); - - StreamInfo queryPlaybackByDevice(String deviceId, String channelId); - - StreamInfo queryPlaybackByStreamId(String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerFactory.java b/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerFactory.java deleted file mode 100644 index 70bdad76..00000000 --- a/src/main/java/com/genersoft/iot/vmp/storager/VideoManagerStoragerFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.genersoft.iot.vmp.storager; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -import com.genersoft.iot.vmp.conf.VManagerConfig; - -/** - * @Description:视频设备数据存储工厂,根据存储策略,返回对应的存储器 - * @author: swwheihei - * @date: 2020年5月6日 下午2:15:16 - */ -@Component -public class VideoManagerStoragerFactory { - - @Autowired - private VManagerConfig vmConfig; - - @Autowired - private IVideoManagerStorager jdbcStorager; - - @Autowired - private IVideoManagerStorager redisStorager; - - @Bean("storager") - public IVideoManagerStorager getStorager() { - if ("redis".equals(vmConfig.getDatabase().toLowerCase())) { - return redisStorager; - } else if ("jdbc".equals(vmConfig.getDatabase().toLowerCase())) { - return jdbcStorager; - } - return redisStorager; - } - -} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/VodeoMannagerTask.java b/src/main/java/com/genersoft/iot/vmp/storager/VodeoMannagerTask.java index c96e4bb0..c2074843 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/VodeoMannagerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/VodeoMannagerTask.java @@ -8,10 +8,10 @@ import org.springframework.stereotype.Component; public class VodeoMannagerTask implements CommandLineRunner { @Autowired - private IVideoManagerStorager storager; + private IVideoManagerStorager redisStorager; @Override public void run(String... strings) throws Exception { - storager.updateCatch(); + redisStorager.updateCatch(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java new file mode 100644 index 00000000..bf67095f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -0,0 +1,20 @@ +package com.genersoft.iot.vmp.storager.dao; + +import com.genersoft.iot.vmp.common.PageResult; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import org.apache.ibatis.annotations.Mapper; + +import java.util.List; + +@Mapper +public interface DeviceChannelMapper { + int update(DeviceChannel channel); + + List queryChannelsByDeviceId(String deviceId); + + List queryChannelsByDeviceId(String deviceId, String parentChannelId); + + DeviceChannel queryChannel(String deviceId, String channelId); + + int cleanChannelsByDeviceId(String deviceId); +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java new file mode 100644 index 00000000..da455fbe --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.storager.dao; + +import com.genersoft.iot.vmp.gb28181.bean.Device; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Select; + +import java.util.List; + +@Mapper +public interface DeviceMapper { + + @Select("SELECT * FROM device WHERE deviceId = #{deviceId}") + Device getDeviceByDeviceId(String deviceId); + + @Insert("SELECT * FROM device WHERE deviceId = #{deviceId}") + int add(Device device); + + int update(Device device); + + List getDevices(); + + int del(String deviceId); +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java new file mode 100644 index 00000000..8eaaf68a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -0,0 +1,172 @@ +package com.genersoft.iot.vmp.storager.impl; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.MediaServerConfig; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; +import com.genersoft.iot.vmp.storager.dao.DeviceMapper; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +@Component +public class RedisCatchStorageImpl implements IRedisCatchStorage { + + @Autowired + private RedisUtil redis; + + @Autowired + private DeviceMapper deviceMapper; + + @Autowired + private DeviceChannelMapper deviceChannelMapper; + + + /** + * 开始播放时将流存入redis + * + * @return + */ + @Override + public boolean startPlay(StreamInfo stream) { + return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, stream.getStreamId(),stream.getDeviceID(), stream.getCahnnelId()), + stream); + } + + /** + * 停止播放时从redis删除 + * + * @return + */ + @Override + public boolean stopPlay(StreamInfo streamInfo) { + if (streamInfo == null) return false; + DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(streamInfo.getDeviceID(), streamInfo.getCahnnelId()); + if (deviceChannel != null) { + deviceChannel.setStreamId(null); + deviceChannel.setPlay(false); + deviceChannel.setDeviceId(streamInfo.getDeviceID()); + deviceChannelMapper.update(deviceChannel); + } + return redis.del(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, + streamInfo.getStreamId(), + streamInfo.getDeviceID(), + streamInfo.getCahnnelId())); + } + + /** + * 查询播放列表 + * @return + */ + @Override + public StreamInfo queryPlay(StreamInfo streamInfo) { + return (StreamInfo)redis.get(String.format("%S_%s_%s_%s", + VideoManagerConstants.PLAYER_PREFIX, + streamInfo.getStreamId(), + streamInfo.getDeviceID(), + streamInfo.getCahnnelId())); + } + @Override + public StreamInfo queryPlayByStreamId(String steamId) { + List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAYER_PREFIX, steamId)); + if (playLeys == null || playLeys.size() == 0) return null; + return (StreamInfo)redis.get(playLeys.get(0).toString()); + } + + @Override + public StreamInfo queryPlaybackByStreamId(String steamId) { + List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX, steamId)); + if (playLeys == null || playLeys.size() == 0) return null; + return (StreamInfo)redis.get(playLeys.get(0).toString()); + } + + @Override + public StreamInfo queryPlayByDevice(String deviceId, String code) { +// List playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, + List playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, + deviceId, + code)); + if (playLeys == null || playLeys.size() == 0) return null; + return (StreamInfo)redis.get(playLeys.get(0).toString()); + } + + /** + * 更新流媒体信息 + * @param mediaServerConfig + * @return + */ + @Override + public boolean updateMediaInfo(MediaServerConfig mediaServerConfig) { + return redis.set(VideoManagerConstants.MEDIA_SERVER_PREFIX,mediaServerConfig); + } + + /** + * 获取流媒体信息 + * @return + */ + @Override + public MediaServerConfig getMediaInfo() { + return (MediaServerConfig)redis.get(VideoManagerConstants.MEDIA_SERVER_PREFIX); + } + + @Override + public Map queryPlayByDeviceId(String deviceId) { + Map streamInfos = new HashMap<>(); +// List playLeys = redis.keys(String.format("%S_*_%S_*", VideoManagerConstants.PLAYER_PREFIX, deviceId)); + List players = redis.scan(String.format("%S_*_%S_*", VideoManagerConstants.PLAYER_PREFIX, deviceId)); + if (players.size() == 0) return streamInfos; + for (int i = 0; i < players.size(); i++) { + String key = (String) players.get(i); + StreamInfo streamInfo = (StreamInfo)redis.get(key); + streamInfos.put(streamInfo.getDeviceID() + "_" + streamInfo.getCahnnelId(), streamInfo); + } + return streamInfos; + } + + + @Override + public boolean startPlayback(StreamInfo stream) { + return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, stream.getStreamId(),stream.getDeviceID(), stream.getCahnnelId()), + stream); + } + + + @Override + public boolean stopPlayback(StreamInfo streamInfo) { + if (streamInfo == null) return false; + DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(streamInfo.getDeviceID(), streamInfo.getCahnnelId()); + if (deviceChannel != null) { + deviceChannel.setStreamId(null); + deviceChannel.setPlay(false); + deviceChannel.setDeviceId(streamInfo.getDeviceID()); + deviceChannelMapper.update(deviceChannel); + } + return redis.del(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, + streamInfo.getStreamId(), + streamInfo.getDeviceID(), + streamInfo.getCahnnelId())); + } + + @Override + public StreamInfo queryPlaybackByDevice(String deviceId, String code) { + String format = String.format("%S_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, + deviceId, + code); + List playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, + deviceId, + code)); + if (playLeys == null || playLeys.size() == 0) { + playLeys = redis.scan(String.format("%S_*_*_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, + deviceId)); + } + if (playLeys == null || playLeys.size() == 0) return null; + return (StreamInfo)redis.get(playLeys.get(0).toString()); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java new file mode 100644 index 00000000..1288efc1 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -0,0 +1,401 @@ +package com.genersoft.iot.vmp.storager.impl; + +import java.util.*; + +import com.genersoft.iot.vmp.common.PageResult; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.MediaServerConfig; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; +import com.genersoft.iot.vmp.storager.dao.DeviceMapper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import org.springframework.util.StringUtils; + +/** + * @Description:视频设备数据存储-jdbc实现 + * @author: swwheihei + * @date: 2020年5月6日 下午2:31:42 + */ +@Component +public class VideoManagerStoragerImpl implements IVideoManagerStorager { + + @Autowired + private DeviceMapper deviceMapper; + + @Autowired + private DeviceChannelMapper deviceChannelMapper; + + + /** + * 根据设备ID判断设备是否存在 + * + * @param deviceId 设备ID + * @return true:存在 false:不存在 + */ + @Override + public boolean exists(String deviceId) { + return deviceMapper.getDeviceByDeviceId(deviceId) != null; + } + + /** + * 视频设备创建 + * + * @param device 设备对象 + * @return true:创建成功 false:创建失败 + */ + @Override + public boolean create(Device device) { + return deviceMapper.add(device) > 0; + } + + + + /** + * 视频设备更新 + * + * @param device 设备对象 + * @return true:更新成功 false:更新失败 + */ + @Override + public boolean updateDevice(Device device) { +// if (deviceMap.get(device.getDeviceId()) == null) { +// deviceMap.put(device.getDeviceId(), new HashMap>()); +// } + // 更新device中的通道数量 +// device.setChannelCount(deviceMap.get(device.getDeviceId()).size()); + int result = deviceMapper.update(device); + // 存储device + return result > 0; + + + } + + @Override + public void updateChannel(String deviceId, DeviceChannel channel) { + String channelId = channel.getChannelId(); + channel.setDeviceId(deviceId); + deviceChannelMapper.update(channel); + +// HashMap> channelMap = deviceMap.get(deviceId); +// if (channelMap == null) return; +// // 作为父设备, 确定自己的子节点数 +// if (channelMap.get(channelId) == null) { +// channelMap.put(channelId, new HashSet()); +// }else if (channelMap.get(channelId).size() > 0) { +// channel.setSubCount(channelMap.get(channelId).size()); +// } +// +// // 存储通道 +// redis.set(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + +// "_" + channel.getChannelId() + +// "_" + (channel.getStatus() == 1 ? "on":"off") + +// "_" + (channelMap.get(channelId).size() > 0)+ +// "_" + (StringUtils.isEmpty(channel.getParentId())?null:channel.getParentId()), +// channel); +// // 更新device中的通道数量 +// Device device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); +// device.setChannelCount(deviceMap.get(deviceId).size()); +// redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); +// +// +// // 如果有父设备,更新父设备内子节点数 +// String parentId = channel.getParentId(); +// if (!StringUtils.isEmpty(parentId) && !parentId.equals(deviceId)) { +// +// if (channelMap.get(parentId) == null) { +// channelMap.put(parentId, new HashSet()); +// } +// channelMap.get(parentId).add(channelId); +// +// DeviceChannel deviceChannel = queryChannel(deviceId, parentId); +// if (deviceChannel != null) { +// deviceChannel.setSubCount(channelMap.get(parentId).size()); +// redis.set(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + +// "_" + deviceChannel.getChannelId() + +// "_" + (deviceChannel.getStatus() == 1 ? "on":"off") + +// "_" + (channelMap.get(deviceChannel.getChannelId()).size() > 0)+ +// "_" + (StringUtils.isEmpty(deviceChannel.getParentId())?null:deviceChannel.getParentId()), +// deviceChannel); +// +// } +// } + + } + + /** + * 获取设备 + * + * @param deviceId 设备ID + * @return Device 设备对象 + */ + @Override + public Device queryVideoDevice(String deviceId) { + return deviceMapper.getDeviceByDeviceId(deviceId); + } + + @Override + public PageResult queryChannelsByDeviceId(String deviceId, String query, Boolean hasSubChannel, String online, int page, int count) { + // 获取到所有正在播放的流 + List result = new ArrayList<>(); + PageResult pageResult = new PageResult(); + + deviceChannelMapper.queryChannelsByDeviceId(deviceId); +// String queryContent = "*"; +// if (!StringUtils.isEmpty(query)) queryContent = String.format("*%S*",query); +// String queryHasSubChannel = "*"; +// if (hasSubChannel != null) queryHasSubChannel = hasSubChannel?"true":"false"; +// String queryOnline = "*"; +// if (!StringUtils.isEmpty(online)) queryOnline = online; +// String queryStr = VideoManagerConstants.CACHEKEY_PREFIX + deviceId + +// "_" + queryContent + // 搜索编号和名称 +// "_" + queryOnline + // 搜索是否在线 +// "_" + queryHasSubChannel + // 搜索是否含有子节点 +// "_" + "*"; +// List deviceChannelList = redis.scan(queryStr); +// //对查询结果排序,避免出现通道排列顺序乱序的情况 +// Collections.sort(deviceChannelList,new Comparator(){ +// @Override +// public int compare(Object o1, Object o2) { +// return o1.toString().compareToIgnoreCase(o2.toString()); +// } +// }); +// pageResult.setPage(page); +// pageResult.setCount(count); +// pageResult.setTotal(deviceChannelList.size()); +// int maxCount = (page + 1 ) * count; +// if (deviceChannelList != null && deviceChannelList.size() > 0 ) { +// for (int i = page * count; i < (pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() ); i++) { +// DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); +// StreamInfo streamInfo = stringStreamInfoMap.get(deviceId + "_" + deviceChannel.getChannelId()); +// deviceChannel.setPlay(streamInfo != null); +// if (streamInfo != null) deviceChannel.setStreamId(streamInfo.getStreamId()); +// result.add(deviceChannel); +// } +// pageResult.setData(result); +// } + + return pageResult; + } + + + + @Override + public List queryChannelsByDeviceId(String deviceId) { +// List result = new ArrayList<>(); +//// List deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); +// List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); +// +// if (deviceChannelList != null && deviceChannelList.size() > 0 ) { +// for (int i = 0; i < deviceChannelList.size(); i++) { +// result.add((DeviceChannel)redis.get((String) deviceChannelList.get(i))); +// } +// } + return deviceChannelMapper.queryChannelsByDeviceId(deviceId); + } + + @Override + public PageResult querySubChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, String online, int page, int count) { + + deviceChannelMapper.queryChannelsByDeviceId(deviceId, parentChannelId); + +// List allDeviceChannels = new ArrayList<>(); +// String queryContent = "*"; +// if (!StringUtils.isEmpty(query)) queryContent = String.format("*%S*",query); +// String queryHasSubChannel = "*"; +// if (hasSubChannel != null) queryHasSubChannel = hasSubChannel?"true":"false"; +// String queryOnline = "*"; +// if (!StringUtils.isEmpty(online)) queryOnline = online; +// String queryStr = VideoManagerConstants.CACHEKEY_PREFIX + deviceId + +// "_" + queryContent + // 搜索编号和名称 +// "_" + queryOnline + // 搜索是否在线 +// "_" + queryHasSubChannel + // 搜索是否含有子节点 +// "_" + parentChannelId; +// +//// List deviceChannelList = redis.keys(queryStr); +// List deviceChannelList = redis.scan(queryStr); +// +// if (deviceChannelList != null && deviceChannelList.size() > 0 ) { +// for (int i = 0; i < deviceChannelList.size(); i++) { +// DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); +// if (deviceChannel.getParentId() != null && deviceChannel.getParentId().equals(parentChannelId)) { +// allDeviceChannels.add(deviceChannel); +// } +// } +// } +// int maxCount = (page + 1 ) * count; + PageResult pageResult = new PageResult(); +// pageResult.setPage(page); +// pageResult.setCount(count); +// pageResult.setTotal(allDeviceChannels.size()); +// +// if (allDeviceChannels.size() > 0) { +// pageResult.setData(allDeviceChannels.subList( +// page * count, pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() +// )); +// } + return pageResult; + } + + public List querySubChannels(String deviceId, String parentChannelId) { + List allDeviceChannels = new ArrayList<>(); +// List deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); +// List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); +// +// if (deviceChannelList != null && deviceChannelList.size() > 0 ) { +// for (int i = 0; i < deviceChannelList.size(); i++) { +// DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); +// if (deviceChannel.getParentId() != null && deviceChannel.getParentId().equals(parentChannelId)) { +// allDeviceChannels.add(deviceChannel); +// } +// } +// } + + return allDeviceChannels; + } + + @Override + public DeviceChannel queryChannel(String deviceId, String channelId) { + DeviceChannel deviceChannel = null; + return deviceChannelMapper.queryChannel(deviceId, channelId); +//// List deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + +// List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + +// "_" + channelId + "*"); +// if (deviceChannelList != null && deviceChannelList.size() > 0 ) { +// deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(0)); +// } +// return deviceChannel; + } + + + /** + * 获取多个设备 + * + * @param deviceIds 设备ID数组 + * @return List 设备对象数组 + */ + @Override + public PageResult queryVideoDeviceList(String[] deviceIds, int page, int count) { + List devices = new ArrayList<>(); + PageResult pageResult = new PageResult(); +// pageResult.setPage(page); +// pageResult.setCount(count); +// Device device = null; +// +// if (deviceIds == null || deviceIds.length == 0) { +// +//// List deviceIdList = redis.keys(VideoManagerConstants.DEVICE_PREFIX+"*"); +// List deviceIdList = redis.scan(VideoManagerConstants.DEVICE_PREFIX+"*"); +// pageResult.setTotal(deviceIdList.size()); +// int maxCount = (page + 1)* count; +// for (int i = page * count; i < (pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() ); i++) { +// // devices.add((Device)redis.get((String)deviceIdList.get(i))); +// device =(Device)redis.get((String)deviceIdList.get(i)); +// if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ +// // outline(device.getDeviceId()); +// } +// devices.add(device); +// } +// } else { +// for (int i = 0; i < deviceIds.length; i++) { +// // devices.add((Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i])); +// device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i]); +// if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ +// // outline(device.getDeviceId()); +// } +// devices.add(device); +// } +// } +// pageResult.setData(devices); + return pageResult; + } + + /** + * 获取多个设备 + * + * @return List 设备对象数组 + */ + @Override + public List queryVideoDeviceList() { + +// if (deviceIds == null || deviceIds.length == 0) { +//// List deviceIdList = redis.keys(VideoManagerConstants.DEVICE_PREFIX+"*"); +// List deviceIdList = redis.scan(VideoManagerConstants.DEVICE_PREFIX+"*"); +// for (int i = 0; i < deviceIdList.size(); i++) { +// device =(Device)redis.get((String)deviceIdList.get(i)); +// if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ +// outline(device.getDeviceId()); +// } +// devices.add(device); +// } +// } else { +// for (int i = 0; i < deviceIds.length; i++) { +// device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i]); +// if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ +// outline(device.getDeviceId()); +// } +// devices.add(device); +// } +// } + + List deviceList = deviceMapper.getDevices(); + return deviceList; + } + + /** + * 删除设备 + * + * @param deviceId 设备ID + * @return true:删除成功 false:删除失败 + */ + @Override + public boolean delete(String deviceId) { + int result = deviceMapper.del(deviceId); + + return result > 0; + } + + /** + * 更新设备在线 + * + * @param deviceId 设备ID + * @return true:更新成功 false:更新失败 + */ + @Override + public boolean online(String deviceId) { + Device device = deviceMapper.getDeviceByDeviceId(deviceId); + device.setOnline(1); + return deviceMapper.update(device) > 0; + } + + /** + * 更新设备离线 + * + * @param deviceId 设备ID + * @return true:更新成功 false:更新失败 + */ + @Override + public boolean outline(String deviceId) { +// Device device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); +// if (device == null) return false; +// device.setOnline(0); +// return redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); + + Device device = deviceMapper.getDeviceByDeviceId(deviceId); + device.setOnline(0); + return deviceMapper.update(device) > 0; + } + + + @Override + public void cleanChannelsForDevice(String deviceId) { + int result = deviceChannelMapper.cleanChannelsByDeviceId(deviceId); + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/jdbc/VideoManagerJdbcStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/jdbc/VideoManagerJdbcStoragerImpl.java deleted file mode 100644 index 3302aa35..00000000 --- a/src/main/java/com/genersoft/iot/vmp/storager/jdbc/VideoManagerJdbcStoragerImpl.java +++ /dev/null @@ -1,217 +0,0 @@ -package com.genersoft.iot.vmp.storager.jdbc; - -import java.util.List; -import java.util.Map; - -import com.genersoft.iot.vmp.common.PageResult; -import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.conf.MediaServerConfig; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.storager.IVideoManagerStorager; - -/** - * @Description:视频设备数据存储-jdbc实现 - * @author: swwheihei - * @date: 2020年5月6日 下午2:28:12 - */ -@Component("jdbcStorager") -public class VideoManagerJdbcStoragerImpl implements IVideoManagerStorager { - - @Override - public boolean updateMediaInfo(MediaServerConfig mediaServerConfig) { - return false; - } - - @Override - public MediaServerConfig getMediaInfo() { - return null; - } - - /** - * 根据设备ID判断设备是否存在 - * - * @param deviceId 设备ID - * @return true:存在 false:不存在 - */ - @Override - public boolean exists(String deviceId) { - // TODO Auto-generated method stub - return false; - } - - /** - * 视频设备创建 - * - * @param device 设备对象 - * @return true:创建成功 false:创建失败 - */ - @Override - public boolean create(Device device) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean updateDevice(Device device) { - return false; - } - - @Override - public void updateChannel(String deviceId, DeviceChannel channel) { - - } - - - /** - * 获取设备 - * - * @param deviceId 设备ID - * @return Device 设备对象 - */ - @Override - public Device queryVideoDevice(String deviceId) { - // TODO Auto-generated method stub - return null; - } - - @Override - public PageResult queryChannelsByDeviceId(String deviceId, String query, Boolean hasSubChannel, String online, int page, int count) { - return null; - } - - - @Override - public List queryChannelsByDeviceId(String deviceId) { - return null; - } - - @Override - public DeviceChannel queryChannel(String deviceId, String channelId) { - return null; - } - - @Override - public PageResult queryVideoDeviceList(String[] deviceIds, int page, int count) { - return null; - } - - /** - * 获取多个设备 - * - * @param deviceIds 设备ID数组 - * @return List 设备对象数组 - */ - @Override - public List queryVideoDeviceList(String[] deviceIds) { - // TODO Auto-generated method stub - return null; - } - - /** - * 删除设备 - * - * @param deviceId 设备ID - * @return true:删除成功 false:删除失败 - */ - @Override - public boolean delete(String deviceId) { - // TODO Auto-generated method stub - return false; - } - - /** - * 更新设备在线 - * - * @param deviceId 设备ID - * @return true:更新成功 false:更新失败 - */ - @Override - public boolean online(String deviceId) { - // TODO Auto-generated method stub - return false; - } - - /** - * 更新设备离线 - * - * @param deviceId 设备ID - * @return true:更新成功 false:更新失败 - */ - @Override - public boolean outline(String deviceId) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean stopPlay(StreamInfo streamInfo) { - return false; - } - - @Override - public StreamInfo queryPlay(StreamInfo streamInfo) { - return null; - } - - @Override - public PageResult querySubChannels(String deviceId, String channelId, String query, Boolean hasSubChannel, String online, int page, int count) { - return null; - } - - @Override - public void updateCatch() { - System.out.println("##################"); - } - - @Override - public void cleanChannelsForDevice(String deviceId) { - - } - - @Override - public boolean startPlay(StreamInfo stream) { - return false; - } - - - @Override - public StreamInfo queryPlayByDevice(String deviceId, String code) { - return null; - } - - @Override - public Map queryPlayByDeviceId(String deviceId) { - - return null; - } - - @Override - public boolean startPlayback(StreamInfo streamInfo) { - return false; - } - - @Override - public boolean stopPlayback(StreamInfo streamInfo) { - return false; - } - - @Override - public StreamInfo queryPlaybackByDevice(String deviceId, String channelId) { - return null; - } - - @Override - public StreamInfo queryPlayByStreamId(String streamId) { - return null; - } - - @Override - public StreamInfo queryPlaybackByStreamId(String streamId) { - return null; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/redis/VideoManagerRedisStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/redis/VideoManagerRedisStoragerImpl.java deleted file mode 100644 index 00d41fc4..00000000 --- a/src/main/java/com/genersoft/iot/vmp/storager/redis/VideoManagerRedisStoragerImpl.java +++ /dev/null @@ -1,561 +0,0 @@ -package com.genersoft.iot.vmp.storager.redis; - -import java.util.*; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.common.PageResult; -import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.conf.MediaServerConfig; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.storager.IVideoManagerStorager; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import org.springframework.util.StringUtils; - -/** - * @Description:视频设备数据存储-redis实现 - * @author: swwheihei - * @date: 2020年5月6日 下午2:31:42 - */ -@Component("redisStorager") -public class VideoManagerRedisStoragerImpl implements IVideoManagerStorager { - - @Autowired - private RedisUtil redis; - - private HashMap>> deviceMap = new HashMap<>(); - - - /** - * 根据设备ID判断设备是否存在 - * - * @param deviceId 设备ID - * @return true:存在 false:不存在 - */ - @Override - public boolean exists(String deviceId) { - return redis.hasKey(VideoManagerConstants.DEVICE_PREFIX+deviceId); - } - - /** - * 视频设备创建 - * - * @param device 设备对象 - * @return true:创建成功 false:创建失败 - */ - @Override - public boolean create(Device device) { - return redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); - } - - - - /** - * 视频设备更新 - * - * @param device 设备对象 - * @return true:更新成功 false:更新失败 - */ - @Override - public boolean updateDevice(Device device) { - if (deviceMap.get(device.getDeviceId()) == null) { - deviceMap.put(device.getDeviceId(), new HashMap>()); - } - // 更新device中的通道数量 - device.setChannelCount(deviceMap.get(device.getDeviceId()).size()); - // 存储device - return redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); - - - } - - @Override - public void updateChannel(String deviceId, DeviceChannel channel) { - String channelId = channel.getChannelId(); - HashMap> channelMap = deviceMap.get(deviceId); - if (channelMap == null) return; - // 作为父设备, 确定自己的子节点数 - if (channelMap.get(channelId) == null) { - channelMap.put(channelId, new HashSet()); - }else if (channelMap.get(channelId).size() > 0) { - channel.setSubCount(channelMap.get(channelId).size()); - } - - // 存储通道 - redis.set(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + - "_" + channel.getChannelId() + - "_" + (channel.getStatus() == 1 ? "on":"off") + - "_" + (channelMap.get(channelId).size() > 0)+ - "_" + (StringUtils.isEmpty(channel.getParentId())?null:channel.getParentId()), - channel); - // 更新device中的通道数量 - Device device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); - device.setChannelCount(deviceMap.get(deviceId).size()); - redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); - - - // 如果有父设备,更新父设备内子节点数 - String parentId = channel.getParentId(); - if (!StringUtils.isEmpty(parentId) && !parentId.equals(deviceId)) { - - if (channelMap.get(parentId) == null) { - channelMap.put(parentId, new HashSet()); - } - channelMap.get(parentId).add(channelId); - - DeviceChannel deviceChannel = queryChannel(deviceId, parentId); - if (deviceChannel != null) { - deviceChannel.setSubCount(channelMap.get(parentId).size()); - redis.set(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + - "_" + deviceChannel.getChannelId() + - "_" + (deviceChannel.getStatus() == 1 ? "on":"off") + - "_" + (channelMap.get(deviceChannel.getChannelId()).size() > 0)+ - "_" + (StringUtils.isEmpty(deviceChannel.getParentId())?null:deviceChannel.getParentId()), - deviceChannel); - - } - } - - } - - /** - * 获取设备 - * - * @param deviceId 设备ID - * @return Device 设备对象 - */ - @Override - public Device queryVideoDevice(String deviceId) { - return (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); - } - - @Override - public PageResult queryChannelsByDeviceId(String deviceId, String query, Boolean hasSubChannel, String online, int page, int count) { - // 获取到所有正在播放的流 - Map stringStreamInfoMap = queryPlayByDeviceId(deviceId); - List result = new ArrayList<>(); - PageResult pageResult = new PageResult(); - String queryContent = "*"; - if (!StringUtils.isEmpty(query)) queryContent = String.format("*%S*",query); - String queryHasSubChannel = "*"; - if (hasSubChannel != null) queryHasSubChannel = hasSubChannel?"true":"false"; - String queryOnline = "*"; - if (!StringUtils.isEmpty(online)) queryOnline = online; - String queryStr = VideoManagerConstants.CACHEKEY_PREFIX + deviceId + - "_" + queryContent + // 搜索编号和名称 - "_" + queryOnline + // 搜索是否在线 - "_" + queryHasSubChannel + // 搜索是否含有子节点 - "_" + "*"; - List deviceChannelList = redis.scan(queryStr); - //对查询结果排序,避免出现通道排列顺序乱序的情况 - Collections.sort(deviceChannelList,new Comparator(){ - @Override - public int compare(Object o1, Object o2) { - return o1.toString().compareToIgnoreCase(o2.toString()); - } - }); - pageResult.setPage(page); - pageResult.setCount(count); - pageResult.setTotal(deviceChannelList.size()); - int maxCount = (page + 1 ) * count; - if (deviceChannelList != null && deviceChannelList.size() > 0 ) { - for (int i = page * count; i < (pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() ); i++) { - DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); - StreamInfo streamInfo = stringStreamInfoMap.get(deviceId + "_" + deviceChannel.getChannelId()); - deviceChannel.setPlay(streamInfo != null); - if (streamInfo != null) deviceChannel.setStreamId(streamInfo.getStreamId()); - result.add(deviceChannel); - } - pageResult.setData(result); - } - - return pageResult; - } - - - - @Override - public List queryChannelsByDeviceId(String deviceId) { - List result = new ArrayList<>(); -// List deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); - List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); - - if (deviceChannelList != null && deviceChannelList.size() > 0 ) { - for (int i = 0; i < deviceChannelList.size(); i++) { - result.add((DeviceChannel)redis.get((String) deviceChannelList.get(i))); - } - } - return result; - } - - @Override - public PageResult querySubChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, String online, int page, int count) { - List allDeviceChannels = new ArrayList<>(); - String queryContent = "*"; - if (!StringUtils.isEmpty(query)) queryContent = String.format("*%S*",query); - String queryHasSubChannel = "*"; - if (hasSubChannel != null) queryHasSubChannel = hasSubChannel?"true":"false"; - String queryOnline = "*"; - if (!StringUtils.isEmpty(online)) queryOnline = online; - String queryStr = VideoManagerConstants.CACHEKEY_PREFIX + deviceId + - "_" + queryContent + // 搜索编号和名称 - "_" + queryOnline + // 搜索是否在线 - "_" + queryHasSubChannel + // 搜索是否含有子节点 - "_" + parentChannelId; - -// List deviceChannelList = redis.keys(queryStr); - List deviceChannelList = redis.scan(queryStr); - - if (deviceChannelList != null && deviceChannelList.size() > 0 ) { - for (int i = 0; i < deviceChannelList.size(); i++) { - DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); - if (deviceChannel.getParentId() != null && deviceChannel.getParentId().equals(parentChannelId)) { - allDeviceChannels.add(deviceChannel); - } - } - } - int maxCount = (page + 1 ) * count; - PageResult pageResult = new PageResult(); - pageResult.setPage(page); - pageResult.setCount(count); - pageResult.setTotal(allDeviceChannels.size()); - - if (allDeviceChannels.size() > 0) { - pageResult.setData(allDeviceChannels.subList( - page * count, pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() - )); - } - return pageResult; - } - - public List querySubChannels(String deviceId, String parentChannelId) { - List allDeviceChannels = new ArrayList<>(); -// List deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); - List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); - - if (deviceChannelList != null && deviceChannelList.size() > 0 ) { - for (int i = 0; i < deviceChannelList.size(); i++) { - DeviceChannel deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(i)); - if (deviceChannel.getParentId() != null && deviceChannel.getParentId().equals(parentChannelId)) { - allDeviceChannels.add(deviceChannel); - } - } - } - - return allDeviceChannels; - } - - @Override - public DeviceChannel queryChannel(String deviceId, String channelId) { - DeviceChannel deviceChannel = null; -// List deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + - List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + - "_" + channelId + "*"); - if (deviceChannelList != null && deviceChannelList.size() > 0 ) { - deviceChannel = (DeviceChannel)redis.get((String)deviceChannelList.get(0)); - } - return deviceChannel; - } - - - /** - * 获取多个设备 - * - * @param deviceIds 设备ID数组 - * @return List 设备对象数组 - */ - @Override - public PageResult queryVideoDeviceList(String[] deviceIds, int page, int count) { - List devices = new ArrayList<>(); - PageResult pageResult = new PageResult(); - pageResult.setPage(page); - pageResult.setCount(count); - Device device = null; - - if (deviceIds == null || deviceIds.length == 0) { - -// List deviceIdList = redis.keys(VideoManagerConstants.DEVICE_PREFIX+"*"); - List deviceIdList = redis.scan(VideoManagerConstants.DEVICE_PREFIX+"*"); - pageResult.setTotal(deviceIdList.size()); - int maxCount = (page + 1)* count; - for (int i = page * count; i < (pageResult.getTotal() > maxCount ? maxCount : pageResult.getTotal() ); i++) { - // devices.add((Device)redis.get((String)deviceIdList.get(i))); - device =(Device)redis.get((String)deviceIdList.get(i)); - if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ - // outline(device.getDeviceId()); - } - devices.add(device); - } - } else { - for (int i = 0; i < deviceIds.length; i++) { - // devices.add((Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i])); - device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i]); - if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ - // outline(device.getDeviceId()); - } - devices.add(device); - } - } - pageResult.setData(devices); - return pageResult; - } - - /** - * 获取多个设备 - * - * @param deviceIds 设备ID数组 - * @return List 设备对象数组 - */ - @Override - public List queryVideoDeviceList(String[] deviceIds) { - List devices = new ArrayList<>(); - Device device = null; - - if (deviceIds == null || deviceIds.length == 0) { -// List deviceIdList = redis.keys(VideoManagerConstants.DEVICE_PREFIX+"*"); - List deviceIdList = redis.scan(VideoManagerConstants.DEVICE_PREFIX+"*"); - for (int i = 0; i < deviceIdList.size(); i++) { - device =(Device)redis.get((String)deviceIdList.get(i)); - if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ - outline(device.getDeviceId()); - } - devices.add(device); - } - } else { - for (int i = 0; i < deviceIds.length; i++) { - device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceIds[i]); - if (redis.scan(VideoManagerConstants.KEEPLIVEKEY_PREFIX+device.getDeviceId()).size() == 0){ - outline(device.getDeviceId()); - } - devices.add(device); - } - } - return devices; - } - - /** - * 删除设备 - * - * @param deviceId 设备ID - * @return true:删除成功 false:删除失败 - */ - @Override - public boolean delete(String deviceId) { - return redis.del(VideoManagerConstants.DEVICE_PREFIX+deviceId); - } - - /** - * 更新设备在线 - * - * @param deviceId 设备ID - * @return true:更新成功 false:更新失败 - */ - @Override - public boolean online(String deviceId) { - Device device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); - device.setOnline(1); - return redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); - } - - /** - * 更新设备离线 - * - * @param deviceId 设备ID - * @return true:更新成功 false:更新失败 - */ - @Override - public boolean outline(String deviceId) { - Device device = (Device)redis.get(VideoManagerConstants.DEVICE_PREFIX+deviceId); - if (device == null) return false; - device.setOnline(0); - return redis.set(VideoManagerConstants.DEVICE_PREFIX+device.getDeviceId(), device); - } - - /** - * 开始播放时将流存入redis - * - * @return - */ - @Override - public boolean startPlay(StreamInfo stream) { - return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, stream.getStreamId(),stream.getDeviceID(), stream.getCahnnelId()), - stream); - } - - /** - * 停止播放时从redis删除 - * - * @return - */ - @Override - public boolean stopPlay(StreamInfo streamInfo) { - if (streamInfo == null) return false; - DeviceChannel deviceChannel = queryChannel(streamInfo.getDeviceID(), streamInfo.getCahnnelId()); - if (deviceChannel != null) { - deviceChannel.setStreamId(null); - deviceChannel.setPlay(false); - updateChannel(streamInfo.getDeviceID(), deviceChannel); - } - return redis.del(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, - streamInfo.getStreamId(), - streamInfo.getDeviceID(), - streamInfo.getCahnnelId())); - } - - /** - * 查询播放列表 - * @return - */ - @Override - public StreamInfo queryPlay(StreamInfo streamInfo) { - return (StreamInfo)redis.get(String.format("%S_%s_%s_%s", - VideoManagerConstants.PLAYER_PREFIX, - streamInfo.getStreamId(), - streamInfo.getDeviceID(), - streamInfo.getCahnnelId())); - } - @Override - public StreamInfo queryPlayByStreamId(String steamId) { - List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAYER_PREFIX, steamId)); - if (playLeys == null || playLeys.size() == 0) return null; - return (StreamInfo)redis.get(playLeys.get(0).toString()); - } - - @Override - public StreamInfo queryPlaybackByStreamId(String steamId) { - List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX, steamId)); - if (playLeys == null || playLeys.size() == 0) return null; - return (StreamInfo)redis.get(playLeys.get(0).toString()); - } - - @Override - public StreamInfo queryPlayByDevice(String deviceId, String code) { -// List playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, - List playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, - deviceId, - code)); - if (playLeys == null || playLeys.size() == 0) return null; - return (StreamInfo)redis.get(playLeys.get(0).toString()); - } - - /** - * 更新流媒体信息 - * @param mediaServerConfig - * @return - */ - @Override - public boolean updateMediaInfo(MediaServerConfig mediaServerConfig) { - return redis.set(VideoManagerConstants.MEDIA_SERVER_PREFIX,mediaServerConfig); - } - - /** - * 获取流媒体信息 - * @return - */ - @Override - public MediaServerConfig getMediaInfo() { - return (MediaServerConfig)redis.get(VideoManagerConstants.MEDIA_SERVER_PREFIX); - } - - @Override - public void updateCatch() { - deviceMap = new HashMap<>(); - // 更新设备 - List devices = queryVideoDeviceList(null); - if (devices == null && devices.size() == 0) return; - for (Device device : devices) { - // 更新设备下的通道 - HashMap> channelMap = new HashMap>(); - List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + - device.getDeviceId() + "_" + "*"); - if (deviceChannelList != null && deviceChannelList.size() > 0 ) { - for (int i = 0; i < deviceChannelList.size(); i++) { - String key = (String)deviceChannelList.get(i); - String[] s = key.split("_"); - String channelId = s[3]; - HashSet subChannel = channelMap.get(channelId); - if (subChannel == null) { - subChannel = new HashSet<>(); - } - System.out.println(key); - if (s.length == 6 && !"null".equals(s[5])) { - subChannel.add(s[5]); - } - channelMap.put(channelId, subChannel); - } - } - deviceMap.put(device.getDeviceId(),channelMap); - } - System.out.println(); - } - - @Override - public void cleanChannelsForDevice(String deviceId) { - List result = new ArrayList<>(); -// List deviceChannelList = redis.keys(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); - List deviceChannelList = redis.scan(VideoManagerConstants.CACHEKEY_PREFIX + deviceId + "_" + "*"); - if (deviceChannelList != null && deviceChannelList.size() > 0 ) { - for (int i = 0; i < deviceChannelList.size(); i++) { - redis.del((String)deviceChannelList.get(i)); - } - } - } - - @Override - public Map queryPlayByDeviceId(String deviceId) { - Map streamInfos = new HashMap<>(); -// List playLeys = redis.keys(String.format("%S_*_%S_*", VideoManagerConstants.PLAYER_PREFIX, deviceId)); - List playLeys = redis.scan(String.format("%S_*_%S_*", VideoManagerConstants.PLAYER_PREFIX, deviceId)); - if (playLeys.size() == 0) return streamInfos; - for (int i = 0; i < playLeys.size(); i++) { - String key = (String) playLeys.get(i); - StreamInfo streamInfo = (StreamInfo)redis.get(key); - streamInfos.put(streamInfo.getDeviceID() + "_" + streamInfo.getCahnnelId(), streamInfo); - } - return streamInfos; - } - - - @Override - public boolean startPlayback(StreamInfo stream) { - return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, stream.getStreamId(),stream.getDeviceID(), stream.getCahnnelId()), - stream); - } - - - @Override - public boolean stopPlayback(StreamInfo streamInfo) { - if (streamInfo == null) return false; - DeviceChannel deviceChannel = queryChannel(streamInfo.getDeviceID(), streamInfo.getCahnnelId()); - if (deviceChannel != null) { - deviceChannel.setStreamId(null); - deviceChannel.setPlay(false); - updateChannel(streamInfo.getDeviceID(), deviceChannel); - } - return redis.del(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, - streamInfo.getStreamId(), - streamInfo.getDeviceID(), - streamInfo.getCahnnelId())); - } - - @Override - public StreamInfo queryPlaybackByDevice(String deviceId, String code) { - String format = String.format("%S_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, - deviceId, - code); - List playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, - deviceId, - code)); - if (playLeys == null || playLeys.size() == 0) { - playLeys = redis.scan(String.format("%S_*_*_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, - deviceId)); - } - if (playLeys == null || playLeys.size() == 0) return null; - return (StreamInfo)redis.get(playLeys.get(0).toString()); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java index eba40bb9..59667d09 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,9 @@ public class PlayController { @Autowired private IVideoManagerStorager storager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -60,7 +64,7 @@ public class PlayController { Device device = storager.queryVideoDevice(deviceId); - StreamInfo streamInfo = storager.queryPlayByDevice(deviceId, channelId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); UUID uuid = UUID.randomUUID(); DeferredResult> result = new DeferredResult>(); @@ -89,7 +93,7 @@ public class PlayController { msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { - storager.stopPlay(streamInfo); + redisCatchStorage.stopPlay(streamInfo); cmder.playStreamCmd(device, channelId, (JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); @@ -117,25 +121,59 @@ public class PlayController { } @PostMapping("/play/{streamId}/stop") - public ResponseEntity playStop(@PathVariable String streamId) { + public DeferredResult> playStop(@PathVariable String streamId) { + + logger.debug(String.format("设备预览/回放停止API调用,streamId:%s", streamId)); + + UUID uuid = UUID.randomUUID(); + DeferredResult> result = new DeferredResult>(); + + // 录像查询以channelId作为deviceId查询 + resultHolder.put(DeferredResultHolder.CALLBACK_CMD_STOP + uuid, result); + + cmder.streamByeCmd(streamId, event -> { + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); + if (streamInfo == null) { + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); + msg.setData("streamId not found"); + resultHolder.invokeResult(msg); + redisCatchStorage.stopPlay(streamInfo); + } + + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid); + Response response = event.getResponse(); + msg.setData(String.format("success")); + resultHolder.invokeResult(msg); + }); + - cmder.streamByeCmd(streamId); - StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); - if (streamInfo == null) - return new ResponseEntity("streamId not found", HttpStatus.OK); - storager.stopPlay(streamInfo); - if (logger.isDebugEnabled()) { - logger.debug(String.format("设备预览停止API调用,streamId:%s", streamId)); - } if (streamId != null) { JSONObject json = new JSONObject(); json.put("streamId", streamId); - return new ResponseEntity(json.toString(), HttpStatus.OK); + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); + msg.setData(json.toString()); + resultHolder.invokeResult(msg); } else { - logger.warn("设备预览停止API调用失败!"); - return new ResponseEntity(HttpStatus.INTERNAL_SERVER_ERROR); + logger.warn("设备预览/回放停止API调用失败!"); + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); + msg.setData("streamId null"); + resultHolder.invokeResult(msg); } + + // 超时处理 + result.onTimeout(()->{ + logger.warn(String.format("设备预览/回放停止超时,streamId:%s ", streamId)); + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid); + msg.setData("Timeout"); + resultHolder.invokeResult(msg); + }); + return result; } /** @@ -145,7 +183,7 @@ public class PlayController { */ @PostMapping("/play/{streamId}/convert") public ResponseEntity playConvert(@PathVariable String streamId) { - StreamInfo streamInfo = storager.queryPlayByStreamId(streamId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo == null) { logger.warn("视频转码API调用失败!, 视频流已经停止!"); return new ResponseEntity("未找到视频流信息, 视频流可能已经停止", HttpStatus.OK); @@ -155,7 +193,7 @@ public class PlayController { logger.warn("视频转码API调用失败!, 视频流已停止推流!"); return new ResponseEntity("推流信息在流媒体中不存在, 视频流可能已停止推流", HttpStatus.OK); } else { - MediaServerConfig mediaInfo = storager.getMediaInfo(); + MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); String dstUrl = String.format("rtmp://%s:%s/convert/%s", "127.0.0.1", mediaInfo.getRtmpPort(), streamId ); String srcUrl = String.format("rtsp://%s:%s/rtp/%s", "127.0.0.1", mediaInfo.getRtspPort(), streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java index 2800882a..6ce0868f 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.play.PlayController; import com.genersoft.iot.vmp.vmanager.service.IPlayService; @@ -24,6 +25,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private IVideoManagerStorager storager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired private DeferredResultHolder resultHolder; @@ -33,7 +37,7 @@ public class PlayServiceImpl implements IPlayService { msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid); if (streamInfo != null) { - storager.startPlay(streamInfo); + redisCatchStorage.startPlay(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { @@ -49,7 +53,7 @@ public class PlayServiceImpl implements IPlayService { msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid); if (streamInfo != null) { - storager.startPlayback(streamInfo); + redisCatchStorage.startPlayback(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); } else { @@ -65,7 +69,7 @@ public class PlayServiceImpl implements IPlayService { streamInfo.setStreamId(streamId); streamInfo.setDeviceID(deviceId); streamInfo.setCahnnelId(channelId); - MediaServerConfig mediaServerConfig = storager.getMediaInfo(); + MediaServerConfig mediaServerConfig = redisCatchStorage.getMediaInfo(); streamInfo.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); streamInfo.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); diff --git a/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java b/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java index fdb67781..def98d80 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java @@ -65,7 +65,7 @@ public class ApiDeviceController { JSONObject result = new JSONObject(); List devices; if (start == null || limit ==null) { - devices = storager.queryVideoDeviceList(null); + devices = storager.queryVideoDeviceList(); result.put("DeviceCount", devices.size()); }else { PageResult deviceList = storager.queryVideoDeviceList(null, start/limit, limit); diff --git a/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java index 6ef3a5f2..5ce51e13 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.play.PlayController; import org.slf4j.Logger; @@ -35,6 +36,9 @@ public class ApiStreamController { @Autowired private IVideoManagerStorager storager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + private boolean closeWaitRTPInfo = false; @@ -158,14 +162,14 @@ public class ApiStreamController { ){ - StreamInfo streamInfo = storager.queryPlayByDevice(serial, code); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code); if (streamInfo == null) { JSONObject result = new JSONObject(); result.put("error","未找到流信息"); return result; } cmder.streamByeCmd(streamInfo.getStreamId()); - storager.stopPlay(streamInfo); + redisCatchStorage.stopPlay(streamInfo); return null; } diff --git a/src/main/resources/wvp.sqlite b/src/main/resources/wvp.sqlite new file mode 100644 index 00000000..e781bbcb Binary files /dev/null and b/src/main/resources/wvp.sqlite differ diff --git a/web_src/src/components/channelList.vue b/web_src/src/components/channelList.vue index 738012d2..b46f1d96 100644 --- a/web_src/src/components/channelList.vue +++ b/web_src/src/components/channelList.vue @@ -104,7 +104,7 @@ export default { mounted() { this.initData(); - this.updateLooper = setInterval(this.initData, 10000); + this.updateLooper = setInterval(this.initData, 1000); }, destroyed() { this.$destroy('videojs');