手写阻塞队列实现生产消费者模式
目录
手写阻塞队列实现生产消费者模式
组塞容器实现思路:
使用的就是是lock
锁的多条件(condition
)阻塞控制
Condition Condition 将 Object的通信方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 通信方法的使用。
在阻塞容器中我们需要定义两个条件:一个是队列不满的时候,一个是队列不空的时候
创建阻塞容器类:
package com.example.spring.boot.test.chainresponsibility.lock;
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author wuhuaming
*/
public class Container {
/**
* 容器元素
*/
private Object[] elements;
/**
* 容器最大容量
*/
private Integer capacity;
/**
* 容器中的元素个数
*/
private Integer count;
/**
* 定义锁
*/
private Lock lock = new ReentrantLock();
/**
* 容器中元素不满条件
*/
private Condition notFullCondition = lock.newCondition();
/**
* 容器中元素不空条件
*/
private Condition notEmptyCondition = lock.newCondition();
/**
* put元素的下标
*/
private int putIndex;
/**
* 取元素的下标
*/
private int takeIndex;
public Container(Integer capacity) {
this.capacity = capacity;
elements = new Object[capacity];
count = 0;
}
public void put(Object o) throws InterruptedException {
lock.lock();
try{
// 如果容器已经满了需要阻塞正在put的线程
while(count.equals(capacity)){
notFullCondition.await();
}
elements[putIndex] = o;
if(++putIndex == capacity){
putIndex = 0;
}
count++;
System.out.println("after put:" + Arrays.asList(elements));
// 唤醒因执行take方法而阻塞的线程
notEmptyCondition.signal();
}finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try{
// 如果容器已经空了需要阻塞正在take的线程
while(count == 0){
notEmptyCondition.await();
}
Object element = elements[takeIndex];
elements[takeIndex] = null;
if(++takeIndex == capacity){
takeIndex = 0;
}
count--;
System.out.println("after take:" + Arrays.asList(elements));
// 唤醒因执行put方法而阻塞的程序,此时队列已经有空间了
notFullCondition.signal();
return element;
}finally {
lock.unlock();
}
}
public synchronized Integer getCount(){
return count;
}
}
创建生产则
package com.example.spring.boot.test.utils;
import com.example.spring.boot.test.chainresponsibility.lock.Container;
import org.springframework.stereotype.Component;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author wuhuaming
*/
@Component
public class Producer {
private final Container container;
public Producer() {
container = new Container(10);
}
/**
* 生产则
* @param o
*/
public void put(Object o){
try {
container.put(o);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
创建消费则
package com.example.spring.boot.test.runner;
import com.example.spring.boot.test.utils.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ConsumerRunner implements ApplicationRunner {
private final Producer producer;
@Override
public void run(ApplicationArguments args) throws Exception {
for(int i = 0; i < 5; i++){
new Thread(()->{
while (true){
Object take;
try {
take = producer.getContainer().take();
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.printf("consumer[%s]开始消费[%s]:%n", Thread.currentThread().getName(), take);
}
}).start();
}
}
}
创建生产者控制器
package com.example.spring.boot.test.controller;
import com.example.spring.boot.test.chainresponsibility.lock.Container;
import com.example.spring.boot.test.utils.Producer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.locks.Lock;
@RestController
@RequiredArgsConstructor
public class ConditionController {
private final Producer producer;
@GetMapping("put")
public String put(@RequestParam("key") String key){
producer.put(key);
return "success";
}
}
测试
http://localhost:8080/put?key=7
结果
2023-08-24 11:12:59.768 INFO 3705 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2023-08-24 11:12:59.769 INFO 3705 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
after put:[7, null, null, null, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-50]开始消费[7]:
after put:[null, 7, null, null, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-51]开始消费[7]:
after put:[null, null, 7, null, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-52]开始消费[7]:
after put:[null, null, null, 7, null, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-53]开始消费[7]:
after put:[null, null, null, null, 7, null, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-54]开始消费[7]:
after put:[null, null, null, null, null, 7, null, null, null, null]
after take:[null, null, null, null, null, null, null, null, null, null]
consumer[Thread-50]开始消费[7]: