10.共享模型之无锁

共享模型之无锁

问题

场景:多线程操作成员变量,导致结果不对

public interface Account {

    Integer getBalance();

    void withdraw(Integer amount);

    static void demo(Account account){
        List<Thread> threadList = new ArrayList<>();

        long startTime = System.nanoTime();

        for (int i = 0; i < 1000; i++) {
            threadList.add(new Thread(()->{
                account.withdraw(10);
            }));
        }

        threadList.forEach(Thread::start);
        threadList.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        long endTime = System.nanoTime();
        System.out.println(account.getBalance() + " cost: " + (endTime - startTime) / 1000_000 + " ms");
    }
}

现成不安全实现类

public class AccountUnsafe implements Account {

    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        return this.balance;
    }

    @Override
    public void withdraw(Integer amount) {
        this.balance -= amount;
    }
}

测试:

public class TestAccount {

    public static void main(String[] args) {
        Account.demo(new AccountUnsafe(10000));
    }
}

结果:

90 cost: 112 ms

Process finished with exit code 0

期望:

0 cost: 112 ms

Process finished with exit code 0

分析

因为:this.balance -= amount; 包含了共享变量的读写操作,是临界区代码

解决方案一:synchronized锁

@Override
public synchronized void withdraw(Integer amount) {
    this.balance -= amount;
}

解决方案二:无锁

public class AccountSafe implements Account {

    private final AtomicInteger balance;

    public AccountSafe(AtomicInteger balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        while (true){
            int oldValue = balance.get();
            int newValue = oldValue - amount;
            if(balance.compareAndSet(oldValue, newValue)){
                break;
            }
        }

        // 可以调用以下方法替换上面代码
        // balance.addAndGet(-1 * amount);

    }
}

测试

public class TestAccount {

    public static void main(String[] args) {
        Account.demo(new AccountUnsafe(10000));

        Account.demo(new AccountSafe(new AtomicInteger(10000)));
    }
}

结果:

590 cost: 109 ms
0 cost: 73 ms

Process finished with exit code 0

CAS 与 volatile

分析 无锁方法 withdraw

@Override
    public void withdraw(Integer amount) {
        // 需要不断尝试,直到成功为止
        while (true){
            // 比如拿到了旧值 1000
            int oldValue = balance.get();
            // 在这个基础上 1000 - 10 = 990
            int newValue = oldValue - amount;
            /*
              compareAndSet 正是这个检查,在 set 前,先比较 prev 与当前值
              - 不一致了,next 作废,返回 false 表示失败
            比如,别的线程已经做了减法,当前值已经被减成了 990
            那么本线程的这次 990 就作废了,进入 while 下次循环重试
            - 一致,以 next 设置为新值,返回 true 表示成功
            */
            if(balance.compareAndSet(oldValue, newValue)){
                break;
            }
        }
    }

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作

sequenceDiagram
	participant t1 as 线程1
	participant account as Account 对象
	participant t2 as 线程2
	participant t3 as 线程3
	participant tn as 线程n
	
	t1 ->> account: 获取金额1000
	t1 ->> t1: 减 10 = 990
	t2 ->> account: 获取金额1000
	t2 ->> t2: 减 10 = 990
	t2 ->> account: cas(1000, 990) 比对成功
	t2 -->> account: 修改account值为 990
	t2 -->> t2: 退出循环
	t1 ->> account: cas(1000, 990) 比对失败
	t1 ->> account: 获取金额990
	t3 ->> account: 获取金额990
	t1 ->> t1: 减 10 = 980
	t3 ->> t3: 减 10 = 980
	t3 ->> account: cas(990, 980) 比对成功
	t3 -->> account: 修改account值为 980
	t3 -->> t3: 退出循环
	t1 ->> account: cas(990, 980) 比对失败
	t1 ->> account: 获取金额980
	t1 ->> t1: 减 10 = 970
	t1 ->> account: cas(980, 970) 比对成功
	t1 -->> account: 修改account值为 970
	t1 -->> t1: 退出循环
	
	

其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。

  • 在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

查看 AtomicInteger 源码:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

   ......

    private volatile int value;
  
  ......
  
}

可以看见value字段被 volatile 关键字修饰,这里~ 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性) ;CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

CAS 和 synchronized

synchronizedcas 没有绝对的谁效率高,要看所处的场景

  • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻 > - 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
    • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。

  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。

  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思

    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一

    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

0%