手写阻塞队列实现生产消费者模式

手写阻塞队列实现生产消费者模式

组塞容器实现思路:

使用的就是是lock锁的多条件(condition)阻塞控制

Condition Condition 将 Object的通信方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 通信方法的使用。

在阻塞容器中我们需要定义两个条件:一个是队列不满的时候一个是队列不空的时候

创建阻塞容器类:

package com.example.spring.boot.test.chainresponsibility.lock;

import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author wuhuaming
 */
public class Container {

    /**
     * 容器元素
     */
    private Object[] elements;

    /**
     * 容器最大容量
     */
    private Integer capacity;

    /**
     * 容器中的元素个数
     */
    private Integer count;

    /**
     * 定义锁
     */
    private Lock lock = new ReentrantLock();

    /**
     * 容器中元素不满条件
     */
    private Condition notFullCondition = lock.newCondition();

    /**
     * 容器中元素不空条件
     */
    private Condition notEmptyCondition = lock.newCondition();

    /**
     * put元素的下标
     */
    private int putIndex;

    /**
     * 取元素的下标
     */
    private int takeIndex;

    public Container(Integer capacity) {
        this.capacity = capacity;
        elements = new Object[capacity];
        count = 0;
    }

    public void put(Object o) throws InterruptedException {
        lock.lock();
        try{
            // 如果容器已经满了需要阻塞正在put的线程
            while(count.equals(capacity)){
                notFullCondition.await();
            }
            elements[putIndex] = o;
            if(++putIndex == capacity){
                putIndex = 0;
            }
            count++;
            System.out.println("after put:" + Arrays.asList(elements));
            // 唤醒因执行take方法而阻塞的线程
            notEmptyCondition.signal();

        }finally {
            lock.unlock();
        }
    }


    public Object take() throws InterruptedException {
        lock.lock();
        try{
            // 如果容器已经空了需要阻塞正在take的线程
            while(count == 0){
                notEmptyCondition.await();
            }
            Object element = elements[takeIndex];
            elements[takeIndex] = null;
            if(++takeIndex == capacity){
                takeIndex = 0;
            }
            count--;
            System.out.println("after take:" + Arrays.asList(elements));
            // 唤醒因执行put方法而阻塞的程序,此时队列已经有空间了
            notFullCondition.signal();
            return element;
        }finally {
            lock.unlock();
        }
    }

    public synchronized Integer getCount(){
        return count;
    }

}

创建生产则

package com.example.spring.boot.test.utils;

import com.example.spring.boot.test.chainresponsibility.lock.Container;
import org.springframework.stereotype.Component;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author wuhuaming
 */
@Component
public class Producer {

    private final Container container;



    public Producer() {
        container = new Container(10);
    }

    /**
     * 生产则
     * @param o 
     */
    public void put(Object o){
        try {
            container.put(o);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

创建消费则

package com.example.spring.boot.test.runner;

import com.example.spring.boot.test.utils.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ConsumerRunner implements ApplicationRunner {

    private final Producer producer;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        for(int i = 0; i < 5; i++){
            new Thread(()->{
                while (true){
                    Object take;
                    try {
                        take = producer.getContainer().take();
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    System.out.printf("consumer[%s]开始消费[%s]:%n", Thread.currentThread().getName(), take);
                }
            }).start();
        }
    }
}

创建生产者控制器

package com.example.spring.boot.test.controller;

import com.example.spring.boot.test.chainresponsibility.lock.Container;
import com.example.spring.boot.test.utils.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.locks.Lock;

@RestController
@RequiredArgsConstructor
public class ConditionController {

    private final Producer producer;

    @GetMapping("put")
    public String put(@RequestParam("key") String key){
        producer.put(key);
        return "success";
    }
}

测试

http://localhost:8080/put?key=7

结果

2023-08-24 11:12:59.768  INFO 3705 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-08-24 11:12:59.769  INFO 3705 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
after put:[7, null, null, null, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-50]开始消费[7]:
after put:[null, 7, null, null, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-51]开始消费[7]:
after put:[null, null, 7, null, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-52]开始消费[7]:
after put:[null, null, null, 7, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-53]开始消费[7]:
after put:[null, null, null, null, 7, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-54]开始消费[7]:
after put:[null, null, null, null, null, 7, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-50]开始消费[7]:

模拟并发场景:

测试结果

0%