添加 推流鉴权事件订阅 + ack 事件关联订阅

This commit is contained in:
zxb 2024-03-14 03:13:04 +08:00
parent 55977b4a5e
commit b20e48c911
3 changed files with 86 additions and 20 deletions

View File

@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ZlmPublishHookService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@ -43,6 +44,8 @@ import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@ -99,6 +102,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private SipSubscribe sipSubscribe;
@Autowired
private ZlmPublishHookService zlmPublishHookService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
/**
* 处理 ACK请求
*
@ -207,24 +215,33 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
});
}else {
logger.debug("sendRtpItem {}", JSONObject.toJSONString(sendRtpItem));
JSONObject startSendRtpStreamResult;
Retryer<JSONObject> retryer = RetryerBuilder.<JSONObject>newBuilder()
.retryIfResult(resp -> resp == null || resp.getInteger("code") != 0)
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(5 * 1000))
.build();
try {
startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param));
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage());
startSendRtpStreamResult = null;
}
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param));
zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{
scheduledExecutorService.submit(()->{
JSONObject startSendRtpStreamResult;
Retryer<JSONObject> retryer = RetryerBuilder.<JSONObject>newBuilder()
.retryIfResult(resp -> resp == null || resp.getInteger("code") != 0)
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(5 * 1000))
.build();
try {
startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param));
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage());
startSendRtpStreamResult = null;
}
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
});
});
scheduledExecutorService.schedule(()->{
zlmPublishHookService.getHandler(sendRtpItem.getApp()).remove(sendRtpItem.getStreamId());
}, 1, TimeUnit.MINUTES);
}
}
private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,

View File

@ -129,6 +129,9 @@ public class ZLMHttpHookListener {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private ZlmPublishHookService zlmPublishHookService;
/**
* 服务器定时上报时间上报间隔可配置默认10s上报一次
*/
@ -253,6 +256,9 @@ public class ZLMHttpHookListener {
}
});
zlmPublishHookService.processEvent(param);
// 是否录像
if ("rtp".equals(param.getApp())) {
result.setEnable_mp4(userSetting.getRecordSip());

View File

@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnPublishHookParam;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Slf4j
@Data
@Service
@RequiredArgsConstructor
public class ZlmPublishHookService {
public interface ZlmPublishHookHandler {
void handler();
}
@Getter(AccessLevel.PRIVATE)
private ConcurrentMap<String, ConcurrentMap<String, ZlmPublishHookHandler>> handler = new ConcurrentHashMap<>();
public ConcurrentMap<String, ZlmPublishHookHandler> getHandler(String app) {
this.handler.putIfAbsent(app, new ConcurrentHashMap<>());
return this.handler.get(app);
}
public void processEvent(OnPublishHookParam dto) {
String app = dto.getApp();
String streamId = dto.getStream();
String ip = dto.getIp();
log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip);
ConcurrentMap<String, ZlmPublishHookHandler> handlers = getHandler(app);
Optional.ofNullable(handlers.remove(streamId)).ifPresent(ZlmPublishHookHandler::handler);
}
}