diff --git a/starter/src/test/java/cn/skcks/study/disruptor/DisruptorTest.java b/starter/src/test/java/cn/skcks/study/disruptor/DisruptorTest.java index 06ae690..7cafefe 100644 --- a/starter/src/test/java/cn/skcks/study/disruptor/DisruptorTest.java +++ b/starter/src/test/java/cn/skcks/study/disruptor/DisruptorTest.java @@ -1,15 +1,19 @@ package cn.skcks.study.disruptor; +import cn.hutool.core.util.IdUtil; +import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuilder; import com.lmax.disruptor.BusySpinWaitStrategy; -import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import javax.sip.ListeningPoint; +import javax.sip.message.Request; import java.util.concurrent.ThreadPoolExecutor; @Slf4j @@ -35,42 +39,41 @@ public class DisruptorTest { */ public static final String THREAD_NAME_PREFIX = "custom-disruptor-executor"; - public static class LongEvent - { - private long value; - - public void set(long value) - { - this.value = value; - } - - @Override - public String toString() - { - return "LongEvent{" + "value=" + value + '}'; - } + @Data + public static class RequestEvent { + private Request request; } - public static class LongEventFactory implements EventFactory + public static class SIPRequestHandler implements EventHandler { @Override - public LongEvent newInstance() + public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) { - return new LongEvent(); - } - } - - public static class LongEventHandler implements EventHandler - { - @Override - public void onEvent(LongEvent event, long sequence, boolean endOfBatch) - { - log.info("Event: {}, sequence: {}, endOfBatch: {}", event, sequence, endOfBatch); + log.info("Request: \n{}, sequence: {}, endOfBatch: {}", event.getRequest(), sequence, endOfBatch); } } public static void main(String[] args) throws Exception { + String localIp = "127.0.0.1"; + int localPort = 5060; + String localId = "44050100000000000001"; + + String serverId = "44050100002000000003"; + String serverIp = "10.10.10.20"; + int serverPort = 5060; + + RegisterRequestBuilder requestBuilder = RegisterRequestBuilder.builder() + .localId(localId) + .localIp(localIp) + .localPort(localPort) + .targetId(serverId) + .targetIp(serverIp) + .targetPort(serverPort) + + .transport(ListeningPoint.UDP) + .build(); + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(CPU_NUM); executor.setMaxPoolSize(MAX_POOL_SIZE); @@ -93,24 +96,28 @@ public class DisruptorTest { // Disruptor disruptor = // new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy()); - Disruptor disruptor = - new Disruptor<>(LongEvent::new, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); + Disruptor disruptor = + new Disruptor<>(RequestEvent::new, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); // disruptor.handleEventsWith((event, sequence, endOfBatch) -> // System.out.println("Event: " + event)); - disruptor.handleEventsWith(new LongEventHandler(),new LongEventHandler(),new LongEventHandler()); + disruptor.handleEventsWith(new SIPRequestHandler()); disruptor.start(); - RingBuffer ringBuffer = disruptor.getRingBuffer(); + RingBuffer ringBuffer = disruptor.getRingBuffer(); new Thread(()->{ for (long l = 0; l<256; l++) { // bb.putLong(0, l); // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); + // + // long finalL = l; + // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL)); - long finalL = l; - ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL)); + String callId = IdUtil.fastSimpleUUID(); + Request request = requestBuilder.createNoAuthorizationRequest(callId, 3600); + ringBuffer.publishEvent((event, sequence, buffer) -> event.setRequest(request)); } }).start(); @@ -119,9 +126,12 @@ public class DisruptorTest { { // bb.putLong(0, l); // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); + // ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL)); + + String callId = IdUtil.fastSimpleUUID(); + Request request = requestBuilder.createNoAuthorizationRequest(callId, 3600); + ringBuffer.publishEvent((event, sequence, buffer) -> event.setRequest(request)); - long finalL = l; - ringBuffer.publishEvent((event, sequence, buffer) -> event.set(finalL)); Thread.sleep(1); }