添加对bye请求的处理

This commit is contained in:
shikong 2023-10-05 03:40:46 +08:00
parent e89656122d
commit de4726ffba
2 changed files with 113 additions and 11 deletions

View File

@ -0,0 +1,59 @@
package cn.skcks.docking.gb28181.core.sip.message.processor.bye.request;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.sip.method.invite.response.InviteResponseBuilder;
import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import java.util.EventObject;
import java.util.Optional;
@Slf4j
@RequiredArgsConstructor
@Component
public class ByeRequestProcessor implements MessageProcessor {
private final SipListener sipListener;
private final SipSubscribe subscribe;
private final SipService sipService;
@PostConstruct
@Override
public void init() {
sipListener.addRequestProcessor(Request.BYE, this);
}
@Override
public void process(EventObject eventObject) {
RequestEvent requestEvent = (RequestEvent) eventObject;
SIPRequest request = (SIPRequest) requestEvent.getRequest();
String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
log.info("key {}", key);
String ip = request.getLocalAddress().getHostAddress();
String transport = request.getTopmostViaHeader().getTransport();
SipProvider provider= sipService.getProvider(transport, ip);
Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(key))
.ifPresentOrElse(
publisher -> publisher.submit(request),
() -> {
try {
provider.sendResponse(InviteResponseBuilder.builder().build().createTryingInviteResponse(request));
} catch (SipException e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -7,6 +7,7 @@ import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe;
import cn.skcks.docking.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
@ -14,7 +15,6 @@ import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.media.dto.rtp.CloseRtpServer;
import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp;
@ -27,6 +27,9 @@ import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder;
import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
import cn.skcks.docking.gb28181.sip.method.invite.response.InviteResponseBuilder;
import cn.skcks.docking.gb28181.sip.utils.SipUtil;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -148,7 +151,7 @@ public class PlayService {
SipProvider provider = sipService.getProvider(transport, senderIp);
CallIdHeader callId = provider.getNewCallId();
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId());
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getSipResponseSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@ -156,7 +159,7 @@ public class PlayService {
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {}", Request.INVITE, subscribeKey);
subscription.request(1);
}
@ -165,15 +168,15 @@ public class PlayService {
int statusCode = item.getStatusCode();
log.debug("{} 收到订阅消息 {}", subscribeKey, item);
if (statusCode == Response.TRYING) {
log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {} 尝试连接流媒体服务", Request.INVITE, subscribeKey);
subscription.request(1);
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey);
RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc)));
result.setResult(JsonResponse.success(videoUrl(streamId)));
onComplete();
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", Request.INVITE, subscribeKey);
RedisUtil.KeyOps.delete(key);
result.setResult(JsonResponse.error("连接流媒体服务失败"));
ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc);
@ -191,6 +194,9 @@ public class PlayService {
subscribe.getSipResponseSubscribe().delPublisher(subscribeKey);
}
};
byeSubscribe(callId.getCallId(),3600,()->{
RedisUtil.KeyOps.delete(key);
});
subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber);
sender.send(senderIp, request);
result.onTimeout(() -> {
@ -246,7 +252,7 @@ public class PlayService {
CallIdHeader callId = provider.getNewCallId();
Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId);
String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId());
String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId());
subscribe.getSipResponseSubscribe().addPublisher(subscribeKey);
Flow.Subscriber<SIPResponse> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@ -254,7 +260,7 @@ public class PlayService {
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.info("订阅 {} {}", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {}", Request.INVITE, subscribeKey);
subscription.request(1);
}
@ -263,16 +269,16 @@ public class PlayService {
int statusCode = item.getStatusCode();
log.debug("{} 收到订阅消息 {}", subscribeKey, item);
if (statusCode == Response.TRYING) {
log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {} 尝试连接流媒体服务", Request.INVITE, subscribeKey);
subscription.request(1);
} else if (statusCode >= Response.OK && statusCode < Response.MULTIPLE_CHOICES) {
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", Request.INVITE, subscribeKey);
RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item, ssrc)));
RedisUtil.KeyOps.expire(key, DateUtil.between(startTime, endTime, DateUnit.SECOND), TimeUnit.SECONDS);
result.setResult(JsonResponse.success(videoUrl(streamId)));
onComplete();
} else {
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey);
log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", Request.INVITE, subscribeKey);
RedisUtil.KeyOps.delete(key);
result.setResult(JsonResponse.error("连接流媒体服务失败"));
ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc);
@ -290,6 +296,9 @@ public class PlayService {
subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey);
}
};
byeSubscribe(callId.getCallId(),DateUtil.between(startTime,endTime,DateUnit.SECOND),()->{
RedisUtil.KeyOps.delete(key);
});
subscribe.getSipResponseSubscribe().addSubscribe(subscribeKey, subscriber);
sender.send(senderIp, request);
result.onTimeout(() -> {
@ -299,6 +308,40 @@ public class PlayService {
return result;
}
public void byeSubscribe(String callId, long seconds, Runnable cb){
GenericTimeoutSubscribe<SIPRequest> sipRequestSubscribe = subscribe.getSipRequestSubscribe();
String subscribeKey = GenericSubscribe.Helper.getKey(Request.BYE, callId);
sipRequestSubscribe.addPublisher(subscribeKey,seconds + 30,TimeUnit.SECONDS);
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>(){
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
@Override
@SneakyThrows
public void onNext(SIPRequest item) {
subscribe.getRecordInfoSubscribe().delPublisher(GenericSubscribe.Helper.getKey(Request.INVITE, callId));
String transport = item.getTopmostViaHeader().getTransport();
String hostAddress = item.getLocalAddress().getHostAddress();
Response byeResponse = InviteResponseBuilder.builder().build().createByeResponse(item, SipUtil.nanoId());
sipService.getProvider(transport,hostAddress).sendResponse(byeResponse);
cb.run();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey);
}
};
sipRequestSubscribe.addSubscribe(subscribeKey,subscriber);
}
@SneakyThrows
public JsonResponse<Void> recordStop(String deviceId, String channelId, Date startTime, Date endTime) {
DockingDevice device = deviceService.getDevice(deviceId);