TransmittableThreadLocal 线程池内异步线程值传递解决方案

2023年 10月 9日 69.8k 0

1. ThreadLocal 基础回顾

从字面上的意思来理解 ThreadLocal,Thread:线程;Local:本地的,局部的。

ThreadLocal 是 JDK1.2 新增的 API,在 JDK1.5 后支持到了泛型。表示线程局部变量:为 当前线程 绑定一个变量,这样在线程生命周期内的 任何地方 均可取出。

也就是说,ThreadLocal 是线程本地的变量,只要是本线程内都可以使用,线程结束了,那么相应的线程本地变量也就跟随着线程消失了。

1.1. ThreadLocal 基本用法

知道了 ThreadLocal 是什么后,怎么用其实就非常简单了。看如下这个简单示例:

@ToString
@lombok.Data
public class Data {

    private Integer id = 1;

}
public class Test {

    private static final ThreadLocal THREAD_LOCAL = new ThreadLocal();

    public static void main(String[] args) {
        Test test = new Test();
        test.setData(new Data());
        test.getAndPrintData();
    }

    private void setData(Data data) {
        System.out.println("set数据,线程名:" + Thread.currentThread().getName());
        THREAD_LOCAL.set(data);
    }

    private Data getAndPrintData() {
        Data data = THREAD_LOCAL.get();
        System.out.println("get数据,线程名:" + Thread.currentThread().getName() + ",数据为:" + data);
        return data;
    }

}

执行后输出:

set数据,线程名:main
get数据,线程名:main,数据为:Data(id=1)

1.2. ThreadLocal 内存泄露

ThreadLocal 仅有如下三个方法:

public void set(T value) { 
     // 设置值:把value和当前线程绑定
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
}

public T get() { 
    // 获取值:获取和当前线程绑定的变量值
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
} 

public void remove() { 
     // 删除值:移除绑定关系
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
}

虽然每个绑定关系都是使用的 WeakReference,但是还是建议你显示的做好 remove() 移除动作,否则容易造成内存泄漏。

    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    
    static class ThreadLocalMap {

        static class Entry extends WeakReference>,所以在 Entry 中 ThreadLocal 是弱引用,一旦发生 GC,ThreadLocal 会被 GC 回收掉,但是 value 是强引用,它不会被回收掉。

20200427173151574.webp

当 JVM 发生 GC 后,也就是 key 会变为 null,value 是强引用有值,整个 Entry 也有值,它依然在 ThreadLocalMap 中,并占据着内存。我们获取数据时,使用 ThreadLocal 的 get() 方法,ThreadLocal 并不为 null,所以我们无法通过一个 key 为 null 去访问到该 entry 的 value,这时就造成了内存泄漏。

那 key 不用弱引用不就好了,真的多此一举?

 ThreadLocal threadLocal = new ThreadLocal();
 threadLocal.set(new Object());
 threadLocal = null; // 直接设置为 null

如果是强引用的话,在设置完数据后,直接将 threadLocal 设为 null,这时栈中 ThreadLocal ref 到堆中 threadLocal 断开了,但是 key 到 ThreadLocal 的引用依然存在,GC 依旧没法回收,同样会造成内存泄漏。

那弱引用比强引用好在哪?当 key 为弱引用时,同样是上面代码,当 threadLocal 设为 null 是,栈中 ThreadLocal ref 到堆中 threadLoacl 断开了,key 到 ThreadLoacl 也因为 GC 断开了,这时 threadLocal 就可以被回收了。

此时,ThreadLocal 也可以 根据 key.get() == null 来判断 key 是否已经被回收,因此 ThreadLocal 可以自己清理这些过期的节点来避免内存泄漏。

2. ThreadLocal 局限性

2.1. 子线程无法获取主线程的数据

在当下互联网环境下,经常会用到了异步方式来提高程序运行效率,例子中就创建了一个异步子线程打印变量。

我们把第一章1.1的代码稍微做下改动:

public class Test {

    private static final ThreadLocal THREAD_LOCAL = new ThreadLocal();

    public static void main(String[] args) throws InterruptedException {
        // 主线程设置一个共享变量
        Test test = new Test();
        test.setData(new Data());

        // 子线程1打印共享变量
        Thread subThread1 = new Thread(test::getAndPrintData);
        subThread1.start();
        subThread1.join();

        // 主线程打印共享变量
        test.getAndPrintData();
    }

    private void setData(Data data) {
        System.out.println("set数据,线程名:" + Thread.currentThread().getName());
        THREAD_LOCAL.set(data);
    }

    private Data getAndPrintData() {
        Data data = THREAD_LOCAL.get();
        System.out.println("get数据,线程名:" + Thread.currentThread().getName() + ",数据为:" + data);
        return data;
    }

}

执行后输出:

set数据,线程名:main
get数据,线程名:Thread-0,数据为:null
get数据,线程名:main,数据为:Data(id=1)

线程名为 Thread-0 的子线程里并没有获取到数据,只因为它 并不是当前线程,貌似合情合理,这便是 ThreadLocal 的局限性。

2.2. 使用 InheritableThreadLocal

InheritableThreadLocal 继承自 ThreadLocal,在其基础上扩展了能力:不仅可在本线程内获取绑定的变量,在 子线程 内亦可获取到。(请注意:必须是子线程,线程池就可能不行,因为线程池里的线程是实现初始化好的,并不一定是你的子线程)

ThreadLocal 和 InheritableThreadLocal 均是 JDK1.2 新增的 API,在 JDK1.5 后支持到了泛型。表示 线程局部变量:为 当前线程 绑定一个变量,这样在线程生命周期内的 任何地方 均可取出。

上边的代码,只要改一句话:

private static final ThreadLocal THREAD_LOCAL = new InheritableThreadLocal();

执行后输出:

set数据,线程名:main
get数据,线程名:Thread-0,数据为:Data(id=1)
get数据,线程名:main,数据为:Data(id=1)

线程名为 Thread-0 的子线程里获取到数据了,不要高兴的太早。平时我们写代码是使用线程池,而不会直接使用 new Thread(),后文就会解释线程池环境下 InheritableThreadLocal 失效的场景。

2.3. ThreadLocal 不能解决共享变量的线程安全问题

这里有几个肤浅的错误结论:

  • 只要这个变量是共享变量,把它用 ThreadLocal 包起来便可?
  • 别的线程修改其线程绑定的变量,并不影响其它线程里的变量值?
  • 以上结果,如果你的 ThreadLocal 绑定的是 Immutable 不可变变量,如字符串等,那结论尚能成立,但 若绑定的是引用类型的共享变量,那是绝对错误的。

    我们把第一章1.1的代码稍微做下改动:

    public class Test {
    
        private static final ThreadLocal THREAD_LOCAL = new InheritableThreadLocal();
    
        public static void main(String[] args) throws InterruptedException {
            // 主线程设置一个共享变量
            Test test = new Test();
            test.setData(new Data());
    
            // 子线程1把共享变量的id改为2,并打印
            Thread subThread1 = new Thread(() -> {
                Data data = test.getAndPrintData();
                if (data != null) data.setId(2);
                test.getAndPrintData();
            });
            subThread1.start();
            subThread1.join();
    
            // 子线程2打印共享变量
            Thread subThread2 = new Thread(test::getAndPrintData);
            subThread2.start();
            subThread2.join();
    
            // 主线程打印共享变量
            test.getAndPrintData();
        }
    
        private void setData(Data data) {
            System.out.println("set数据,线程名:" + Thread.currentThread().getName());
            THREAD_LOCAL.set(data);
        }
    
        private Data getAndPrintData() {
            Data data = THREAD_LOCAL.get();
            System.out.println("get数据,线程名:" + Thread.currentThread().getName() + ",数据为:" + data);
            return data;
        }
    
    }
    

    执行后输出:

    set数据,线程名:main
    get数据,线程名:Thread-0,数据为:Data(id=1)
    get数据,线程名:Thread-0,数据为:Data(id=2)
    get数据,线程名:Thread-1,数据为:Data(id=2)
    get数据,线程名:main,数据为:Data(id=2)
    

    线程 subThread1 把共享变量 data 的值改过之后,其它线程再去获取得到的均是改变后的值,因此此处使用 ThreadLocal 并没有达到解决共享变量线程安全问题的效果。

    为何会出现此现象?是因为这里面所谓的变量副本都是 “引用传递” 的,指向的数据是同一位置,就是个浅拷贝。ThreadLocal 包装根本就不能解决共享变量的多线程安全问题。

    所以说,要想使引用类型的变量线程安全,每个线程内必须手动 new 一个实例:

    • 每个线程独享一份 new 出来的实例 -> 线程安全
    • 多个线程共享一份“引用类型”实例 -> 线程不安全

    ThreadLocal 最大的用处还是来解决实例变量的全局存取方便问题,但并不是专门用来解决线程安全问题。

    3. InheritableThreadLocal 原理和局限性

    3.1. InheritableThreadLocal 原理

    我们先看 InheritableThreadLocal 的源码:

    public class InheritableThreadLocal extends ThreadLocal {
    
        protected T childValue(T parentValue) {
            return parentValue;
        }
    
        ThreadLocalMap getMap(Thread t) {
           return t.inheritableThreadLocals;
        }
    
        void createMap(Thread t, T firstValue) {
            t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
        }
    }
    

    Thread t 专门维护了一个类型是 ThreadLocalMap 的 inheritableThreadLocals 变量,用来进行父子传递:

    public class Thread implements Runnable {
    
        ...
    
        ThreadLocal.ThreadLocalMap threadLocals = null;
    
        ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    
        // inheritThreadLocals是否继承线程的本地变量们(默认是true)
        private void init(ThreadGroup g, Runnable target, String name,
                          long stackSize, AccessControlContext acc,
                          boolean inheritThreadLocals) {
            ...
    
            Thread parent = currentThread();
            ...
    
            if (inheritThreadLocals && parent.inheritableThreadLocals != null)
                this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
            ...
      
            tid = nextThreadID();
        }
    
        ... 
    }
    

    Thread 的 init() 方法中的两步:

  • 当前线程作为新创建线程的父线程;
  • 如果父线程绑定了变量 inheritableThreadLocals != null 并且允许继承 inheritThreadLocals = true,那么就会把父线程绑定的变量 Map 拷贝一份到子线程里 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
  • 这里的 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals) :

        static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
            return new ThreadLocalMap(parentMap);
        }
    
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];
    
            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal key = (ThreadLocal) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }
    

    这里边拷贝时候的 Object value = key.childValue(e.value); 这个 childValue(e.value) 方法在 InheritableThreadLocal 里的实现是:

        protected T childValue(T parentValue) {
            return parentValue;
        }
    

    所以 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals) 这里的拷贝是浅拷贝。

    InheritableThreadLocal 支持子线程访问父线程中本地变量的原理是:创建子线程时将父线程中的本地变量值浅拷贝了一份到自己这来,拷贝的时机是子线程创建时。

    3.2. InheritableThreadLocal 局限性

    在实际生产中,线程一般不可能孤立的独立去运行,而是交给线程池去调度处理。所以实际上 几乎没有纯正的父子线程 的关系存在,而若有这种需求大多是线程池与线程池之间的线程联系。

    我们稍微改下第二章2.2的代码,使用线程池来操作:

    public class Test {
    
        private static final ThreadLocal THREAD_LOCAL = new InheritableThreadLocal();
        private static final ExecutorService THREAD_POOL = Executors.newSingleThreadExecutor();
    
        public static void main(String[] args) throws InterruptedException {
    
            Test test = new Test();
    
            Data data1 =  new Data();
            THREAD_LOCAL.set(data1);
    
            THREAD_POOL.execute(test::getAndPrintData);
            TimeUnit.SECONDS.sleep(2);
            Data data2 =  new Data();
            data2.setId(2);
            THREAD_LOCAL.set(data2); // 给线程重新绑定值
    
            THREAD_POOL.execute(test::getAndPrintData);
            TimeUnit.SECONDS.sleep(2);
        }
    
        private void setData(Data data) {
            System.out.println("set数据,线程名:" + Thread.currentThread().getName());
            THREAD_LOCAL.set(data);
        }
    
        private Data getAndPrintData() {
            Data data = THREAD_LOCAL.get();
            System.out.println("get数据,线程名:" + Thread.currentThread().getName() + ",数据为:" + data);
            return data;
        }
    
    }
    

    执行后输出:

    get数据,线程名:pool-1-thread-1,数据为:Data(id=1)
    get数据,线程名:pool-1-thread-1,数据为:Data(id=1)
    

    给线程重新绑定值,居然没生效?

    我们把线程池大小扩充下,让他不复用 1 个线程 pool-1-thread-1:

    private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(2);
    

    执行后输出:

    get数据,线程名:pool-1-thread-1,数据为:Data(id=1)
    get数据,线程名:pool-1-thread-2,数据为:Data(id=2)
    

    因此可以得出结论:线程在 init 初始化的时候,才会去同步一份最新数据过来。

    在实际开发中,多线程就离不开线程池 的使用,因为线程池能够 复用线程,减少线程的频繁创建与销毁。倘若合格时候使用 InheritableThreadLocal 来传递数据,那么线程池中的线程拷贝的数据始终来自于 第一个提交任务的外部线程,这样非常容易造成 线程本地变量混乱,这种错误是致命的。

    4. TransmittableThreadLocal 使用和原理

    4.1. TransmittableThreadLocal 使用

    TransmittableThreadLocal(TTL) 是阿里巴巴开源的专门解决 InheritableThreadLocal 的局限性,实现线程本地变量在线程池的执行过程中,能正常的访问父线程设置的线程变量。

    TTL官网:github.com/alibaba/tra…

    
        com.alibaba
        transmittable-thread-local
        2.14.2
    
    

    我们对第三章3.2的代码稍作改动:

    public class Test {
    
        private static final ThreadLocal THREAD_LOCAL = new TransmittableThreadLocal();
        private static final ExecutorService THREAD_POOL = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());
    
    
        public static void main(String[] args) throws InterruptedException {
    
            Test test = new Test();
    
            Data data1 =  new Data();
            THREAD_LOCAL.set(data1);
    
            THREAD_POOL.execute(test::getAndPrintData);
            TimeUnit.SECONDS.sleep(2);
            Data data2 =  new Data();
            data2.setId(2);
            THREAD_LOCAL.set(data2); // 给线程重新绑定值
    
            THREAD_POOL.execute(test::getAndPrintData);
            TimeUnit.SECONDS.sleep(2);
        }
    
        private void setData(Data data) {
            System.out.println("set数据,线程名:" + Thread.currentThread().getName());
            THREAD_LOCAL.set(data);
        }
    
        private Data getAndPrintData() {
            Data data = THREAD_LOCAL.get();
            System.out.println("get数据,线程名:" + Thread.currentThread().getName() + ",数据为:" + data);
            return data;
        }
    
    }
    

    执行后输出:

    get数据,线程名:pool-1-thread-1,数据为:Data(id=1)
    get数据,线程名:pool-1-thread-1,数据为:Data(id=2)
    

    复用线程 pool-1-thread-1 并没重新创建,但是数据绑定却生效了。

    如果我们跨线程池使用呢?其实仔细想想跨线程也还是父子线程的关系传递,问题的关键还是在于数据绑定的时机:

    public class Test {
    
        private static final ThreadLocal THREAD_LOCAL = new TransmittableThreadLocal();
        private static final ExecutorService THREAD_POOL_1 = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());
        private static final ExecutorService THREAD_POOL_2 = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());
    
        public static void main(String[] args) throws InterruptedException {
    
            Test test = new Test();
    
            Data data1 =  new Data();
            THREAD_LOCAL.set(data1);
    
            THREAD_POOL_1.execute(test::getAndPrintData);
            TimeUnit.SECONDS.sleep(2);
            Data data2 =  new Data();
            data2.setId(2);
            THREAD_LOCAL.set(data2); // 给线程重新绑定值
    
            THREAD_POOL_2.execute(test::getAndPrintData);
            
            THREAD_POOL_1.execute(test::getAndPrintData);
            TimeUnit.SECONDS.sleep(2);
    
            THREAD_POOL_2.execute(test::getAndPrintData);
        }
    
        private void setData(Data data) {
            System.out.println("set数据,线程名:" + Thread.currentThread().getName());
            THREAD_LOCAL.set(data);
        }
    
        private Data getAndPrintData() {
            Data data = THREAD_LOCAL.get();
            System.out.println("get数据,线程名:" + Thread.currentThread().getName() + ",数据为:" + data);
            return data;
        }
    
    }
    

    执行后输出:

    get数据,线程名:pool-1-thread-1,数据为:Data(id=1)
    get数据,线程名:pool-2-thread-1,数据为:Data(id=2)
    get数据,线程名:pool-1-thread-1,数据为:Data(id=2)
    get数据,线程名:pool-2-thread-1,数据为:Data(id=2)
    

    第一次 THREAD_POOL_2.execute(test::getAndPrintData) 的时候,其实存在两个数据绑定机制,pool-1-thread-1 线程是父线程,此时 TTL 生效数据绑定已经让 Data(id=2);接下来 pool-2-thread-1 线程是子线程创建,此时 JDK 的数据绑定依然生效,数据拷贝得到的 Data(id=2)。

    我们把上边阿里的 TTL 改回成 JDK 的:

    private static final ThreadLocal THREAD_LOCAL = new InheritableThreadLocal();
    

    执行后输出:

    get数据,线程名:pool-1-thread-1,数据为:Data(id=1)
    get数据,线程名:pool-2-thread-1,数据为:Data(id=2)
    get数据,线程名:pool-1-thread-1,数据为:Data(id=1)
    get数据,线程名:pool-2-thread-1,数据为:Data(id=2)
    

    pool-1-thread-1 自己的数据为 Data(id=1),而拷贝过去的 pool-2-thread-1,数据为 Data(id=2) 这不神奇了?

    THREAD_LOCAL.set(data) 只是对 ThreadLocalMap 进行值重新设置,线程 pool-1-thread-1 本身没有将最新的数据同步过去,而 pool-2-thread-1 因为执行了创建 init,所以把 ThreadLocalMap 浅拷贝过去后还进行了数据绑定。恰恰验证了线程在初始化的过程中才会同步最新的数据副本。

    思考:代码测试到这里,我们不由发起联想。在第三章3.2我们得到结论,线程在 init 初始化的时候,才会去同步一份最新数据过来,那阿里的 TTL 是不是在 run 的运行的时候,也专门去同步一份最新数据过来呢?

    4.2. TransmittableThreadLocal 原理

    TransmittableThreadLocal 继承于 InheritableThreadLocal,并拥有了 InheritableThreadLocal 对子线程传递上下文的特性。

    使用 TtlRunnable 包装了任务的运行,可以理解为对 Runnable 做了一层 “AOP”。被包装的 run() 方法执行异步任务之前,会使用 replay() 进行设置父线程里的本地变量给当前子线程,任务执行完毕,会调用 restore() 恢复该子线程原生的本地变量。

    在 TransmittableThreadLocal(TTL)的实现中,capture、replay 和 restore是三个关键步骤,用于在线程切换或异步任务传递时保存、恢复和设置 ThreadLocal 变量的值。它们的作用如下:

    • Capture(捕获):在任务提交或线程切换之前,TTL 会捕获当前线程的 ThreadLocal 变量值,并将其保存到一个中间结构中。这样做是为了在后续的步骤中能够访问到父线程的 ThreadLocal 值。
    • Replay(重放):在任务开始执行或切换到子线程时,TTL 会将之前捕获的 ThreadLocal 变量值从中间结构中取出,并将其设置到当前线程的 ThreadLocal 副本中。这样,子线程或异步任务就能够获取到父线程中相同的 ThreadLocal 值。
    • Restore(恢复):在任务执行完毕或线程切换回父线程时,TTL 会恢复原始的 ThreadLocal 变量值。它会将之前捕获的 ThreadLocal 值重新设置到当前线程的 ThreadLocal 中,以确保父线程的 ThreadLocal 状态不会受到影响。

    4.3. TransmittableThreadLocal 深入

    一般来说,关于线程池内异步线程上下文传递的问题,我们的思路分为几个步骤:

  • 主线程获取到自己的上下文,然后传递给任务暂存;
  • 异步线程将自己原有的上下文备份,暂时保存;
  • 异步线程通过任务,并设置主线程传递过来的上下文;
  • 异步线程执行异步任务;
  • 异步线程将自己备份的上下文设置回去;
  • TransmittableThreadLocal 主要分为三个部分:任务( TtlCallable )、线程池( ExecutorServiceTtlWrapper )、本地线程 ThreadLocal( TransmittableThreadLocal )。

    但是无论是任务和线程池,本身还是依赖于 TransmittableThreadLocal 对于存储值的管理。

    TtlCallable(载体任务)
    public final class TtlCallable implements Callable, TtlWrapper, TtlEnhanced, TtlAttachments {
    
        // 保存父线程的 ThreadLocal 快照
        private final AtomicReference capturedRef;
        // 实际执行任务
        private final Callable callable;
        // 判断是否执行完,清除任务所保存的 ThreadLocal 快照
        private final boolean releaseTtlValueReferenceAfterCall;
    
        private TtlCallable(@NonNull Callable callable, boolean releaseTtlValueReferenceAfterCall) {
    
            // 1. 创建时,通过 Transmitter 抓取快照
            this.capturedRef = new AtomicReference(capture());
            this.callable = callable;
            this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
        }
    
        @Override
        @SuppressFBWarnings("THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION")
        public V call() throws Exception {
            final Object captured = capturedRef.get();
            if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {
                throw new IllegalStateException("TTL value reference is released after call!");
            }
    
            // 2. 使用 Transmitter 将快照重做到当前执行线程,并将原来的值取出
            final Object backup = replay(captured);
            try {
                // 3. 执行任务
                return callable.call();
            } finally {
                // 4. Transmitter 重新将原值放回执行线程
                restore(backup);
            }
        }
    
        ...
    }
    

    分析下上边的部分代码,可先参考4.2章节中的原理介绍:

    AtomicReference capturedRef 保存父线程的 ThreadLocal 快照,保证任务误重用下,清除快照动作的多线程安全性。
    Callable callable 是实际执行的任务。
    boolean releaseTtlValueReferenceAfterCall 判断执行完是否清除任务保存的 ThreadLocal 快照,保证了任务执行完,依然被业务代码持有的场景下,避免 ThreadLocal 快照继续持有而造成的内存泄漏。

    任务重用的间隔之间,可能出现 ThreadLocal 值被修改的情况,那么后一次任务执行时,快照实际是不准确的,业务场景应该尽量避免这种情况出现才对。

    ExecutorServiceTtlWrapper(线程池装饰器)
    @SuppressFBWarnings({"EQ_DOESNT_OVERRIDE_EQUALS"})
    class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced {
       
     private final ExecutorService executorService;
    
        ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService, boolean idempotent) {
            super(executorService, idempotent);
            this.executorService = executorService;
        }
    
        @NonNull
        @Override
        public  Future submit(@NonNull Callable task) {
            // 确保线程池执行的是 TtlCallable 类型的任务
            return executorService.submit(TtlCallable.get(task, false, idempotent));
        }
    
        @NonNull
        @Override
        public  Future submit(@NonNull Runnable task, T result) {
            // 确保线程池执行的是 TtlRunnable 类型的任务
            return executorService.submit(TtlRunnable.get(task, false, idempotent), result);
        }
    
        ...
    }
    

    不难看出,任务其实起到了线程间传递上下文的载体作用,这个类的代码主要是对线程池的装饰器,确保执行的任务是 TtlCallable 类型或者 TtlRunnable 类型的任务。

    TransmittableThreadLocal(TTL 主体类)
    public class TransmittableThreadLocal extends InheritableThreadLocal implements TtlCopier {
    
        // 是否禁用忽略空值语义
        private final boolean disableIgnoreNullValueSemantics;
    
        public TransmittableThreadLocal() {
            this(false);
        }
    
        public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
            this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
        }
    
        ...
    
        private static final InheritableThreadLocal holder = 
                new InheritableThreadLocal() {
                    
                    @Override
                    protected WeakHashMap initialValue() {
                        return new WeakHashMap();
                    }
    
                    @Override
                    protected WeakHashMap childValue(WeakHashMap parentValue) {
                        return new WeakHashMap(parentValue);
                    }
                };
        }
        
        ...
        
        public static class Transmitter {
    
            @NonNull
            public static Object capture() { ... }
    
            @NonNull
            public static Object replay(@NonNull Object captured) { ... }
    
            @NonNull
            public static Object clear()  { ... }
    
            public static void restore(@NonNull Object backup)  { ... }
    
        
            private static class Snapshot {  
                ...
            }
        
            public interface Transmittee {
                ...
            }
        }
    
    }
    

    TransmittableThreadLocal 只继承了 InheritableThreadLocal 和实现了该框架提供的函数接口 TtlCopier。

    因此 TransmittableThreadLocal 自身是一个 InheritableThreadLocal,同样具备了线程创建时传递的特性。本质上只是为了让框架能够进行线程传递,做了一些小动作而已。

    TransmittableThreadLocal 一共两个构造函数,有参构造函数允许设置 “是否禁用忽略空值语义”。默认是开启的,表现行为是如果是 null 值,那么 TransmittableThreadLocal 是不会传递这个值,并且如果 set null,同时执行 remove 操作。这样设计可能是因为一开始设计服务于业务,是希望业务不要通过 NULL 来表达任何含义,同时避免 NPE 和优化 GC。

    TtlCopier

    @FunctionalInterface
    public interface TtlCopier {
    
        T copy(T parentValue);
    }
    

    TransmittableThreadLocal 实现了一个类,TtlCopier。顾名思义,该类定义了线程传递时,值复制的抽象语义。而 TransmittableThreadLocal 的默认实现是与 InheritableThreadLocal 相同的,返回值的引用。

    public T copy(T parentValue) {
        return parentValue;
    }
    

    同时,该接口也为业务方留下了扩展点。开发者可以重写该方法,来定义线程传递时,如何进行值的复制。

    InheritableThreadLocal holder

    private static final InheritableThreadLocal holder = 
            new InheritableThreadLocal() {
    
                @Override
                protected WeakHashMap initialValue() {
                    return new WeakHashMap();
                }
    
                @Override
                protected WeakHashMap childValue(WeakHashMap parentValue) {
                    return new WeakHashMap(parentValue);
                }
            };
        }
    

    TransmittableThreadLocal 内部维护了一个非常关键的属性,用来注册项目中维护的 TransmittableThreadLocal,从而保证 Transmitter 去正确传递 ThreadLocal 的值。

    holder 是一个 InheritableThreadLocal,用来保存所有注册的 TransmittableThreadLocal。父子线程传递时,可以直接将父线程的注册表传递过来。使用 InheritableThreadLocal,主要保证了嵌套线程场景下,注册表的正确传递。

    其次,存储的是 WeakHashMap ,value 都是无意义的 null,并且永远不会被使用。这样一来,保证项目使用 TransmittableThreadLocal 的话,不会引入新的内存泄漏问题。其内存泄漏的可能风险,就只完全来自于 InheritableThreadLocal 本身。

    Transmitter

    public static class Transmitter {
    
        @NonNull
        public static Object capture() { ... }
    
        @NonNull
        public static Object replay(@NonNull Object captured) { ... }
    
        public static void restore(@NonNull Object backup)  { ... }
    
        @NonNull
        public static Object clear()  { ... }
        
    
        private static class Snapshot {  
            ...
        }
    
        public interface Transmittee {
            ...
        }
    }
    

    我们讲到 TransmittableThreadLocal 会将有值的对象,注册到 holder 中,以便 Transmitter 去知道传递哪一些实例的值。但是如果这样,那不是都要修改代码,将项目中的 ThreadLocal 都改掉吗?

    这当然不可能,因此 Transmitter 承担了这个任务,允许业务代码将原有的 ThreadLocal 注册进来,以方便 Transmitter 来识别和传递。capture、replay 和 restore 是三个关键步骤,用于在线程切换或异步任务传递时保存、恢复和设置 ThreadLocal 变量的值。

    1. 抓取当前线程的值快照(capture)

    @NonNull
    public static Object capture() {
        final HashMap transmittee2Value = newHashMap(transmitteeSet.size());
        for (Transmittee transmittee : transmitteeSet) {
            try {
                transmittee2Value.put(transmittee, transmittee.capture());
            } catch (Throwable t) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "exception when Transmitter.capture for transmittee " + transmittee +
                            "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);
                }
            }
        }
        return new Snapshot(transmittee2Value);
    }
    

    2. 将快照重做到执行线程(replay)

    @NonNull
    public static Object replay(@NonNull Object captured) {
        final Snapshot capturedSnapshot = (Snapshot) captured;
    
        final HashMap transmittee2Value = newHashMap(capturedSnapshot.transmittee2Value.size());
        for (Map.Entry entry : capturedSnapshot.transmittee2Value.entrySet()) {
            Transmittee transmittee = entry.getKey();
            try {
                Object transmitteeCaptured = entry.getValue();
                transmittee2Value.put(transmittee, transmittee.replay(transmitteeCaptured));
            } catch (Throwable t) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "exception when Transmitter.replay for transmittee " + transmittee +
                            "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);
                }
            }
        }
        return new Snapshot(transmittee2Value);
    }
    

    3. 恢复备份的原快照(restore)

    public static void restore(@NonNull Object backup) {
        for (Map.Entry entry : ((Snapshot) backup).transmittee2Value.entrySet()) {
            Transmittee transmittee = entry.getKey();
            try {
                Object transmitteeBackup = entry.getValue();
                transmittee.restore(transmitteeBackup);
            } catch (Throwable t) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "exception when Transmitter.restore for transmittee " + transmittee +
                            "(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);
                }
            }
        }
    }
    
    DisableInheritableThreadFactoryWrapper(自定义值传递)
    class DisableInheritableThreadFactoryWrapper implements DisableInheritableThreadFactory {
        
        private final ThreadFactory threadFactory;
    
        DisableInheritableThreadFactoryWrapper(@NonNull ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
        }
    
        @Override
        public Thread newThread(@NonNull Runnable r) {
       
            // 调用了 Transmitter 的 clear 方法,在创建子线程前,清除当前线程的值并保存下来
            final Object backup = clear();
            try {
                return threadFactory.newThread(r);
            } finally {
                // 创建完,再重新恢复。以此,避免了值的继承传递
                restore(backup);
            }
        }
    
    }
    

    如果注意到 TransmittableThreadLocal 是继承 InheritableThreadLocal,就应该知道,子线程创建时,值还是会被传递过去,这也就可能带来内存泄漏问题。所以,同时提供 DisableInheritableThreadFactoryWrapper,以方便业务代码自定义线程池,禁止值的继承传递。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论