修复流地址返回错误

This commit is contained in:
648540858 2022-07-27 14:48:21 +08:00
parent 4f2282f125
commit 881fb113e1
5 changed files with 100 additions and 62 deletions

View File

@ -92,39 +92,36 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
@Override @Override
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
try { try {
taskQueue.offer(new HandlerCatchData(evt, null, null)); taskQueue.offer(new HandlerCatchData(evt, null, null));
responseAck(evt, Response.OK); responseAck(evt, Response.OK);
if (!taskQueueHandlerRun) { if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true; taskQueueHandlerRun = true;
taskExecutor.execute(()-> { taskExecutor.execute(()-> {
while (!taskQueue.isEmpty()) { while (!taskQueue.isEmpty()) {
try { try {
HandlerCatchData take = taskQueue.poll(); HandlerCatchData take = taskQueue.poll();
Element rootElement = getRootElement(take.getEvt()); Element rootElement = getRootElement(take.getEvt());
String cmd = XmlUtil.getText(rootElement, "CmdType"); String cmd = XmlUtil.getText(rootElement, "CmdType");
if (CmdType.CATALOG.equals(cmd)) { if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知"); logger.info("接收到Catalog通知");
processNotifyCatalogList(take.getEvt()); processNotifyCatalogList(take.getEvt());
} else if (CmdType.ALARM.equals(cmd)) { } else if (CmdType.ALARM.equals(cmd)) {
logger.info("接收到Alarm通知"); logger.info("接收到Alarm通知");
processNotifyAlarm(take.getEvt()); processNotifyAlarm(take.getEvt());
} else if (CmdType.MOBILE_POSITION.equals(cmd)) { } else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知"); logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(take.getEvt()); processNotifyMobilePosition(take.getEvt());
} else { } else {
logger.info("接收到消息:" + cmd); logger.info("接收到消息:" + cmd);
}
} catch (DocumentException e) {
throw new RuntimeException(e);
}
} }
taskQueueHandlerRun = false; } catch (DocumentException e) {
}); throw new RuntimeException(e);
}
}
taskQueueHandlerRun = false;
});
} }
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -174,7 +171,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
} else { } else {
mobilePosition.setAltitude(0.0); mobilePosition.setAltitude(0.0);
} }
logger.info("[收到 移动位置订阅]{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), logger.info("[收到移动位置订阅通知]{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.getLongitude(), mobilePosition.getLatitude());
mobilePosition.setReportSource("Mobile Position"); mobilePosition.setReportSource("Mobile Position");

View File

@ -67,9 +67,9 @@ public class MediaServiceImpl implements IMediaService {
JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class); JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class);
JSONArray tracks = mediaJSON.getJSONArray("tracks"); JSONArray tracks = mediaJSON.getJSONArray("tracks");
if (authority) { if (authority) {
streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, streamAuthorityInfo.getCallId()); streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,streamAuthorityInfo.getCallId());
}else { }else {
streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null); streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null);
} }
} }

View File

@ -1,16 +1,21 @@
package com.genersoft.iot.vmp.service.impl; package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
* 接收来自redis的GPS更新通知 * 接收来自redis的GPS更新通知
* @author lin * @author lin
@ -20,13 +25,31 @@ public class RedisGpsMsgListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
private boolean taskQueueHandlerRun = false;
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public void onMessage(@NotNull Message message, byte[] bytes) { public void onMessage(@NotNull Message message, byte[] bytes) {
// TODO 加消息队列 taskQueue.offer(message);
GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); if (!taskQueueHandlerRun) {
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); taskQueueHandlerRun = true;
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
}
taskQueueHandlerRun = false;
});
}
} }
} }

View File

@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -21,14 +22,17 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
@ -40,46 +44,60 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class); private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
private boolean taskQueueHandlerRun = false;
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@Autowired @Autowired
private IStreamPushService streamPushService; private IStreamPushService streamPushService;
@Autowired
private EventPublisher eventPublisher;
@Autowired
private UserSetting userSetting;
@Autowired @Autowired
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
// TODO 增加队列 // TODO 增加队列
logger.warn("[REDIS 消息-推流设备状态变化] {}", new String(message.getBody())); logger.warn("[REDIS 消息-推流设备状态变化] {}", new String(message.getBody()));
// taskQueue.offer(message);
PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class);
if (statusChangeFromPushStream == null) { if (!taskQueueHandlerRun) {
logger.warn("[REDIS 消息]推流设备状态变化消息解析失败"); taskQueueHandlerRun = true;
return; taskExecutor.execute(() -> {
} while (!taskQueue.isEmpty()) {
// 取消定时任务 Message msg = taskQueue.poll();
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
if (statusChangeFromPushStream.isSetAllOffline()) { if (statusChangeFromPushStream == null) {
// 所有设备离线 logger.warn("[REDIS 消息]推流设备状态变化消息解析失败");
streamPushService.allStreamOffline(); return;
} }
if (statusChangeFromPushStream.getOfflineStreams() != null // 取消定时任务
&& statusChangeFromPushStream.getOfflineStreams().size() > 0) { dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
// 更新部分设备离线 if (statusChangeFromPushStream.isSetAllOffline()) {
streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); // 所有设备离线
} streamPushService.allStreamOffline();
if (statusChangeFromPushStream.getOnlineStreams() != null && }
statusChangeFromPushStream.getOnlineStreams().size() > 0) { if (statusChangeFromPushStream.getOfflineStreams() != null
// 更新部分设备上线 && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); // 更新部分设备离线
streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
}
if (statusChangeFromPushStream.getOnlineStreams() != null &&
statusChangeFromPushStream.getOnlineStreams().size() > 0) {
// 更新部分设备上线
streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
}
}
taskQueueHandlerRun = false;
});
} }
} }

View File

@ -688,21 +688,21 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void sendMobilePositionMsg(JSONObject jsonObject) { public void sendMobilePositionMsg(JSONObject jsonObject) {
String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
logger.info("[redis 移动位置订阅通知] {}: {}", key, jsonObject.toString()); logger.info("[redis发送通知]移动位置 {}: {}", key, jsonObject.toString());
redis.convertAndSend(key, jsonObject); redis.convertAndSend(key, jsonObject);
} }
@Override @Override
public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream()); logger.info("[redis发送通知]推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
} }
@Override @Override
public void sendAlarmMsg(AlarmChannelMessage msg) { public void sendAlarmMsg(AlarmChannelMessage msg) {
String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM; String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM;
logger.info("[redis 报警通知] {}: {}", key, JSON.toJSON(msg)); logger.info("[redis发送通知] 报警{}: {}", key, JSON.toJSON(msg));
redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
} }
@ -715,7 +715,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void sendStreamPushRequestedMsgForStatus() { public void sendStreamPushRequestedMsgForStatus() {
String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED; String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED;
logger.info("[redis 通知]获取所有推流设备的状态"); logger.info("[redis通知]获取所有推流设备的状态");
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put(key, key); jsonObject.put(key, key);
redis.convertAndSend(key, jsonObject); redis.convertAndSend(key, jsonObject);