From f66bf823fa2b03732858819d9fdf086fb9e0b6a2 Mon Sep 17 00:00:00 2001 From: chenjialing <595168663@qq.com> Date: Wed, 20 Jul 2022 16:52:13 +0800 Subject: [PATCH 01/15] =?UTF-8?q?=E4=BC=98=E5=8C=96----=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E4=B8=8B=E7=BA=BF,=E9=80=9A=E9=81=93=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E4=B9=9F=E5=88=87=E6=8D=A2=E8=87=B3=E7=A6=BB=E7=BA=BF=E7=8A=B6?= =?UTF-8?q?=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/RegisterRequestProcessor.java | 1 + .../vmp/service/impl/DeviceServiceImpl.java | 21 +++++++++++++++---- .../vmp/storager/dao/DeviceChannelMapper.java | 3 +++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 622cf073..351b79cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -143,6 +143,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen device.setGeoCoordSys("WGS84"); device.setTreeType("CivilCode"); device.setDeviceId(deviceId); + device.setOnline(0); } device.setIp(received); device.setPort(rPort); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 579184c9..f7198005 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -86,10 +86,10 @@ public class DeviceServiceImpl implements IDeviceService { redisCatchStorage.clearCatchByDeviceId(device.getDeviceId()); } device.setUpdateTime(now); - device.setOnline(1); - // 第一次上线 + // 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询 if (device.getCreateTime() == null) { + device.setOnline(1); device.setCreateTime(now); logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId()); deviceMapper.add(device); @@ -97,8 +97,19 @@ public class DeviceServiceImpl implements IDeviceService { commander.deviceInfoQuery(device); sync(device); }else { - deviceMapper.update(device); - redisCatchStorage.updateDevice(device); + if(device.getOnline() == 0){ + device.setOnline(1); + device.setCreateTime(now); + logger.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId()); + deviceMapper.update(device); + redisCatchStorage.updateDevice(device); + commander.deviceInfoQuery(device); + sync(device); + }else { + deviceMapper.update(device); + redisCatchStorage.updateDevice(device); + } + } // 上线添加订阅 @@ -125,6 +136,8 @@ public class DeviceServiceImpl implements IDeviceService { device.setOnline(0); redisCatchStorage.updateDevice(device); deviceMapper.update(device); + //进行通道离线 + deviceChannelMapper.offlineByDeviceId(deviceId); // 离线释放所有ssrc List ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null); if (ssrcTransactions != null && ssrcTransactions.size() > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 653e39da..bcebb943 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -140,6 +140,9 @@ public interface DeviceChannelMapper { @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"}) void offline(String deviceId, String channelId); + @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId}"}) + void offlineByDeviceId(String deviceId); + @Update(value = {"UPDATE device_channel SET status=1 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"}) void online(String deviceId, String channelId); From e29d94c83f62960bb1d6dacac4c978debc85c5ef Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 22 Jul 2022 16:02:14 +0800 Subject: [PATCH 02/15] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8A=9F=E8=83=BD=EF=BC=8C=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E7=BA=A7=E8=81=94=E7=9B=AE=E5=BD=95=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/InviteRequestProcessor.java | 4 +- .../iot/vmp/service/IStreamPushService.java | 4 ++ .../service/impl/StreamPushServiceImpl.java | 32 ++++++++++ .../storager/dao/PlatformCatalogMapper.java | 2 +- .../impl/VideoManagerStorageImpl.java | 5 ++ .../iot/vmp/vmanager/bean/WVPResult.java | 1 + .../gb28181/platform/PlatformController.java | 20 ++++-- .../streamPush/StreamPushController.java | 30 +++++++++ web_src/src/components/PushVideoList.vue | 7 ++- web_src/src/components/channelList.vue | 3 +- web_src/src/components/dialog/catalogEdit.vue | 2 +- .../{addStreamTOGB.vue => pushStreamEdit.vue} | 61 +++++++++++++------ 12 files changed, 139 insertions(+), 32 deletions(-) rename web_src/src/components/dialog/{addStreamTOGB.vue => pushStreamEdit.vue} (77%) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 19908e49..01837483 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -419,7 +419,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } else if (gbStream != null) { - if (streamPushItem.isStatus()) { + if (streamPushItem != null && streamPushItem.isStatus()) { // 在线状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); @@ -428,9 +428,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } - } - } } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 5dd45ef9..b95ec486 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -96,4 +96,8 @@ public interface IStreamPushService { */ void online(List onlineStreams); + /** + * 增加推流 + */ + boolean add(StreamPushItem stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 62cf20fa..8fa04094 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -22,7 +22,10 @@ import com.github.pagehelper.PageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; import org.springframework.util.StringUtils; import java.util.*; @@ -69,6 +72,12 @@ public class StreamPushServiceImpl implements IStreamPushService { @Autowired private IMediaServerService mediaServerService; + @Autowired + DataSourceTransactionManager dataSourceTransactionManager; + + @Autowired + TransactionDefinition transactionDefinition; + @Override public List handleJSON(String jsonData, MediaServerItem mediaServerItem) { if (jsonData == null) { @@ -463,4 +472,27 @@ public class StreamPushServiceImpl implements IStreamPushService { // 发送通知 eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); } + + @Override + public boolean add(StreamPushItem stream) { + stream.setUpdateTime(DateUtil.getNow()); + stream.setCreateTime(DateUtil.getNow()); + stream.setServerId(userSetting.getServerId()); + + // 放在事务内执行 + boolean result = false; + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + try { + int addStreamResult = streamPushMapper.add(stream); + if (!StringUtils.isEmpty(stream.getGbId())) { + gbStreamMapper.add(stream); + } + dataSourceTransactionManager.commit(transactionStatus); + result = true; + }catch (Exception e) { + logger.error("批量移除流与平台的关系时错误", e); + dataSourceTransactionManager.rollback(transactionStatus); + } + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java index 4ed0f32f..ae16379f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java @@ -50,7 +50,7 @@ public interface PlatformCatalogMapper { @Select("SELECT pc.* FROM platform_catalog pc WHERE pc.id = #{id}") PlatformCatalog selectParentCatalog(String id); - @Select("SELECT pc.id as channelId, pc.name, pc.civilCode, pc.businessGroupId,'0' as parental, pc.parentId " + + @Select("SELECT pc.id as channelId, pc.name, pc.civilCode, pc.businessGroupId,'1' as parental, pc.parentId " + " FROM platform_catalog pc WHERE pc.platformId=#{platformId}") List queryCatalogInPlatform(String platformId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index c18c5d2e..8465d806 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -741,6 +741,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) { // 第一层节点 platformCatalog.setBusinessGroupId(platformCatalog.getId()); + platformCatalog.setParentId(platform.getDeviceGBId()); }else { // 获取顶层的 PlatformCatalog topCatalog = getTopCatalog(platformCatalog.getParentId(), platformCatalog.getPlatformId()); @@ -749,6 +750,10 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { } if (platform.getTreeType().equals(TreeType.CIVIL_CODE)) { platformCatalog.setCivilCode(platformCatalog.getId()); + if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) { + // 第一层节点 + platformCatalog.setParentId(platform.getDeviceGBId()); + } } int result = catalogMapper.add(platformCatalog); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/WVPResult.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/WVPResult.java index 91ed22c2..db937d68 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/WVPResult.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/WVPResult.java @@ -12,6 +12,7 @@ public class WVPResult { this.data = data; } + private int code; private String msg; private T data; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java index 8d2278e6..7a226934 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; +import com.genersoft.iot.vmp.gb28181.bean.TreeType; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.IPlatformChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -463,13 +464,20 @@ public class PlatformController { if (logger.isDebugEnabled()) { logger.debug("查询目录,platformId: {}, parentId: {}", platformId, parentId); } + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + if (platform == null) { + return new ResponseEntity<>(new WVPResult<>(400, "平台未找到", null), HttpStatus.OK); + } + if (platformId.equals(parentId)) { + parentId = platform.getDeviceGBId(); + } List platformCatalogList = storager.getChildrenCatalogByPlatform(platformId, parentId); - // 查询下属的国标通道 -// List catalogsForChannel = storager.queryChannelInParentPlatformAndCatalog(platformId, parentId); - // 查询下属的直播流通道 -// List catalogsForStream = storager.queryStreamInParentPlatformAndCatalog(platformId, parentId); -// platformCatalogList.addAll(catalogsForChannel); -// platformCatalogList.addAll(catalogsForStream); +// if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)) { +// platformCatalogList = storager.getChildrenCatalogByPlatform(platformId, parentId); +// }else { +// +// } + WVPResult> result = new WVPResult<>(); result.setCode(0); result.setMsg("success"); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 300f9521..1f0884ff 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -284,5 +284,35 @@ public class StreamPushController { return result; } + /** + * 获取推流播放地址 + * @param stream 推流信息 + * @return + */ + @ApiOperation("获取推流播放地址") + @ApiImplicitParams({ + @ApiImplicitParam(name = "stream", value = "推流信息", dataTypeClass = StreamPushItem.class), + }) + @PostMapping(value = "/add") + @ResponseBody + public WVPResult add(@RequestBody StreamPushItem stream){ + if (StringUtils.isEmpty(stream.getGbId())) { + return new WVPResult<>(400, "国标ID不可为空", null); + } + if (StringUtils.isEmpty(stream.getApp()) && StringUtils.isEmpty(stream.getStream())) { + return new WVPResult<>(400, "app或stream不可为空", null); + } + stream.setStatus(false); + stream.setPushIng(false); + stream.setAliveSecond(0L); + stream.setTotalReaderCount("0"); + boolean result = streamPushService.add(stream); + + if (result) { + return new WVPResult<>(0, "success", null); + }else { + return new WVPResult<>(-1, "fail", null); + } + } } diff --git a/web_src/src/components/PushVideoList.vue b/web_src/src/components/PushVideoList.vue index 58ce8a21..6a415f77 100644 --- a/web_src/src/components/PushVideoList.vue +++ b/web_src/src/components/PushVideoList.vue @@ -34,6 +34,8 @@ 批量移除 + 添加通道 + @@ -108,7 +110,7 @@ "}) int update(StreamPushItem streamPushItem); @@ -119,7 +121,7 @@ public interface StreamPushMapper { @Update("UPDATE stream_push " + "SET pushIng=${pushIng} " + "WHERE app=#{app} AND stream=#{stream}") - int updatePushStatus(String app, String stream, boolean status); + int updatePushStatus(String app, String stream, boolean pushIng); @Update("UPDATE stream_push " + "SET status=#{status} " + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index b8b97cee..5b2e515f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -479,7 +479,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendStreamChangeMsg(String type, JSONObject jsonObject) { String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type; - logger.debug("[redis 流变化事件] {}: {}", key, jsonObject.toString()); + logger.info("[redis 流变化事件] {}: {}", key, jsonObject.toString()); redis.convertAndSend(key, jsonObject); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 8465d806..6cdd1082 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -635,47 +635,11 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return streamProxyMapper.selectOne(app, stream); } - @Override - public void updateMediaList(List streamPushItems) { - if (streamPushItems == null || streamPushItems.size() == 0) { - return; - } - logger.info("updateMediaList: " + streamPushItems.size()); - streamPushMapper.addAll(streamPushItems); - // TODO 待优化 - for (int i = 0; i < streamPushItems.size(); i++) { - int onlineResult = mediaOnline(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream()); - if (onlineResult > 0) { - // 发送上线通知 - eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON); - } - } - } - - - - @Override - public void updateMedia(StreamPushItem streamPushItem) { - streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream()); - streamPushMapper.add(streamPushItem); - mediaOffline(streamPushItem.getApp(), streamPushItem.getStream()); - } - @Override public int removeMedia(String app, String stream) { return streamPushMapper.del(app, stream); } - @Override - public StreamPushItem getMedia(String app, String stream) { - return streamPushMapper.selectOne(app, stream); - } - - @Override - public void clearMediaList() { - streamPushMapper.clear(); - } - @Override public int mediaOffline(String app, String stream) { GbStream gbStream = gbStreamMapper.selectOne(app, stream); @@ -683,7 +647,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { if ("proxy".equals(gbStream.getStreamType())) { result = streamProxyMapper.updateStatus(app, stream, false); }else { - result = streamPushMapper.updateStatus(app, stream, false); + result = streamPushMapper.updatePushStatus(app, stream, false); } return result; } @@ -695,7 +659,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { if ("proxy".equals(gbStream.getStreamType())) { result = streamProxyMapper.updateStatus(app, stream, true); }else { - result = streamPushMapper.updateStatus(app, stream, true); + result = streamPushMapper.updatePushStatus(app, stream, true); } return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 1f0884ff..6c111693 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -4,6 +4,7 @@ import com.alibaba.excel.EasyExcel; import com.alibaba.excel.ExcelReader; import com.alibaba.excel.read.metadata.ReadSheet; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.security.SecurityUtils; import com.genersoft.iot.vmp.conf.security.dto.LoginUser; import com.genersoft.iot.vmp.gb28181.bean.GbStream; @@ -63,6 +64,9 @@ public class StreamPushController { @Autowired private IMediaService mediaService; + @Autowired + private UserSetting userSetting; + @ApiOperation("推流列表查询") @ApiImplicitParams({ @ApiImplicitParam(name="page", value = "当前页", required = true, dataTypeClass = Integer.class), @@ -269,18 +273,23 @@ public class StreamPushController { if (userInfo!= null) { authority = true; } - - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); - WVPResult result = new WVPResult<>(); + StreamPushItem push = streamPushService.getPush(app, stream); + if (!userSetting.getServerId().equals(push.getServerId()) ) { + result.setCode(-1); + result.setMsg("来自其他平台的推流信息"); + return result; + } + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); if (streamInfo != null){ result.setCode(0); result.setMsg("scccess"); result.setData(streamInfo); }else { result.setCode(-1); - result.setMsg("fail"); + result.setMsg("获取播放地址失败"); } + return result; } diff --git a/web_src/src/components/PushVideoList.vue b/web_src/src/components/PushVideoList.vue index 6a415f77..866c8084 100644 --- a/web_src/src/components/PushVideoList.vue +++ b/web_src/src/components/PushVideoList.vue @@ -69,7 +69,7 @@ @@ -202,10 +202,15 @@ export default { } }).then(function (res) { that.getListLoading = false; - that.$refs.devicePlayer.openDialog("streamPlay", null, null, { - streamInfo: res.data.data, - hasAudio: true - }); + if (res.data.code === 0 ) { + that.$refs.devicePlayer.openDialog("streamPlay", null, null, { + streamInfo: res.data.data, + hasAudio: true + }); + }else { + that.$message.error(res.data.msg); + } + }).catch(function (error) { console.error(error); that.getListLoading = false; From b957ab61c7d7f54716f81f4cd9474238fd110e1d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 26 Jul 2022 11:43:56 +0800 Subject: [PATCH 06/15] =?UTF-8?q?bug=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/MobilePositionSubscribeTask.java | 6 +- .../request/impl/InviteRequestProcessor.java | 18 ++--- .../request/impl/NotifyRequestProcessor.java | 12 ++-- .../notify/cmd/AlarmNotifyMessageHandler.java | 2 +- .../cmd/CatalogResponseMessageHandler.java | 4 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 2 + .../vmp/media/zlm/ZLMMediaListManager.java | 67 +++++++++---------- .../vmp/media/zlm/ZLMRTPServerFactory.java | 6 +- .../service/impl/RedisStreamMsgListener.java | 9 +-- .../streamPush/StreamPushController.java | 9 ++- web_src/src/components/PushVideoList.vue | 2 +- web_src/src/components/dialog/catalogEdit.vue | 5 +- 12 files changed, 71 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index cf1f7cf6..e43e59b9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -39,9 +39,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { dynamicTask.stop(taskKey); } sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> { -// if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { -// dialog = eventResult.dialog; -// } + if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { + dialog = eventResult.dialog; + } ResponseEvent event = (ResponseEvent) eventResult.event; if (event.getResponse().getRawContent() != null) { // 成功 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 01837483..fda3bff5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -419,12 +419,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } else if (gbStream != null) { - if (streamPushItem != null && streamPushItem.isStatus()) { - // 在线状态 + if (streamPushItem != null && streamPushItem.isPushIng()) { + // 推流状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } else { - // 不在线 拉起 + // 未推流 拉起 notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } @@ -451,7 +451,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { // 推流 - if (streamPushItem.getServerId().equals(userSetting.getServerId())) { + if (streamPushItem.isSelf()) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (streamReady) { // 自平台内容 @@ -500,7 +500,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { if ("proxy".equals(gbStream.getStreamType())) { // TODO 控制启用以使设备上线 - logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { @@ -508,7 +508,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } // 发送redis消息以使设备上线 - logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), @@ -518,7 +518,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); try { - mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException e) { e.printStackTrace(); @@ -533,7 +533,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements Boolean finalTcpActive = tcpActive; // 添加在本机上线的通知 - mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> { + mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, @@ -621,7 +621,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 离线 // 查询是否在本机上线了 StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isStatus()) { + if (currentStreamPushItem.isPushIng()) { // 在线状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index c8a221bd..e191578a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -350,17 +350,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements switch (event) { case CatalogEvent.ON: // 上线 - logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOnline(deviceId, channel.getChannelId()); break; case CatalogEvent.OFF : // 离线 - logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); break; case CatalogEvent.VLOST: // 视频丢失 - logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); break; case CatalogEvent.DEFECT: @@ -368,17 +368,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements break; case CatalogEvent.ADD: // 增加 - logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); deviceChannelService.updateChannel(deviceId, channel); break; case CatalogEvent.DEL: // 删除 - logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.delChannel(deviceId, channel.getChannelId()); break; case CatalogEvent.UPDATE: // 更新 - logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); deviceChannelService.updateChannel(deviceId, channel); break; default: diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 265694ae..20316e7e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -69,7 +69,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - logger.info("收到来自设备[{}]的报警通知", device.getDeviceId()); + logger.info("[收到报警通知]设备:{}", device.getDeviceId()); // 回复200 OK try { responseAck(evt, Response.OK); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index f66d4f8b..ff71a922 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -111,7 +111,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp int sumNum = Integer.parseInt(sumNumElement.getText()); if (sumNum == 0) { - logger.info("收到来自设备【{}】的通道: 0个", take.getDevice().getDeviceId()); + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); // 数据已经完整接收 storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); @@ -133,7 +133,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } int sn = Integer.parseInt(snElement.getText()); catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); - logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { // 数据已经完整接收 boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 1f462314..11dd8179 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -240,6 +240,8 @@ public class ZLMHttpHookListener { if (mediaInfo != null) { assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null); } + }else { + zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId()); } ret.put("code", 0); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 1b1fb9f4..f78ca4a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; @@ -63,7 +64,13 @@ public class ZLMMediaListManager { @Autowired private UserSetting userSetting; - private Map channelOnlineEvents = new ConcurrentHashMap<>(); + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + private Map channelOnPublishEvents = new ConcurrentHashMap<>(); public StreamPushItem addPush(MediaItem mediaItem) { // 查找此直播流是否存在redis预设gbId @@ -79,9 +86,26 @@ public class ZLMMediaListManager { }else { streamPushMapper.update(transform); } + if (transform != null) { + if (getChannelOnlineEventLister(transform.getApp(), transform.getStream()) != null) { + getChannelOnlineEventLister(transform.getApp(), transform.getStream()).run(transform.getApp(), transform.getStream(), transform.getServerId()); + removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); + } + } return transform; } + public void sendStreamEvent(String app, String stream, String mediaServerId) { + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + // 查看推流状态 + if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { + if (getChannelOnlineEventLister(app, stream) != null) { + getChannelOnlineEventLister(app, stream).run(app, stream, mediaServerId); + removedChannelOnlineEventLister(app, stream); + } + } + } + public int removeMedia(String app, String streamId) { // 查找是否关联了国标, 关联了不删除, 置为离线 GbStream gbStream = gbStreamMapper.selectOne(app, streamId); @@ -89,48 +113,21 @@ public class ZLMMediaListManager { if (gbStream == null) { result = storager.removeMedia(app, streamId); }else { - // TODO 暂不设置为离线 result =storager.mediaOffline(app, streamId); } return result; } - public void addChannelOnlineEventLister(String key, ChannelOnlineEvent callback) { - this.channelOnlineEvents.put(key,callback); + public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { + this.channelOnPublishEvents.put(app + "_" + stream, callback); } - public void removedChannelOnlineEventLister(String key) { - this.channelOnlineEvents.remove(key); + public void removedChannelOnlineEventLister(String app, String stream) { + this.channelOnPublishEvents.remove(app + "_" + stream); } + public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { + return this.channelOnPublishEvents.get(app + "_" + stream); + } - -// public void clearAllSessions() { -// logger.info("清空所有国标相关的session"); -// JSONObject allSessionJSON = zlmresTfulUtils.getAllSession(); -// ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); -// HashSet allLocalPorts = new HashSet(); -// if (allSessionJSON.getInteger("code") == 0) { -// JSONArray data = allSessionJSON.getJSONArray("data"); -// if (data.size() > 0) { -// for (int i = 0; i < data.size(); i++) { -// JSONObject sessionJOSN = data.getJSONObject(i); -// Integer local_port = sessionJOSN.getInteger("local_port"); -// if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){ -// allLocalPorts.add(sessionJOSN.getInteger("local_port") + ""); -// } -// } -// } -// } -// if (allLocalPorts.size() > 0) { -// List result = new ArrayList<>(allLocalPorts); -// String localPortSStr = String.join(",", result); -// zlmresTfulUtils.kickSessions(localPortSStr); -// } -// } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index cbef9ce2..1fe00e44 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -273,8 +273,10 @@ public class ZLMRTPServerFactory { * 查询待转推的流是否就绪 */ public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId); - return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); + JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId); + return (mediaInfo.getInteger("code") == 0 + && mediaInfo.getJSONArray("data") != null + && mediaInfo.getJSONArray("data").size() > 0); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java index 72b3c4e2..c5592f49 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java @@ -72,9 +72,10 @@ public class RedisStreamMsgListener implements MessageListener { mediaItem.setOriginType(0); mediaItem.setOriginTypeStr("0"); mediaItem.setOriginTypeStr("unknown"); - - zlmMediaListManager.addPush(mediaItem); - - + if (register) { + zlmMediaListManager.addPush(mediaItem); + }else { + zlmMediaListManager.removeMedia(app, stream); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 6c111693..c978ae67 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -264,9 +264,8 @@ public class StreamPushController { }) @GetMapping(value = "/getPlayUrl") @ResponseBody - public WVPResult getPlayUrl(HttpServletRequest request, @RequestParam String app, - @RequestParam String stream, - @RequestParam(required = false) String mediaServerId){ + public WVPResult getPlayUrl(@RequestParam String app,@RequestParam String stream, + @RequestParam(required = false) String mediaServerId){ boolean authority = false; // 是否登陆用户, 登陆用户返回完整信息 LoginUser userInfo = SecurityUtils.getUserInfo(); @@ -275,7 +274,7 @@ public class StreamPushController { } WVPResult result = new WVPResult<>(); StreamPushItem push = streamPushService.getPush(app, stream); - if (!userSetting.getServerId().equals(push.getServerId()) ) { + if (push != null && !push.isSelf()) { result.setCode(-1); result.setMsg("来自其他平台的推流信息"); return result; @@ -283,7 +282,7 @@ public class StreamPushController { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); if (streamInfo != null){ result.setCode(0); - result.setMsg("scccess"); + result.setMsg("success"); result.setData(streamInfo); }else { result.setCode(-1); diff --git a/web_src/src/components/PushVideoList.vue b/web_src/src/components/PushVideoList.vue index 866c8084..6b4707e3 100644 --- a/web_src/src/components/PushVideoList.vue +++ b/web_src/src/components/PushVideoList.vue @@ -76,7 +76,7 @@