Java多线程最佳实践

2023年 10月 2日 29.1k 0

前言: 这篇文章根据场景分类总结实践的最优做法以及注意事项, 不详细解释基本概念; 更灵活, 性能更好往往意味着代码复杂度的增加, 根据实际业务选择, 有时最简单的写法反而更合适

可见性

  • volatile: 对于一个多线程共享的变量, 每次访问变量时,总是获取主内存的最新值, 且当某个线程在其本地内存副本中修改了该变量的值, 立刻回写到主内存

image.png

线程同步

原子操作

赋值不需要同步

  • 基本类型(longdouble除外)赋值,例如:int n = m, longdouble是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把longdouble的赋值作为原子操作实现的。
  • 引用类型赋值,例如:List list = anotherList
  • Atomic 原子类

    把单个变量引用或者值的变化封装为原子操作, 可分为4类

    基本类型

    使用原子的方式更新基本类型

    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean:布尔型原子类

    数组类型

    使用原子的方式更新数组里的某个元素

    • AtomicIntegerArray:整型数组原子类
    • AtomicLongArray:长整型数组原子类
    • AtomicReferenceArray:引用类型数组原子类

    引用类型

    • AtomicReference:引用类型原子类
    • AtomicMarkableReference:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来。
    • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。

    ABA 问题不是必须解决的, 如果业务关注变量的值而不在意值变化的过程, 那就不需要处理。

    对象的属性修改类型

    • AtomicIntegerFieldUpdater:原子更新整型字段的更新器
    • AtomicLongFieldUpdater:原子更新长整型字段的更新器
    • AtomicReferenceFieldUpdater:原子更新引用类型里的字段

    一般场景

    使用synchronizedReentrantLock

  • ReentrantLock性能更好, 提供了tryLock()方法限制了最大阻塞时间
  • ReentrantLock更灵活, 临界区可跨多个代码块
  • ReentrantLock更适用于顺序敏感的场景, synchronizedReentrantLock默认都是非公平锁, 但ReentrantLock可以在构造时new ReentrantLock(true)设置为公平锁
  • 读多写少

    使用ReadWriteLockStampedLock, 将读锁和写锁分离, 提高读并发性能

  • ReadWriteLock: 悲观读锁, 把读写操作分别用读锁和写锁来加锁, 允许多个线程同时读(当有一个线程持有读锁, 其他线程也可以获取读锁, 这样就大大提高了并发读的执行效率), 但它只允许一个线程写入(当有一个线程持有写锁, 其他线程读锁和写锁都获取不到)
  • StampedLock: 乐观读锁, 它和ReadWriteLock相比,不同之处在于,读的过程中也允许获取写锁,这样一来,我们读的数据就可能不一致,但需要一点额外的代码来判断读的过程中是否有写入
  • public class Point {
        private final StampedLock stampedLock = new StampedLock();
    
        private double x;
        private double y;
    
        public void move(double deltaX, double deltaY) {
            long stamp = stampedLock.writeLock(); // 获取写锁
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                stampedLock.unlockWrite(stamp); // 释放写锁
            }
        }
    
        public double distanceFromOrigin() {
            long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
            // 注意下面两行代码不是原子操作
            // 假设x,y = (100,200)
            double currentX = x;
            // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
            double currentY = y;
            // 此处已读取到y,如果没有写入,读取是正确的(100,200)
            // 如果有写入,读取是错误的(100,400)
            if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
                stamp = stampedLock.readLock(); // 获取一个悲观读锁
                try {
                    currentX = x;
                    currentY = y;
                } finally {
                    stampedLock.unlockRead(stamp); // 释放悲观读锁
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }
    }
    

    死锁

    各线程获取可重入锁的顺序一定要相同

    public void add(int m) {
        synchronized(lockA) { // 获得lockA的锁
            this.value += m;
            synchronized(lockB) { // 获得lockB的锁
                this.another += m;
            } // 释放lockB的锁
        } // 释放lockA的锁
    }
    
    public void dec(int m) {
        synchronized(lockB) { // 获得lockB的锁
            this.another -= m;
            synchronized(lockA) { // 获得lockA的锁
                this.value -= m;
            } // 释放lockA的锁
        } // 释放lockB的锁
    }
    

    对于上述代码,线程1和线程2如果分别执行add()dec()方法时, 执行到内层的synchronized时就会永远等待下去, 造成死锁

    线程协同

    等待/唤醒

    使用synchronized+wait/notify, 或者ReentrantLock+Condition可以做到最细粒度的控制, 而且进入等待状态会释放锁, 不会阻塞其他线程, 但代码过于繁琐, 实战中尽量用其他并发容器替代, 没有必要自己实现

    class TaskQueue {
        Queue queue = new LinkedList();
    
        public synchronized void addTask(String s) {
            this.queue.add(s);
            this.notifyAll();
        }
    
        public synchronized String getTask() throws InterruptedException {
            while (queue.isEmpty()) {
                this.wait();
            }
            return queue.remove();
        }
    }
    

    CountDownLatch

    • CountDownLatch阻塞主线程, 等待指定数量的线程完成后, 执行指定的逻辑, 基本上可以被CompletableFuture的静态方法allOf取代, 实践中不考虑用它

    CyclicBarrier

    • CyclicBarrier阻塞子线程, 作为一道屏障拦截在多个线程上, 屏障本身包含一段逻辑, 线程经过屏障时会等待, 所有线程都通过屏障时, 执行屏障逻辑, 各子线程也继续往下执行, 实践中还是很有用的

    Semaphore

    • Semaphore用于限制同一时间并发访问的线程数量

    CompletableFuture

    CompletableFuture在实践中最为常用, 下面详细说明

    1. 实例方法

    实例方法较多, 建议使用Aync版本的方法, 防止阻塞主线程, 以及避免不确定性

    当单个异步任务完成后

    • thenApply: 对其结果执行Function, 返回一个新CompletionStage
    • thenAccept: 对其结果执行Consumer
    • thenRun: 执行一个Runnable
    • thenCompose: 类似thenApply, 他们的回参类型都是CompletionStage, 但thenCompose执行的是不是一个普通的Function, 而是Function, 当现有的方法返回已经是一个CompletionStage时, 相比thenApply, thenCompose不会嵌套, 因此thenApply适合用来编写新的异步逻辑, 而thenCompose更适合用来串接多个已有的CompletableFuture
    // 回调是普通方法
    CompletableFuture futureApply = CompletableFuture
                                         .supplyAsync(() -> 1)
                                         .thenApply(x -> x+1);
                                       
    CompletableFuture futureCompose = CompletableFuture
                                  .supplyAsync(() -> 1)
                                  .thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));
    
    // 回调是已有的异步方法, thenApply会嵌套一层而thenCompose不会
    public CompletableFuture getUserInfo(userId)
    public CompletableFuture getUserRating(UserInfo)
    
    CompletableFuture f =
        userInfo.thenApply(this::getUserRating);
    
    CompletableFuture relevanceFuture =
        userInfo.thenCompose(this::getUserRating);
    

    当两个异步任务都完成后

    • thenCombine: 对它们的结果执行BiFunction, 返回一个新结果
    • thenAcceptBoth: 对它们的结果执行BiConsumer
    • runAfterBoth: 执行一个Runnable
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> 10);
    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 20);
    
    CompletableFuture combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
    
    int combinedResult = combinedFuture.join(); // 或者使用 get() 方法获取结果
    
    System.out.println(combinedResult); // 输出:30,因为 future1 返回 10,future2 返回 20,合并结果为 10 + 20 = 30
    

    当两个异步任务中的任意一个完成后

    • applyToEither: 对其结果后执行Function, 返回一个新结果, 不需要等待两个任务都完成
    • acceptEither: 对其结果执行Consumer
    • runAfterEither: 执行一个Runnable
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000); // 模拟任务1耗时2秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 10;
    });
    
    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000); // 模拟任务2耗时1秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 20;
    });
    
    CompletableFuture resultFuture = future1.applyToEither(future2, result -> result * 2);
    
    int result = resultFuture.join(); // 或者使用 get() 方法获取结果
    
    System.out.println(result); // 输出:40,因为 future2 先完成,结果为 20,应用 fn 函数得到 20 * 2 = 40
    

    异常处理相关

    • exceptionally: 入参是一个Function, 当exceptionally前的异步操作抛出异常时,可以对这个异常进行处理,并返回一个新的CompletionStage
    • handle: 和exceptionally类似, 但入参是一个BiFunction, 因此异常和正常的情况可以处理, ,并返回一个新的CompletionStage, 传递给后面
    • whenComplete: 和handle类似, , 可以同时处理正常和异常的情况, 但入参是一个BiConsumer, Consumer是没有回参的, 所以whenComplete不产生新的异步结果
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        // Simulating an exception
        throw new RuntimeException("Oops, something went wrong!");
    }).exceptionally(ex -> {
        // Handling the exception
        System.out.println("Caught exception: " + ex.getMessage());
        return 0; // Providing a default value
    });
    
    future.thenAccept(result -> {
        System.out.println("Final result: " + result);
    });
    
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        return 10 / 2;
    });
    
    CompletableFuture handledFuture = future.handle((result, ex) -> {
        if (ex != null) {
            return "Error: " + ex.getMessage();
        } else {
            return "Result: " + result;
        }
    });
    
    handledFuture.thenAccept(result -> {
        System.out.println(result);
    });
    
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        return 10 / 2;
    });
    
    future.whenComplete((result, ex) -> {
        if (ex != null) {
            System.out.println("Exception occurred: " + ex.getMessage());
        } else {
            System.out.println("Result: " + result);
        }
    });
    

    其它实例方法

    Future接口下

    • get():调用方线程阻塞, 等待异步任务完成后获取结果
    • get(long timeout, TimeUnit unit):同get(),但只等待指定的时间;
    • cancel(boolean mayInterruptIfRunning):取消当前任务;
    • isDone():判断任务是否已完成。
    • join(): 调用方线程阻塞, 等待异步任务完成

    CompletableFuture类下

    • getNow(T valueIfAbsent): 不会阻塞调用方线程, 如果任务已完成则返回结果, 否则返回给定的缺省值
    • complete(T value)/completeExceptionally(Throwable ex): 直接手动完成异步任务, 返回给定的正常或异常结果, 如果在调用该方法之前已经有一个结果(包括正常结果或异常),则该方法不会生效, 这个方法可以用于模拟异步任务的完成,并将结果传递给等待该任务的其他部分。
    • obtrudeValue(T value)/obtrudeException(Throwable ex): 类似complete(T value)/completeExceptionally(Throwable ex), 但会无视之前的结果, 强制替换为给定的值

    备注

    • Future接口下的方法get()CompletableFuture类下的join()方法的区别在于get()会抛出checked exception, 需要try...catch...手动处理异常, 而join()不需要, 发生异常时join()会抛出一个checked CompletionException, CompletionException中包裹着真正的异常信息
    • 对于异常的处理, 更好的方式还是在get()或者join()之前就调用exceptionally

    2. 静态方法

    • runAync/supplyAsync: 执行异步任务, 可指定提交的线程池, 默认使用ForkJoinPool.commonPool()
    • anyOf/allOf: applyToEither/applyToBoth,acceptEither/acceptBoth,runAfterEither/runAfterBoth的强化版, 可以组合2个以上的CompletableFuture
    • completedFuture: 创建一个已完成的任务, 并指定它的返回值, 可用于在异步调用链中返回常量

    3. 属性

    • isCompletedExceptionally:如果异步任务异常结束时返回true, 未完成或已正常完成返回false

    大任务分割

    ForkJoinPool线程池可以把一个大任务递归地分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction,但代码较为繁琐, 普通场景推荐使用进一步封装的parallelStream()

    parallelStream()的性能优于parallel()

    常用并发容器

    // TODO

    线程池

    // TODO

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论