完善
This commit is contained in:
parent
39a1a84961
commit
abe3194c5f
@ -24,11 +24,12 @@ import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
|
|||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||||
import com.github.rholder.retry.*;
|
import com.github.rholder.retry.*;
|
||||||
import gov.nist.javax.sip.message.SIPRequest;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.sip.InvalidArgumentException;
|
import javax.sip.InvalidArgumentException;
|
||||||
@ -39,7 +40,6 @@ import javax.sip.header.CallIdHeader;
|
|||||||
import javax.sip.header.FromHeader;
|
import javax.sip.header.FromHeader;
|
||||||
import javax.sip.header.HeaderAddress;
|
import javax.sip.header.HeaderAddress;
|
||||||
import javax.sip.header.ToHeader;
|
import javax.sip.header.ToHeader;
|
||||||
import javax.sip.message.Response;
|
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -107,6 +107,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
|
|||||||
|
|
||||||
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
|
@Qualifier("taskExecutor")
|
||||||
|
@Autowired
|
||||||
|
private ThreadPoolTaskExecutor taskExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理 ACK请求
|
* 处理 ACK请求
|
||||||
*
|
*
|
||||||
@ -133,55 +137,22 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
taskExecutor.execute(()->{
|
||||||
// sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> {
|
long retry = 50;
|
||||||
// sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
|
while (retry > 0){
|
||||||
// try {
|
SipSubscribe.Event platformAckSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId());
|
||||||
// responseAck((SIPRequest) evt.getRequest(), Response.OK);
|
if(platformAckSubscribe != null) {
|
||||||
// } catch (SipException | InvalidArgumentException | ParseException e) {
|
platformAckSubscribe.response(null);
|
||||||
// logger.error(e.getMessage());
|
break;
|
||||||
// }
|
}
|
||||||
// });
|
retry -=1;
|
||||||
|
|
||||||
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId());
|
try {
|
||||||
// if( okSubscribe != null) {
|
TimeUnit.MILLISECONDS.sleep(100);
|
||||||
// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
|
} catch (InterruptedException ignored) {
|
||||||
// return;
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
// String channelId = id;
|
|
||||||
// if(storager.queryVideoDeviceByChannelId(id) != null){
|
|
||||||
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id);
|
|
||||||
// if(okSubscribe!=null){
|
|
||||||
// okSubscribe.response(null);
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// Device device = storager.queryVideoDevice(id);
|
|
||||||
// if(device != null){
|
|
||||||
// List<DeviceChannel> deviceChannels = storager.queryChannelWithCatalog(device.getDeviceId());
|
|
||||||
// deviceChannels.forEach(channel -> {
|
|
||||||
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + channel.getChannelId());
|
|
||||||
// if(okSubscribe!=null){
|
|
||||||
// okSubscribe.response(null);
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
// } else {
|
|
||||||
// logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
|
|
||||||
// sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId());
|
|
||||||
// if( okSubscribe != null) {
|
|
||||||
// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
|
|
||||||
scheduledExecutorService.schedule(()->{
|
|
||||||
SipSubscribe.Event platformAckSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId());
|
|
||||||
if(platformAckSubscribe != null) {
|
|
||||||
platformAckSubscribe.response(null);
|
|
||||||
}
|
}
|
||||||
}, 100, TimeUnit.MILLISECONDS);
|
});
|
||||||
|
|
||||||
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
|
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
|
||||||
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||||
@ -220,7 +191,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
|
|||||||
logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param));
|
logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param));
|
||||||
|
|
||||||
zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{
|
zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{
|
||||||
scheduledExecutorService.submit(()->{
|
taskExecutor.submit(()->{
|
||||||
JSONObject startSendRtpStreamResult;
|
JSONObject startSendRtpStreamResult;
|
||||||
Retryer<JSONObject> retryer = RetryerBuilder.<JSONObject>newBuilder()
|
Retryer<JSONObject> retryer = RetryerBuilder.<JSONObject>newBuilder()
|
||||||
.retryIfResult(resp -> resp == null || resp.getInteger("code") != 0)
|
.retryIfResult(resp -> resp == null || resp.getInteger("code") != 0)
|
||||||
@ -229,12 +200,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
|
|||||||
// 重试间隔
|
// 重试间隔
|
||||||
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
|
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
|
||||||
// 重试次数
|
// 重试次数
|
||||||
.withStopStrategy(StopStrategies.stopAfterAttempt(15 * 1000))
|
.withStopStrategy(StopStrategies.stopAfterAttempt(20 * 1000))
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param));
|
startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param));
|
||||||
} catch (ExecutionException | RetryException e) {
|
} catch (ExecutionException | RetryException e) {
|
||||||
logger.error(e.getMessage());
|
logger.error("rtp转推失败 {}",e.getMessage());
|
||||||
startSendRtpStreamResult = null;
|
startSendRtpStreamResult = null;
|
||||||
}
|
}
|
||||||
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
|
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
|
||||||
|
@ -552,6 +552,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|||||||
SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
|
SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
|
||||||
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
|
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
|
||||||
|
|
||||||
|
logger.info("收到上级 callId => {} 的 ACK 请求, 向 下级 {} 转发 ACK", callIdHeader.getCallId(), deviceSdp.getOrigin().getUsername());
|
||||||
// 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
|
// 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
|
||||||
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
|
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
|
||||||
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
|
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
|
||||||
@ -618,6 +619,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|||||||
SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
|
SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
|
||||||
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
|
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
|
||||||
|
|
||||||
|
logger.info("收到上级 callId => {} 的 ACK 请求, 向 下级 {} 转发 ACK", callIdHeader.getCallId(), deviceSdp.getOrigin().getUsername());
|
||||||
// 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
|
// 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
|
||||||
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
|
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
|
||||||
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
|
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
|
||||||
|
@ -6,6 +6,9 @@ import lombok.Data;
|
|||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -17,6 +20,9 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
@Service
|
@Service
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class ZlmPublishHookService {
|
public class ZlmPublishHookService {
|
||||||
|
@Qualifier("taskExecutor")
|
||||||
|
@Autowired
|
||||||
|
private ThreadPoolTaskExecutor taskExecutor;
|
||||||
|
|
||||||
public interface ZlmPublishHookHandler {
|
public interface ZlmPublishHookHandler {
|
||||||
void handler();
|
void handler();
|
||||||
@ -38,6 +44,8 @@ public class ZlmPublishHookService {
|
|||||||
log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip);
|
log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip);
|
||||||
|
|
||||||
ConcurrentMap<String, ZlmPublishHookHandler> handlers = getHandler(app);
|
ConcurrentMap<String, ZlmPublishHookHandler> handlers = getHandler(app);
|
||||||
Optional.ofNullable(handlers.remove(streamId)).ifPresent(ZlmPublishHookHandler::handler);
|
Optional.ofNullable(handlers.remove(streamId)).ifPresent(handler -> {
|
||||||
|
taskExecutor.execute(handler::handler);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user