本文共 8853 字,大约阅读时间需要 29 分钟。
项目中异步处理方法获取用户对象,发现获取不到,发现用户对象是ThreadLocal进行传递,但是异步属于子线程,ThreadLocal对父子线程传递有问题的,后来将对象的传递值改为了TransmittableThreadLocal进行传递。
public static TransmittableThreadLocal
ThreadLocal 大家都熟知它作为本地线程传递,原理是Thread中存在ThreadLocal.ThreadLocalMap threadLocals 变量的ThreadLocal,而再看ThreadLocal的get set方法,都是获取当前的线程,然后进行对当前线程中的ThreadLocalMap变量进行赋值或者取值,这样便实现了线程之间数据传递。
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }
但是当子线程要获取父进程中的ThreadLocal值,得到的确是null,原因是子线程ThreadLocal调用get方法时的当前线程是子线程,所以取不到父线程的值。
这时InheritableThreadLocal就出现了,进行了父子线程之间的数据传递。具体实现原理如下:
Thread中用threadLocals、inheritableThreadLocals两个ThreadLocal.ThreadLocalMap类型的变量
ThreadLocal.ThreadLocalMap threadLocals = null; /* * InheritableThreadLocal values pertaining to this thread. This map is * maintained by the InheritableThreadLocal class. */ ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
而InheritableThreadLocal中重写了getmap createmap方法,分别赋值、取值于thread中的子线程inheritableThreadLocals 变量的ThreadLocalMap。当InheritableThreadLocal进行get set时,getmap方法获取的就行子线程变量inheritableThreadLocals的值
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals;}void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);}
而父线程是如何把数据传递给子线程的哪?
可以看到Thread创建时,将parent的map赋值copy给了当前线程中的inheritableThreadLocals。而copy过程都是浅copy,key,value都是引用地址的赋值
public Thread() { init(null, null, "Thread-" + nextThreadNum(), 0); }追踪下去init方法中this.group = g; this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext(); this.target = target; setPriority(priority); if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* Set thread ID */ tid = nextThreadID();//ThreadLocal.createInheritedMap追踪下去private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal
到此就理解InheritableThreadLocal是怎样实现父子线程之间数据传递的,但是InheritableThreadLocal有没有其他问题哪?
线程池的场景中,线程是共享的,这样线程中的inheritableThreadLocals的便也共享了。下面的demo代码(参考的网上的)输出结果可以看出inheritableThreadLocals的值是共享了的,另外一个线程执行时没有获取到父线程中的对应的值。
输出结果:
xiexiexie
======== xiexiexie zhangzhangzhang ======== zhangzhangzhang zhangzhangzhang ======== liuliuliu上述原因是线程之间共享了,如果解决那,就是子线程在生命周期结束时clear清空子线程中的Threadlocal值。
这时Alibaba的TransmittableThreadLocal便出现了。
输出结果:
xiexiexie ======== xiexiexie zhangzhangzhang ======== xiexiexie zhangzhangzhang ======== xiexiexie
跟上面的类似的demo一样,输出结果子线程便是从父线程中获取了,TransmittableThreadLocal是采用了装饰模式进行处理的,使用线程的情况,也需要transmittable-thread-local包中相应的类进行装饰使用。详细使用说明可以参考其官网:
针对更新父线程中的值,因为Thread创建时,子线程中的值就是从父线程中浅copy过来的,所以获取其对象的引用后,直接对象的修改,便可以实现,可以看上面截图的修改。
学习理解中主要参考了,讲解的比较好
后续补充:
今天线上出了问题,feign之间调用老是报登录失效,moduleA异步方法中feign调用moduleB,B进行校验,理论上A模块登录了,B中登录校验是没问题。继续追查,发现moduleA中其他人增加了异步线程池,校验用户是用的TransmittableThreadLocal进行存储。
异步线程池配置如下:
public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { //定义一个最大为10个线程数量的线程池 ExecutorService service = Executors.newFixedThreadPool(10); return service; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { // 捕获异步线程的异常,统一处理 @Override public void handleUncaughtException(Throwable throwable, Method method, Object... objects) { log.error("asyncExceptionHandler, exception message:{},method name:{}", throwable.getMessage(), method.getName()); for (Object param : objects) { log.info("exception method parameter:{}", JacksonUtils.toJSONString(objects)); } throwable.printStackTrace(); } }}
而针对线程池共用线程,子线程在生命周期结束时需要clear清空子线程中的Threadlocal值,否则会出现共享线程池中threadlocal数据混乱(本次用户登录校验失败的原因),而TransmittableThreadLocal针对线程池线程数据执行完后不共享,则需要3种处理办法
Runnable
和Callable
Java Agent
来修饰JDK
线程池实现类具体使用办法参看阿里云的技术说明文档:
我们进行修改采用的装饰线程池办法:
service = TtlExecutors.getTtlExecutorService(service);
同时自己写了test类,输出结果能验证线程池的共享
@ActiveProfiles("local")//@RunWith(SpringRunner.class)@SpringBootTest(classes= ThreadTest.class)@Slf4jpublic class ThreadTest { @Autowired private IntegralService integralService; @Test public void testThread() throws InterruptedException { for(int i=0;i<10;i++){ MyThread m = new MyThread(); m.setIndex(i); m.setMyName("thread"+i); m.start(); Thread.sleep(1000); } } ExecutorService service = Executors.newFixedThreadPool(3); public class MyThread extends Thread{ public int index ; public void run(){ BaseContextHandler.setToken("thread"+index); System.out.println("===thread====="+index+"===token====="+BaseContextHandler.getToken()); ays();// exeAys(); ttAys();// BaseContextHandler.setToken("thread"+index+index); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("TTTTaccessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName()); } public void setIndex(int index) { this.index = index; } public void setMyName (String name){ this.setName(name); } } @Async public void ays(){ System.out.println("accessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName()); } @Async public void exeAys(){ service.execute(new AysThread()); } public class AysThread extends Thread{ public void run(){ System.out.println("AAAAAAaccessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName()); } } ExecutorService serviceT = Executors.newFixedThreadPool(3); @Async public void ttAys(){ serviceT = TtlExecutors.getTtlExecutorService(serviceT) ; serviceT.execute(new TtThread()); } public class TtThread extends Thread { public void run(){ System.out.println("TTaccessToken:"+ BaseContextHandler.getToken()+"-----------------thread:"+Thread.currentThread().getName()); BaseContextHandler.setToken("ZZZZ:"+BaseContextHandler.getToken()+"ZZZ====================="); } }}
另外一种使用线程池的情况,feign调用,如果使用了ttl,需要注意,feign调用红pom引入httpclient或者OKhttp便会使用线程池,这样使用线程池会有线程共享的问题,需要对线程进行ttl装饰,否则threadlocal中数据会混乱
(Feign-使用HttpClient和OkHttp可以参考:)
转载地址:http://rgadi.baihongyu.com/