play recordPlay key 调整(准备重构)

This commit is contained in:
shikong 2023-10-04 03:10:32 +08:00
parent 5b8db7322d
commit 61cdec8a74
3 changed files with 20 additions and 10 deletions

View File

@ -1,8 +1,10 @@
package cn.skcks.docking.gb28181.core.sip.listener; package cn.skcks.docking.gb28181.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
@ -21,7 +23,7 @@ import java.util.concurrent.ConcurrentMap;
@Component @Component
@Slf4j @Slf4j
public class SipListenerImpl implements SipListener { public class SipListenerImpl implements SipListener {
private final SipSubscribe sipSubscribe; private final SipSubscribe subscribe;
private final ConcurrentMap<String, MessageProcessor> requestProcessor = new ConcurrentHashMap<>(); private final ConcurrentMap<String, MessageProcessor> requestProcessor = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MessageProcessor> responseProcessor = new ConcurrentHashMap<>(); private final ConcurrentMap<String, MessageProcessor> responseProcessor = new ConcurrentHashMap<>();
@ -65,6 +67,11 @@ public class SipListenerImpl implements SipListener {
// 增加其它无需回复的响应如101180等 // 增加其它无需回复的响应如101180等
} else { } else {
log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase()); log.warn("接收到失败的response响应status" + status + ",message:" + response.getReasonPhrase());
SIPResponse sipResponse = (SIPResponse) response;
String callId = sipResponse.getCallIdHeader().getCallId();
Optional.ofNullable(subscribe.getSipResponseSubscribe()
.getPublisher(GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId)))
.ifPresent(publisher->publisher.submit(sipResponse));
if (responseEvent.getDialog() != null) { if (responseEvent.getDialog() != null) {
responseEvent.getDialog().delete(); responseEvent.getDialog().delete();
} }

View File

@ -53,13 +53,13 @@ public class InviteResponseProcessor implements MessageProcessor {
// trying不会回复 // trying不会回复
if (statusCode == Response.TRYING) { if (statusCode == Response.TRYING) {
subscribe.getInviteSubscribe().getPublisher(subscribeKey).submit(response); subscribe.getSipResponseSubscribe().getPublisher(subscribeKey).submit(response);
return; return;
} }
// 成功响应 // 成功响应
// 下发ack // 下发ack
if (statusCode == Response.OK) { if (statusCode == Response.OK) {
Optional.ofNullable(subscribe.getInviteSubscribe().getPublisher(subscribeKey)) Optional.ofNullable(subscribe.getSipResponseSubscribe().getPublisher(subscribeKey))
.ifPresentOrElse(publisher-> publisher.submit(response), .ifPresentOrElse(publisher-> publisher.submit(response),
()-> log.warn("对应订阅 {} 已结束",callId.getCallId())); ()-> log.warn("对应订阅 {} 已结束",callId.getCallId()));

View File

@ -1,5 +1,7 @@
package cn.skcks.docking.gb28181.service.play; package cn.skcks.docking.gb28181.service.play;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils; import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil; import cn.skcks.docking.gb28181.common.redis.RedisUtil;
@ -147,7 +149,7 @@ public class PlayService {
CallIdHeader callId = provider.getNewCallId(); CallIdHeader callId = provider.getNewCallId();
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey); subscribe.getSipResponseSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() { Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription; private Flow.Subscription subscription;
@ -186,13 +188,13 @@ public class PlayService {
@Override @Override
public void onComplete() { public void onComplete() {
subscribe.getInviteSubscribe().delPublisher(subscribeKey); subscribe.getSipResponseSubscribe().delPublisher(subscribeKey);
} }
}; };
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber);
sender.send(senderIp, request); sender.send(senderIp, request);
result.onTimeout(() -> { result.onTimeout(() -> {
subscribe.getInviteSubscribe().delPublisher(subscribeKey); subscribe.getSipResponseSubscribe().delPublisher(subscribeKey);
result.setResult(JsonResponse.error("点播超时")); result.setResult(JsonResponse.error("点播超时"));
}); });
return result; return result;
@ -245,7 +247,7 @@ public class PlayService {
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId());
subscribe.getInviteSubscribe().addPublisher(subscribeKey); subscribe.getSipResponseSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() { Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription; private Flow.Subscription subscription;
@ -266,6 +268,7 @@ public class PlayService {
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) { } else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc))); RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc)));
RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND), TimeUnit.SECONDS);
result.setResult(JsonResponse.success(videoUrl(streamId))); result.setResult(JsonResponse.success(videoUrl(streamId)));
onComplete(); onComplete();
} else { } else {
@ -287,10 +290,10 @@ public class PlayService {
subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey); subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey);
} }
}; };
subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber);
sender.send(senderIp, request); sender.send(senderIp, request);
result.onTimeout(() -> { result.onTimeout(() -> {
subscribe.getInviteSubscribe().delPublisher(subscribeKey); subscribe.getSipResponseSubscribe().delPublisher(subscribeKey);
result.setResult(JsonResponse.error("点播超时")); result.setResult(JsonResponse.error("点播超时"));
}); });
return result; return result;