博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程之间数据传递ThreadLocal,InheritableThreadLocal,TransmittableThreadLocal
阅读量:4041 次
发布时间:2019-05-24

本文共 8853 字,大约阅读时间需要 29 分钟。

项目中异步处理方法获取用户对象,发现获取不到,发现用户对象是ThreadLocal进行传递,但是异步属于子线程,ThreadLocal对父子线程传递有问题的,后来将对象的传递值改为了TransmittableThreadLocal进行传递。

public static TransmittableThreadLocal
> transmittableThreadLocal = new TransmittableThreadLocal() ;

ThreadLocal 大家都熟知它作为本地线程传递,原理是Thread中存在ThreadLocal.ThreadLocalMap threadLocals 变量的ThreadLocal,而再看ThreadLocal的get set方法,都是获取当前的线程,然后进行对当前线程中的ThreadLocalMap变量进行赋值或者取值,这样便实现了线程之间数据传递。

public T get() {        Thread t = Thread.currentThread();        ThreadLocalMap map = getMap(t);        if (map != null) {            ThreadLocalMap.Entry e = map.getEntry(this);            if (e != null) {                @SuppressWarnings("unchecked")                T result = (T)e.value;                return result;            }        }        return setInitialValue();    }    public void set(T value) {        Thread t = Thread.currentThread();        ThreadLocalMap map = getMap(t);        if (map != null)            map.set(this, value);        else            createMap(t, value);    }

但是当子线程要获取父进程中的ThreadLocal值,得到的确是null,原因是子线程ThreadLocal调用get方法时的当前线程是子线程,所以取不到父线程的值。

这时InheritableThreadLocal就出现了,进行了父子线程之间的数据传递。具体实现原理如下:

Thread中用threadLocals、inheritableThreadLocals两个ThreadLocal.ThreadLocalMap类型的变量

ThreadLocal.ThreadLocalMap threadLocals = null;    /*     * InheritableThreadLocal values pertaining to this thread. This map is     * maintained by the InheritableThreadLocal class.     */    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

而InheritableThreadLocal中重写了getmap createmap方法,分别赋值、取值于thread中的子线程inheritableThreadLocals 变量的ThreadLocalMap。当InheritableThreadLocal进行get set时,getmap方法获取的就行子线程变量inheritableThreadLocals的值

public T get() {        Thread t = Thread.currentThread();        ThreadLocalMap map = getMap(t);        if (map != null) {            ThreadLocalMap.Entry e = map.getEntry(this);            if (e != null) {                @SuppressWarnings("unchecked")                T result = (T)e.value;                return result;            }        }        return setInitialValue();    }ThreadLocalMap getMap(Thread t) {   return t.inheritableThreadLocals;}void createMap(Thread t, T firstValue) {    t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);}

而父线程是如何把数据传递给子线程的哪?

可以看到Thread创建时,将parent的map赋值copy给了当前线程中的inheritableThreadLocals。而copy过程都是浅copy,key,value都是引用地址的赋值

public Thread() {        init(null, null, "Thread-" + nextThreadNum(), 0);    }追踪下去init方法中this.group = g;        this.daemon = parent.isDaemon();        this.priority = parent.getPriority();        if (security == null || isCCLOverridden(parent.getClass()))            this.contextClassLoader = parent.getContextClassLoader();        else            this.contextClassLoader = parent.contextClassLoader;        this.inheritedAccessControlContext =                acc != null ? acc : AccessController.getContext();        this.target = target;        setPriority(priority);        if (parent.inheritableThreadLocals != null)            this.inheritableThreadLocals =                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);        /* Stash the specified stack size in case the VM cares */        this.stackSize = stackSize;        /* Set thread ID */        tid = nextThreadID();//ThreadLocal.createInheritedMap追踪下去private ThreadLocalMap(ThreadLocalMap parentMap) {            Entry[] parentTable = parentMap.table;            int len = parentTable.length;            setThreshold(len);            table = new Entry[len];            for (int j = 0; j < len; j++) {                Entry e = parentTable[j];                if (e != null) {                    @SuppressWarnings("unchecked")                    ThreadLocal key = (ThreadLocal) e.get();                    if (key != null) {                        Object value = key.childValue(e.value);                        Entry c = new Entry(key, value);                        int h = key.threadLocalHashCode & (len - 1);                        while (table[h] != null)                            h = nextIndex(h, len);                        table[h] = c;                        size++;                    }                }            }        }

到此就理解InheritableThreadLocal是怎样实现父子线程之间数据传递的,但是InheritableThreadLocal有没有其他问题哪?

线程池的场景中,线程是共享的,这样线程中的inheritableThreadLocals的便也共享了。下面的demo代码(参考的网上的)输出结果可以看出inheritableThreadLocals的值是共享了的,另外一个线程执行时没有获取到父线程中的对应的值。

输出结果:

xiexiexie

========
xiexiexie
zhangzhangzhang
========
zhangzhangzhang
zhangzhangzhang
========
liuliuliu

上述原因是线程之间共享了,如果解决那,就是子线程在生命周期结束时clear清空子线程中的Threadlocal值。

这时Alibaba的TransmittableThreadLocal便出现了。

输出结果:

xiexiexie

========
xiexiexie
zhangzhangzhang
========
xiexiexie
zhangzhangzhang
========
xiexiexie
 

跟上面的类似的demo一样,输出结果子线程便是从父线程中获取了,TransmittableThreadLocal是采用了装饰模式进行处理的,使用线程的情况,也需要transmittable-thread-local包中相应的类进行装饰使用。详细使用说明可以参考其官网:

针对更新父线程中的值,因为Thread创建时,子线程中的值就是从父线程中浅copy过来的,所以获取其对象的引用后,直接对象的修改,便可以实现,可以看上面截图的修改。

学习理解中主要参考了,讲解的比较好

 

后续补充:

今天线上出了问题,feign之间调用老是报登录失效,moduleA异步方法中feign调用moduleB,B进行校验,理论上A模块登录了,B中登录校验是没问题。继续追查,发现moduleA中其他人增加了异步线程池,校验用户是用的TransmittableThreadLocal进行存储。

异步线程池配置如下:

public class AsyncConfig implements AsyncConfigurer {    @Override    public Executor getAsyncExecutor() {        //定义一个最大为10个线程数量的线程池        ExecutorService service = Executors.newFixedThreadPool(10);        return service;    }    @Override    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return new AsyncExceptionHandler();    }    class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {        // 捕获异步线程的异常,统一处理        @Override        public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {            log.error("asyncExceptionHandler, exception message:{},method name:{}", throwable.getMessage(), method.getName());            for (Object param : objects) {                log.info("exception method parameter:{}", JacksonUtils.toJSONString(objects));            }            throwable.printStackTrace();        }    }}

而针对线程池共用线程,子线程在生命周期结束时需要clear清空子线程中的Threadlocal值,否则会出现共享线程池中threadlocal数据混乱(本次用户登录校验失败的原因),而TransmittableThreadLocal针对线程池线程数据执行完后不共享,则需要3种处理办法

2.1 修饰RunnableCallable

2.2 修饰线程池

2.3 使用Java Agent来修饰JDK线程池实现类

具体使用办法参看阿里云的技术说明文档:

我们进行修改采用的装饰线程池办法:

service = TtlExecutors.getTtlExecutorService(service);

 

同时自己写了test类,输出结果能验证线程池的共享

@ActiveProfiles("local")//@RunWith(SpringRunner.class)@SpringBootTest(classes= ThreadTest.class)@Slf4jpublic class ThreadTest {    @Autowired    private IntegralService integralService;    @Test    public void testThread() throws InterruptedException {        for(int i=0;i<10;i++){            MyThread m = new MyThread();            m.setIndex(i);            m.setMyName("thread"+i);            m.start();            Thread.sleep(1000);        }    }    ExecutorService service = Executors.newFixedThreadPool(3);    public class MyThread extends Thread{        public int index ;        public void run(){            BaseContextHandler.setToken("thread"+index);            System.out.println("===thread====="+index+"===token====="+BaseContextHandler.getToken());            ays();//            exeAys();            ttAys();//            BaseContextHandler.setToken("thread"+index+index);            try {                Thread.sleep(500);            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("TTTTaccessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName());        }        public void setIndex(int index) {            this.index = index;        }        public void setMyName (String name){            this.setName(name);        }    }    @Async    public void ays(){        System.out.println("accessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName());    }    @Async    public void exeAys(){        service.execute(new AysThread());    }    public class AysThread extends Thread{        public void run(){            System.out.println("AAAAAAaccessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName());        }    }    ExecutorService serviceT = Executors.newFixedThreadPool(3);    @Async    public void ttAys(){        serviceT = TtlExecutors.getTtlExecutorService(serviceT) ;        serviceT.execute(new TtThread());    }    public class TtThread extends Thread {        public void run(){            System.out.println("TTaccessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName());            BaseContextHandler.setToken("ZZZZ:"+BaseContextHandler.getToken()+"ZZZ=====================");        }    }}

另外一种使用线程池的情况,feign调用,如果使用了ttl,需要注意,feign调用红pom引入httpclient或者OKhttp便会使用线程池,这样使用线程池会有线程共享的问题,需要对线程进行ttl装饰,否则threadlocal中数据会混乱

(Feign-使用HttpClient和OkHttp可以参考:)

 

 

 

转载地址:http://rgadi.baihongyu.com/

你可能感兴趣的文章
56个民族的C++ map定义
查看>>
Yotta企业云盘如何实现保护文件安全
查看>>
区块链技术让Yotta企业云盘为行政事业服务助力
查看>>
yotta企业云盘助力制造行业创高峰
查看>>
Yotta企业云盘更好的为媒体广告业服务
查看>>
Yotta企业云盘助力旅游行业新发展
查看>>
Yotta企业云盘助力科技行业创高峰
查看>>
Yotta企业云盘更好地为教育行业服务
查看>>
Yotta企业云盘怎么帮助到能源化工行业
查看>>
企业云盘如何助力商业新发展
查看>>
医疗行业运用企业云盘可以带来什么样的提升
查看>>
教育数字智能化能为现有体系带来新的起点
查看>>
媒体广告业如何将内容资产进行高效地综合管理与利用
查看>>
能源化工要怎么管控核心数据
查看>>
制药医疗使用云盘能带来什么样的好处
查看>>
媒体广告业如何运用云盘提升效率
查看>>
企业如何运用企业云盘进行数字化转型-实现新发展
查看>>
司法如何运用电子智能化加快现代化建设
查看>>
设计行业运用企业云盘能带来什么样的变化
查看>>
如何运用企业云盘助力企业数字化新发展
查看>>