Disruptor 试水

This commit is contained in:
shikong 2024-05-03 17:26:06 +08:00
parent 85b48056f6
commit ee3c9bd0ab
4 changed files with 173 additions and 16 deletions

View File

@ -27,6 +27,7 @@
<springboot.version>3.2.5</springboot.version> <springboot.version>3.2.5</springboot.version>
<rxjava.version>3.1.8</rxjava.version> <rxjava.version>3.1.8</rxjava.version>
<springdoc.version>2.5.0</springdoc.version> <springdoc.version>2.5.0</springdoc.version>
<disruptor.version>4.0.0</disruptor.version>
<gb28181.docking.version>0.1.0</gb28181.docking.version> <gb28181.docking.version>0.1.0</gb28181.docking.version>
</properties> </properties>
@ -40,6 +41,12 @@
<type>pom</type> <type>pom</type>
</dependency> </dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
<dependency> <dependency>
<groupId>io.reactivex.rxjava3</groupId> <groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId> <artifactId>rxjava</artifactId>

View File

@ -30,6 +30,11 @@
<artifactId>springdoc-openapi-starter-webmvc-api</artifactId> <artifactId>springdoc-openapi-starter-webmvc-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.reactivex.rxjava3</groupId> <groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId> <artifactId>rxjava</artifactId>

View File

@ -0,0 +1,131 @@
package cn.skcks.study.disruptor;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class DisruptorTest {
/**
* cpu 核心数
*/
public static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
/**
* 最大线程数
*/
public static final int MAX_POOL_SIZE = CPU_NUM * 2;
/**
* 允许线程空闲时间单位默认为秒
*/
private static final int KEEP_ALIVE_TIME = 20;
/**
* 队列长度
*/
public static final int TASK_NUM = 5;
/**
* 线程名称(前缀)
*/
public static final String THREAD_NAME_PREFIX = "custom-disruptor-executor";
public static class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
@Override
public String toString()
{
return "LongEvent{" + "value=" + value + '}';
}
}
public static class LongEventFactory implements EventFactory<LongEvent>
{
@Override
public LongEvent newInstance()
{
return new LongEvent();
}
}
public static class LongEventHandler implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
log.info("Event: {}, sequence: {}, endOfBatch: {}", event, sequence, endOfBatch);
}
}
public static void main(String[] args) throws Exception
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CPU_NUM);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(TASK_NUM);
executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
// 线程池对拒绝任务的处理策略
// CallerRunsPolicy由调用线程提交任务的线程处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
// ExecutorService executor = Executors.newCachedThreadPool();
int bufferSize = 64;
// LongEventFactory factory = new LongEventFactory();
// Disruptor<LongEvent> disruptor =
// new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy());
// Disruptor<LongEvent> disruptor =
// new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy());
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
// disruptor.handleEventsWith((event, sequence, endOfBatch) ->
// System.out.println("Event: " + event));
disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler(),new LongEventHandler());
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(()->{
for (long l = 0; l<256; l++)
{
// bb.putLong(0, l);
// ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
long finalL = l;
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL));
}
}).start();
// ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; l<128; l++)
{
// bb.putLong(0, l);
// ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
long finalL = l;
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL));
Thread.sleep(1);
}
disruptor.shutdown(); //关闭 disruptor 阻塞直至所有事件都得到处理
executor.shutdown();
}
}

View File

@ -9,6 +9,7 @@ import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
@ -16,6 +17,7 @@ import org.springframework.boot.test.context.SpringBootTest;
import javax.sip.ListeningPoint; import javax.sip.ListeningPoint;
import javax.sip.message.Request; import javax.sip.message.Request;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ -70,16 +72,9 @@ public class RxJavaStudyStarterTest {
take.subscribe(System.out::println); take.subscribe(System.out::println);
} }
@SneakyThrows
@Test @Test
void gb28181SipTest(){ void gb28181SipTest(){
Consumer<Request> serverConsumer = request -> {
SIPRequest sipRequest = (SIPRequest) request;
String method = request.getMethod();
log.info("收到SIP请求: {}", method);
Thread.sleep(RandomUtil.randomLong(10,500));
log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId());
};
String localIp = "127.0.0.1"; String localIp = "127.0.0.1";
int localPort = 5060; int localPort = 5060;
String localId = "44050100000000000001"; String localId = "44050100000000000001";
@ -97,24 +92,43 @@ public class RxJavaStudyStarterTest {
.targetId(serverId) .targetId(serverId)
.targetIp(serverIp) .targetIp(serverIp)
.targetPort(serverPort) .targetPort(serverPort)
.transport(ListeningPoint.UDP) .transport(ListeningPoint.UDP)
.build(); .build();
Flowable<Request> requestObservable = Flowable.intervalRange(1,100, 0,10 ,TimeUnit.MILLISECONDS) AtomicLong produce = new AtomicLong(0);
.onBackpressureBuffer() AtomicLong consume = new AtomicLong(0);
Flowable<Request> requestObservable = Flowable.intervalRange(0,10,0,100,TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.map(i -> { .map(i -> {
String callId = IdUtil.fastSimpleUUID(); String callId = IdUtil.fastSimpleUUID();
produce.getAndAdd(1);
return requestBuilder.createNoAuthorizationRequest(callId, 3600); return requestBuilder.createNoAuthorizationRequest(callId, 3600);
}) })
.cache()
.share()
.distinct(request -> ((SIPRequest) request).getCallId().getCallId())
.observeOn(Schedulers.computation()) .observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io()); .subscribeOn(Schedulers.io());
requestObservable.subscribe(serverConsumer); Flowable<Request> requestFlowable = requestObservable.publish().refCount();
requestObservable.subscribe(serverConsumer);
requestObservable.subscribe(serverConsumer);
requestObservable.subscribe(serverConsumer);
requestObservable.subscribe(serverConsumer);
requestObservable.blockingSubscribe(); Consumer<Request> serverConsumer = request -> {
SIPRequest sipRequest = (SIPRequest) request;
String method = request.getMethod();
log.info("收到SIP请求: {}", method);
Thread.sleep(RandomUtil.randomLong(10,100));
log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId());
consume.getAndAdd(1);
};
requestFlowable.subscribe(serverConsumer);
requestFlowable.subscribe(serverConsumer);
// requestObservable.subscribe(serverConsumer);
// requestObservable.blockingSubscribe();
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
log.info("生产数量: {}, 消费数量: {}", produce.get(), consume.get());
} }
} }