Spring Boot 自定義線程池實(shí)現(xiàn)異步開(kāi)發(fā)相信看過(guò)陳某的文章都了解,但是在實(shí)際開(kāi)發(fā)中需要在父子線程之間傳遞一些數(shù)據(jù),比如用戶(hù)信息,鏈路信息等等
比如用戶(hù)登錄信息使用ThreadLocal存放保證線程隔離,代碼如下:
/** *@description用戶(hù)上下文信息 */ publicclassOauthContext{ privatestaticfinalThreadLocalloginValThreadLocal=newThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
那么子線程想要獲取這個(gè)LoginVal如何做呢?
今天就來(lái)介紹幾種優(yōu)雅的方式實(shí)現(xiàn)Spring Boot 內(nèi)部的父子線程的數(shù)據(jù)傳遞。
1. 手動(dòng)設(shè)置
每執(zhí)行一次異步線程都要分為兩步:
獲取父線程的LoginVal
將LoginVal設(shè)置到子線程,達(dá)到復(fù)用
代碼如下:
publicvoidhandlerAsync(){ //1.獲取父線程的loginVal LoginValloginVal=OauthContext.get(); log.info("父線程的值:{}",OauthContext.get()); CompletableFuture.runAsync(()->{ //2.設(shè)置子線程的值,復(fù)用 OauthContext.set(loginVal); log.info("子線程的值:{}",OauthContext.get()); }); }
雖然能夠?qū)崿F(xiàn)目的,但是每次開(kāi)異步線程都需要手動(dòng)設(shè)置,重復(fù)代碼太多,看了頭疼,你認(rèn)為優(yōu)雅嗎?
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶(hù)小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶(hù)、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
2. 線程池設(shè)置TaskDecorator
TaskDecorator是什么?官方api的大致意思:這是一個(gè)執(zhí)行回調(diào)方法的裝飾器,主要應(yīng)用于傳遞上下文,或者提供任務(wù)的監(jiān)控/統(tǒng)計(jì)信息。
知道有這么一個(gè)東西,如何去使用?
TaskDecorator是一個(gè)接口,首先需要去實(shí)現(xiàn)它,代碼如下:
/** *@description上下文裝飾器 */ publicclassContextTaskDecoratorimplementsTaskDecorator{ @Override publicRunnabledecorate(Runnablerunnable){ //獲取父線程的loginVal LoginValloginVal=OauthContext.get(); return()->{ try{ //將主線程的請(qǐng)求信息,設(shè)置到子線程中 OauthContext.set(loginVal); //執(zhí)行子線程,這一步不要忘了 runnable.run(); }finally{ //線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 OauthContext.clear(); } }; } }
這里我只是設(shè)置了LoginVal,實(shí)際開(kāi)發(fā)中其他的共享數(shù)據(jù),比如SecurityContext,RequestAttributes....
TaskDecorator需要結(jié)合線程池使用,實(shí)際開(kāi)發(fā)中異步線程建議使用線程池,只需要在對(duì)應(yīng)的線程池配置一下,代碼如下:
@Bean("taskExecutor") publicThreadPoolTaskExecutortaskExecutor(){ ThreadPoolTaskExecutorpoolTaskExecutor=newThreadPoolTaskExecutor(); poolTaskExecutor.setCorePoolSize(xx); poolTaskExecutor.setMaxPoolSize(xx); //設(shè)置線程活躍時(shí)間(秒) poolTaskExecutor.setKeepAliveSeconds(xx); //設(shè)置隊(duì)列容量 poolTaskExecutor.setQueueCapacity(xx); //設(shè)置TaskDecorator,用于解決父子線程間的數(shù)據(jù)復(fù)用 poolTaskExecutor.setTaskDecorator(newContextTaskDecorator()); poolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy()); //等待所有任務(wù)結(jié)束后再關(guān)閉線程池 poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); returnpoolTaskExecutor; }
此時(shí)業(yè)務(wù)代碼就不需要去設(shè)置子線程的值,直接使用即可,代碼如下:
publicvoidhandlerAsync(){ log.info("父線程的用戶(hù)信息:{}",OauthContext.get()); //執(zhí)行異步任務(wù),需要指定的線程池 CompletableFuture.runAsync(()->log.info("子線程的用戶(hù)信息:{}",OauthContext.get()),taskExecutor); }
來(lái)看一下結(jié)果,如下圖:
這里使用的是CompletableFuture執(zhí)行異步任務(wù),使用@Async這個(gè)注解同樣是可行的。
注意 :無(wú)論使用何種方式,都需要指定線程池
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶(hù)小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶(hù)、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/yudao-cloud
3. InheritableThreadLocal
這種方案不建議使用,InheritableThreadLocal雖然能夠?qū)崿F(xiàn)父子線程間的復(fù)用,但是在線程池中使用會(huì)存在復(fù)用的問(wèn)題。
這種方案使用也是非常簡(jiǎn)單,直接用InheritableThreadLocal替換ThreadLocal即可,代碼如下:
/** *@description用戶(hù)上下文信息 */ publicclassOauthContext{ privatestaticfinalInheritableThreadLocalloginValThreadLocal=newInheritableThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
4. TransmittableThreadLocal
TransmittableThreadLocal是阿里開(kāi)源的工具,彌補(bǔ)了InheritableThreadLocal的缺陷,在使用線程池等會(huì)池化復(fù)用線程的執(zhí)行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執(zhí)行時(shí)上下文傳遞的問(wèn)題。
使用起來(lái)也是非常簡(jiǎn)單,添加依賴(lài)如下:
com.alibaba transmittable-thread-local 2.14.2
OauthContext改造代碼如下:
/** *@description用戶(hù)上下文信息 */ publicclassOauthContext{ privatestaticfinalTransmittableThreadLocalloginValThreadLocal=newTransmittableThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
TransmittableThreadLocal原理
從定義來(lái)看,TransimittableThreadLocal繼承于InheritableThreadLocal,并實(shí)現(xiàn)TtlCopier接口,它里面只有一個(gè)copy方法。所以主要是對(duì)InheritableThreadLocal的擴(kuò)展。
publicclassTransmittableThreadLocalextendsInheritableThreadLocal implementsTtlCopier
在TransimittableThreadLocal中添加holder屬性。這個(gè)屬性的作用就是被標(biāo)記為具備線程傳遞資格的對(duì)象都會(huì)被添加到這個(gè)對(duì)象中。
要標(biāo)記一個(gè)類(lèi),比較容易想到的方式,就是給這個(gè)類(lèi)新增一個(gè)Type字段,還有一個(gè)方法就是將具備這種類(lèi)型的的對(duì)象都添加到一個(gè)靜態(tài)全局集合中。之后使用時(shí),這個(gè)集合里的所有值都具備這個(gè)標(biāo)記。
//1.holder本身是一個(gè)InheritableThreadLocal對(duì)象 //2.這個(gè)holder對(duì)象的value是WeakHashMap,?> //2.1WeekHashMap的value總是null,且不可能被使用。 //2.2WeekHasshMap支持value=null privatestaticInheritableThreadLocal ,?>>holder=newInheritableThreadLocal ,?>>(){ @Override protectedWeakHashMap ,?>initialValue(){ returnnewWeakHashMap ,Object>(); } /** *重寫(xiě)了childValue方法,實(shí)現(xiàn)上直接將父線程的屬性作為子線程的本地變量對(duì)象。 */ @Override protectedWeakHashMap ,?>childValue(WeakHashMap ,?>parentValue){ returnnewWeakHashMap ,Object>(parentValue); } };
應(yīng)用代碼是通過(guò)TtlExecutors工具類(lèi)對(duì)線程池對(duì)象進(jìn)行包裝。工具類(lèi)只是簡(jiǎn)單的判斷,輸入的線程池是否已經(jīng)被包裝過(guò)、非空校驗(yàn)等,然后返回包裝類(lèi)ExecutorServiceTtlWrapper。根據(jù)不同的線程池類(lèi)型,有不同和的包裝類(lèi)。
@Nullable publicstaticExecutorServicegetTtlExecutorService(@NullableExecutorServiceexecutorService){ if(TtlAgent.isTtlAgentLoaded()||executorService==null||executorServiceinstanceofTtlEnhanced){ returnexecutorService; } returnnewExecutorServiceTtlWrapper(executorService); }
進(jìn)入包裝類(lèi)ExecutorServiceTtlWrapper。可以注意到不論是通過(guò)ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都會(huì)將線程對(duì)象包裝成TtlCallable或者TtlRunnable,用于在真正執(zhí)行run方法前做一些業(yè)務(wù)邏輯。
/** *在ExecutorServiceTtlWrapper實(shí)現(xiàn)submit方法 */ @NonNull @Override publicFuture submit(@NonNullCallable task){ returnexecutorService.submit(TtlCallable.get(task)); } /** *在ExecutorTtlWrapper實(shí)現(xiàn)execute方法 */ @Override publicvoidexecute(@NonNullRunnablecommand){ executor.execute(TtlRunnable.get(command)); }
所以,重點(diǎn)的核心邏輯應(yīng)該是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable為例,TtlRunnable同理類(lèi)似。在分析call()方法之前,先看一個(gè)類(lèi)Transmitter
publicstaticclassTransmitter{ /** *捕獲當(dāng)前線程中的是所有TransimittableThreadLocal和注冊(cè)ThreadLocal的值。 */ @NonNull publicstaticObjectcapture(){ returnnewSnapshot(captureTtlValues(),captureThreadLocalValues()); } /** *捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。 */ privatestaticHashMap,Object>captureTtlValues(){ HashMap ,Object>ttl2Value= newHashMap ,Object>(); for(TransmittableThreadLocal
進(jìn)入TtlCallable#call()方法。
@Override publicVcall()throwsException{ Objectcaptured=capturedRef.get(); if(captured==null||releaseTtlValueReferenceAfterCall&& !capturedRef.compareAndSet(captured,null)){ thrownewIllegalStateException("TTLvaluereferenceisreleasedaftercall!"); } //調(diào)用replay方法將捕獲到的當(dāng)前線程的本地變量,傳遞給線程池線程的本地變量, //并且獲取到線程池線程覆蓋之前的本地變量副本。 Objectbackup=replay(captured); try{ //線程方法調(diào)用 returncallable.call(); }finally{ //使用副本進(jìn)行恢復(fù)。 restore(backup); } }
到這基本上線程池方式傳遞本地變量的核心代碼已經(jīng)大概看完了。總的來(lái)說(shuō)在創(chuàng)建TtlCallable對(duì)象是,調(diào)用capture()方法捕獲調(diào)用方的本地線程變量,在call()執(zhí)行時(shí),將捕獲到的線程變量,替換到線程池所對(duì)應(yīng)獲取到的線程的本地變量中,并且在執(zhí)行完成之后,將其本地變量恢復(fù)到調(diào)用之前。
總結(jié)
上述列舉了4種方案,陳某這里推薦方案2和方案4,其中兩種方案的缺點(diǎn)非常明顯,實(shí)際開(kāi)發(fā)中也是采用的方案2或者方案4。
-
接口
+關(guān)注
關(guān)注
33文章
8691瀏覽量
151920 -
spring
+關(guān)注
關(guān)注
0文章
340瀏覽量
14390 -
Boot
+關(guān)注
關(guān)注
0文章
150瀏覽量
35946 -
線程
+關(guān)注
關(guān)注
0文章
505瀏覽量
19758 -
數(shù)據(jù)傳遞
+關(guān)注
關(guān)注
1文章
3瀏覽量
1770
原文標(biāo)題:用這4招 優(yōu)雅的實(shí)現(xiàn)Spring Boot 異步線程間數(shù)據(jù)傳遞
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論