4.wait Notify
目录
wait-notify
为什么需要wait
- 由于条件不满足,Thread-0不能继续进行计算
- 但Thread-0如果一直占用着锁,其它线程就得一直阻塞,效率太低
-
于是jvm单开了一个集合(调用 wait 方法),让Thread-0到WaitSet等着去了,但这时锁释放开,其它线程可以由jvm随机安排
-
直到某个线程,通知Thread-0 (调用 notify 方法),这是Thread-0将离开waitset进入EntryList中重新竞争
相关api
obj.wait()
让进入 object 监视器的线程到 waitSet 等待wait(long n)
有时限的等待, 到 n 毫秒后结束等待obj.notify()
在 object 上正在 waitSet 等待的线程中挑一个唤醒obj.notifyAll()
让 object 上正在 waitSet 等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于 Object 对象的方法。==必须获得此对象的锁,才能调用这几个方法==
@Slf4j(topic = "c.testWait")
public class TestWaitNotify {
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
log.debug("t1任务开始");
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("其他任务");
}, "t1").start();
new Thread(()->{
log.debug("t2任务开始");
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("其他任务");
}, "t2").start();
Thread.sleep(2000);
log.debug("唤醒 obj 上其它线程");
synchronized (lock){
lock.notify();
// lock.notifyAll();
}
}
}
唤醒其中一个线程
15:41:09.576 c.testWait [t1] - t1任务开始
15:41:09.576 c.testWait [t2] - t2任务开始
15:41:11.577 c.testWait [main] - 唤醒 obj 上其它线程
15:41:11.578 c.testWait [t1] - 其他任务
唤醒所有线程
15:41:54.935 c.testWait [t2] - t2任务开始
15:41:54.935 c.testWait [t1] - t1任务开始
15:41:56.938 c.testWait [main] - 唤醒 obj 上其它线程
15:41:56.939 c.testWait [t2] - 其他任务
15:41:56.939 c.testWait [t1] - 其他任务
Process finished with exit code 0
wait vs sleep
- sleep 是 Thread 方法,而 wait 是 Object 的方法
- sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
- sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
- 它们状态 TIMED_WAITING
wait notify 的正确姿势
sleep会阻碍其它线程执行
@Slf4j(topic = "c.TestWaitNotifyStep1")
public class TestWaitNotifyStep1 {
static final Object lock = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
synchronized (lock){
log.debug("有烟没有[{}]", hasCigarette);
if(!hasCigarette){
log.debug("没有烟先歇一会");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小明").start();
for (int i = 0; i < 5; i++) {
new Thread(()->{
synchronized (lock){
log.debug("to do something");
}
}, "其他人" + i).start();
}
Thread.sleep(1000);
new Thread(()->{
// 这里不能使用 synchronized(lock) ,加了 synchronized (lock) 后,就好比小明在里面反锁了门睡觉,烟根本没法送进门,main 没加 synchronized 就好像 main 线程是翻窗户进来的
hasCigarette = true;
log.debug("烟到了噢!");
}, "小红").start();
}
}
输出:
17:24:37.919 c.TestWaitNotifyStep1 [小明] - 有烟没有[false]
17:24:37.923 c.TestWaitNotifyStep1 [小明] - 没有烟先歇一会
17:24:38.924 c.TestWaitNotifyStep1 [小红] - 烟到了噢!
17:24:39.925 c.TestWaitNotifyStep1 [小明] - 有烟没?[true]
17:24:39.925 c.TestWaitNotifyStep1 [小明] - 可以开始干活了
17:24:39.925 c.TestWaitNotifyStep1 [其他人4] - to do something
17:24:39.925 c.TestWaitNotifyStep1 [其他人3] - to do something
17:24:39.926 c.TestWaitNotifyStep1 [其他人2] - to do something
17:24:39.926 c.TestWaitNotifyStep1 [其他人1] - to do something
17:24:39.926 c.TestWaitNotifyStep1 [其他人0] - to do something
Process finished with exit code 0
结论:
- 其它干活的线程,都要一直阻塞,效率太低
- 小明线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
- sleep妨碍其它人干活 解决方法,使用 wait - notify
wait替代sleep
@Slf4j(topic = "c.TestWaitNotifyStep1")
public class TestWaitNotifyStep1 {
static final Object lock = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
synchronized (lock){
log.debug("有烟没有[{}]", hasCigarette);
if(!hasCigarette){
log.debug("没有烟先歇一会");
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小明").start();
for (int i = 0; i < 5; i++) {
new Thread(()->{
synchronized (lock){
log.debug("to do something");
}
}, "其他人" + i).start();
}
Thread.sleep(1000);
new Thread(()->{
synchronized (lock){
hasCigarette = true;
log.debug("烟到了噢!");
lock.notify();
}
}, "小红").start();
}
}
输出
17:35:30.851 c.TestWaitNotifyStep1 [小明] - 有烟没有[false]
17:35:30.855 c.TestWaitNotifyStep1 [小明] - 没有烟先歇一会
17:35:30.856 c.TestWaitNotifyStep1 [其他人4] - to do something
17:35:30.856 c.TestWaitNotifyStep1 [其他人3] - to do something
17:35:30.856 c.TestWaitNotifyStep1 [其他人2] - to do something
17:35:30.856 c.TestWaitNotifyStep1 [其他人1] - to do something
17:35:30.856 c.TestWaitNotifyStep1 [其他人0] - to do something
17:35:31.852 c.TestWaitNotifyStep1 [小红] - 烟到了噢!
17:35:31.853 c.TestWaitNotifyStep1 [小明] - 有烟没?[true]
17:35:31.853 c.TestWaitNotifyStep1 [小明] - 可以开始干活了
结论:
- 解决了其它干活的线程阻塞的问题
- ==目前只是一个线程等待,如果是多个线程等待就不一定能唤醒了==
虚假唤醒
@Slf4j(topic = "c.TestWaitNotifyStep1")
public class TestWaitNotifyStep1 {
static final Object lock = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
synchronized (lock){
log.debug("有烟没有[{}]", hasCigarette);
if(!hasCigarette){
log.debug("没有烟先歇一会");
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小明").start();
new Thread(()->{
synchronized (lock){
log.debug("外卖送到没?[{}]", hasTakeout);
if (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小丽").start();
Thread.sleep(1000);
new Thread(() -> {
synchronized (lock) {
hasTakeout = true;
log.debug("外卖到了噢!");
lock.notify();
}
}, "送外卖的").start();
}
}
输出:
20:01:11.179 c.TestWaitNotifyStep1 [小明] - 有烟没有[false]
20:01:11.182 c.TestWaitNotifyStep1 [小明] - 没有烟先歇一会
20:01:11.182 c.TestWaitNotifyStep1 [小丽] - 外卖送到没?[false]
20:01:11.182 c.TestWaitNotifyStep1 [小丽] - 没外卖,先歇会!
20:01:12.181 c.TestWaitNotifyStep1 [送外卖的] - 外卖到了噢!
20:01:12.181 c.TestWaitNotifyStep1 [小明] - 有烟没?[false]
结论:
- notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】
- 发生虚假唤醒: 解决方法,改为 notifyAll
采用 while + wait
@Slf4j(topic = "c.TestWaitNotifyStep1")
public class TestWaitNotifyStep1 {
static final Object lock = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
synchronized (lock){
log.debug("有烟没有[{}]", hasCigarette);
while(!hasCigarette){
log.debug("没有烟先歇一会");
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
}
}
}, "小明").start();
new Thread(()->{
synchronized (lock){
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小丽").start();
Thread.sleep(1000);
new Thread(() -> {
synchronized (lock) {
hasTakeout = true;
log.debug("外卖到了噢!");
lock.notifyAll();
}
}, "送外卖的").start();
}
}
输出:
20:06:57.998 c.TestWaitNotifyStep1 [小明] - 有烟没有[false]
20:06:58.001 c.TestWaitNotifyStep1 [小明] - 没有烟先歇一会
20:06:58.001 c.TestWaitNotifyStep1 [小丽] - 外卖送到没?[false]
20:06:58.002 c.TestWaitNotifyStep1 [小丽] - 没外卖,先歇会!
20:06:59.003 c.TestWaitNotifyStep1 [送外卖的] - 外卖到了噢!
20:06:59.003 c.TestWaitNotifyStep1 [小丽] - 外卖送到没?[true]
20:06:59.003 c.TestWaitNotifyStep1 [小丽] - 可以开始干活了
20:06:59.003 c.TestWaitNotifyStep1 [小明] - 没有烟先歇一会
结论:
使用wait + notify正确方式:
synchronized(lock) {
while(条件不成立) {
lock.wait();
}
// 干活
}
//另一个线程
synchronized(lock) {
lock.notifyAll();
}
wait + notify实现-设计模式-保护性暂停(同步)
场景: 保护性暂停(Guarded Suspension),用在一个线程等待另一个线程的执行结果
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
没有超时时长的方法
创建一个对象:GuardedObject
public class GuardedObject {
/**
* 存储结果载体
*/
private Object result;
/**
* 创建一个锁对象
*/
private final Object lock = new Object();
public Object get(){
synchronized (lock){
// 如果 结果==null 则表示有可能是虚假唤醒,继续等待
while(result == null){
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return this.result;
}
}
public void complete(Object obj){
synchronized (lock){
this.result = obj;
lock.notifyAll();
}
}
}
创建测试类
@Slf4j(topic = "c.TestGuarded")
public class TestGuarded {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
log.debug("准备获取结果");
Object object = guardedObject.get();
log.debug("结果是:{}", object);
}, "t1").start();
new Thread(()->{
log.debug("开始处理任务");
// 通过睡眠来模拟执行的任务
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("任务执行结束");
guardedObject.complete("success");
}, "t1").start();
}
}
结果:
09:27:08.200 c.TestGuarded [t1] - 开始处理任务
09:27:08.200 c.TestGuarded [t1] - 准备获取结果
09:27:13.204 c.TestGuarded [t1] - 任务执行结束
09:27:13.204 c.TestGuarded [t1] - 结果是:success
Process finished with exit code 0
增加超时等待的方法
新增加方法
/**
*
* @param timeout 超时时长
* @return
*/
public Object get(long timeout){
// 0. 假设 timeout 是 2000 = 2s
synchronized (lock){
// 1. 记录开始时间
long startTime = System.currentTimeMillis();
// 2. 已经过的时长: 初始值是:0
long passTime = 0;
while(result == null){
// 4. 此时线程还需要等待 (2 - 已等待的时间)= 1s
long waitTime = timeout - passTime;
// 5. 如果,现成已经等待了2s或则等多,那么这是应该立刻退出循环
if(waitTime <= 0){
break;
}
try {
// 2.5 已等待了1s
lock.wait(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 3. 假设等待1s后,线程被虚假唤醒了,此时经过的时长为
passTime = System.currentTimeMillis() - startTime;
}
return this.result;
}
}
测试代码:
@Slf4j(topic = "c.TestGuarded")
public class TestGuarded {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
log.debug("准备获取结果");
Object object = guardedObject.get(3000);
log.debug("结果是:{}", object);
}, "t1").start();
new Thread(()->{
log.debug("开始处理任务");
// 通过睡眠来模拟执行的任务
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("任务执行结束");
guardedObject.complete("success");
}, "t1").start();
}
}
结果:
09:46:46.355 c.TestGuarded [t1] - 开始处理任务
09:46:46.355 c.TestGuarded [t1] - 准备获取结果
09:46:49.361 c.TestGuarded [t1] - 结果是:null
09:46:51.361 c.TestGuarded [t1] - 任务执行结束
Process finished with exit code 0
远程调用框架中的运用
在RPC框架中,每个请求都需要等待结果返回,且需要一一对应。下面模拟这个场景
升级 GuardedObject 对象
新增加 id 字段,用于映射关系
public class GuardedObject {
private Long id;
/**
* 存储结果载体
*/
private Object result;
/**
* 创建一个锁对象
*/
private final Object lock = new Object();
public GuardedObject(Long id) {
this.id = id;
}
public Long getId() {
return id;
}
public Object get(){
synchronized (lock){
// 如果 结果==null 则表示有可能是虚假唤醒,继续等待
while(result == null){
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return this.result;
}
}
/**
*
* @param timeout 超时时长
* @return
*/
public Object get(long timeout){
// 0. 假设 timeout 是 2000 = 2s
synchronized (lock){
// 1. 记录开始时间
long startTime = System.currentTimeMillis();
// 2. 已经过的时长: 初始值是:0
long passTime = 0;
while(result == null){
// 4. 此时线程还需要等待 (2 - 已等待的时间)= 1s
long waitTime = timeout - passTime;
// 5. 如果,现成已经等待了2s或则等多,那么这是应该立刻退出循环
if(waitTime <= 0){
break;
}
try {
// 2.5 已等待了1s
lock.wait(waitTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 3. 假设等待1s后,线程被虚假唤醒了,此时经过的时长为
passTime = System.currentTimeMillis() - startTime;
}
return this.result;
}
}
public void complete(Object obj){
synchronized (lock){
this.result = obj;
lock.notifyAll();
}
}
}
创建一个管理类管理GuardedObject
public class GuardedCollections {
private static Map<Long, GuardedObject> map = new Hashtable<>();
private static Long id = 1L;
/**
* 要保证ID生成的原子性
* @return
*/
private synchronized static Long generatorId(){
return id++;
}
public static GuardedObject createGuardedObject(){
GuardedObject guardedObject = new GuardedObject(generatorId());
map.put(guardedObject.getId(), guardedObject);
return guardedObject;
}
public static Set<Long> getIds(){
return map.keySet();
}
public static GuardedObject getGuardedObject(Long id){
return map.get(id);
}
}
创建RpcClient
@Slf4j(topic = "c.RpcClient")
public class RpcClient extends Thread{
@Override
public void run() {
// 创建远程调用请求体
GuardedObject guardedObject = GuardedCollections.createGuardedObject();
log.debug("请求前{}", guardedObject.getId());
// rpc 远程调用超时等待
Object object = guardedObject.get(5000);
log.debug("请求后{}, context: {}", guardedObject.getId(), object);
}
}
创建RpcServer
@Slf4j(topic = "c.postman")
public class RpcServer extends Thread{
/**
* 通过ID可以绑定一个 RpcClient 对象: 请求的一一对应
*/
private Long id;
private Object context;
public RpcServer(Long id, Object context) {
this.id = id;
this.context = context;
}
@Override
public void run() {
// 响应
GuardedObject guardedObject = GuardedCollections.getGuardedObject(id);
log.debug("开始处理请求:{}, 响应内容:{}", id, context);
guardedObject.complete(context);
}
}
测试
public class TestGuardedCollection {
public static void main(String[] args) throws InterruptedException {
// 模拟三个 RPC 请求
for (int i = 0; i < 3; i++) {
new RpcClient().start();
}
Thread.sleep(2000);
// 获取请求的所有id
Set<Long> ids = GuardedCollections.getIds();
for(Long id: ids){
// 创建远程代理对象执行 Service,
new RpcServer(id, String.format("{\"success\":true,\"data\":%s,\"code\":\"200\",\"message\":\"\"}", id)).start();
}
}
}
结果:
17:03:53.136 c.RpcClient [Thread-0] - 请求前3
17:03:53.136 c.RpcClient [Thread-1] - 请求前1
17:03:53.136 c.RpcClient [Thread-2] - 请求前2
17:03:55.139 c.postman [Thread-3] - 开始处理请求:3, 响应内容:{"success":true,"data":3,"code":"200","message":""}
17:03:55.139 c.postman [Thread-4] - 开始处理请求:2, 响应内容:{"success":true,"data":2,"code":"200","message":""}
17:03:55.139 c.RpcClient [Thread-0] - 请求后3, context: {"success":true,"data":3,"code":"200","message":""}
17:03:55.139 c.postman [Thread-5] - 开始处理请求:1, 响应内容:{"success":true,"data":1,"code":"200","message":""}
17:03:55.140 c.RpcClient [Thread-2] - 请求后2, context: {"success":true,"data":2,"code":"200","message":""}
17:03:55.140 c.RpcClient [Thread-1] - 请求后1, context: {"success":true,"data":1,"code":"200","message":""}
Process finished with exit code 0
wait + notify实现-设计模式-生产-消费者模式(异步)
创建一个消息对象(Message)
@Getter
public final class Message {
private Integer id;
private Object value;
public Message(Integer id, Object value) {
this.id = id;
this.value = value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
创建消息队列(MessageQueue)
@Slf4j(topic = "c.MessageQueue")
public class MessageQueue {
/**
* 消息队列集合
*/
private LinkedList<Message> queue = new LinkedList<>();
/**
* 消息队列容量
*/
private Integer capacity;
private final Object lock = new Object();
public MessageQueue(Integer capacity) {
this.capacity = capacity;
}
public Message take(){
synchronized (lock){
// 检查队列是否为空
while(this.queue.isEmpty()){
// 线程等待
try {
log.debug("队列为空消费者等待");
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 从头部获取消息并返回
Message message = this.queue.removeFirst();
log.debug("已消费消息");
// 唤醒 put 中等待的线程
lock.notifyAll();
return message;
}
}
public void put(Message message){
synchronized (lock){
// 检查队列是否已满,需要让put线程等待
while (queue.size() == capacity){
try {
log.debug("队列已满生产者等待");
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 加入数据到队列尾部
queue.addLast(message);
log.debug("已生产消息");
// 需要唤醒 take中等待的线程
lock.notifyAll();
}
}
}
测试
@Slf4j(topic = "c.TestConsumer")
public class TestConsumer {
public static void main(String[] args) {
// 创建队列
MessageQueue messageQueue = new MessageQueue(2);
// 创建三个生产者线程
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(()->{
messageQueue.put(new Message(id, "i am message body: " + id));
}, "生产者" + i).start();
}
// 定义一个消费者
new Thread(()->{
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Message message = messageQueue.take();
log.debug("消费消息:{}", message.toString());
}
}, "消费者").start();
}
}
结果:
17:52:54.755 c.MessageQueue [生产者0] - 已生产消息
17:52:54.757 c.MessageQueue [生产者1] - 已生产消息
17:52:54.757 c.MessageQueue [生产者2] - 队列已满生产者等待
17:52:55.755 c.MessageQueue [消费者] - 已消费消息
17:52:55.755 c.MessageQueue [生产者2] - 已生产消息
17:52:55.755 c.TestConsumer [消费者] - 消费消息:Message{id=0, value=i am message body: 0}
17:52:56.762 c.MessageQueue [消费者] - 已消费消息
17:52:56.762 c.TestConsumer [消费者] - 消费消息:Message{id=1, value=i am message body: 1}
17:52:57.765 c.MessageQueue [消费者] - 已消费消息
17:52:57.765 c.TestConsumer [消费者] - 消费消息:Message{id=2, value=i am message body: 2}
17:52:58.770 c.MessageQueue [消费者] - 队列为空消费者等待
Process finished with exit code 130 (interrupted by signal 2:SIGINT)