diff --git a/pom.xml b/pom.xml index aa16e74..1a1d06c 100644 --- a/pom.xml +++ b/pom.xml @@ -27,6 +27,7 @@ 3.2.5 3.1.8 2.5.0 + 4.0.0 0.1.0 @@ -40,6 +41,12 @@ pom + + com.lmax + disruptor + ${disruptor.version} + + io.reactivex.rxjava3 rxjava diff --git a/starter/pom.xml b/starter/pom.xml index 61884c6..0edba32 100644 --- a/starter/pom.xml +++ b/starter/pom.xml @@ -30,6 +30,11 @@ springdoc-openapi-starter-webmvc-api + + com.lmax + disruptor + + io.reactivex.rxjava3 rxjava diff --git a/starter/src/test/java/cn/skcks/study/disruptor/DisruptorTest.java b/starter/src/test/java/cn/skcks/study/disruptor/DisruptorTest.java new file mode 100644 index 0000000..06ae690 --- /dev/null +++ b/starter/src/test/java/cn/skcks/study/disruptor/DisruptorTest.java @@ -0,0 +1,131 @@ +package cn.skcks.study.disruptor; + + +import com.lmax.disruptor.BusySpinWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +@Slf4j +public class DisruptorTest { + /** + * cpu 核心数 + */ + public static final int CPU_NUM = Runtime.getRuntime().availableProcessors(); + /** + * 最大线程数 + */ + public static final int MAX_POOL_SIZE = CPU_NUM * 2; + /** + * 允许线程空闲时间(单位:默认为秒) + */ + private static final int KEEP_ALIVE_TIME = 20; + /** + * 队列长度 + */ + public static final int TASK_NUM = 5; + /** + * 线程名称(前缀) + */ + public static final String THREAD_NAME_PREFIX = "custom-disruptor-executor"; + + public static class LongEvent + { + private long value; + + public void set(long value) + { + this.value = value; + } + + @Override + public String toString() + { + return "LongEvent{" + "value=" + value + '}'; + } + } + + public static class LongEventFactory implements EventFactory + { + @Override + public LongEvent newInstance() + { + return new LongEvent(); + } + } + + public static class LongEventHandler implements EventHandler + { + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) + { + log.info("Event: {}, sequence: {}, endOfBatch: {}", event, sequence, endOfBatch); + } + } + + public static void main(String[] args) throws Exception + { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(CPU_NUM); + executor.setMaxPoolSize(MAX_POOL_SIZE); + executor.setQueueCapacity(TASK_NUM); + executor.setKeepAliveSeconds(KEEP_ALIVE_TIME); + executor.setThreadNamePrefix(THREAD_NAME_PREFIX); + // 线程池对拒绝任务的处理策略 + // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 初始化 + executor.initialize(); + + // ExecutorService executor = Executors.newCachedThreadPool(); + int bufferSize = 64; + + // LongEventFactory factory = new LongEventFactory(); + // Disruptor disruptor = + // new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy()); + + // Disruptor disruptor = + // new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy()); + + Disruptor disruptor = + new Disruptor<>(LongEvent::new, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); + + // disruptor.handleEventsWith((event, sequence, endOfBatch) -> + // System.out.println("Event: " + event)); + + disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler(),new LongEventHandler()); + disruptor.start(); + + RingBuffer ringBuffer = disruptor.getRingBuffer(); + new Thread(()->{ + for (long l = 0; l<256; l++) + { + // bb.putLong(0, l); + // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); + + long finalL = l; + ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL)); + } + }).start(); + + // ByteBuffer bb = ByteBuffer.allocate(8); + for (long l = 0; l<128; l++) + { + // bb.putLong(0, l); + // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); + + long finalL = l; + ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL)); + Thread.sleep(1); + } + + disruptor.shutdown(); //关闭 disruptor 阻塞直至所有事件都得到处理 + executor.shutdown(); + } +} diff --git a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java index 850c1a1..979283a 100644 --- a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java +++ b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.schedulers.Schedulers; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -16,6 +17,7 @@ import org.springframework.boot.test.context.SpringBootTest; import javax.sip.ListeningPoint; import javax.sip.message.Request; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; @Slf4j @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) @@ -70,16 +72,9 @@ public class RxJavaStudyStarterTest { take.subscribe(System.out::println); } + @SneakyThrows @Test void gb28181SipTest(){ - Consumer serverConsumer = request -> { - SIPRequest sipRequest = (SIPRequest) request; - String method = request.getMethod(); - log.info("收到SIP请求: {}", method); - Thread.sleep(RandomUtil.randomLong(10,500)); - log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId()); - }; - String localIp = "127.0.0.1"; int localPort = 5060; String localId = "44050100000000000001"; @@ -97,24 +92,43 @@ public class RxJavaStudyStarterTest { .targetId(serverId) .targetIp(serverIp) .targetPort(serverPort) + .transport(ListeningPoint.UDP) .build(); - Flowable requestObservable = Flowable.intervalRange(1,100, 0,10 ,TimeUnit.MILLISECONDS) - .onBackpressureBuffer() + AtomicLong produce = new AtomicLong(0); + AtomicLong consume = new AtomicLong(0); + + Flowable requestObservable = Flowable.intervalRange(0,10,0,100,TimeUnit.MILLISECONDS) + .onBackpressureDrop() .map(i -> { String callId = IdUtil.fastSimpleUUID(); + produce.getAndAdd(1); return requestBuilder.createNoAuthorizationRequest(callId, 3600); }) + .cache() + .share() + .distinct(request -> ((SIPRequest) request).getCallId().getCallId()) .observeOn(Schedulers.computation()) .subscribeOn(Schedulers.io()); - requestObservable.subscribe(serverConsumer); - requestObservable.subscribe(serverConsumer); - requestObservable.subscribe(serverConsumer); - requestObservable.subscribe(serverConsumer); - requestObservable.subscribe(serverConsumer); + Flowable requestFlowable = requestObservable.publish().refCount(); - requestObservable.blockingSubscribe(); + Consumer serverConsumer = request -> { + SIPRequest sipRequest = (SIPRequest) request; + String method = request.getMethod(); + log.info("收到SIP请求: {}", method); + Thread.sleep(RandomUtil.randomLong(10,100)); + log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId()); + consume.getAndAdd(1); + }; + + requestFlowable.subscribe(serverConsumer); + requestFlowable.subscribe(serverConsumer); + // requestObservable.subscribe(serverConsumer); + // requestObservable.blockingSubscribe(); + + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + log.info("生产数量: {}, 消费数量: {}", produce.get(), consume.get()); } }