Disruptor框架使用

disruptor框架使用

使用步骤

  • 建立一个工厂Event类,用于创建Event类实例对象
  • 需要有一个监听事件类,用于处理数据(Event类)
  • 实例化Disruptor,配置参数,编写Disruptor核心组件
  • 编写生产者组件,向Disruptor容器中去投递数据

OrderEvent

  • Event类:相当于传送的消息;
  • 如果需要实现读写,需要实现Serializable接口,Disruptor走纯内存,不实现Serializable接口也可以;
package com.wuhm.disruptor.demo1;

import lombok.Data;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
@Data
public class OrderEvent {

    private long value;

}

OrderEventFactory

  • Event工厂类:用于生产Event对象;
  • 需要实现com.lmax.disruptor.EventFactory接口;
package com.wuhm.disruptor.demo1;

import com.lmax.disruptor.EventFactory;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class OrderEventFactory implements EventFactory<OrderEvent> {
    @Override
    public OrderEvent newInstance() {
        // 这是为了返回一个空的Event对象
        return new OrderEvent();
    }
}

OrderEventHandle

  • 相当于消费者;
  • 需要实现com.lmax.disruptor.EventHandler接口;
package com.wuhm.disruptor.demo1;

import com.lmax.disruptor.EventHandler;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class OrderEventHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
        System.out.println("消费者:" + orderEvent.getValue());
    }
}

Main

  • 实例化disruptor对象;
  • 添加消费者的监听 (构建disruptor与消费者的一个关联关系);
  • 启动disruptor;
package com.wuhm.disruptor.demo1;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class TestDisruptor {
    public static void main(String[] args) {
        OrderEventFactory factory = new OrderEventFactory();
        // 环形数组长度,必须是2的n次幂
        int ringBufferSize = 2;
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<OrderEvent> disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.SINGLE, waitStrategy);
        disruptor.handleEventsWith(new OrderEventHandler());
        // 处理器异常处理器
//        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
//        disruptor.setDefaultExceptionHandler(exceptionHandler);
        disruptor.start();

        // 通过事件转换器(EventTranslator)来指明如何将发布的数据转换到事件对象(Event)中
        // 这里是一个参数的转换器,另外还有两个(EventTranslatorTwoArg)、三个(EventTranslatorThreeArg)
        // 和多个(EventTranslatorVararg)参数的转换器可以使用,参数类型可以不一样
        EventTranslatorOneArg<OrderEvent, Long> eventTranslatorOneArg = (event, sequence, arg0) -> event.setValue(arg0);
        // 发布
        for (int i = 0; i < 100; i++) {
            disruptor.publishEvent(eventTranslatorOneArg, (long) i);
        }
        disruptor.shutdown();

    }
}

Disruptor核心 RingBuffer

  • 初看Disruptor,给人的印象RingBuffer是其核心,生产者向RingBuffer写元素,消费者从RingBuffer消费元素;

组件com.lmax.disruptor.dsl.Disruptor定义

  • 可以理解成辅助类,用于启停,和消费者建立连接,里面包含了RingBuffer,消费者线程池Executor,消费者集合ConsumerRepository等引用;

生产者和消费者之间的平衡

  • RingBuffer是个数组,生产者往里丢元素,消费者从里面消费元素;

  • 生产和消费的速度可能不一样

  • 如果消费者的速度快,数组中没有可消费的元素了,消费者会停下来,生产者向数组中丢了新元素,消费者才会继续消费;

  • 生产者丢到数组末尾的时候,会从数组的开始位置继续丢,如果生产者速度快,追上了消费者,即生产者要丢的位置上,之前的元素还没有被消费,生产者就要等待消费者消费了该元素再继续丢;

Disruptor核心 WaitStrategy

com.lmax.disruptor.WaitStrategy

  • 决定一个消费者如何等待生产者将Event置入Disruptor;
  • 其所有实现都是针对消费者线程的;

主要策略有 com.lmax.disruptor.BlockingWaitStrategy com.lmax.disruptor.SleepingWaitStrategy com.lmax.disruptor.YieldingWaitStrategy

com.lmax.disruptor.BlockingWaitStrategy

  • 最低效的策略,但其对CPU的消耗最小,并且在各种部署环境中能提供更加一致的性能表现;
  • 内部维护了一个重入锁ReentrantLock和Condition;

com.lmax.disruptor.SleepingWaitStrategy

  • 性能表现和com.lmax.disruptor.BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
  • 是一种无锁的方式;

com.lmax.disruptor.YieldingWaitStrategy

  • 性能最好,适合用于低延迟的系统;在要求极高性能且事件处理线程数小于CPU逻辑核心树的场景中,推荐使用此策略;例如,CPU开启超线程的特性;
  • 也是无锁的实现,只要是无锁的实现,signalAllWhenBlocking()都是空实现;

Disruptor核心 EventProcessor & WorkProcessor

com.lmax.disruptor.EventProcessor

  • 继承自java.lang.Runnable接口;

  • 主要用于事件的循环,处理Disruptor中的Event,拥有消费者的Sequence;

  • 它有一个实现类是com.lmax.disruptor.BatchEventProcessor,包含了event loop的有效实现,并且将回调一个com.lmax.disruptor.EventHandler接口的实现对象;

  • 是Disruptor中最核心的方法,实现了run()方法,不断的轮询,获取数据对象,把数据对象交给消费者处理,具体怎么交给消费者,利用了消费者的等待策略;

  • 其在run()方法中回调com.lmax.disruptor.EventHandler的实现对象,所有的Consumer都实现了com.lmax.disruptor.EventHandler接口;

com.lmax.disruptor.EventHandler

  • 由用户实现并且代表了Disruptor中的一个消费者的接口,也就是消费者逻辑都要写在其中;

com.lmax.disruptor.WorkProcessor

  • 在多生产者多消费者模式下,确保每个sequence只被一个processor消费,在同一个WorkPool中,确保多个WorkProcessor不会消费同样的sequence;

Disruptor 串行&并行消费&菱形消费&六边形消费

定义一个Event类

package com.wuhm.disruptor.demo2;

import lombok.Data;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Disruptor中的 Event
 * @author wuhuaming
 */
@Data
public class Trade {

    private String id;
    
    private String name;
    
    private double price;
    
    private AtomicInteger count = new AtomicInteger(0);
    
}

创建工厂类

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.EventFactory;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class TradeFactory implements EventFactory<Trade> {
    @Override
    public Trade newInstance() {
        return new Trade();
    }
}

创建处理器

Handler1.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade>{

    //EventHandler
    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        this.onEvent(event);
    }

    //WorkHandler
    @Override
    public void onEvent(Trade event) throws Exception {
        System.err.println("handler 1 : SET NAME");
        Thread.sleep(1000);
        event.setName("H1");
    }

}

Handler2.java

package com.wuhm.disruptor.demo2;

import java.util.UUID;

import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler<Trade> {

    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 2 : SET ID");
        Thread.sleep(2000);
        event.setId(UUID.randomUUID().toString());
    }

}

Handler3.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler<Trade> {

    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 3 : NAME: " 
                                + event.getName() 
                                + ", ID: " 
                                + event.getId()
                                + ", PRICE: " 
                                + event.getPrice()
                                + " INSTANCE : " + event.toString());
    }

}

Handler4.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.EventHandler;

public class Handler4 implements EventHandler<Trade> {

    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 4 : SET PRICE");
        Thread.sleep(1000);
        event.setPrice(17.0);
    }

}

Handler5.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.EventHandler;

public class Handler5 implements EventHandler<Trade> {

    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 5 : GET PRICE: " +  event.getPrice());
        Thread.sleep(1000);
        event.setPrice(event.getPrice() + 3.0);
    }

}

串行执行

通过disruptor对handleEventsWith(final EventHandler<? super T>… handlers)的链式调用,制定Consumer的消费顺序;

SerialTestDisruptor.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.wuhm.disruptor.demo1.OrderEvent;
import com.wuhm.disruptor.demo1.OrderEventFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class SerialTestDisruptor {

    public static void main(String[] args) throws InterruptedException {
        long begin = System.currentTimeMillis();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<Trade> disruptor = new Disruptor<>(new TradeFactory(), 2, threadFactory, ProducerType.SINGLE, waitStrategy);

        //2 把消费者设置到Disruptor中 handleEventsWith
        //2.1 串行操作:
        disruptor
                .handleEventsWith(new Handler1())
                .handleEventsWith(new Handler2())
                .handleEventsWith(new Handler3())
                .handleEventsWith(new Handler4())
                .handleEventsWith(new Handler5());

        disruptor.start();

        TradeEventTranslator eventTranslator = new TradeEventTranslator();
        for(int i =0; i < 1; i ++){
            //新的提交任务的方式
            disruptor.publishEvent(eventTranslator);
        }
        disruptor.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
    }
}

结果:

handler 1 : SET NAME
handler 2 : SET ID
handler 3 : NAME: H1, ID: dae4fa4c-00fe-4d47-a159-62e2fafadfe3, PRICE: 8025.358499754297 INSTANCE : Trade(id=dae4fa4c-00fe-4d47-a159-62e2fafadfe3, name=H1, price=8025.358499754297, count=0)
handler 4 : SET PRICE
handler 5 : GET PRICE: 17.0
总耗时: 5080

并行实现

  • 有2种实现并行消费的代码编写方式
    • 在handleEventsWith方法中添加多个handler实现即可;
    • 依次调用handleEventsWith方法;

ParallelTestDisruptor.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class ParallelTestDisruptor {

    public static void main(String[] args) throws InterruptedException {
        long begin = System.currentTimeMillis();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<Trade> disruptor = new Disruptor<>(new TradeFactory(), 2, threadFactory, ProducerType.SINGLE, waitStrategy);

        //2 把消费者设置到Disruptor中 handleEventsWith
        //2.1 并行操作:handleEventsWith方法 添加多个handler实现即可
        disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
        // 2.2 handleEventsWith方法 分别进行调用
        disruptor.handleEventsWith(new Handler4());
        disruptor.handleEventsWith(new Handler5());

        disruptor.start();

        TradeEventTranslator eventTranslator = new TradeEventTranslator();
        for(int i =0; i < 1; i ++){
            //新的提交任务的方式
            disruptor.publishEvent(eventTranslator);
        }
        disruptor.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
    }
}

结果:

handler 1 : SET NAME
handler 2 : SET ID
handler 4 : SET PRICE
handler 5 : GET PRICE: 8605.307484935085
handler 3 : NAME: null, ID: null, PRICE: 8605.307484935085 INSTANCE : Trade(id=null, name=null, price=8605.307484935085, count=0)
总耗时: 2041

菱形消费示例

需求:使用EventHandlerGroup接收并行结果,在用其串行调用;

RhombusTestDisruptor.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class RhombusTestDisruptor {

    public static void main(String[] args) throws InterruptedException {
        long begin = System.currentTimeMillis();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<Trade> disruptor = new Disruptor<>(new TradeFactory(), 2, threadFactory, ProducerType.SINGLE, waitStrategy);

        //2 把消费者设置到Disruptor中 handleEventsWith
        //2.1 串行操作:
        EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
        EventHandlerGroup<Trade> handlerGroup1 = handlerGroup.then(new Handler3())
                .handleEventsWith(new Handler4(), new Handler5());
        handlerGroup1.then(new Handler3());

        disruptor.start();

        TradeEventTranslator eventTranslator = new TradeEventTranslator();
        for(int i =0; i < 1; i ++){
            //新的提交任务的方式
            disruptor.publishEvent(eventTranslator);
        }
        disruptor.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
    }
}

结果:

handler 1 : SET NAME
handler 2 : SET ID
handler 3 : NAME: H1, ID: cd4a83d6-c289-49ae-8a42-a8c54bf67959, PRICE: 2889.919839319558 INSTANCE : Trade(id=cd4a83d6-c289-49ae-8a42-a8c54bf67959, name=H1, price=2889.919839319558, count=0)
handler 4 : SET PRICE
handler 5 : GET PRICE: 2889.919839319558
handler 3 : NAME: H1, ID: cd4a83d6-c289-49ae-8a42-a8c54bf67959, PRICE: 20.0 INSTANCE : Trade(id=cd4a83d6-c289-49ae-8a42-a8c54bf67959, name=H1, price=20.0, count=0)
总耗时: 3055

六边形消费

六边形消费示例

  • 生产者作为六边形“最左边”的点,生产消息;
  • h1和h4并行执行,h1和h2串行执行,h4和h5串行执行,h3在h2和h5都执行完了再执行;
  • 先在disruptor中把消费者“编排”好,等生产者把消息(Event)丢到RingBuffer中,“编排”好的消费者就开始工作,串行的串行,并行的并行;

注意1:在单生产者单消费者模式下,有几个消费者,在初始化disruptor的时候,用于初始化disruptor的线程池中就得有几个线程; 注意2:多个消费者拿到的Event,是同一个实例;

HexagonTestDisruptor.java

package com.wuhm.disruptor.demo2;

import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @Description
 * @Author wuhuaming
 * @Date 2023/4/18
 */
public class HexagonTestDisruptor {

    public static void main(String[] args) throws InterruptedException {
        long begin = System.currentTimeMillis();
        // 创建消费者线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<Trade> disruptor = new Disruptor<>(new TradeFactory(), 2, threadFactory, ProducerType.SINGLE, waitStrategy);

        //2 把消费者设置到Disruptor中 handleEventsWith
        //2.4 六边形操作
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h4);
        disruptor.after(h1).handleEventsWith(h2);
        disruptor.after(h4).handleEventsWith(h5);
        disruptor.after(h2, h5).handleEventsWith(h3);

        disruptor.start();

        TradeEventTranslator eventTranslator = new TradeEventTranslator();
        for(int i =0; i < 1; i ++){
            //新的提交任务的方式
            disruptor.publishEvent(eventTranslator);
        }
        disruptor.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
    }
}

结果:

handler 4 : SET PRICE
handler 1 : SET NAME
handler 2 : SET ID
handler 5 : GET PRICE: 17.0
handler 3 : NAME: H1, ID: 50b741df-850c-4480-bee2-bc9df4ad126a, PRICE: 20.0 INSTANCE : Trade(id=50b741df-850c-4480-bee2-bc9df4ad126a, name=H1, price=20.0, count=0)
总耗时: 3052
0%