请求历史媒体下载增加回复事件处理ssrc与下级不一致情况

This commit is contained in:
wangyimeng 2023-03-03 20:59:10 +08:00
parent 6f057b422c
commit 60984c27cd
3 changed files with 152 additions and 92 deletions

View File

@ -122,7 +122,7 @@ public interface ISIPCommander {
*/
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
/**
* 视频流停止

View File

@ -29,7 +29,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
@ -365,7 +364,8 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
String startTime, String endTime,
InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
@ -411,7 +411,8 @@ public class SIPCommander implements ISIPCommander {
content.append("a=setup:active\r\n");
content.append("a=connection:new\r\n");
}
} else {
} else
{
if ("TCP-PASSIVE".equalsIgnoreCase(streamMode)) {
content.append("m=video " + ssrcInfo.getPort() + " TCP/RTP/AVP 96 97 98 99\r\n");
} else if ("TCP-ACTIVE".equalsIgnoreCase(streamMode)) {
@ -471,8 +472,9 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String startTime, String endTime, int downloadSpeed,
InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
String sdpIp;
@ -544,7 +546,8 @@ public class SIPCommander implements ISIPCommander {
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
// 添加订阅
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) ->
{
hookEvent.call(new InviteStreamInfo(mediaServerItem, json,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream()));
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("regist", false);
@ -567,10 +570,11 @@ public class SIPCommander implements ISIPCommander {
inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream()));
}
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent -> {
ResponseEvent responseEvent = (ResponseEvent) okEvent.event;
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download);
okEvent.response(event);
});
}
@ -802,7 +806,6 @@ public class SIPCommander implements ISIPCommander {
*
* @param device 视频设备
* @param channelId 通道id非通道则是设备本身
* @param frontCmd 上级平台的指令如果存在则直接下发
* @param enabled 看守位使能1 = 开启0 = 关闭
* @param resetTime 自动归位时间间隔开启看守位时使用单位:(s)
* @param presetIndex 调用预置位编号开启看守位时使用取值范围0~255

View File

@ -258,7 +258,8 @@ public class PlayServiceImpl implements IPlayService {
return;
}
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) ->
{
logger.info("收到订阅消息: " + response.toJSONString());
dynamicTask.stop(timeOutTaskKey);
@ -273,7 +274,8 @@ public class PlayServiceImpl implements IPlayService {
logger.info("[请求截图]: " + fileName);
zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName);
}, (event) -> {
}, (event) ->
{
ResponseEvent responseEvent = (ResponseEvent) event.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
@ -322,7 +324,8 @@ public class PlayServiceImpl implements IPlayService {
}
}
}, (event) -> {
}, (event) ->
{
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 释放ssrc
@ -513,7 +516,8 @@ public class PlayServiceImpl implements IPlayService {
try {
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
hookEvent, eventResult -> {
hookEvent, eventResult ->
{
if (eventResult.type == SipSubscribe.EventResultType.response) {
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
@ -582,6 +586,7 @@ public class PlayServiceImpl implements IPlayService {
if (device == null) {
return;
}
//获取录像下载的服务配置文件中的record-assist-port不为0
MediaServerItem newMediaServerItem = getNewMediaServerItemHasAssist(device);
if (newMediaServerItem == null) {
PlayBackResult<StreamInfo> downloadResult = new PlayBackResult<>();
@ -651,7 +656,59 @@ public class PlayServiceImpl implements IPlayService {
downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
downloadResult.setResponse(inviteStreamInfo.getResponse());
hookCallBack.call(downloadResult);
}, errorEvent);
}, errorEvent, eventResult ->
{
if (eventResult.type == SipSubscribe.EventResultType.response) {
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有f=字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
return;
}
logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
// ssrc 不可用
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
eventResult.statusCode = 400;
errorEvent.response(eventResult);
return;
}
// 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(downLoadTimeOutTaskKey);
// hook响应TODO 此处待处理
// onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, playBackCallback);
// hookCallBack.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
});
}
// 关闭rtp server
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
}
}
}
});
} catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 录像下载: {}", e.getMessage());