diff --git a/pom.xml b/pom.xml
index 1cef38b6..b85d5b01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -331,6 +331,18 @@
32.1.3-jre
+
+ com.github.rholder
+ guava-retrying
+ 2.0.0
+
+
+ com.google.guava
+ guava
+
+
+
+
org.springframework.boot
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
index a169a317..32758f9b 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -9,10 +9,12 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import gov.nist.javax.sip.message.SIPRequest;
+import gov.nist.javax.sip.message.SIPResponse;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
+import java.util.function.Function;
/**
* @description:设备能力接口,用于定义设备的控制、查询能力
@@ -120,7 +122,7 @@ public interface ISIPCommander {
* @param downloadSpeed 下载倍速参数
*/
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
- String startTime, String endTime, int downloadSpeed, ZlmHttpHookSubscribe.Event hookEvent,
+ String startTime, String endTime, int downloadSpeed, Function ignoreCallBack, Function inviteCallBack, ZlmHttpHookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
/**
@@ -128,7 +130,8 @@ public interface ISIPCommander {
*/
void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
- void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;
+
+ void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;
/**
* 回放暂停
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 0f03d761..2581b1f7 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -41,6 +41,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.util.List;
+import java.util.function.Function;
/**
* @description:设备能力接口,用于定义设备的控制、查询能力
@@ -497,8 +498,10 @@ public class SIPCommander implements ISIPCommander {
@Override
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed,
+ Function ignoreCallBack,
+ Function inviteCallBack,
ZlmHttpHookSubscribe.Event hookEvent,
- SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
+ SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
String sdpIp;
@@ -593,10 +596,15 @@ public class SIPCommander implements ISIPCommander {
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
-
+ if(ignoreCallBack != null){
+ ignoreCallBack.apply(newCallIdHeader.getCallId());
+ }
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
+ if(inviteCallBack != null){
+ inviteCallBack.apply(response);
+ }
String contentString =new String(response.getRawContent());
String ssrc = SipUtils.getSsrcFromSdp(contentString);
streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 22303d17..e79ccd61 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -22,6 +22,7 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.github.rholder.retry.*;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +42,8 @@ import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* SIP命令类型: ACK请求
@@ -122,20 +125,27 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
return;
}
- // sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> {
- // sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
- // try {
- // responseAck((SIPRequest) evt.getRequest(), Response.OK);
- // } catch (SipException | InvalidArgumentException | ParseException e) {
- // logger.error(e.getMessage());
- // }
- // });
+ //
+// sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> {
+// sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
+// try {
+// responseAck((SIPRequest) evt.getRequest(), Response.OK);
+// } catch (SipException | InvalidArgumentException | ParseException e) {
+// logger.error(e.getMessage());
+// }
+// });
- SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId());
- if( okSubscribe != null) {
- okSubscribe.response(new SipSubscribe.EventResult<>(evt));
+ SipSubscribe.Event platformAckSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId());
+ if(platformAckSubscribe != null) {
+ platformAckSubscribe.response(null);
}
+ // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId());
+// if( okSubscribe != null) {
+// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
+// return;
+// }
+
// String channelId = id;
// if(storager.queryVideoDeviceByChannelId(id) != null){
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id);
@@ -157,10 +167,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
// sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
// }
// }
- okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId());
- if( okSubscribe != null) {
- okSubscribe.response(new SipSubscribe.EventResult<>(evt));
- }
+// okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId());
+// if( okSubscribe != null) {
+// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
+// return;
+// }
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -196,10 +207,24 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
});
}else {
- JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
- if (startSendRtpStreamResult != null) {
- startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
+ logger.debug("sendRtpItem {}", JSONObject.toJSONString(sendRtpItem));
+ JSONObject startSendRtpStreamResult;
+ Retryer retryer = RetryerBuilder.newBuilder()
+ .retryIfResult(resp -> resp == null || resp.getInteger("code") != 0)
+ .retryIfException()
+ .retryIfRuntimeException()
+ // 重试间隔
+ .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
+ // 重试次数
+ .withStopStrategy(StopStrategies.stopAfterAttempt(3000))
+ .build();
+ try {
+ startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param));
+ } catch (ExecutionException | RetryException e) {
+ logger.error(e.getMessage());
+ startSendRtpStreamResult = null;
}
+ startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
}
}
private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 872ee4e2..5a6453ed 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@@ -46,10 +47,10 @@ import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.sdp.*;
-import javax.sip.InvalidArgumentException;
-import javax.sip.RequestEvent;
-import javax.sip.SipException;
+import javax.sip.*;
+import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
+import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
@@ -129,6 +130,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private SipSubscribe sipSubscribe;
+ @Autowired
+ private SIPRequestHeaderProvider headerProvider;
+
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -438,29 +442,32 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
try {
- // 超时未收到Ack应该回复bye,当前等待时间为10秒
- dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
- logger.info("Ack 等待超时");
- mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
- // 回复bye
- try {
- cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
- } catch (SipException | InvalidArgumentException | ParseException e) {
- logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
- }
- }, 60 * 1000);
+ if(!"Download".equalsIgnoreCase(sessionName)){
+ // 超时未收到Ack应该回复bye,当前等待时间为10秒
+ dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
+ logger.info("Ack 等待超时");
+ mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
+ // 回复bye
+ try {
+ cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
+ } catch (SipException | InvalidArgumentException | ParseException e) {
+ logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
+ }
+ }, 60 * 1000);
- sipSubscribe.addOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId(), (eventResult) -> {
- sipSubscribe.removeOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId());
+ // sipSubscribe.addOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId(), (eventResult) -> {
+ // sipSubscribe.removeOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId());
+ //
+ // SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + channelId);
+ // if(okSubscribe != null) {
+ // okSubscribe.response(null);
+ // //sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + channelId);
+ // }
+ // // responseAck(streamInfo.getTransactionInfo())
+ // });
+ responseSdpAck(request, content.toString(), platform);
+ }
- SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + channelId);
- if(okSubscribe != null) {
- okSubscribe.response(null);
- //sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + channelId);
- }
- // responseAck(streamInfo.getTransactionInfo())
- });
- responseSdpAck(request, content.toString(), platform);
// sipSubscribe.addOkSubscribe("ACK_" + channelId, (eventResult) -> {});
// tcp主动模式,回复sdp后开启监听
@@ -567,8 +574,46 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// throw new RuntimeException(e);
// }
// });
+
playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
- DateUtil.formatter.format(end), NumberUtils.createDouble(downloadSpeed).intValue(),
+ DateUtil.formatter.format(end), NumberUtils.createDouble(downloadSpeed).intValue(),(String deviceCallId)->{
+ // 忽略自动回复 ACK
+ sipSubscribe.addOkSubscribe("ACK_IGNORE_" + deviceCallId, (eventResult) -> {
+ sipSubscribe.removeOkSubscribe("ACK_IGNORE_" + deviceCallId);
+ });
+
+ return null;
+ },(SIPResponse response)->{
+
+
+ // 先订阅再发起
+ // 订阅上级平台的 ACK 请求
+ sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), (eventResult) -> {
+ sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
+
+ try {
+ String deviceContent = new String(response.getRawContent());
+ Gb28181Sdp deviceGb28181Sdp = SipUtils.parseSDP(deviceContent);
+ SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
+ SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
+
+ // 收到上级的 ACK 后, 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
+ Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
+ sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
+ } catch (ParseException | SdpParseException | InvalidArgumentException | SipException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ try {
+ // 向上级平台回复 INVITE OK
+ responseSdpAck(request, new String(request.getRawContent()), platform);
+ } catch (SipException | InvalidArgumentException |
+ ParseException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
(code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
hookEvent.run(code, msg, data);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
index 27142ab0..9b69c26f 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
@@ -134,25 +134,34 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
//
// }
- ScheduledFuture> schedule = scheduledExecutorService.schedule(() -> {
- try {
- sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
- logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
- sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
- } catch (SdpParseException | ParseException | SipException e) {
- throw new RuntimeException(e);
- }
- }, 3, TimeUnit.SECONDS);
- sipSubscribe.addOkSubscribe("ACK_DEVICE_" + id, (eventResult)->{
- schedule.cancel(true);
- sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
- try {
- logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
- sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
- } catch (SdpParseException | ParseException | SipException e) {
- throw new RuntimeException(e);
- }
- });
+ SipSubscribe.Event ignore = sipSubscribe.getOkSubscribe("ACK_IGNORE_" + callId);
+ if( ignore != null){
+ ignore.response(null);
+ return;
+ }
+
+ logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
+ sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
+
+// ScheduledFuture> schedule = scheduledExecutorService.schedule(() -> {
+// try {
+// sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
+// logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
+// sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
+// } catch (SdpParseException | ParseException | SipException e) {
+// throw new RuntimeException(e);
+// }
+// }, 1500, TimeUnit.MILLISECONDS);
+// sipSubscribe.addOkSubscribe("ACK_DEVICE_" + id, (eventResult)->{
+// schedule.cancel(true);
+// sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
+// try {
+// logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
+// sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
+// } catch (SdpParseException | ParseException | SipException e) {
+// throw new RuntimeException(e);
+// }
+// });
}
} catch (InvalidArgumentException | ParseException | SipException | SdpParseException e) {
logger.info("[点播回复ACK],异常:", e );
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
index ada75fb5..e2ada5d4 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -6,10 +6,12 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import gov.nist.javax.sip.message.SIPResponse;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
+import java.util.function.Function;
/**
* 点播处理
@@ -29,6 +31,8 @@ public interface IPlayService {
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback