SipSubscribe 定义

This commit is contained in:
shikong 2023-08-23 00:05:10 +08:00
parent f5e65eb274
commit 18b8c78b30
10 changed files with 146 additions and 118 deletions

View File

@ -13,6 +13,7 @@ import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
@Tag(name="历史录像")
@RestController
@ -27,8 +28,10 @@ public class RecordController {
}
@GetJson("/getInfo")
public JsonResponse<Void> getInfo(@ParameterObject @Validated GetInfoDTO dto){
public DeferredResult<JsonResponse<Void>> getInfo(@ParameterObject @Validated GetInfoDTO dto){
recordService.requestRecordInfo(dto.getDeviceId());
return JsonResponse.success(null);
DeferredResult<JsonResponse<Void>> result = new DeferredResult<>();
result.setResult(JsonResponse.success(null));
return result;
}
}

View File

@ -1,7 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.listener;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.event.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,58 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.core.sip.message.event.custom.DeviceNotFoundEvent;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.Data;
import javax.sip.DialogTerminatedEvent;
import javax.sip.ResponseEvent;
import javax.sip.TimeoutEvent;
import java.util.EventObject;
@Data
public class SipEventItem {
private int statusCode;
private SipEventType type;
private String msg;
private String callId;
private final EventObject event;
public SipEventItem(EventObject eventObject) {
event = eventObject;
msg = ResponseStatus.UNDEFINED.getMessage();
statusCode = ResponseStatus.UNDEFINED.getCode();
if(eventObject instanceof ResponseEvent responseEvent){
SIPResponse response = (SIPResponse)responseEvent.getResponse();
type = SipEventType.Response;
if (response != null) {
msg = response.getReasonPhrase();
statusCode = response.getStatusCode();
callId = response.getCallIdHeader().getCallId();
}
} else if(eventObject instanceof TimeoutEvent timeoutEvent){
type = SipEventType.TimeOut;
msg = "消息超时未回复";
statusCode = ResponseStatus.REQUEST_TIMEOUT.getCode();
SIPRequest request;
if (timeoutEvent.isServerTransaction()) {
request = ((SIPRequest)timeoutEvent.getServerTransaction().getRequest());
} else {
request = ((SIPRequest)timeoutEvent.getClientTransaction().getRequest());
}
callId = request.getCallIdHeader().getCallId();
} else if(eventObject instanceof DialogTerminatedEvent dialogTerminatedEvent){
type = SipEventType.End;
msg = "会话已结束";
statusCode = ResponseStatus.GONE.getCode();
callId = dialogTerminatedEvent.getDialog().getCallId().getCallId();
} else if(eventObject instanceof DeviceNotFoundEvent deviceNotFoundEvent){
type = SipEventType.DeviceNotFound;
msg = "设备未找到";
statusCode = ResponseStatus.NOT_FOUND.getCode();
callId = deviceNotFoundEvent.getCallId();
}
}
}

View File

@ -1,9 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
public enum SipEventType {
TimeOut,
Response,
End,
DeviceNotFound,
CmdFail
}

View File

@ -1,7 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
import java.util.concurrent.Flow;
public interface SipSubscriber extends Flow.Subscriber<SipEventItem> {
}

View File

@ -1,20 +0,0 @@
package cn.skcks.docking.gb28181.core.sip.message.event.custom;
import javax.sip.Dialog;
import java.util.EventObject;
public class DeviceNotFoundEvent extends EventObject {
private String callId;
public DeviceNotFoundEvent(Dialog dialog) {
super(dialog);
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
}

View File

@ -0,0 +1,46 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public interface GenericSubscribe<T> {
void close();
void addPublisher(String key);
SubmissionPublisher<T> getPublisher(String key);
void addSubscribe(String key,Flow.Subscriber<T> subscribe);
class Helper {
public static <T> void close(Map<String,SubmissionPublisher<T>> publishers){
publishers.values().forEach(SubmissionPublisher::close);
publishers.clear();
}
public static <T> void addPublisher(Executor executor, Map<String, SubmissionPublisher<T>> publishers, String key){
SubmissionPublisher<T> publisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
publishers.put(key, publisher);
}
public static <T> void addPublisher(Executor executor, Map<String, SubmissionPublisher<T>> publishers, String key, int bufferSize){
SubmissionPublisher<T> publisher = new SubmissionPublisher<>(executor, bufferSize);
publishers.put(key, publisher);
}
public static <T> SubmissionPublisher<T> getPublisher(Map<String, SubmissionPublisher<T>> publishers, String key){
return publishers.get(key);
}
public static <T> void addSubscribe(Map<String, SubmissionPublisher<T>> publishers, String key,Flow.Subscriber<T> subscribe){
SubmissionPublisher<T> publisher = getPublisher(publishers, key);
if(publisher != null){
publisher.subscribe(subscribe);
}
}
}
}

View File

@ -0,0 +1,32 @@
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@RequiredArgsConstructor
public class RecordInfoSubscribe implements GenericSubscribe<RecordInfoResponseDTO> {
private final Executor executor;
private static final Map<String, SubmissionPublisher<RecordInfoResponseDTO>> publishers = new ConcurrentHashMap<>();
public void close() {
Helper.close(publishers);
}
public void addPublisher(String key) {
Helper.addPublisher(executor, publishers, key);
}
public SubmissionPublisher<RecordInfoResponseDTO> getPublisher(String key) {
return Helper.getPublisher(publishers, key);
}
public void addSubscribe(String key, Flow.Subscriber<RecordInfoResponseDTO> subscribe) {
Helper.addSubscribe(publishers, key, subscribe);
}
}

View File

@ -1,6 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
package cn.skcks.docking.gb28181.core.sip.message.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
@ -10,8 +11,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@Slf4j
@Data
@ -20,16 +19,15 @@ import java.util.concurrent.SubmissionPublisher;
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private SubmissionPublisher<SipEventItem> publisher;
private GenericSubscribe<RecordInfoResponseDTO> recordInfoSubscribe;
@PostConstruct
private void init(){
publisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
private void init() {
recordInfoSubscribe = new RecordInfoSubscribe(executor);
}
@PreDestroy
private void destroy(){
publisher.close();
private void destroy() {
recordInfoSubscribe.close();
}
}

View File

@ -2,16 +2,16 @@ package cn.skcks.docking.gb28181.core.sip.message.event;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class SipEventTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(512);
CountDownLatch countDownLatch = new CountDownLatch(1);
int threadNum = Runtime.getRuntime().availableProcessors() * 2;
int taskNum = 1000;
ExecutorService executor = new ThreadPoolExecutor(threadNum, threadNum,
@ -20,21 +20,20 @@ public class SipEventTest {
new ThreadPoolExecutor.DiscardPolicy());
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
submissionPublisher.subscribe(new Flow.Subscriber<>() {
List<String> list = new ArrayList<>();
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("建立订阅");
this.subscription = subscription;
subscription.request(1);
log.info("建立订阅");
subscription.request(5);
}
@Override
public void onNext(String item) {
log.info("接收发送者消息 {}", item);
countDownLatch.countDown();
subscription.request(1);
list.add(item);
subscription.request(5);
}
@Override
@ -44,21 +43,65 @@ public class SipEventTest {
@Override
public void onComplete() {
subscription.cancel();
countDownLatch.countDown();
log.info("订阅结束");
}
});
};
submissionPublisher.subscribe(subscriber);
AtomicInteger finalI = new AtomicInteger(1);
new Thread(()->{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
subscriber.onComplete();
}).start();
for (int i = 0; i < 128; i++) {
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
Thread.sleep(10);
}
countDownLatch.await();
submissionPublisher.close();
list.parallelStream().forEach(item -> log.info("接收发送者消息 {}", item));
CountDownLatch countDownLatch2 = new CountDownLatch(1);
submissionPublisher.subscribe(new Flow.Subscriber<>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.info("建立订阅");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String item) {
log.info("{}", item);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscription.cancel();
countDownLatch2.countDown();
log.info("订阅结束");
}
});
countDownLatch2.await();
executor.shutdown();
}
}