记录生产环境Rabbitmq消息积压的问题

记录生产环境Rabbitmq消息积压的问题

场景:下游所有的子系统需要上上游中心推送消息,1秒至少有10条消息推送过来,由于之前的开发人员在处理这个地方的逻辑的时候,消息量比较少,所以采用的单线程消费。导致消息处理不及时造成了消息积压

看一下处理前日志截图和Rabbitmq的队列消息数量

优化思路

  • 由于springbootrabbitmq默认是单线程监听,所以需要我们加以配置,将rabbitmq单线程消费改为多线程消费

  • 将每个线程接收的rabbitmq消息放入环形队列中,异步处理,这里采用的是disruptor框架

改在Rabbitmq为多线程方式

配置rabbitmq多线程监听,提供SimpleRabbitListenerContainerFactory类的bean,为数据量大的队列指定这个bean。 设置多线程批量处理。在任意配置类中,提供这个bean。(注意配置类中的Listener会覆盖配置文件中的配置)

@Bean("batchQueueRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    //设置线程数
    factory.setConnectionFactory(connectionFactory);
    //最大线程数
    factory.setConcurrentConsumers(10);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}

在消费的地方运用 batchQueueRabbitListenerContainerFactory bean

@RabbitListener(queues = "alarmQueue", containerFactory = "batchQueueRabbitListenerContainerFactory")
public void saveAlgorithmAlarm(String message) {
  // todo
}

应用disruptor框架处理业务

引入依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

编写业务BO类

@Data
public class RabbitMqContextBO {
  
  // 定义上下文字段
  private String filed;
  
}

创建RabbitMqContextBO类创建工厂类

public class RabbitMqContextBOFactory implements EventFactory<RabbitMqContextBO> {
  @Override
  public RabbitMqContextBO newInstance() {
    return new RabbitMqContextBO();
  }
}

创建一个消息消费者

@Slf4j
@component
public class RabbitMqEventProducer {
  
  private final RingBuffer<RabbitMqContextBO> ringBuffer;
  
  public RabbitMqEventProducer(@Qualifier("ringBuffer") RingBuffer<RabbitMqContextBO> ringBuffer) {
    this.ringBuffer = ringBuffer;
  } 
  
  public void commit(String message){
    long sequence = ringBuffer.next();
    try{
      // 转换数据对象
      RabbitMqContextBO rabbitMqContextBO = ringBuffer.get(sequence);
      rabbitMqContextBO.setFiled(message);
    }catch (Exception e){
      log.error("消息处理失败:", e);
    }finally {
      ringBuffer.publish(sequence);
    }
  }

}

创建业务处理handler,这里使用业务1,2,3代替

RabbitMqEventHandle1

@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle1 implements EventHandler<RabbitMqContextBO> {
  
  @Override
  public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
    // TODO 具体业务逻辑
    log.info("RabbitMqEventHandle1");
  }
  
}

RabbitMqEventHandle2

@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle2 implements EventHandler<RabbitMqContextBO> {
  
  @Override
  public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
    // TODO 具体业务逻辑
    log.info("RabbitMqEventHandle2");
  }
  
}

RabbitMqEventHandle3

@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle3 implements EventHandler<RabbitMqContextBO> {
  
  @Override
  public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
    // TODO 具体业务逻辑
    log.info("RabbitMqEventHandle3");
  }
  
}

创建RingBuffer并加入spring 容器,并启动消费者

@Bean("ringBuffer")
public RingBuffer<RabbitMqContextBO> ringBuffer(){
  RabbitMqContextBOFactory factory = new RabbitMqContextBOFactory();
  ThreadFactory threadFactory = Executors.defaultThreadFactory();
  // 等待策略
  SleepingWaitStrategy waitStrategy = new SleepingWaitStrategy();
  Disruptor<RabbitMqContextBO> disruptor = new Disruptor<>(factory, 1024 * 4, threadFactory, ProducerType.SINGLE, waitStrategy);
  // 业务编排
  disruptor
    .handleEventsWith(new RabbitMqEventHandle1())
    .handleEventsWith(new RabbitMqEventHandle2())
    .handleEventsWith(new RabbitMqEventHandle3());
  // 启动消费者
  disruptor.start();
  return disruptor.getRingBuffer();
}

在Rabbitmq消费消息的时候将具体的业务逻辑交给handler处理即可

@RabbitListener(queues = "alarmQueue", containerFactory = "batchQueueRabbitListenerContainerFactory")
public void saveAlgorithmAlarm(String message) {
  // todo
  alarmEventProducer.commit(message);
}

重新查看改造后的rabbitmq web manager

查看日志线程名称:

上面的代码是改造过后的逻辑代码,和图片中的类名不相同

0%