记录生产环境Rabbitmq消息积压的问题
目录
记录生产环境Rabbitmq消息积压的问题
场景:下游所有的子系统需要上上游中心推送消息,1秒至少有10条消息推送过来,由于之前的开发人员在处理这个地方的逻辑的时候,消息量比较少,所以采用的单线程消费。导致消息处理不及时造成了消息积压
看一下处理前日志截图和Rabbitmq的队列消息数量
优化思路
-
由于
springboot
中rabbitmq
默认是单线程监听,所以需要我们加以配置,将rabbitmq
单线程消费改为多线程消费 -
将每个线程接收的
rabbitmq
消息放入环形队列中,异步处理,这里采用的是disruptor
框架
改在Rabbitmq为多线程方式
配置rabbitmq
多线程监听,提供SimpleRabbitListenerContainerFactory
类的bean,为数据量大的队列指定这个bean。
设置多线程批量处理。在任意配置类中,提供这个bean。(注意配置类中的Listener会覆盖配置文件中的配置)
@Bean("batchQueueRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置线程数
factory.setConnectionFactory(connectionFactory);
//最大线程数
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(10);
return factory;
}
在消费的地方运用 batchQueueRabbitListenerContainerFactory bean
@RabbitListener(queues = "alarmQueue", containerFactory = "batchQueueRabbitListenerContainerFactory")
public void saveAlgorithmAlarm(String message) {
// todo
}
应用disruptor框架处理业务
引入依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
编写业务BO类
@Data
public class RabbitMqContextBO {
// 定义上下文字段
private String filed;
}
创建RabbitMqContextBO类创建工厂类
public class RabbitMqContextBOFactory implements EventFactory<RabbitMqContextBO> {
@Override
public RabbitMqContextBO newInstance() {
return new RabbitMqContextBO();
}
}
创建一个消息消费者
@Slf4j
@component
public class RabbitMqEventProducer {
private final RingBuffer<RabbitMqContextBO> ringBuffer;
public RabbitMqEventProducer(@Qualifier("ringBuffer") RingBuffer<RabbitMqContextBO> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void commit(String message){
long sequence = ringBuffer.next();
try{
// 转换数据对象
RabbitMqContextBO rabbitMqContextBO = ringBuffer.get(sequence);
rabbitMqContextBO.setFiled(message);
}catch (Exception e){
log.error("消息处理失败:", e);
}finally {
ringBuffer.publish(sequence);
}
}
}
创建业务处理handler,这里使用业务1,2,3代替
RabbitMqEventHandle1
@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle1 implements EventHandler<RabbitMqContextBO> {
@Override
public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
// TODO 具体业务逻辑
log.info("RabbitMqEventHandle1");
}
}
RabbitMqEventHandle2
@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle2 implements EventHandler<RabbitMqContextBO> {
@Override
public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
// TODO 具体业务逻辑
log.info("RabbitMqEventHandle2");
}
}
RabbitMqEventHandle3
@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle3 implements EventHandler<RabbitMqContextBO> {
@Override
public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
// TODO 具体业务逻辑
log.info("RabbitMqEventHandle3");
}
}
创建RingBuffer
并加入spring
容器,并启动消费者
@Bean("ringBuffer")
public RingBuffer<RabbitMqContextBO> ringBuffer(){
RabbitMqContextBOFactory factory = new RabbitMqContextBOFactory();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
SleepingWaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<RabbitMqContextBO> disruptor = new Disruptor<>(factory, 1024 * 4, threadFactory, ProducerType.SINGLE, waitStrategy);
// 业务编排
disruptor
.handleEventsWith(new RabbitMqEventHandle1())
.handleEventsWith(new RabbitMqEventHandle2())
.handleEventsWith(new RabbitMqEventHandle3());
// 启动消费者
disruptor.start();
return disruptor.getRingBuffer();
}
在Rabbitmq消费消息的时候将具体的业务逻辑交给handler处理即可
@RabbitListener(queues = "alarmQueue", containerFactory = "batchQueueRabbitListenerContainerFactory")
public void saveAlgorithmAlarm(String message) {
// todo
alarmEventProducer.commit(message);
}
重新查看改造后的rabbitmq web manager
查看日志线程名称:
上面的代码是改造过后的逻辑代码,和图片中的类名不相同