This commit is contained in:
shikong 2023-08-24 12:21:22 +08:00
parent ad1c20277a
commit 0d5e134943
3 changed files with 48 additions and 1 deletions

View File

@ -0,0 +1,37 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@RequiredArgsConstructor
public class InviteSubscribe implements GenericSubscribe<SipTransactionInfo> {
private final Executor executor;
private static final Map<String, SubmissionPublisher<SipTransactionInfo>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<SipTransactionInfo> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<SipTransactionInfo> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
@Override
public void delPublisher(String key) {
Helper.delPublisher(publishers, key);
}
}

View File

@ -1,5 +1,6 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import jakarta.annotation.PostConstruct;
@ -20,14 +21,17 @@ public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private GenericSubscribe<RecordInfoResponseDTO> recordInfoSubscribe;
private GenericSubscribe<SipTransactionInfo> inviteSubscribe;
@PostConstruct
private void init() {
recordInfoSubscribe = new RecordInfoSubscribe(executor);
inviteSubscribe = new InviteSubscribe(executor);
}
@PreDestroy
private void destroy() {
inviteSubscribe.close();
recordInfoSubscribe.close();
}
}

View File

@ -6,8 +6,11 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
@ -45,9 +48,10 @@ public class PlayService {
private final SsrcService ssrcService;
private final SipService sipService;
private final SipMessageSender sender;
private final SipSubscribe subscribe;
/**
*
* 实时视频点播
* @param deviceId 设备id
* @param channelId 通道id
*/
@ -99,6 +103,8 @@ public class PlayService {
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
sender.send(senderIp, request);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, deviceId, streamId);
// subscribe.getInviteSubscribe().addPublisher(subscribeKey);
result.setResult(JsonResponse.success(StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv")));
return result;
// zlmMediaService.getRtpInfo();