Disruptor 试水

This commit is contained in:
shikong 2024-05-03 23:13:03 +08:00
parent ee3c9bd0ab
commit 1eb055c662

View File

@ -1,15 +1,19 @@
package cn.skcks.study.disruptor; package cn.skcks.study.disruptor;
import cn.hutool.core.util.IdUtil;
import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuilder;
import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sip.ListeningPoint;
import javax.sip.message.Request;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@Slf4j @Slf4j
@ -35,42 +39,41 @@ public class DisruptorTest {
*/ */
public static final String THREAD_NAME_PREFIX = "custom-disruptor-executor"; public static final String THREAD_NAME_PREFIX = "custom-disruptor-executor";
public static class LongEvent @Data
{ public static class RequestEvent {
private long value; private Request request;
public void set(long value)
{
this.value = value;
}
@Override
public String toString()
{
return "LongEvent{" + "value=" + value + '}';
}
} }
public static class LongEventFactory implements EventFactory<LongEvent> public static class SIPRequestHandler implements EventHandler<RequestEvent>
{ {
@Override @Override
public LongEvent newInstance() public void onEvent(RequestEvent event, long sequence, boolean endOfBatch)
{ {
return new LongEvent(); log.info("Request: \n{}, sequence: {}, endOfBatch: {}", event.getRequest(), sequence, endOfBatch);
}
}
public static class LongEventHandler implements EventHandler<LongEvent>
{
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
log.info("Event: {}, sequence: {}, endOfBatch: {}", event, sequence, endOfBatch);
} }
} }
public static void main(String[] args) throws Exception public static void main(String[] args) throws Exception
{ {
String localIp = "127.0.0.1";
int localPort = 5060;
String localId = "44050100000000000001";
String serverId = "44050100002000000003";
String serverIp = "10.10.10.20";
int serverPort = 5060;
RegisterRequestBuilder requestBuilder = RegisterRequestBuilder.builder()
.localId(localId)
.localIp(localIp)
.localPort(localPort)
.targetId(serverId)
.targetIp(serverIp)
.targetPort(serverPort)
.transport(ListeningPoint.UDP)
.build();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CPU_NUM); executor.setCorePoolSize(CPU_NUM);
executor.setMaxPoolSize(MAX_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE);
@ -93,24 +96,28 @@ public class DisruptorTest {
// Disruptor<LongEvent> disruptor = // Disruptor<LongEvent> disruptor =
// new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy()); // new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy());
Disruptor<LongEvent> disruptor = Disruptor<RequestEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); new Disruptor<>(RequestEvent::new, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
// disruptor.handleEventsWith((event, sequence, endOfBatch) -> // disruptor.handleEventsWith((event, sequence, endOfBatch) ->
// System.out.println("Event: " + event)); // System.out.println("Event: " + event));
disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler(),new LongEventHandler()); disruptor.handleEventsWith(new SIPRequestHandler());
disruptor.start(); disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); RingBuffer<RequestEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(()->{ new Thread(()->{
for (long l = 0; l<256; l++) for (long l = 0; l<256; l++)
{ {
// bb.putLong(0, l); // bb.putLong(0, l);
// ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
//
// long finalL = l;
// ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL));
long finalL = l; String callId = IdUtil.fastSimpleUUID();
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL)); Request request = requestBuilder.createNoAuthorizationRequest(callId, 3600);
ringBuffer.publishEvent((event, sequence, buffer) -> event.setRequest(request));
} }
}).start(); }).start();
@ -119,9 +126,12 @@ public class DisruptorTest {
{ {
// bb.putLong(0, l); // bb.putLong(0, l);
// ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
// ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL));
String callId = IdUtil.fastSimpleUUID();
Request request = requestBuilder.createNoAuthorizationRequest(callId, 3600);
ringBuffer.publishEvent((event, sequence, buffer) -> event.setRequest(request));
long finalL = l;
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL));
Thread.sleep(1); Thread.sleep(1);
} }