wvp 国标级联 视频下载 推流流程修正

This commit is contained in:
zxb 2024-03-05 14:41:55 +08:00
parent fc60e18cfa
commit a3d4bf9899
8 changed files with 180 additions and 68 deletions

12
pom.xml
View File

@ -331,6 +331,18 @@
<version>32.1.3-jre</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -9,10 +9,12 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.function.Function;
/**
* @description:设备能力接口用于定义设备的控制查询能力
@ -120,7 +122,7 @@ public interface ISIPCommander {
* @param downloadSpeed 下载倍速参数
*/
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed, ZlmHttpHookSubscribe.Event hookEvent,
String startTime, String endTime, int downloadSpeed, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ZlmHttpHookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
/**
@ -128,7 +130,8 @@ public interface ISIPCommander {
*/
void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;
void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;
/**
* 回放暂停

View File

@ -41,6 +41,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.util.List;
import java.util.function.Function;
/**
* @description:设备能力接口用于定义设备的控制查询能力
@ -497,8 +498,10 @@ public class SIPCommander implements ISIPCommander {
@Override
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed,
Function<String, Void> ignoreCallBack,
Function<SIPResponse, Void> inviteCallBack,
ZlmHttpHookSubscribe.Event hookEvent,
SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
String sdpIp;
@ -593,10 +596,15 @@ public class SIPCommander implements ISIPCommander {
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
if(ignoreCallBack != null){
ignoreCallBack.apply(newCallIdHeader.getCallId());
}
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
if(inviteCallBack != null){
inviteCallBack.apply(response);
}
String contentString =new String(response.getRawContent());
String ssrc = SipUtils.getSsrcFromSdp(contentString);
streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);

View File

@ -22,6 +22,7 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.github.rholder.retry.*;
import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,6 +42,8 @@ import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* SIP命令类型 ACK请求
@ -122,20 +125,27 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
return;
}
// sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> {
// sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
// try {
// responseAck((SIPRequest) evt.getRequest(), Response.OK);
// } catch (SipException | InvalidArgumentException | ParseException e) {
// logger.error(e.getMessage());
// }
// });
//
// sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), eventResult -> {
// sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
// try {
// responseAck((SIPRequest) evt.getRequest(), Response.OK);
// } catch (SipException | InvalidArgumentException | ParseException e) {
// logger.error(e.getMessage());
// }
// });
SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId());
if( okSubscribe != null) {
okSubscribe.response(new SipSubscribe.EventResult<>(evt));
SipSubscribe.Event platformAckSubscribe = sipSubscribe.getOkSubscribe("ACK_" + callIdHeader.getCallId());
if(platformAckSubscribe != null) {
platformAckSubscribe.response(null);
}
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_" + platformGbId + "_" + callIdHeader.getCallId());
// if( okSubscribe != null) {
// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
// return;
// }
// String channelId = id;
// if(storager.queryVideoDeviceByChannelId(id) != null){
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("DEVICE_INVITE_OK_" + id);
@ -157,10 +167,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
// sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
// }
// }
okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId());
if( okSubscribe != null) {
okSubscribe.response(new SipSubscribe.EventResult<>(evt));
}
// okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + callIdHeader.getCallId());
// if( okSubscribe != null) {
// okSubscribe.response(new SipSubscribe.EventResult<>(evt));
// return;
// }
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@ -196,10 +207,24 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader);
});
}else {
JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
if (startSendRtpStreamResult != null) {
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
logger.debug("sendRtpItem {}", JSONObject.toJSONString(sendRtpItem));
JSONObject startSendRtpStreamResult;
Retryer<JSONObject> retryer = RetryerBuilder.<JSONObject>newBuilder()
.retryIfResult(resp -> resp == null || resp.getInteger("code") != 0)
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3000))
.build();
try {
startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param));
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage());
startSendRtpStreamResult = null;
}
startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
}
}
private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform,

View File

@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@ -46,10 +47,10 @@ import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
@ -129,6 +130,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private SipSubscribe sipSubscribe;
@Autowired
private SIPRequestHeaderProvider headerProvider;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@ -438,29 +442,32 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
try {
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("Ack 等待超时");
mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
// 回复bye
try {
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}, 60 * 1000);
if(!"Download".equalsIgnoreCase(sessionName)){
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("Ack 等待超时");
mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
// 回复bye
try {
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
}, 60 * 1000);
sipSubscribe.addOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId(), (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId());
// sipSubscribe.addOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId(), (eventResult) -> {
// sipSubscribe.removeOkSubscribe("ACK_" + platform.getServerGBId()+"_"+callIdHeader.getCallId());
//
// SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + channelId);
// if(okSubscribe != null) {
// okSubscribe.response(null);
// //sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + channelId);
// }
// // responseAck(streamInfo.getTransactionInfo())
// });
responseSdpAck(request, content.toString(), platform);
}
SipSubscribe.Event okSubscribe = sipSubscribe.getOkSubscribe("ACK_DEVICE_" + channelId);
if(okSubscribe != null) {
okSubscribe.response(null);
//sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + channelId);
}
// responseAck(streamInfo.getTransactionInfo())
});
responseSdpAck(request, content.toString(), platform);
// sipSubscribe.addOkSubscribe("ACK_" + channelId, (eventResult) -> {});
// tcp主动模式回复sdp后开启监听
@ -567,8 +574,46 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// throw new RuntimeException(e);
// }
// });
playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.formatter.format(end), NumberUtils.createDouble(downloadSpeed).intValue(),
DateUtil.formatter.format(end), NumberUtils.createDouble(downloadSpeed).intValue(),(String deviceCallId)->{
// 忽略自动回复 ACK
sipSubscribe.addOkSubscribe("ACK_IGNORE_" + deviceCallId, (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_IGNORE_" + deviceCallId);
});
return null;
},(SIPResponse response)->{
// 先订阅再发起
// 订阅上级平台的 ACK 请求
sipSubscribe.addOkSubscribe("ACK_" + callIdHeader.getCallId(), (eventResult) -> {
sipSubscribe.removeOkSubscribe("ACK_" + callIdHeader.getCallId());
try {
String deviceContent = new String(response.getRawContent());
Gb28181Sdp deviceGb28181Sdp = SipUtils.parseSDP(deviceContent);
SessionDescription deviceSdp = deviceGb28181Sdp.getBaseSdb();
SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(deviceSdp.getOrigin().getUsername(), response.getRemoteAddress().getHostAddress() + ":" + response.getRemotePort());
// 收到上级的 ACK , 向设备转发 ACK 并开启 ZLM RTP 收流 + RTP 转发
Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response);
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
} catch (ParseException | SdpParseException | InvalidArgumentException | SipException e) {
throw new RuntimeException(e);
}
});
try {
// 向上级平台回复 INVITE OK
responseSdpAck(request, new String(request.getRawContent()), platform);
} catch (SipException | InvalidArgumentException |
ParseException e) {
throw new RuntimeException(e);
}
return null;
},
(code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
hookEvent.run(code, msg, data);

View File

@ -134,25 +134,34 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
//
// }
ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
try {
sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
} catch (SdpParseException | ParseException | SipException e) {
throw new RuntimeException(e);
}
}, 3, TimeUnit.SECONDS);
sipSubscribe.addOkSubscribe("ACK_DEVICE_" + id, (eventResult)->{
schedule.cancel(true);
sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
try {
logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
} catch (SdpParseException | ParseException | SipException e) {
throw new RuntimeException(e);
}
});
SipSubscribe.Event ignore = sipSubscribe.getOkSubscribe("ACK_IGNORE_" + callId);
if( ignore != null){
ignore.response(null);
return;
}
logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
// ScheduledFuture<?> schedule = scheduledExecutorService.schedule(() -> {
// try {
// sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
// logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
// sipSender.transmitRequest(response.getLocalAddress().getHostAddress(), reqAck);
// } catch (SdpParseException | ParseException | SipException e) {
// throw new RuntimeException(e);
// }
// }, 1500, TimeUnit.MILLISECONDS);
// sipSubscribe.addOkSubscribe("ACK_DEVICE_" + id, (eventResult)->{
// schedule.cancel(true);
// sipSubscribe.removeOkSubscribe("ACK_DEVICE_" + id);
// try {
// logger.info("[回复ack] {}-> {}:{} ", sdp.getOrigin().getUsername(), event.getRemoteIpAddress(), event.getRemotePort());
// sipSender.transmitRequest( response.getLocalAddress().getHostAddress(), reqAck);
// } catch (SdpParseException | ParseException | SipException e) {
// throw new RuntimeException(e);
// }
// });
}
} catch (InvalidArgumentException | ParseException | SipException | SdpParseException e) {
logger.info("[点播回复ACK],异常:", e );

View File

@ -6,10 +6,12 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import gov.nist.javax.sip.message.SIPResponse;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.function.Function;
/**
* 点播处理
@ -29,6 +31,8 @@ public interface IPlayService {
void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback);
void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback);
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);
void zlmServerOnline(String mediaServerId);

View File

@ -52,6 +52,7 @@ import java.text.ParseException;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
import java.util.function.Function;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Service
@ -670,9 +671,13 @@ public class PlayServiceImpl implements IPlayService {
download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback);
}
@Override
public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<Object> callback) {
download(mediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, null,null, callback);
}
@Override
public void download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, Function<String, Void> ignoreCallBack, Function<SIPResponse, Void> inviteCallBack, ErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@ -735,8 +740,9 @@ public class PlayServiceImpl implements IPlayService {
logger.debug("streamInfo => {}",streamInfo);
};
try {
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed,
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, ignoreCallBack, inviteCallBack,
hookEvent, errorEvent, eventResult ->{
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId,
downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD);