Merge branch 'wvp-28181-2.0' into wvp-28181-2.0

This commit is contained in:
mrjackwang 2022-03-07 09:01:14 +08:00 committed by GitHub
commit 21506440d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 14921 additions and 305 deletions

View File

@ -44,7 +44,7 @@ CREATE TABLE `device` (
`charset` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, `charset` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `device_deviceId_uindex` (`deviceId`) UNIQUE KEY `device_deviceId_uindex` (`deviceId`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; ) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -129,7 +129,7 @@ CREATE TABLE `device_channel` (
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `device_channel_id_uindex` (`id`), UNIQUE KEY `device_channel_id_uindex` (`id`),
UNIQUE KEY `device_channel_pk` (`channelId`,`deviceId`) UNIQUE KEY `device_channel_pk` (`channelId`,`deviceId`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; ) ENGINE=InnoDB AUTO_INCREMENT=46 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -198,7 +198,7 @@ CREATE TABLE `gb_stream` (
PRIMARY KEY (`gbStreamId`) USING BTREE, PRIMARY KEY (`gbStreamId`) USING BTREE,
UNIQUE KEY `app` (`app`,`stream`) USING BTREE, UNIQUE KEY `app` (`app`,`stream`) USING BTREE,
UNIQUE KEY `gbId` (`gbId`) USING BTREE UNIQUE KEY `gbId` (`gbId`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=375 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; ) ENGINE=InnoDB AUTO_INCREMENT=300766 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -228,7 +228,7 @@ CREATE TABLE `log` (
`username` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `username` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
PRIMARY KEY (`id`) USING BTREE PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=313 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; ) ENGINE=InnoDB AUTO_INCREMENT=962 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -317,7 +317,7 @@ CREATE TABLE `parent_platform` (
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `parent_platform_id_uindex` (`id`), UNIQUE KEY `parent_platform_id_uindex` (`id`),
UNIQUE KEY `parent_platform_pk` (`serverGBId`) UNIQUE KEY `parent_platform_pk` (`serverGBId`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; ) ENGINE=InnoDB AUTO_INCREMENT=23 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -367,7 +367,7 @@ CREATE TABLE `platform_gb_channel` (
`catalogId` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, `catalogId` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`deviceChannelId` int NOT NULL, `deviceChannelId` int NOT NULL,
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; ) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -393,7 +393,7 @@ CREATE TABLE `platform_gb_stream` (
`id` int NOT NULL AUTO_INCREMENT, `id` int NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `platform_gb_stream_pk` (`platformId`,`catalogId`,`gbStreamId`) UNIQUE KEY `platform_gb_stream_pk` (`platformId`,`catalogId`,`gbStreamId`)
) ENGINE=InnoDB AUTO_INCREMENT=406 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; ) ENGINE=InnoDB AUTO_INCREMENT=301207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -415,7 +415,6 @@ DROP TABLE IF EXISTS `stream_proxy`;
CREATE TABLE `stream_proxy` ( CREATE TABLE `stream_proxy` (
`id` int NOT NULL AUTO_INCREMENT, `id` int NOT NULL AUTO_INCREMENT,
`type` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, `type` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL,
`app` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `app` varchar(255) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL,
`stream` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `stream` varchar(255) COLLATE utf8mb4_general_ci NOT NULL,
@ -432,9 +431,10 @@ CREATE TABLE `stream_proxy` (
`status` bit(1) NOT NULL, `status` bit(1) NOT NULL,
`enable_remove_none_reader` bit(1) NOT NULL, `enable_remove_none_reader` bit(1) NOT NULL,
`createTime` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, `createTime` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `stream_proxy_pk` (`app`,`stream`) UNIQUE KEY `stream_proxy_pk` (`app`,`stream`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -465,7 +465,7 @@ CREATE TABLE `stream_push` (
`mediaServerId` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, `mediaServerId` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `stream_push_pk` (`app`,`stream`) UNIQUE KEY `stream_push_pk` (`app`,`stream`)
) ENGINE=InnoDB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; ) ENGINE=InnoDB AUTO_INCREMENT=300799 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
-- --
@ -502,7 +502,7 @@ CREATE TABLE `user` (
LOCK TABLES `user` WRITE; LOCK TABLES `user` WRITE;
/*!40000 ALTER TABLE `user` DISABLE KEYS */; /*!40000 ALTER TABLE `user` DISABLE KEYS */;
INSERT INTO `user` VALUES (1,'admin','21232f297a57a5a743894a0e4a801fc3',1,'2021-04-13 14:14:57','2021-04-13 14:14:57'); INSERT INTO `user` VALUES (1,'admin','21232f297a57a5a743894a0e4a801fc3',1,'2021 - 04 - 13 14:14:57','2021 - 04 - 13 14:14:57');
/*!40000 ALTER TABLE `user` ENABLE KEYS */; /*!40000 ALTER TABLE `user` ENABLE KEYS */;
UNLOCK TABLES; UNLOCK TABLES;
@ -542,4 +542,4 @@ UNLOCK TABLES;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2022-02-25 20:32:21 -- Dump completed on 2022-03-07 8:26:30

View File

@ -5,6 +5,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -25,15 +26,38 @@ public class DynamicTask {
return new ThreadPoolTaskScheduler(); return new ThreadPoolTaskScheduler();
} }
/**
* 循环执行的任务
* @param key 任务ID
* @param task 任务
* @param cycleForCatalog 间隔
* @return
*/
public String startCron(String key, Runnable task, int cycleForCatalog) { public String startCron(String key, Runnable task, int cycleForCatalog) {
stopCron(key); stop(key);
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period cycleForCatalog表示执行的间隔 // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period cycleForCatalog表示执行的间隔
ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L); ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L);
futureMap.put(key, future); futureMap.put(key, future);
return "startCron"; return "startCron";
} }
public void stopCron(String key) { /**
* 延时任务
* @param key 任务ID
* @param task 任务
* @param delay 延时 /
* @return
*/
public String startDelay(String key, Runnable task, int delay) {
stop(key);
Date starTime = new Date(System.currentTimeMillis() + delay * 1000);
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period cycleForCatalog表示执行的间隔
ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime);
futureMap.put(key, future);
return "startCron";
}
public void stop(String key) {
if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
futureMap.get(key).cancel(true); futureMap.get(key).cancel(true);
} }

View File

@ -59,8 +59,11 @@ public class SipPlatformRunner implements CommandLineRunner {
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
// 取消订阅 // 取消订阅
sipCommanderForPlatform.unregister(parentPlatform, null, null); sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
Thread.sleep(500); ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId());
sipCommanderForPlatform.register(platform, null, null);
});
// 发送平台未注册消息 // 发送平台未注册消息
publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId());
} }

View File

@ -19,8 +19,6 @@ public class UserSetup {
private Long playTimeout = 18000L; private Long playTimeout = 18000L;
private Boolean waitTrack = Boolean.FALSE;
private Boolean interfaceAuthentication = Boolean.TRUE; private Boolean interfaceAuthentication = Boolean.TRUE;
private Boolean recordPushLive = Boolean.TRUE; private Boolean recordPushLive = Boolean.TRUE;
@ -57,10 +55,6 @@ public class UserSetup {
return playTimeout; return playTimeout;
} }
public Boolean isWaitTrack() {
return waitTrack;
}
public Boolean isInterfaceAuthentication() { public Boolean isInterfaceAuthentication() {
return interfaceAuthentication; return interfaceAuthentication;
} }
@ -89,10 +83,6 @@ public class UserSetup {
this.playTimeout = playTimeout; this.playTimeout = playTimeout;
} }
public void setWaitTrack(Boolean waitTrack) {
this.waitTrack = waitTrack;
}
public void setInterfaceAuthentication(boolean interfaceAuthentication) { public void setInterfaceAuthentication(boolean interfaceAuthentication) {
this.interfaceAuthentication = interfaceAuthentication; this.interfaceAuthentication = interfaceAuthentication;
} }

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.auth; package com.genersoft.iot.vmp.gb28181.auth;
import com.genersoft.iot.vmp.storager.impl.VideoManagerStoragerImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -20,13 +21,24 @@ public class RegisterLogicHandler {
@Autowired @Autowired
private SIPCommander cmder; private SIPCommander cmder;
@Autowired
private VideoManagerStoragerImpl storager;
public void onRegister(Device device) { public void onRegister(Device device) {
// 只有第一次注册时调用查询设备信息如需更新调用更新API接口 // 只有第一次注册时调用查询设备信息如需更新调用更新API接口
// TODO 此处错误无法获取到通道
Device device1 = storager.queryVideoDevice(device.getDeviceId());
if (device.isFirsRegister()) { if (device.isFirsRegister()) {
logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
cmder.deviceInfoQuery(device); try {
cmder.catalogQuery(device, null); Thread.sleep(100);
cmder.deviceInfoQuery(device);
Thread.sleep(100);
cmder.catalogQuery(device, null);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
} }
} }

View File

@ -81,6 +81,10 @@ public class SendRtpItem {
*/ */
private boolean isPlay; private boolean isPlay;
private byte[] transaction;
private byte[] dialog;
public String getIp() { public String getIp() {
return ip; return ip;
} }
@ -200,4 +204,20 @@ public class SendRtpItem {
public void setPlay(boolean play) { public void setPlay(boolean play) {
isPlay = play; isPlay = play;
} }
public byte[] getTransaction() {
return transaction;
}
public void setTransaction(byte[] transaction) {
this.transaction = transaction;
}
public byte[] getDialog() {
return dialog;
}
public void setDialog(byte[] dialog) {
this.dialog = dialog;
}
} }

View File

@ -2,7 +2,10 @@ package com.genersoft.iot.vmp.gb28181.event.offline;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -39,6 +42,9 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
@Autowired @Autowired
private SipSubscribe sipSubscribe; private SipSubscribe sipSubscribe;
@Autowired
private IVideoManagerStorager storager;
public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) { public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
super(listenerContainer, userSetup); super(listenerContainer, userSetup);
} }
@ -61,15 +67,22 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent
String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_"; String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_";
if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId);
publisher.platformKeepaliveExpireEventPublish(platformGBId); if (platform != null) {
publisher.platformKeepaliveExpireEventPublish(platformGBId);
}
}else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) { }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) {
String platformGBId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length()); String platformGBId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length());
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId);
publisher.platformRegisterCycleEventPublish(platformGBId); if (platform != null) {
publisher.platformRegisterCycleEventPublish(platformGBId);
}
}else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){ }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX); Device device = storager.queryVideoDevice(deviceId);
if (device != null) {
publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX);
}
}else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) { }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) {
String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length()); String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();

View File

@ -2,8 +2,13 @@ package com.genersoft.iot.vmp.gb28181.event.offline;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -32,6 +37,9 @@ public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
@Autowired @Autowired
private IVideoManagerStorager storager; private IVideoManagerStorager storager;
@Autowired
private VideoStreamSessionManager streamSession;
@Autowired @Autowired
private RedisUtil redis; private RedisUtil redis;
@ -42,6 +50,14 @@ public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
@Autowired @Autowired
private EventPublisher eventPublisher; private EventPublisher eventPublisher;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@Override @Override
public void onApplicationEvent(OfflineEvent event) { public void onApplicationEvent(OfflineEvent event) {
@ -73,5 +89,15 @@ public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
// TODO 离线取消订阅 // TODO 离线取消订阅
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null);
if (ssrcTransactions.size() > 0) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
streamSession.remove(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
}
}
} }
} }

View File

@ -75,7 +75,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener<Platf
stream.append(","); stream.append(",");
} }
stream.append(sendRtpItem.getStreamId()); stream.append(sendRtpItem.getStreamId());
redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId()); redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId(), null, null);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__"); param.put("vhost", "__defaultVhost__");
@ -84,9 +84,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener<Platf
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
} }
} }
Timer timer = new Timer(); Timer timer = new Timer();
SipSubscribe.Event okEvent = (responseEvent)->{ SipSubscribe.Event okEvent = (responseEvent)->{
timer.cancel(); timer.cancel();

View File

@ -4,8 +4,6 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -46,7 +44,7 @@ public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessage
String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_"; String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
// 取消定时任务 // 取消定时任务
dynamicTask.stopCron(expiredKey); dynamicTask.stop(expiredKey);
} }
} }
} }

View File

@ -86,6 +86,15 @@ public class VideoStreamSessionManager {
return dialog; return dialog;
} }
public SIPDialog getDialogByCallId(String deviceId, String channelId, String callID){
SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callID, null);
if (ssrcTransaction == null) return null;
byte[] dialogByteArray = ssrcTransaction.getDialog();
if (dialogByteArray == null) return null;
SIPDialog dialog = (SIPDialog)SerializeUtils.deSerialize(dialogByteArray);
return dialog;
}
public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){ public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){
if (StringUtils.isEmpty(callId)) callId ="*"; if (StringUtils.isEmpty(callId)) callId ="*";
if (StringUtils.isEmpty(stream)) stream ="*"; if (StringUtils.isEmpty(stream)) stream ="*";
@ -95,6 +104,21 @@ public class VideoStreamSessionManager {
return (SsrcTransaction)redisUtil.get((String) scanResult.get(0)); return (SsrcTransaction)redisUtil.get((String) scanResult.get(0));
} }
public List<SsrcTransaction> getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){
if (StringUtils.isEmpty(deviceId)) deviceId ="*";
if (StringUtils.isEmpty(channelId)) channelId ="*";
if (StringUtils.isEmpty(callId)) callId ="*";
if (StringUtils.isEmpty(stream)) stream ="*";
String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream;
List<Object> scanResult = redisUtil.scan(key);
if (scanResult.size() == 0) return null;
List<SsrcTransaction> result = new ArrayList<>();
for (Object keyObj : scanResult) {
result.add((SsrcTransaction)redisUtil.get((String) keyObj));
}
return result;
}
public String getMediaServerId(String deviceId, String channelId, String stream){ public String getMediaServerId(String deviceId, String channelId, String stream){
SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream);
if (ssrcTransaction == null) return null; if (ssrcTransaction == null) return null;

View File

@ -63,7 +63,5 @@ public class GPSSubscribeTask implements Runnable{
} }
} }
} }
} }
} }

View File

@ -96,4 +96,11 @@ public interface ISIPCommanderForPlatform {
* @param recordInfo 录像信息 * @param recordInfo 录像信息
*/ */
boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo); boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo);
/**
* 向发起点播的上级回复bye
* @param platform 平台信息
* @param callId callId
*/
void streamByeCmd(ParentPlatform platform, String callId);
} }

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
@ -85,6 +86,9 @@ public class SIPCommander implements ISIPCommander {
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired
private DynamicTask dynamicTask;
/** /**
* 云台方向放控制使用配置文件中的默认镜头移动速度 * 云台方向放控制使用配置文件中的默认镜头移动速度
@ -330,7 +334,8 @@ public class SIPCommander implements ISIPCommander {
* @param errorEvent sip错误订阅 * @param errorEvent sip错误订阅
*/ */
@Override @Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) {
String streamId = ssrcInfo.getStream(); String streamId = ssrcInfo.getStream();
try { try {
if (device == null) return; if (device == null) return;
@ -342,15 +347,13 @@ public class SIPCommander implements ISIPCommander {
subscribeKey.put("app", "rtp"); subscribeKey.put("app", "rtp");
subscribeKey.put("stream", streamId); subscribeKey.put("stream", streamId);
subscribeKey.put("regist", true); subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId()); subscribeKey.put("mediaServerId", mediaServerItem.getId());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{ (MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
if (event != null) { if (event != null) {
event.response(mediaServerItemInUse, json); event.response(mediaServerItemInUse, json);
} }
// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
}); });
// //
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
@ -419,7 +422,7 @@ public class SIPCommander implements ISIPCommander {
transmitRequest(device, request, (e -> { transmitRequest(device, request, (e -> {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
errorEvent.response(e); errorEvent.response(e);
}), e ->{ }), e ->{
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
@ -458,8 +461,6 @@ public class SIPCommander implements ISIPCommander {
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{ (MediaServerItem mediaServerItemInUse, JSONObject json)->{
System.out.println(344444);
if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
if (event != null) { if (event != null) {
event.response(mediaServerItemInUse, json); event.response(mediaServerItemInUse, json);
} }
@ -565,7 +566,6 @@ public class SIPCommander implements ISIPCommander {
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{ (MediaServerItem mediaServerItemInUse, JSONObject json)->{
if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return;
event.response(mediaServerItemInUse, json); event.response(mediaServerItemInUse, json);
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
}); });
@ -662,6 +662,7 @@ public class SIPCommander implements ISIPCommander {
@Override @Override
public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) { public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) {
try { try {
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream);
ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream);
if (transaction == null) { if (transaction == null) {
logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId); logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
@ -715,10 +716,9 @@ public class SIPCommander implements ISIPCommander {
dialog.sendRequest(clientTransaction); dialog.sendRequest(clientTransaction);
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callIdHeader.getCallId(), null);
if (ssrcTransaction != null) { if (ssrcTransaction != null) {
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream());
streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream());
} }
@ -1169,8 +1169,6 @@ public class SIPCommander implements ISIPCommander {
*/ */
@Override @Override
public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
// 清空通道
// storager.cleanChannelsForDevice(device.getDeviceId());
try { try {
StringBuffer catalogXml = new StringBuffer(200); StringBuffer catalogXml = new StringBuffer(200);
catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n"); catalogXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");

View File

@ -5,8 +5,16 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -18,10 +26,14 @@ import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.sip.*; import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.header.ViaHeader;
import javax.sip.header.WWWAuthenticateHeader; import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import java.lang.reflect.Field;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
@ -37,18 +49,24 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@Autowired
private IMediaServerService mediaServerService;
@Autowired @Autowired
private SipSubscribe sipSubscribe; private SipSubscribe sipSubscribe;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@Lazy @Lazy
@Autowired @Autowired
@Qualifier(value="tcpSipProvider") @Qualifier(value="tcpSipProvider")
private SipProvider tcpSipProvider; private SipProviderImpl tcpSipProvider;
@Lazy @Lazy
@Autowired @Autowired
@Qualifier(value="udpSipProvider") @Qualifier(value="udpSipProvider")
private SipProvider udpSipProvider; private SipProviderImpl udpSipProvider;
@Override @Override
public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
@ -57,13 +75,12 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override @Override
public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
parentPlatform.setExpires("0");
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
if (parentPlatformCatch != null) { if (parentPlatformCatch != null) {
parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
} }
parentPlatform.setExpires("0");
return register(parentPlatform, null, null, errorEvent, okEvent, false); return register(parentPlatform, null, null, errorEvent, okEvent, false);
} }
@ -543,4 +560,59 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
return true; return true;
} }
@Override
public void streamByeCmd(ParentPlatform platform, String callId) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
if (sendRtpItem != null) {
String mediaServerId = sendRtpItem.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId());
}
byte[] dialogByteArray = sendRtpItem.getDialog();
if (dialogByteArray != null) {
SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray);
SipStack sipStack = udpSipProvider.getSipStack();
SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
if (dialog != sipDialog) {
dialog = sipDialog;
} else {
try {
dialog.setSipProvider(udpSipProvider);
Field sipStackField = SIPDialog.class.getDeclaredField("sipStack");
sipStackField.setAccessible(true);
sipStackField.set(dialog, sipStack);
Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners");
eventListenersField.setAccessible(true);
eventListenersField.set(dialog, new HashSet<>());
byte[] transactionByteArray = sendRtpItem.getTransaction();
ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray);
Request byeRequest = dialog.createRequest(Request.BYE);
SipURI byeURI = (SipURI) byeRequest.getRequestURI();
SIPRequest request = (SIPRequest) clientTransaction.getRequest();
byeURI.setHost(request.getRemoteAddress().getHostName());
byeURI.setPort(request.getRemotePort());
if ("TCP".equals(platform.getTransport())) {
clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
} else if ("UDP".equals(platform.getTransport())) {
clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest);
}
dialog.sendRequest(clientTransaction);
} catch (SipException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
}
} }

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@ -22,6 +23,7 @@ import javax.sip.Dialog;
import javax.sip.DialogState; import javax.sip.DialogState;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress; import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader; import javax.sip.header.ToHeader;
@ -60,6 +62,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired @Autowired
private ZLMHttpHookSubscribe subscribe; private ZLMHttpHookSubscribe subscribe;
@Autowired
private DynamicTask dynamicTask;
/** /**
* 处理 ACK请求 * 处理 ACK请求
@ -68,13 +73,16 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
*/ */
@Override @Override
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
logger.info("ACK请求 {}", ((System.currentTimeMillis())));
Dialog dialog = evt.getDialog(); Dialog dialog = evt.getDialog();
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
if (dialog == null) return; if (dialog == null) return;
if (dialog.getState()== DialogState.CONFIRMED) { if (dialog.getState()== DialogState.CONFIRMED) {
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
logger.info("ACK请求 platformGbId->{}", platformGbId);
// 取消设置的超时任务
dynamicTask.stop(callIdHeader.getCallId());
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
String deviceId = sendRtpItem.getDeviceId(); String deviceId = sendRtpItem.getDeviceId();
StreamInfo streamInfo = null; StreamInfo streamInfo = null;
@ -83,15 +91,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
}else { }else {
streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId);
} }
System.out.println(JSON.toJSON(streamInfo));
if (streamInfo == null) { if (streamInfo == null) {
streamInfo = new StreamInfo(); streamInfo = new StreamInfo();
streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setApp(sendRtpItem.getApp());
streamInfo.setStream(sendRtpItem.getStreamId()); streamInfo.setStream(sendRtpItem.getStreamId());
} }
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
logger.info(platformGbId);
logger.info(channelId);
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__"); param.put("vhost","__defaultVhost__");
param.put("app",streamInfo.getApp()); param.put("app",streamInfo.getApp());
@ -100,42 +105,23 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
param.put("dst_url",sendRtpItem.getIp()); param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort()); param.put("dst_port", sendRtpItem.getPort());
param.put("is_udp", is_Udp); param.put("is_udp", is_Udp);
// 设备推流查询成功后才能转推
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { if (jsonObject.getInteger("code") != 0) {
// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream());
// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 监听流上线
// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); // 添加订阅
// } else { JSONObject subscribeKey = new JSONObject();
// // 对hook进行订阅 subscribeKey.put("app", "rtp");
// logger.info("等待设备推流[{}/{}].......", subscribeKey.put("stream", streamInfo.getStream());
// streamInfo.getApp(), streamInfo.getStreamId()); subscribeKey.put("regist", true);
// Timer timer = new Timer(); subscribeKey.put("schema", "rtmp");
// timer.schedule(new TimerTask() { subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId());
// @Override subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
// public void run() { (MediaServerItem mediaServerItemInUse, JSONObject json)->{
// logger.info("设备推流[{}/{}]超时,终止向上级推流", zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// finalStreamInfo.getApp() , finalStreamInfo.getStreamId()); });
// }
// }
// }, 30*1000L);
// // 添加订阅
// JSONObject subscribeKey = new JSONObject();
// subscribeKey.put("app", "rtp");
// subscribeKey.put("stream", streamInfo.getStreamId());
// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId());
// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey,
// (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
// timer.cancel();
// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
// });
// }
} }
} }
} }

View File

@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@ -13,6 +15,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -21,6 +25,7 @@ import org.springframework.stereotype.Component;
import javax.sip.*; import javax.sip.*;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress; import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader; import javax.sip.header.ToHeader;
@ -56,6 +61,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired @Autowired
private SIPProcessorObserver sipProcessorObserver; private SIPProcessorObserver sipProcessorObserver;
@Autowired
private VideoStreamSessionManager streamSession;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅 // 添加消息处理的订阅
@ -71,11 +79,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
try { try {
responseAck(evt, Response.OK); responseAck(evt, Response.OK);
Dialog dialog = evt.getDialog(); Dialog dialog = evt.getDialog();
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
if (dialog == null) return; if (dialog == null) return;
if (dialog.getState().equals(DialogState.TERMINATED)) { if (dialog.getState().equals(DialogState.TERMINATED)) {
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
logger.info("收到bye, [{}/{}]", platformGbId, channelId); logger.info("收到bye, [{}/{}]", platformGbId, channelId);
if (sendRtpItem != null){ if (sendRtpItem != null){
String streamId = sendRtpItem.getStreamId(); String streamId = sendRtpItem.getStreamId();
@ -87,35 +96,44 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("停止向上级推流:" + streamId); logger.info("停止向上级推流:" + streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount == 0) { if (totalReaderCount <= 0) {
logger.info(streamId + "无其它观看者,通知设备停止推流"); logger.info(streamId + "无其它观看者,通知设备停止推流");
cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId);
}else if (totalReaderCount == -1){
logger.warn(streamId + " 查找其它观看者失败");
} }
} }
// 可能是设备主动停止 // 可能是设备主动停止
Device device = storager.queryVideoDeviceByChannelId(platformGbId); Device device = storager.queryVideoDeviceByChannelId(platformGbId);
if (device != null) { if (device != null) {
storager.stopPlay(device.getDeviceId(), channelId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
if (sendRtpItem != null) { if (streamInfo != null) {
if (sendRtpItem.isPlay()) { redisCatchStorage.stopPlay(streamInfo);
if (streamInfo != null) {
redisCatchStorage.stopPlay(streamInfo);
}
}else {
if (streamInfo != null) {
redisCatchStorage.stopPlayback(streamInfo);
}
}
storager.stopPlay(device.getDeviceId(), channelId);
mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream()); mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream());
} }
SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
if (ssrcTransactionForPlay != null){
SIPDialog dialogForPlay = (SIPDialog) SerializeUtils.deSerialize(ssrcTransactionForPlay.getDialog());
if (dialogForPlay.getCallId().equals(callIdHeader.getCallId())){
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
}
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
}
}
SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
if (ssrcTransactionForPlayBack != null) {
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
}
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
}
} }
} }
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@ -21,6 +22,7 @@ import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.TimeField;
@ -68,6 +70,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@Autowired
private DynamicTask dynamicTask;
@Autowired @Autowired
private SIPCommander cmder; private SIPCommander cmder;
@ -257,11 +262,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setPlay("Play".equals(sessionName)); sendRtpItem.setPlay("Play".equals(sessionName));
byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
sendRtpItem.setDialog(dialogByteArray);
byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
sendRtpItem.setTransaction(transactionByteArray);
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
Device finalDevice = device;
MediaServerItem finalMediaServerItem = mediaServerItem;
Long finalStartTime = startTime; Long finalStartTime = startTime;
Long finalStopTime = stopTime; Long finalStopTime = stopTime;
ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{
@ -289,7 +296,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
content.append("f=\r\n"); content.append("f=\r\n");
try { try {
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(callIdHeader.getCallId(), ()->{
logger.info("Ack 等待超时");
mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc);
// 回复bye
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
}, 60);
responseSdpAck(evt, content.toString(), platform); responseSdpAck(evt, content.toString(), platform);
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {
@ -320,6 +335,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (result.getEvent() != null) { if (result.getEvent() != null) {
errorEvent.response(result.getEvent()); errorEvent.response(result.getEvent());
} }
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
try { try {
responseAck(evt, Response.REQUEST_TIMEOUT); responseAck(evt, Response.REQUEST_TIMEOUT);
} catch (SipException e) { } catch (SipException e) {
@ -343,7 +359,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId));
} }
sendRtpItem.setPlay(false); sendRtpItem.setPlay(false);
playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
});
}else { }else {
sendRtpItem.setStreamId(streamInfo.getStream()); sendRtpItem.setStreamId(streamInfo.getStream());
hookEvent.response(mediaServerItem, null); hookEvent.response(mediaServerItem, null);
@ -365,6 +383,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 写入redis 超时时回复 // 写入redis 超时时回复
sendRtpItem.setStatus(1); sendRtpItem.setStatus(1);
sendRtpItem.setCallId(callIdHeader.getCallId());
byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
sendRtpItem.setDialog(dialogByteArray);
byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
sendRtpItem.setTransaction(transactionByteArray);
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");

View File

@ -158,6 +158,10 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
device.setCharset("gb2312"); device.setCharset("gb2312");
device.setDeviceId(deviceId); device.setDeviceId(deviceId);
device.setFirsRegister(true); device.setFirsRegister(true);
}else {
if (device.getOnline() == 0) {
device.setFirsRegister(true);
}
} }
device.setIp(received); device.setIp(received);
device.setPort(rPort); device.setPort(rPort);
@ -187,7 +191,6 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
// 注册成功 // 注册成功
// 保存到redis // 保存到redis
// 下发catelog查询目录
if (registerFlag == 1 ) { if (registerFlag == 1 ) {
logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress); logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER); publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);

View File

@ -27,9 +27,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.ServerTransaction; import javax.sip.ServerTransaction;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ExpiresHeader; import javax.sip.header.ExpiresHeader;
import javax.sip.header.Header;
import javax.sip.header.ToHeader; import javax.sip.header.ToHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
@ -139,19 +137,17 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
if (subscribeInfo.getExpires() > 0) { if (subscribeInfo.getExpires() > 0) {
if (redisCatchStorage.getSubscribe(key) != null) { if (redisCatchStorage.getSubscribe(key) != null) {
dynamicTask.stopCron(key); dynamicTask.stop(key);
} }
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval)); dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval));
redisCatchStorage.updateSubscribe(key, subscribeInfo); redisCatchStorage.updateSubscribe(key, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) { }else if (subscribeInfo.getExpires() == 0) {
dynamicTask.stopCron(key); dynamicTask.stop(key);
redisCatchStorage.delSubscribe(key); redisCatchStorage.delSubscribe(key);
} }
try { try {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform);

View File

@ -85,19 +85,18 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
redisCatchStorage.delPlatformRegisterInfo(callId); redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus("注册".equals(action)); parentPlatform.setStatus("注册".equals(action));
// 取回Expires设置避免注销过程中被置为0 // 取回Expires设置避免注销过程中被置为0
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) {
String expires = parentPlatformTmp.getExpires(); ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
parentPlatform.setExpires(expires); String expires = parentPlatformTmp.getExpires();
parentPlatform.setId(parentPlatformTmp.getId()); parentPlatform.setExpires(expires);
parentPlatform.setId(parentPlatformTmp.getId());
redisCatchStorage.updatePlatformRegister(parentPlatform);
redisCatchStorage.updatePlatformKeepalive(parentPlatform);
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
}
storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
redisCatchStorage.updatePlatformRegister(parentPlatform);
redisCatchStorage.updatePlatformKeepalive(parentPlatform);
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
} }
} }

View File

@ -504,7 +504,7 @@ public class ZLMHttpHookListener {
} }
String mediaServerId = json.getString("mediaServerId"); String mediaServerId = json.getString("mediaServerId");
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (userSetup.isAutoApplyPlay() && mediaInfo != null) { if (userSetup.isAutoApplyPlay() && mediaInfo != null && mediaInfo.isRtpEnable()) {
String app = json.getString("app"); String app = json.getString("app");
String streamId = json.getString("stream"); String streamId = json.getString("stream");
if ("rtp".equals(app)) { if ("rtp".equals(app)) {
@ -514,28 +514,16 @@ public class ZLMHttpHookListener {
String channelId = s[1]; String channelId = s[1];
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
if (device != null) { if (device != null) {
UUID uuid = UUID.randomUUID(); playService.play(mediaInfo,deviceId, channelId, null, null, null);
SSRCInfo ssrcInfo;
String streamId2 = null;
if (mediaInfo.isRtpEnable()) {
streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
}
ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2);
cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
playService.onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid.toString());
}, null);
} }
} }
} }
} }
JSONObject ret = new JSONObject(); JSONObject ret = new JSONObject();
ret.put("code", 0); ret.put("code", 0);
ret.put("msg", "success"); ret.put("msg", "success");
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK); return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
} }
/** /**

View File

@ -205,7 +205,7 @@ public class ZLMRTPServerFactory {
/** /**
* 调用zlm RESTful API startSendRtp * 调用zlm RESTful API startSendRtp
*/ */
public Boolean startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) { public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Map<String, Object>param) {
Boolean result = false; Boolean result = false;
JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServerItem, param); JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServerItem, param);
if (jsonObject == null) { if (jsonObject == null) {
@ -216,7 +216,7 @@ public class ZLMRTPServerFactory {
} else { } else {
logger.error("RTP推流失败: " + jsonObject.getString("msg")); logger.error("RTP推流失败: " + jsonObject.getString("msg"));
} }
return result; return jsonObject;
} }
/** /**

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.event; package com.genersoft.iot.vmp.media.zlm.event;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.IStreamPushService;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -34,6 +35,9 @@ public class ZLMStatusEventListener {
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired
private IPlayService playService;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Async @Async
@ -55,6 +59,6 @@ public class ZLMStatusEventListener {
mediaServerService.zlmServerOffline(event.getMediaServerId()); mediaServerService.zlmServerOffline(event.getMediaServerId());
streamProxyService.zlmServerOffline(event.getMediaServerId()); streamProxyService.zlmServerOffline(event.getMediaServerId());
streamPushService.zlmServerOffline(event.getMediaServerId()); streamPushService.zlmServerOffline(event.getMediaServerId());
// TODO 处理对国标的影响 playService.zlmServerOffline(event.getMediaServerId());
} }
} }

View File

@ -58,7 +58,7 @@ public interface IMediaServerService {
void removeCount(String mediaServerId); void removeCount(String mediaServerId);
void releaseSsrc(MediaServerItem mediaServerItem, String ssrc); void releaseSsrc(String mediaServerItemId, String ssrc);
void clearMediaServerForOnline(); void clearMediaServerForOnline();

View File

@ -17,11 +17,13 @@ public interface IPlayService {
void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
MediaServerItem getNewMediaServerItem(Device device); MediaServerItem getNewMediaServerItem(Device device);
void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString);
DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack); DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack);
void zlmServerOffline(String mediaServerId);
} }

View File

@ -52,11 +52,9 @@ public class DeviceServiceImpl implements IDeviceService {
return false; return false;
} }
logger.info("移除目录订阅: {}", device.getDeviceId()); logger.info("移除目录订阅: {}", device.getDeviceId());
dynamicTask.stopCron(device.getDeviceId()); dynamicTask.stop(device.getDeviceId());
device.setSubscribeCycleForCatalog(0); device.setSubscribeCycleForCatalog(0);
sipCommander.catalogSubscribe(device, null, null); sipCommander.catalogSubscribe(device, null, null);
// 清空cseq计数
return true; return true;
} }
} }

View File

@ -167,13 +167,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (mediaServerItem != null) { if (mediaServerItem != null) {
String streamId = String.format("%s_%s", deviceId, channelId); String streamId = String.format("%s_%s", deviceId, channelId);
zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
releaseSsrc(mediaServerItem, ssrc); releaseSsrc(mediaServerItem.getId(), ssrc);
} }
streamSession.remove(deviceId, channelId, stream); streamSession.remove(deviceId, channelId, stream);
} }
@Override @Override
public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) { public void releaseSsrc(String mediaServerItemId, String ssrc) {
MediaServerItem mediaServerItem = getOne(mediaServerItemId);
if (mediaServerItem == null || ssrc == null) { if (mediaServerItem == null || ssrc == null) {
return; return;
} }

View File

@ -5,13 +5,13 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -37,8 +37,7 @@ import org.springframework.util.ResourceUtils;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.util.Objects; import java.util.*;
import java.util.UUID;
@SuppressWarnings(value = {"rawtypes", "unchecked"}) @SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service @Service
@ -52,6 +51,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired @Autowired
private SIPCommander cmder; private SIPCommander cmder;
@Autowired
private SIPCommanderFroPlatform sipCommanderFroPlatform;
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@ -78,7 +80,9 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) { public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback) {
PlayResult playResult = new PlayResult(); PlayResult playResult = new PlayResult();
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
@ -101,29 +105,10 @@ public class PlayServiceImpl implements IPlayService {
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device); playResult.setDevice(device);
// 超时处理
result.onTimeout(()->{
logger.warn(String.format("设备点播超时deviceId%s channelId%s", deviceId, channelId));
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, streamInfo.getStream());
if (dialog != null) {
wvpResult.setMsg("收流超时,请稍候重试");
}else {
wvpResult.setMsg("点播超时,请稍候重试");
}
msg.setData(wvpResult);
// 点播超时回复BYE
cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream());
// 释放rtpserver
mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, streamInfo.getStream());
// 回复之前所有的点播请求
resultHolder.invokeAllResult(msg);
// TODO 释放ssrc
});
result.onCompletion(()->{ result.onCompletion(()->{
// 点播结束时调用截图接口 // 点播结束时调用截图接口
// TODO 应该在上流时调用更好结束也可能是错误结束
try { try {
String classPath = ResourceUtils.getURL("classpath:").getPath(); String classPath = ResourceUtils.getURL("classpath:").getPath();
// 兼容打包为jar的class路径 // 兼容打包为jar的class路径
@ -161,31 +146,60 @@ public class PlayServiceImpl implements IPlayService {
if (mediaServerItem.isRtpEnable()) { if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId); streamId = String.format("%s_%s", device.getDeviceId(), channelId);
} }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
// 超时处理
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
logger.warn(String.format("设备点播超时deviceId%s channelId%s", deviceId, channelId));
if (timeoutCallback != null) {
timeoutCallback.run();
}
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
if (dialog != null) {
wvpResult.setMsg("收流超时,请稍候重试");
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
}else {
wvpResult.setMsg("点播超时,请稍候重试");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
msg.setData(wvpResult);
// 回复之前所有的点播请求
resultHolder.invokeAllResult(msg);
}
}, userSetup.getPlayTimeout());
// 发送点播消息 // 发送点播消息
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString()); logger.info("收到订阅消息: " + response.toJSONString());
timer.cancel();
onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid); onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid);
if (hookEvent != null) { if (hookEvent != null) {
hookEvent.response(mediaServerItem, response); hookEvent.response(mediaServerItem, response);
} }
}, (event) -> { }, (event) -> {
timer.cancel();
WVPResult wvpResult = new WVPResult(); WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1); wvpResult.setCode(-1);
// 点播返回sip错误 // 点播返回sip错误
mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
// 释放ssrc // 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
msg.setData(wvpResult); msg.setData(wvpResult);
resultHolder.invokeAllResult(msg); resultHolder.invokeAllResult(msg);
if (errorEvent != null) { if (errorEvent != null) {
errorEvent.response(event); errorEvent.response(event);
} }
}); });
} else { } else {
String streamId = streamInfo.getStream(); String streamId = streamInfo.getStream();
@ -222,13 +236,41 @@ public class PlayServiceImpl implements IPlayService {
streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); streamId2 = String.format("%s_%s", device.getDeviceId(), channelId);
} }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2);
// 超时处理
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
logger.warn(String.format("设备点播超时deviceId%s channelId%s", deviceId, channelId));
if (timeoutCallback != null) {
timeoutCallback.run();
}
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
if (dialog != null) {
wvpResult.setMsg("收流超时,请稍候重试");
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
}else {
wvpResult.setMsg("点播超时,请稍候重试");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
msg.setData(wvpResult);
// 回复之前所有的点播请求
resultHolder.invokeAllResult(msg);
}
}, userSetup.getPlayTimeout());
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString()); logger.info("收到订阅消息: " + response.toJSONString());
onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid);
}, (event) -> { }, (event) -> {
mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream());
// 释放ssrc // 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
WVPResult wvpResult = new WVPResult(); WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1); wvpResult.setCode(-1);
@ -306,14 +348,33 @@ public class PlayServiceImpl implements IPlayService {
msg.setId(uuid); msg.setId(uuid);
msg.setKey(key); msg.setKey(key);
PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>(); PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
result.onTimeout(()->{
msg.setData("回放超时"); Timer timer = new Timer();
playBackResult.setCode(-1); timer.schedule(new TimerTask() {
playBackResult.setData(msg); @Override
callback.call(playBackResult); public void run() {
}); logger.warn(String.format("设备回放超时deviceId%s channelId%s", deviceId, channelId));
playBackResult.setCode(-1);
playBackResult.setData(msg);
callback.call(playBackResult);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
if (dialog != null) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
}else {
mediaServerService.releaseSsrc(newMediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream());
// 回复之前所有的点播请求
callback.call(playBackResult);
}
}, userSetup.getPlayTimeout());
cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString()); logger.info("收到订阅消息: " + response.toJSONString());
timer.cancel();
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
if (streamInfo == null) { if (streamInfo == null) {
logger.warn("设备回放API调用失败"); logger.warn("设备回放API调用失败");
@ -331,6 +392,7 @@ public class PlayServiceImpl implements IPlayService {
playBackResult.setResponse(response); playBackResult.setResponse(response);
callback.call(playBackResult); callback.call(playBackResult);
}, event -> { }, event -> {
timer.cancel();
msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
playBackResult.setCode(-1); playBackResult.setCode(-1);
playBackResult.setData(msg); playBackResult.setData(msg);
@ -370,4 +432,26 @@ public class PlayServiceImpl implements IPlayService {
return streamInfo; return streamInfo;
} }
@Override
public void zlmServerOffline(String mediaServerId) {
// 处理正在向上推流的上级平台
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
}
}
}
// 处理正在观看的国标设备
List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
if (allSsrc.size() > 0) {
for (SsrcTransaction ssrcTransaction : allSsrc) {
if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
}
}
}
}
} }

View File

@ -89,7 +89,7 @@ public interface IRedisCatchStorage {
* @param channelId * @param channelId
* @return sendRtpItem * @return sendRtpItem
*/ */
SendRtpItem querySendRTPServer(String platformGbId, String channelId); SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId);
List<SendRtpItem> querySendRTPServer(String platformGbId); List<SendRtpItem> querySendRTPServer(String platformGbId);
@ -98,7 +98,7 @@ public interface IRedisCatchStorage {
* @param platformGbId * @param platformGbId
* @param channelId * @param channelId
*/ */
void deleteSendRTPServer(String platformGbId, String channelId); void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId);
/** /**
* 查询某个通道是否存在上级点播RTP推送 * 查询某个通道是否存在上级点播RTP推送

View File

@ -135,6 +135,32 @@ public interface DeviceChannelMapper {
"'${item.ipAddress}', ${item.port}, '${item.password}', ${item.PTZType}, ${item.status}, " + "'${item.ipAddress}', ${item.port}, '${item.password}', ${item.PTZType}, ${item.status}, " +
"'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" + "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" +
"</foreach> " + "</foreach> " +
"ON DUPLICATE KEY UPDATE " +
"updateTime=VALUES(updateTime), " +
"name=VALUES(name), " +
"manufacture=VALUES(manufacture), " +
"model=VALUES(model), " +
"owner=VALUES(owner), " +
"civilCode=VALUES(civilCode), " +
"block=VALUES(block), " +
"subCount=VALUES(subCount), " +
"address=VALUES(address), " +
"parental=VALUES(parental), " +
"parentId=VALUES(parentId), " +
"safetyWay=VALUES(safetyWay), " +
"registerWay=VALUES(registerWay), " +
"certNum=VALUES(certNum), " +
"certifiable=VALUES(certifiable), " +
"errCode=VALUES(errCode), " +
"secrecy=VALUES(secrecy), " +
"ipAddress=VALUES(ipAddress), " +
"port=VALUES(port), " +
"password=VALUES(password), " +
"PTZType=VALUES(PTZType), " +
"status=VALUES(status), " +
"streamId=VALUES(streamId), " +
"longitude=VALUES(longitude), " +
"latitude=VALUES(latitude)" +
"</script>") "</script>")
int batchAdd(List<DeviceChannel> addChannels); int batchAdd(List<DeviceChannel> addChannels);
@ -211,4 +237,15 @@ public interface DeviceChannelMapper {
" from device_channel\n" + " from device_channel\n" +
" where deviceId = #{deviceId}") " where deviceId = #{deviceId}")
List<DeviceChannelTree> tree(String deviceId); List<DeviceChannelTree> tree(String deviceId);
@Delete(value = {" <script>" +
"DELETE " +
"from " +
"device_channel " +
"WHERE " +
"deviceId = #{deviceId} " +
" AND channelId NOT IN " +
"<foreach collection='channels' item='item' open='(' separator=',' close=')' > #{item.channelId}</foreach>" +
" </script>"})
int cleanChannelsNotInList(String deviceId, List<DeviceChannel> channels);
} }

View File

@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.parameters.P;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -276,19 +277,32 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void updateSendRTPSever(SendRtpItem sendRtpItem) { public void updateSendRTPSever(SendRtpItem sendRtpItem) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId(); String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_"
+ sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId() + "_"
+ sendRtpItem.getStreamId() + "_" + sendRtpItem.getCallId();
redis.set(key, sendRtpItem); redis.set(key, sendRtpItem);
} }
@Override @Override
public SendRtpItem querySendRTPServer(String platformGbId, String channelId) { public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId; if (platformGbId == null) platformGbId = "*";
return (SendRtpItem)redis.get(key); if (channelId == null) channelId = "*";
if (streamId == null) streamId = "*";
if (callId == null) callId = "*";
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId
+ "_" + channelId + "_" + streamId + "_" + callId;
List<Object> scan = redis.scan(key);
if (scan.size() > 0) {
return (SendRtpItem)redis.get((String)scan.get(0));
}else {
return null;
}
} }
@Override @Override
public List<SendRtpItem> querySendRTPServer(String platformGbId) { public List<SendRtpItem> querySendRTPServer(String platformGbId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*"; if (platformGbId == null) platformGbId = "*";
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*" + "_*" + "_*";
List<Object> queryResult = redis.scan(key); List<Object> queryResult = redis.scan(key);
List<SendRtpItem> result= new ArrayList<>(); List<SendRtpItem> result= new ArrayList<>();
@ -306,18 +320,28 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
* @param channelId * @param channelId
*/ */
@Override @Override
public void deleteSendRTPServer(String platformGbId, String channelId) { public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId; if (streamId == null) streamId = "*";
redis.del(key); if (callId == null) callId = "*";
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId
+ "_" + channelId + "_" + streamId + "_" + callId;
List<Object> scan = redis.scan(key);
if (scan.size() > 0) {
for (Object keyStr : scan) {
redis.del((String)keyStr);
}
}
} }
/** /**
* 查询某个通道是否存在上级点播RTP推送 * 查询某个通道是否存在上级点播RTP推送
* @param channelId * @param channelId
*/ */
@Override @Override
public boolean isChannelSendingRTP(String channelId) { public boolean isChannelSendingRTP(String channelId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId; String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId + "*_" + "*_";
List<Object> RtpStreams = redis.scan(key); List<Object> RtpStreams = redis.scan(key);
if (RtpStreams.size() > 0) { if (RtpStreams.size() > 0) {
return true; return true;

View File

@ -284,7 +284,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
logger.debug("[目录查询]收到的数据存在重复: {}" , stringBuilder); logger.debug("[目录查询]收到的数据存在重复: {}" , stringBuilder);
} }
try { try {
int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId); // int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels);
int limitCount = 300; int limitCount = 300;
boolean result = cleanChannelsResult < 0; boolean result = cleanChannelsResult < 0;
if (!result && channels.size() > 0) { if (!result && channels.size() > 0) {

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.gb28181.device; package com.genersoft.iot.vmp.vmanager.gb28181.device;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
@ -13,7 +14,6 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree; import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiImplicitParams;
@ -57,6 +57,9 @@ public class DeviceQuery {
@Autowired @Autowired
private IDeviceService deviceService; private IDeviceService deviceService;
@Autowired
private DynamicTask dynamicTask;
/** /**
* 使用ID查询国标设备 * 使用ID查询国标设备
* @param deviceId 国标ID * @param deviceId 国标ID
@ -209,6 +212,8 @@ public class DeviceQuery {
boolean isSuccess = storager.delete(deviceId); boolean isSuccess = storager.delete(deviceId);
if (isSuccess) { if (isSuccess) {
redisCatchStorage.clearCatchByDeviceId(deviceId); redisCatchStorage.clearCatchByDeviceId(deviceId);
// 停止此设备的订阅更新
dynamicTask.stop(deviceId);
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("deviceId", deviceId); json.put("deviceId", deviceId);
return new ResponseEntity<>(json.toString(),HttpStatus.OK); return new ResponseEntity<>(json.toString(),HttpStatus.OK);

View File

@ -2,8 +2,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.platform;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.CatalogData; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -39,6 +40,9 @@ public class PlatformController {
private final static Logger logger = LoggerFactory.getLogger(PlatformController.class); private final static Logger logger = LoggerFactory.getLogger(PlatformController.class);
@Autowired
private UserSetup userSetup;
@Autowired @Autowired
private IVideoManagerStorager storager; private IVideoManagerStorager storager;
@ -48,11 +52,15 @@ public class PlatformController {
@Autowired @Autowired
private ISIPCommanderForPlatform commanderForPlatform; private ISIPCommanderForPlatform commanderForPlatform;
@Autowired
private SipConfig sipConfig;
@Autowired @Autowired
private SipConfig sipConfig; private DynamicTask dynamicTask;
/** /**
* 获取国标服务的配置 * 获取国标服务的配置
*
* @return * @return
*/ */
@ApiOperation("获取国标服务的配置") @ApiOperation("获取国标服务的配置")
@ -65,8 +73,10 @@ public class PlatformController {
result.put("password", sipConfig.getPassword()); result.put("password", sipConfig.getPassword());
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
} }
/** /**
* 获取级联服务器信息 * 获取级联服务器信息
*
* @return * @return
*/ */
@ApiOperation("获取国标服务的配置") @ApiOperation("获取国标服务的配置")
@ -78,7 +88,7 @@ public class PlatformController {
wvpResult.setCode(0); wvpResult.setCode(0);
wvpResult.setMsg("success"); wvpResult.setMsg("success");
wvpResult.setData(parentPlatform); wvpResult.setData(parentPlatform);
}else { } else {
wvpResult.setCode(-1); wvpResult.setCode(-1);
wvpResult.setMsg("未查询到此平台"); wvpResult.setMsg("未查询到此平台");
} }
@ -87,7 +97,8 @@ public class PlatformController {
/** /**
* 分页查询级联平台 * 分页查询级联平台
* @param page 当前页 *
* @param page 当前页
* @param count 每页条数 * @param count 每页条数
* @return * @return
*/ */
@ -97,7 +108,7 @@ public class PlatformController {
@ApiImplicitParam(name = "page", value = "当前页", dataTypeClass = Integer.class), @ApiImplicitParam(name = "page", value = "当前页", dataTypeClass = Integer.class),
@ApiImplicitParam(name = "count", value = "每页条数", dataTypeClass = Integer.class), @ApiImplicitParam(name = "count", value = "每页条数", dataTypeClass = Integer.class),
}) })
public PageInfo<ParentPlatform> platforms(@PathVariable int page, @PathVariable int count){ public PageInfo<ParentPlatform> platforms(@PathVariable int page, @PathVariable int count) {
// if (logger.isDebugEnabled()) { // if (logger.isDebugEnabled()) {
// logger.debug("查询所有上级设备API调用"); // logger.debug("查询所有上级设备API调用");
@ -107,6 +118,7 @@ public class PlatformController {
/** /**
* 添加上级平台信息 * 添加上级平台信息
*
* @param parentPlatform * @param parentPlatform
* @return * @return
*/ */
@ -116,28 +128,28 @@ public class PlatformController {
}) })
@PostMapping("/add") @PostMapping("/add")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<String>> addPlatform(@RequestBody ParentPlatform parentPlatform){ public ResponseEntity<WVPResult<String>> addPlatform(@RequestBody ParentPlatform parentPlatform) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("保存上级平台信息API调用"); logger.debug("保存上级平台信息API调用");
} }
WVPResult<String> wvpResult = new WVPResult<>(); WVPResult<String> wvpResult = new WVPResult<>();
if (StringUtils.isEmpty(parentPlatform.getName()) if (StringUtils.isEmpty(parentPlatform.getName())
||StringUtils.isEmpty(parentPlatform.getServerGBId()) || StringUtils.isEmpty(parentPlatform.getServerGBId())
||StringUtils.isEmpty(parentPlatform.getServerGBDomain()) || StringUtils.isEmpty(parentPlatform.getServerGBDomain())
||StringUtils.isEmpty(parentPlatform.getServerIP()) || StringUtils.isEmpty(parentPlatform.getServerIP())
||StringUtils.isEmpty(parentPlatform.getServerPort()) || StringUtils.isEmpty(parentPlatform.getServerPort())
||StringUtils.isEmpty(parentPlatform.getDeviceGBId()) || StringUtils.isEmpty(parentPlatform.getDeviceGBId())
||StringUtils.isEmpty(parentPlatform.getExpires()) || StringUtils.isEmpty(parentPlatform.getExpires())
||StringUtils.isEmpty(parentPlatform.getKeepTimeout()) || StringUtils.isEmpty(parentPlatform.getKeepTimeout())
||StringUtils.isEmpty(parentPlatform.getTransport()) || StringUtils.isEmpty(parentPlatform.getTransport())
||StringUtils.isEmpty(parentPlatform.getCharacterSet()) || StringUtils.isEmpty(parentPlatform.getCharacterSet())
){ ) {
wvpResult.setCode(-1); wvpResult.setCode(-1);
wvpResult.setMsg("missing parameters"); wvpResult.setMsg("missing parameters");
return new ResponseEntity<>(wvpResult, HttpStatus.BAD_REQUEST); return new ResponseEntity<>(wvpResult, HttpStatus.BAD_REQUEST);
} }
if (parentPlatform.getServerPort()< 0 || parentPlatform.getServerPort() > 65535){ if (parentPlatform.getServerPort() < 0 || parentPlatform.getServerPort() > 65535) {
wvpResult.setCode(-1); wvpResult.setCode(-1);
wvpResult.setMsg("error severPort"); wvpResult.setMsg("error severPort");
return new ResponseEntity<>(wvpResult, HttpStatus.BAD_REQUEST); return new ResponseEntity<>(wvpResult, HttpStatus.BAD_REQUEST);
@ -146,7 +158,7 @@ public class PlatformController {
ParentPlatform parentPlatformOld = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId()); ParentPlatform parentPlatformOld = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId());
if (parentPlatformOld != null) { if (parentPlatformOld != null) {
wvpResult.setCode(-1); wvpResult.setCode(-1);
wvpResult.setMsg("平台 "+parentPlatform.getServerGBId()+" 已存在"); wvpResult.setMsg("平台 " + parentPlatform.getServerGBId() + " 已存在");
return new ResponseEntity<>(wvpResult, HttpStatus.OK); return new ResponseEntity<>(wvpResult, HttpStatus.OK);
} }
boolean updateResult = storager.updateParentPlatform(parentPlatform); boolean updateResult = storager.updateParentPlatform(parentPlatform);
@ -154,17 +166,17 @@ public class PlatformController {
if (updateResult) { if (updateResult) {
// 保存时启用就发送注册 // 保存时启用就发送注册
if (parentPlatform.isEnable()) { if (parentPlatform.isEnable()) {
if (parentPlatformOld.isStatus()) { if (parentPlatformOld != null && parentPlatformOld.isStatus()) {
commanderForPlatform.unregister(parentPlatformOld, null, eventResult -> { commanderForPlatform.unregister(parentPlatformOld, null, eventResult -> {
// 只要保存就发送注册 // 只要保存就发送注册
commanderForPlatform.register(parentPlatform, null, null); commanderForPlatform.register(parentPlatform, null, null);
}); });
}else { } else {
// 只要保存就发送注册 // 只要保存就发送注册
commanderForPlatform.register(parentPlatform, null, null); commanderForPlatform.register(parentPlatform, null, null);
} }
} else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()){ // 关闭启用时注销 } else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()) { // 关闭启用时注销
commanderForPlatform.unregister(parentPlatform, null, null); commanderForPlatform.unregister(parentPlatform, null, null);
} }
wvpResult.setCode(0); wvpResult.setCode(0);
@ -179,6 +191,7 @@ public class PlatformController {
/** /**
* 保存上级平台信息 * 保存上级平台信息
*
* @param parentPlatform * @param parentPlatform
* @return * @return
*/ */
@ -188,23 +201,23 @@ public class PlatformController {
}) })
@PostMapping("/save") @PostMapping("/save")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<String>> savePlatform(@RequestBody ParentPlatform parentPlatform){ public ResponseEntity<WVPResult<String>> savePlatform(@RequestBody ParentPlatform parentPlatform) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("保存上级平台信息API调用"); logger.debug("保存上级平台信息API调用");
} }
WVPResult<String> wvpResult = new WVPResult<>(); WVPResult<String> wvpResult = new WVPResult<>();
if (StringUtils.isEmpty(parentPlatform.getName()) if (StringUtils.isEmpty(parentPlatform.getName())
||StringUtils.isEmpty(parentPlatform.getServerGBId()) || StringUtils.isEmpty(parentPlatform.getServerGBId())
||StringUtils.isEmpty(parentPlatform.getServerGBDomain()) || StringUtils.isEmpty(parentPlatform.getServerGBDomain())
||StringUtils.isEmpty(parentPlatform.getServerIP()) || StringUtils.isEmpty(parentPlatform.getServerIP())
||StringUtils.isEmpty(parentPlatform.getServerPort()) || StringUtils.isEmpty(parentPlatform.getServerPort())
||StringUtils.isEmpty(parentPlatform.getDeviceGBId()) || StringUtils.isEmpty(parentPlatform.getDeviceGBId())
||StringUtils.isEmpty(parentPlatform.getExpires()) || StringUtils.isEmpty(parentPlatform.getExpires())
||StringUtils.isEmpty(parentPlatform.getKeepTimeout()) || StringUtils.isEmpty(parentPlatform.getKeepTimeout())
||StringUtils.isEmpty(parentPlatform.getTransport()) || StringUtils.isEmpty(parentPlatform.getTransport())
||StringUtils.isEmpty(parentPlatform.getCharacterSet()) || StringUtils.isEmpty(parentPlatform.getCharacterSet())
){ ) {
wvpResult.setCode(-1); wvpResult.setCode(-1);
wvpResult.setMsg("missing parameters"); wvpResult.setMsg("missing parameters");
return new ResponseEntity<>(wvpResult, HttpStatus.BAD_REQUEST); return new ResponseEntity<>(wvpResult, HttpStatus.BAD_REQUEST);
@ -216,7 +229,7 @@ public class PlatformController {
if (updateResult) { if (updateResult) {
// 保存时启用就发送注册 // 保存时启用就发送注册
if (parentPlatform.isEnable()) { if (parentPlatform.isEnable()) {
if (parentPlatformOld.isStatus()) { if (parentPlatformOld != null && parentPlatformOld.isStatus()) {
commanderForPlatform.unregister(parentPlatformOld, null, null); commanderForPlatform.unregister(parentPlatformOld, null, null);
try { try {
Thread.sleep(500); Thread.sleep(500);
@ -225,11 +238,11 @@ public class PlatformController {
} }
// 只要保存就发送注册 // 只要保存就发送注册
commanderForPlatform.register(parentPlatform, null, null); commanderForPlatform.register(parentPlatform, null, null);
}else { } else {
// 只要保存就发送注册 // 只要保存就发送注册
commanderForPlatform.register(parentPlatform, null, null); commanderForPlatform.register(parentPlatform, null, null);
} }
} else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()){ // 关闭启用时注销 } else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()) { // 关闭启用时注销
commanderForPlatform.unregister(parentPlatformOld, null, null); commanderForPlatform.unregister(parentPlatformOld, null, null);
} }
wvpResult.setCode(0); wvpResult.setCode(0);
@ -244,7 +257,8 @@ public class PlatformController {
/** /**
* 删除上级平台 * 删除上级平台
* @param serverGBId 上级平台国标ID *
* @param serverGBId 上级平台国标ID
* @return * @return
*/ */
@ApiOperation("删除上级平台") @ApiOperation("删除上级平台")
@ -253,13 +267,13 @@ public class PlatformController {
}) })
@DeleteMapping("/delete/{serverGBId}") @DeleteMapping("/delete/{serverGBId}")
@ResponseBody @ResponseBody
public ResponseEntity<String> deletePlatform(@PathVariable String serverGBId){ public ResponseEntity<String> deletePlatform(@PathVariable String serverGBId) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("删除上级平台API调用"); logger.debug("删除上级平台API调用");
} }
if (StringUtils.isEmpty(serverGBId) if (StringUtils.isEmpty(serverGBId)
){ ) {
return new ResponseEntity<>("missing parameters", HttpStatus.BAD_REQUEST); return new ResponseEntity<>("missing parameters", HttpStatus.BAD_REQUEST);
} }
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(serverGBId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(serverGBId);
@ -280,17 +294,19 @@ public class PlatformController {
boolean deleteResult = storager.deleteParentPlatform(parentPlatform); boolean deleteResult = storager.deleteParentPlatform(parentPlatform);
storager.delCatalogByPlatformId(parentPlatform.getServerGBId()); storager.delCatalogByPlatformId(parentPlatform.getServerGBId());
storager.delRelationByPlatformId(parentPlatform.getServerGBId()); storager.delRelationByPlatformId(parentPlatform.getServerGBId());
// 停止发送位置订阅定时任务
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + parentPlatform.getServerGBId();
dynamicTask.stop(key);
if (deleteResult) { if (deleteResult) {
return new ResponseEntity<>("success", HttpStatus.OK); return new ResponseEntity<>("success", HttpStatus.OK);
}else { } else {
return new ResponseEntity<>("fail", HttpStatus.OK); return new ResponseEntity<>("fail", HttpStatus.OK);
} }
} }
/** /**
* 查询上级平台是否存在 * 查询上级平台是否存在
*
* @param serverGBId 上级平台国标ID * @param serverGBId 上级平台国标ID
* @return * @return
*/ */
@ -300,7 +316,7 @@ public class PlatformController {
}) })
@GetMapping("/exit/{serverGBId}") @GetMapping("/exit/{serverGBId}")
@ResponseBody @ResponseBody
public ResponseEntity<String> exitPlatform(@PathVariable String serverGBId){ public ResponseEntity<String> exitPlatform(@PathVariable String serverGBId) {
// if (logger.isDebugEnabled()) { // if (logger.isDebugEnabled()) {
// logger.debug("查询上级平台是否存在API调用" + serverGBId); // logger.debug("查询上级平台是否存在API调用" + serverGBId);
@ -311,12 +327,13 @@ public class PlatformController {
/** /**
* 分页查询级联平台的所有所有通道 * 分页查询级联平台的所有所有通道
* @param page 当前页 *
* @param count 每页条数 * @param page 当前页
* @param platformId 上级平台ID * @param count 每页条数
* @param query 查询内容 * @param platformId 上级平台ID
* @param online 是否在线 * @param query 查询内容
* @param choosed 是否已选中 * @param online 是否在线
* @param choosed 是否已选中
* @param channelType 通道类型 * @param channelType 通道类型
* @return * @return
*/ */
@ -333,22 +350,22 @@ public class PlatformController {
@GetMapping("/channel_list") @GetMapping("/channel_list")
@ResponseBody @ResponseBody
public PageInfo<ChannelReduce> channelList(int page, int count, public PageInfo<ChannelReduce> channelList(int page, int count,
@RequestParam(required = false) String platformId, @RequestParam(required = false) String platformId,
@RequestParam(required = false) String catalogId, @RequestParam(required = false) String catalogId,
@RequestParam(required = false) String query, @RequestParam(required = false) String query,
@RequestParam(required = false) Boolean online, @RequestParam(required = false) Boolean online,
@RequestParam(required = false) Boolean channelType){ @RequestParam(required = false) Boolean channelType) {
// if (logger.isDebugEnabled()) { // if (logger.isDebugEnabled()) {
// logger.debug("查询所有所有通道API调用"); // logger.debug("查询所有所有通道API调用");
// } // }
if(StringUtils.isEmpty(platformId)) { if (StringUtils.isEmpty(platformId)) {
platformId = null; platformId = null;
} }
if(StringUtils.isEmpty(query)) { if (StringUtils.isEmpty(query)) {
query = null; query = null;
} }
if(StringUtils.isEmpty(platformId) || StringUtils.isEmpty(catalogId)) { if (StringUtils.isEmpty(platformId) || StringUtils.isEmpty(catalogId)) {
catalogId = null; catalogId = null;
} }
PageInfo<ChannelReduce> channelReduces = storager.queryAllChannelList(page, count, query, online, channelType, platformId, catalogId); PageInfo<ChannelReduce> channelReduces = storager.queryAllChannelList(page, count, query, online, channelType, platformId, catalogId);
@ -358,6 +375,7 @@ public class PlatformController {
/** /**
* 向上级平台添加国标通道 * 向上级平台添加国标通道
*
* @param param 通道关联参数 * @param param 通道关联参数
* @return * @return
*/ */
@ -367,7 +385,7 @@ public class PlatformController {
}) })
@PostMapping("/update_channel_for_gb") @PostMapping("/update_channel_for_gb")
@ResponseBody @ResponseBody
public ResponseEntity<String> updateChannelForGB(@RequestBody UpdateChannelParam param){ public ResponseEntity<String> updateChannelForGB(@RequestBody UpdateChannelParam param) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("给上级平台添加国标通道API调用"); logger.debug("给上级平台添加国标通道API调用");
@ -379,6 +397,7 @@ public class PlatformController {
/** /**
* 从上级平台移除国标通道 * 从上级平台移除国标通道
*
* @param param 通道关联参数 * @param param 通道关联参数
* @return * @return
*/ */
@ -388,7 +407,7 @@ public class PlatformController {
}) })
@DeleteMapping("/del_channel_for_gb") @DeleteMapping("/del_channel_for_gb")
@ResponseBody @ResponseBody
public ResponseEntity<String> delChannelForGB(@RequestBody UpdateChannelParam param){ public ResponseEntity<String> delChannelForGB(@RequestBody UpdateChannelParam param) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("给上级平台删除国标通道API调用"); logger.debug("给上级平台删除国标通道API调用");
@ -400,8 +419,9 @@ public class PlatformController {
/** /**
* 获取目录 * 获取目录
*
* @param platformId 平台ID * @param platformId 平台ID
* @param parentId 目录父ID * @param parentId 目录父ID
* @return * @return
*/ */
@ApiOperation("获取目录") @ApiOperation("获取目录")
@ -411,7 +431,7 @@ public class PlatformController {
}) })
@GetMapping("/catalog") @GetMapping("/catalog")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> getCatalogByPlatform(String platformId, String parentId){ public ResponseEntity<WVPResult<List<PlatformCatalog>>> getCatalogByPlatform(String platformId, String parentId) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("查询目录,platformId: {}, parentId: {}", platformId, parentId); logger.debug("查询目录,platformId: {}, parentId: {}", platformId, parentId);
@ -432,6 +452,7 @@ public class PlatformController {
/** /**
* 添加目录 * 添加目录
*
* @param platformCatalog 目录 * @param platformCatalog 目录
* @return * @return
*/ */
@ -441,7 +462,7 @@ public class PlatformController {
}) })
@PostMapping("/catalog/add") @PostMapping("/catalog/add")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> addCatalog(@RequestBody PlatformCatalog platformCatalog){ public ResponseEntity<WVPResult<List<PlatformCatalog>>> addCatalog(@RequestBody PlatformCatalog platformCatalog) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("添加目录,{}", JSON.toJSONString(platformCatalog)); logger.debug("添加目录,{}", JSON.toJSONString(platformCatalog));
@ -452,7 +473,7 @@ public class PlatformController {
if (platformCatalogInStore != null) { if (platformCatalogInStore != null) {
result.setCode(-1); result.setCode(-1);
result.setMsg( platformCatalog.getId() + " already exists"); result.setMsg(platformCatalog.getId() + " already exists");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
} }
int addResult = storager.addCatalog(platformCatalog); int addResult = storager.addCatalog(platformCatalog);
@ -460,7 +481,7 @@ public class PlatformController {
result.setCode(0); result.setCode(0);
result.setMsg("success"); result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
}else { } else {
result.setCode(-500); result.setCode(-500);
result.setMsg("save error"); result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
@ -469,6 +490,7 @@ public class PlatformController {
/** /**
* 编辑目录 * 编辑目录
*
* @param platformCatalog 目录 * @param platformCatalog 目录
* @return * @return
*/ */
@ -478,7 +500,7 @@ public class PlatformController {
}) })
@PostMapping("/catalog/edit") @PostMapping("/catalog/edit")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> editCatalog(@RequestBody PlatformCatalog platformCatalog){ public ResponseEntity<WVPResult<List<PlatformCatalog>>> editCatalog(@RequestBody PlatformCatalog platformCatalog) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("编辑目录,{}", JSON.toJSONString(platformCatalog)); logger.debug("编辑目录,{}", JSON.toJSONString(platformCatalog));
@ -488,14 +510,14 @@ public class PlatformController {
result.setCode(0); result.setCode(0);
if (platformCatalogInStore == null) { if (platformCatalogInStore == null) {
result.setMsg( platformCatalog.getId() + " not exists"); result.setMsg(platformCatalog.getId() + " not exists");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
} }
int addResult = storager.updateCatalog(platformCatalog); int addResult = storager.updateCatalog(platformCatalog);
if (addResult > 0) { if (addResult > 0) {
result.setMsg("success"); result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
}else { } else {
result.setMsg("save error"); result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
} }
@ -503,6 +525,7 @@ public class PlatformController {
/** /**
* 删除目录 * 删除目录
*
* @param id 目录Id * @param id 目录Id
* @return * @return
*/ */
@ -512,7 +535,7 @@ public class PlatformController {
}) })
@DeleteMapping("/catalog/del") @DeleteMapping("/catalog/del")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<String>> delCatalog(String id, String platformId){ public ResponseEntity<WVPResult<String>> delCatalog(String id, String platformId) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("删除目录,{}", id); logger.debug("删除目录,{}", id);
@ -540,7 +563,7 @@ public class PlatformController {
if (delResult > 0) { if (delResult > 0) {
result.setMsg("success"); result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
}else { } else {
result.setMsg("save error"); result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
} }
@ -548,6 +571,7 @@ public class PlatformController {
/** /**
* 删除关联 * 删除关联
*
* @param platformCatalog 关联的信息 * @param platformCatalog 关联的信息
* @return * @return
*/ */
@ -557,7 +581,7 @@ public class PlatformController {
}) })
@DeleteMapping("/catalog/relation/del") @DeleteMapping("/catalog/relation/del")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<List<PlatformCatalog>>> delRelation(@RequestBody PlatformCatalog platformCatalog){ public ResponseEntity<WVPResult<List<PlatformCatalog>>> delRelation(@RequestBody PlatformCatalog platformCatalog) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("删除关联,{}", JSON.toJSONString(platformCatalog)); logger.debug("删除关联,{}", JSON.toJSONString(platformCatalog));
@ -569,7 +593,7 @@ public class PlatformController {
if (delResult > 0) { if (delResult > 0) {
result.setMsg("success"); result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
}else { } else {
result.setMsg("save error"); result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
} }
@ -578,8 +602,9 @@ public class PlatformController {
/** /**
* 修改默认目录 * 修改默认目录
*
* @param platformId 平台Id * @param platformId 平台Id
* @param catalogId 目录Id * @param catalogId 目录Id
* @return * @return
*/ */
@ApiOperation("修改默认目录") @ApiOperation("修改默认目录")
@ -589,7 +614,7 @@ public class PlatformController {
}) })
@PostMapping("/catalog/default/update") @PostMapping("/catalog/default/update")
@ResponseBody @ResponseBody
public ResponseEntity<WVPResult<String>> setDefaultCatalog(String platformId, String catalogId){ public ResponseEntity<WVPResult<String>> setDefaultCatalog(String platformId, String catalogId) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("修改默认目录,{},{}", platformId, catalogId); logger.debug("修改默认目录,{},{}", platformId, catalogId);
@ -601,7 +626,7 @@ public class PlatformController {
if (updateResult > 0) { if (updateResult > 0) {
result.setMsg("success"); result.setMsg("success");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
}else { } else {
result.setMsg("save error"); result.setMsg("save error");
return new ResponseEntity<>(result, HttpStatus.OK); return new ResponseEntity<>(result, HttpStatus.OK);
} }

View File

@ -88,7 +88,7 @@ public class PlayController {
// 获取可用的zlm // 获取可用的zlm
Device device = storager.queryVideoDevice(deviceId); Device device = storager.queryVideoDevice(deviceId);
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null); PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null);
return playResult.getResult(); return playResult.getResult();
} }

View File

@ -150,7 +150,7 @@ public class ApiStreamController {
JSONObject result = new JSONObject(); JSONObject result = new JSONObject();
result.put("error", "channel[ " + code + " ] " + eventResult.msg); result.put("error", "channel[ " + code + " ] " + eventResult.msg);
resultDeferredResult.setResult(result); resultDeferredResult.setResult(result);
}); }, null);
return resultDeferredResult; return resultDeferredResult;
} }

View File

@ -170,8 +170,6 @@ user-settings:
save-position-history: false save-position-history: false
# 点播等待超时时间,单位:毫秒 # 点播等待超时时间,单位:毫秒
play-timeout: 3000 play-timeout: 3000
# 等待音视频编码信息再返回, true 可以根据编码选择合适的播放器false 可以更快点播
wait-track: false
# 是否开启接口鉴权 # 是否开启接口鉴权
interface-authentication: true interface-authentication: true
# 自动配置redis 可以过期事件 # 自动配置redis 可以过期事件

View File

@ -13,7 +13,7 @@ spring:
# [可选] 数据库 DB # [可选] 数据库 DB
database: 6 database: 6
# [可选] 访问密码,若你的redis服务器没有设置密码就不需要用密码去连接 # [可选] 访问密码,若你的redis服务器没有设置密码就不需要用密码去连接
password: password: face2020
# [可选] 超时时间 # [可选] 超时时间
timeout: 10000 timeout: 10000
# [可选] jdbc数据库配置, 项目使用sqlite作为数据库一般不需要配置 # [可选] jdbc数据库配置, 项目使用sqlite作为数据库一般不需要配置
@ -23,7 +23,7 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/wvp?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true&serverTimezone=PRC&useSSL=false url: jdbc:mysql://127.0.0.1:3306/wvp?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true&serverTimezone=PRC&useSSL=false
username: root username: root
password: root123 password: 123456
druid: druid:
initialSize: 10 # 连接池初始化连接数 initialSize: 10 # 连接池初始化连接数
maxActive: 200 # 连接池最大连接数 maxActive: 200 # 连接池最大连接数
@ -50,7 +50,7 @@ server:
# 作为28181服务器的配置 # 作为28181服务器的配置
sip: sip:
# [必须修改] 本机的IP # [必须修改] 本机的IP
ip: 192.168.118.70 ip: 192.168.41.16
# [可选] 28181服务监听的端口 # [可选] 28181服务监听的端口
port: 5060 port: 5060
# 根据国标6.1.2中规定domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码由省级、市级、区级、基层编号组成参照GB/T 2260-2007 # 根据国标6.1.2中规定domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码由省级、市级、区级、基层编号组成参照GB/T 2260-2007
@ -67,9 +67,9 @@ sip:
media: media:
id: FQ3TF8yT83wh5Wvz id: FQ3TF8yT83wh5Wvz
# [必须修改] zlm服务器的内网IP # [必须修改] zlm服务器的内网IP
ip: 192.168.118.70 ip: 192.168.41.16
# [必须修改] zlm服务器的http.port # [必须修改] zlm服务器的http.port
http-port: 80 http-port: 8091
# [可选] zlm服务器的hook.admin_params=secret # [可选] zlm服务器的hook.admin_params=secret
secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
# 启用多端口模式, 多端口模式使用端口区分每路流,兼容性更好。 单端口使用流的ssrc区分 点播超时建议使用多端口测试 # 启用多端口模式, 多端口模式使用端口区分每路流,兼容性更好。 单端口使用流的ssrc区分 点播超时建议使用多端口测试

14263
web_src/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -184,7 +184,7 @@ export default {
let that = this; let that = this;
this.$axios({ this.$axios({
method: 'get', method: 'get',
url:`/api/platform/query/100/1` url:`/api/platform/query/10000/1`
}).then(function (res) { }).then(function (res) {
that.platformList = res.data.list; that.platformList = res.data.list;
}).catch(function (error) { }).catch(function (error) {