前言: 这篇文章根据场景分类总结实践的最优做法以及注意事项, 不详细解释基本概念; 更灵活, 性能更好往往意味着代码复杂度的增加, 根据实际业务选择, 有时最简单的写法反而更合适
可见性
volatile
: 对于一个多线程共享的变量, 每次访问变量时,总是获取主内存的最新值, 且当某个线程在其本地内存副本中修改了该变量的值, 立刻回写到主内存
线程同步
原子操作
赋值不需要同步
long
和double
除外)赋值,例如:int n = m
, long
和double
是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把long
和double
的赋值作为原子操作实现的。List list = anotherList
Atomic 原子类
把单个变量引用或者值的变化封装为原子操作, 可分为4类
基本类型
使用原子的方式更新基本类型
AtomicInteger
:整型原子类AtomicLong
:长整型原子类AtomicBoolean
:布尔型原子类
数组类型
使用原子的方式更新数组里的某个元素
AtomicIntegerArray
:整型数组原子类AtomicLongArray
:长整型数组原子类AtomicReferenceArray
:引用类型数组原子类
引用类型
AtomicReference
:引用类型原子类AtomicMarkableReference
:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来。AtomicStampedReference
:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用CAS
进行原子更新时可能出现的ABA
问题。
ABA
问题不是必须解决的, 如果业务关注变量的值而不在意值变化的过程, 那就不需要处理。
对象的属性修改类型
AtomicIntegerFieldUpdater
:原子更新整型字段的更新器AtomicLongFieldUpdater
:原子更新长整型字段的更新器AtomicReferenceFieldUpdater
:原子更新引用类型里的字段
一般场景
使用synchronized
或ReentrantLock
ReentrantLock
性能更好, 提供了tryLock()
方法限制了最大阻塞时间ReentrantLock
更灵活, 临界区可跨多个代码块ReentrantLock
更适用于顺序敏感的场景, synchronized
和ReentrantLock
默认都是非公平锁, 但ReentrantLock
可以在构造时new ReentrantLock(true)
设置为公平锁读多写少
使用ReadWriteLock
或StampedLock
, 将读锁和写锁分离, 提高读并发性能
ReadWriteLock
: 悲观读锁, 把读写操作分别用读锁和写锁来加锁, 允许多个线程同时读(当有一个线程持有读锁, 其他线程也可以获取读锁, 这样就大大提高了并发读的执行效率), 但它只允许一个线程写入(当有一个线程持有写锁, 其他线程读锁和写锁都获取不到)StampedLock
: 乐观读锁, 它和ReadWriteLock
相比,不同之处在于,读的过程中也允许获取写锁,这样一来,我们读的数据就可能不一致,但需要一点额外的代码来判断读的过程中是否有写入public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
死锁
各线程获取可重入锁的顺序一定要相同
public void add(int m) {
synchronized(lockA) { // 获得lockA的锁
this.value += m;
synchronized(lockB) { // 获得lockB的锁
this.another += m;
} // 释放lockB的锁
} // 释放lockA的锁
}
public void dec(int m) {
synchronized(lockB) { // 获得lockB的锁
this.another -= m;
synchronized(lockA) { // 获得lockA的锁
this.value -= m;
} // 释放lockA的锁
} // 释放lockB的锁
}
对于上述代码,线程1和线程2如果分别执行add()
和dec()
方法时, 执行到内层的synchronized
时就会永远等待下去, 造成死锁
线程协同
等待/唤醒
使用synchronized
+wait/notify
, 或者ReentrantLock
+Condition
可以做到最细粒度的控制, 而且进入等待状态会释放锁, 不会阻塞其他线程, 但代码过于繁琐, 实战中尽量用其他并发容器替代, 没有必要自己实现
class TaskQueue {
Queue queue = new LinkedList();
public synchronized void addTask(String s) {
this.queue.add(s);
this.notifyAll();
}
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
}
CountDownLatch
CountDownLatch
阻塞主线程, 等待指定数量的线程完成后, 执行指定的逻辑, 基本上可以被CompletableFuture
的静态方法allOf
取代, 实践中不考虑用它
CyclicBarrier
CyclicBarrier
阻塞子线程, 作为一道屏障拦截在多个线程上, 屏障本身包含一段逻辑, 线程经过屏障时会等待, 所有线程都通过屏障时, 执行屏障逻辑, 各子线程也继续往下执行, 实践中还是很有用的
Semaphore
Semaphore
用于限制同一时间并发访问的线程数量
CompletableFuture
CompletableFuture在实践中最为常用, 下面详细说明
1. 实例方法
实例方法较多, 建议使用Aync
版本的方法, 防止阻塞主线程, 以及避免不确定性
当单个异步任务完成后
thenApply
: 对其结果执行Function
, 返回一个新CompletionStage
thenAccept
: 对其结果执行Consumer
thenRun
: 执行一个Runnable
thenCompose
: 类似thenApply, 他们的回参类型都是CompletionStage
, 但thenCompose
执行的是不是一个普通的Function
, 而是Function
, 当现有的方法返回已经是一个CompletionStage
时, 相比thenApply
,thenCompose
不会嵌套, 因此thenApply
适合用来编写新的异步逻辑, 而thenCompose
更适合用来串接多个已有的CompletableFuture
// 回调是普通方法
CompletableFuture futureApply = CompletableFuture
.supplyAsync(() -> 1)
.thenApply(x -> x+1);
CompletableFuture futureCompose = CompletableFuture
.supplyAsync(() -> 1)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));
// 回调是已有的异步方法, thenApply会嵌套一层而thenCompose不会
public CompletableFuture getUserInfo(userId)
public CompletableFuture getUserRating(UserInfo)
CompletableFuture f =
userInfo.thenApply(this::getUserRating);
CompletableFuture relevanceFuture =
userInfo.thenCompose(this::getUserRating);
当两个异步任务都完成后
thenCombine
: 对它们的结果执行BiFunction
, 返回一个新结果thenAcceptBoth
: 对它们的结果执行BiConsumer
runAfterBoth
: 执行一个Runnable
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
int combinedResult = combinedFuture.join(); // 或者使用 get() 方法获取结果
System.out.println(combinedResult); // 输出:30,因为 future1 返回 10,future2 返回 20,合并结果为 10 + 20 = 30
当两个异步任务中的任意一个完成后
applyToEither
: 对其结果后执行Function
, 返回一个新结果, 不需要等待两个任务都完成acceptEither
: 对其结果执行Consumer
runAfterEither
: 执行一个Runnable
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000); // 模拟任务1耗时2秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟任务2耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture resultFuture = future1.applyToEither(future2, result -> result * 2);
int result = resultFuture.join(); // 或者使用 get() 方法获取结果
System.out.println(result); // 输出:40,因为 future2 先完成,结果为 20,应用 fn 函数得到 20 * 2 = 40
异常处理相关
exceptionally
: 入参是一个Function
, 当exceptionally
前的异步操作抛出异常时,可以对这个异常进行处理,并返回一个新的CompletionStage
handle
: 和exceptionally
类似, 但入参是一个BiFunction
, 因此异常和正常的情况可以处理, ,并返回一个新的CompletionStage
, 传递给后面whenComplete
: 和handle
类似, , 可以同时处理正常和异常的情况, 但入参是一个BiConsumer
,Consumer
是没有回参的, 所以whenComplete
不产生新的异步结果
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// Simulating an exception
throw new RuntimeException("Oops, something went wrong!");
}).exceptionally(ex -> {
// Handling the exception
System.out.println("Caught exception: " + ex.getMessage());
return 0; // Providing a default value
});
future.thenAccept(result -> {
System.out.println("Final result: " + result);
});
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return 10 / 2;
});
CompletableFuture handledFuture = future.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
} else {
return "Result: " + result;
}
});
handledFuture.thenAccept(result -> {
System.out.println(result);
});
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return 10 / 2;
});
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("Exception occurred: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});
其它实例方法
Future
接口下
get()
:调用方线程阻塞, 等待异步任务完成后获取结果get(long timeout, TimeUnit unit)
:同get()
,但只等待指定的时间;cancel(boolean mayInterruptIfRunning)
:取消当前任务;isDone()
:判断任务是否已完成。join()
: 调用方线程阻塞, 等待异步任务完成
CompletableFuture
类下
getNow(T valueIfAbsent)
: 不会阻塞调用方线程, 如果任务已完成则返回结果, 否则返回给定的缺省值complete(T value)/completeExceptionally(Throwable ex)
: 直接手动完成异步任务, 返回给定的正常或异常结果, 如果在调用该方法之前已经有一个结果(包括正常结果或异常),则该方法不会生效, 这个方法可以用于模拟异步任务的完成,并将结果传递给等待该任务的其他部分。obtrudeValue(T value)/obtrudeException(Throwable ex)
: 类似complete(T value)/completeExceptionally(Throwable ex)
, 但会无视之前的结果, 强制替换为给定的值
备注
Future接口
下的方法get()
和CompletableFuture
类下的join()
方法的区别在于get()
会抛出checked exception
, 需要try...catch...
手动处理异常, 而join()
不需要, 发生异常时join()
会抛出一个checked CompletionException
,CompletionException
中包裹着真正的异常信息- 对于异常的处理, 更好的方式还是在
get()
或者join()
之前就调用exceptionally
2. 静态方法
runAync/supplyAsync
: 执行异步任务, 可指定提交的线程池, 默认使用ForkJoinPool.commonPool()
anyOf
/allOf
:applyToEither
/applyToBoth
,acceptEither
/acceptBoth
,runAfterEither
/runAfterBoth
的强化版, 可以组合2个以上的CompletableFuture
completedFuture
: 创建一个已完成的任务, 并指定它的返回值, 可用于在异步调用链中返回常量
3. 属性
isCompletedExceptionally
:如果异步任务异常结束时返回true
, 未完成或已正常完成返回false
大任务分割
ForkJoinPool
线程池可以把一个大任务递归地分拆成小任务并行执行,任务类必须继承自RecursiveTask
或RecursiveAction
,但代码较为繁琐, 普通场景推荐使用进一步封装的parallelStream()
parallelStream()
的性能优于parallel()
常用并发容器
// TODO
线程池
// TODO