特別說(shuō)明CountDownLatch
CountDownLatch的用法
CountDownLatch(num) 簡(jiǎn)單說(shuō)明
主線程:mainThreadLatch.await() 和mainThreadLatch.countDown()
子線程:rollBackLatch.await() 和rollBackLatch.countDown()
為什么所有的子線程會(huì)在一瞬間就被所有都釋放了?
事務(wù)的回滾是怎么結(jié)合進(jìn)去的?
主線程類Entry
子線程類WorkThread
代碼實(shí)際運(yùn)用踩坑!!!!
特別說(shuō)明CountDownLatch
CountDownLatch是一個(gè)類springboot自帶的類,可以直接用 ,變量AtomicBoolean 也是可以直接使用
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
視頻教程:https://doc.iocoder.cn/video/
CountDownLatch的用法
CountDownLatch典型用法:
1、某一線程在開(kāi)始運(yùn)行前等待n個(gè)線程執(zhí)行完畢。 將CountDownLatch的計(jì)數(shù)器初始化為new CountDownLatch(n),每當(dāng)一個(gè)任務(wù)線程執(zhí)行完畢,就將計(jì)數(shù)器減1 countdownLatch.countDown(),當(dāng)計(jì)數(shù)器的值變?yōu)?時(shí),在CountDownLatch上await()的線程就會(huì)被喚醒。一個(gè)典型應(yīng)用場(chǎng)景就是啟動(dòng)一個(gè)服務(wù)時(shí),主線程需要等待多個(gè)組件加載完畢,之后再繼續(xù)執(zhí)行。
2、實(shí)現(xiàn)多個(gè)線程開(kāi)始執(zhí)行任務(wù)的最大并行性。 注意是并行性,不是并發(fā),強(qiáng)調(diào)的是多個(gè)線程在某一時(shí)刻同時(shí)開(kāi)始執(zhí)行。類似于賽跑,將多個(gè)線程放到起點(diǎn),等待發(fā)令槍響,然后同時(shí)開(kāi)跑。做法是初始化一個(gè)共享的CountDownLatch(1),將其計(jì)算器初始化為1,多個(gè)線程在開(kāi)始執(zhí)行任務(wù)前首先countdownlatch.await(),當(dāng)主線程調(diào)用countDown()時(shí),計(jì)數(shù)器變?yōu)?,多個(gè)線程同時(shí)被喚醒。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/yudao-cloud
視頻教程:https://doc.iocoder.cn/video/
CountDownLatch(num) 簡(jiǎn)單說(shuō)明
new 一個(gè) CountDownLatch(num) 對(duì)象
建立對(duì)象的時(shí)候 num 代表的是需要等待 num 個(gè)線程
//?建立對(duì)象的時(shí)候?num?代表的是需要等待?num?個(gè)線程 //主線程 CountDownLatch?mainThreadLatch?=?new?CountDownLatch(num); //子線程 CountDownLatch?rollBackLatch??=?new?CountDownLatch(1);
主線程:mainThreadLatch.await() 和mainThreadLatch.countDown()
新建對(duì)象
CountDownLatch?mainThreadLatch?=?new?CountDownLatch(num);
卡住主線程,讓其等待子線程,代碼mainThreadLatch.await(),放在主線程里
mainThreadLatch.await();
?
?
代碼mainThreadLatch.countDown(),放在子線程里,每一個(gè)子線程運(yùn)行一到這個(gè)代碼,意味著CountDownLatch(num),里面的num-1(自動(dòng)減一)
mainThreadLatch.countDown();
CountDownLatch(num)里面的num減到0,也就是CountDownLatch(0),被卡住的主線程mainThreadLatch.await(),就會(huì)往下執(zhí)行
子線程:rollBackLatch.await() 和rollBackLatch.countDown()
新建對(duì)象,特別注意:子線程這個(gè)num就是1(關(guān)于只能為1的解答在后面)
CountDownLatch?rollBackLatch??=?new?CountDownLatch(1);
卡住子線程,阻止每一個(gè)子線程的事務(wù)提交和回滾
rollBackLatch.await();
?
?
代碼rollBackLatch.countDown();放在主線程里,而且是放在主線程的等待代碼mainThreadLatch.await();后面。
rollBackLatch.countDown();
?
?
為什么所有的子線程會(huì)在一瞬間就被所有都釋放了?
事務(wù)的回滾是怎么結(jié)合進(jìn)去的?
假設(shè)總共20個(gè)子線程,那么其中一個(gè)線程報(bào)錯(cuò)了怎么實(shí)現(xiàn)所有線程回滾。
引入變量
AtomicBoolean?rollbackFlag?=?new?AtomicBoolean(false)
和字面意思是一樣的:根據(jù) rollbackFlag 的true或者false 判斷子線程里面,是否回滾。
首先我們確定的一點(diǎn):rollbackFlag 是所有的子線程都用著這一個(gè)判斷
?
主線程類Entry
package?org.apache.dolphinscheduler.api.utils; import?com.alibaba.fastjson.JSONArray; import?com.alibaba.fastjson.JSONObject; import?org.apache.dolphinscheduler.api.controller.WorkThread; import?org.apache.dolphinscheduler.common.enums.DbType; import?org.springframework.web.bind.annotation.*; import?java.text.SimpleDateFormat; import?java.util.ArrayList; import?java.util.Date; import?java.util.List; import?java.util.TimeZone; import?java.util.concurrent.CountDownLatch; import?java.util.concurrent.atomic.AtomicBoolean; @RestController @RequestMapping("importDatabase") public?class?Entry?{ ????/** ?????*?@param?dbid?數(shù)據(jù)庫(kù)的id ?????*?@param?tablename?表名 ?????*?@param?sftpFileName?文件名稱 ?????*?@param?head?是否有頭文件 ?????*?@param?splitSign?分隔符 ?????*?@param?type?數(shù)據(jù)庫(kù)類型 ?????*/ ????private?static?String?SFTP_HOST?=?"192.168.1.92"; ????private?static?int?SFTP_PORT?=?22; ????private?static?String?SFTP_USERNAME?=?"root"; ????private?static?String?SFTP_PASSWORD?=?"rootroot"; ????private?static?String?SFTP_BASEPATH?=?"/opt/testSFTP/"; ????@PostMapping("/thread") ????@ResponseBody ????public?static?JSONObject?importDatabase(@RequestParam("dbid")?int?dbid ????????????,@RequestParam("tablename")?String?tablename ????????????,@RequestParam("sftpFileName")?String?sftpFileName ????????????,@RequestParam("head")?String?head ????????????,@RequestParam("splitSign")?String?splitSign ????????????,@RequestParam("type")?DbType?type ????????????,@RequestParam("heads")?String?heads ????????????,@RequestParam("scolumns")?String?scolumns ????????????,@RequestParam("tcolumns")?String?tcolumns?)?throws?Exception?{ ????????JSONObject?obForRetrun?=?new?JSONObject(); ????????try?{ ????????????JSONArray?jsonArray?=?JSONArray.parseArray(tcolumns); ????????????JSONArray?scolumnArray?=?JSONArray.parseArray(scolumns); ????????????JSONArray?headsArray?=?JSONArray.parseArray(heads); ????????????List?listInteger?=?getRrightDataNum(headsArray,scolumnArray); ????????????JSONArray?bodys?=?SFTPUtils.getSftpContent(SFTP_HOST,SFTP_PORT,SFTP_USERNAME,SFTP_PASSWORD,SFTP_BASEPATH,sftpFileName,head,splitSign); ????????????int?total??=?bodys.size(); ????????????int?num?=?20;?//一個(gè)批次的數(shù)據(jù)有多少 ????????????int?count?=?total/num;//周期 ????????????int?lastNum?=total-?count*num;//余數(shù) ????????????List ?list?=?new?ArrayList (); ????????????SimpleDateFormat?sdf?=?new?SimpleDateFormat("HHss:SS"); ????????????TimeZone?t?=?sdf.getTimeZone(); ????????????t.setRawOffset(0); ????????????sdf.setTimeZone(t); ????????????Long?startTime=System.currentTimeMillis(); ????????????int?countForCountDownLatch?=?0; ????????????if(lastNum==0){//整除 ????????????????countForCountDownLatch=?count; ????????????}else{ ????????????????countForCountDownLatch=?count?+?1; ????????????} ????????????//子線程 ????????????CountDownLatch?rollBackLatch??=?new?CountDownLatch(1); ????????????//主線程 ????????????CountDownLatch?mainThreadLatch?=?new?CountDownLatch(countForCountDownLatch); ????????????AtomicBoolean?rollbackFlag?=?new?AtomicBoolean(false); ????????????StringBuffer?message?=?new?StringBuffer(); ????????????message.append("報(bào)錯(cuò)信息:"); ????????????//子線程 ????????????for(int?i=0;i ?getRrightDataNum(JSONArray?headsArray,?JSONArray?scolumnArray){ ????????List ?list?=?new?ArrayList (); ????????String?arrayA?[]?=?new?String[headsArray.size()]; ????????for(int?i=0;i 子線程類WorkThread
package?org.apache.dolphinscheduler.api.controller; import?com.alibaba.fastjson.JSONArray; import?com.alibaba.fastjson.JSONObject; import?org.apache.dolphinscheduler.api.service.DataSourceService; import?org.apache.dolphinscheduler.common.enums.DbType; import?org.apache.dolphinscheduler.dao.entity.DataSource; import?org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import?org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import?org.springframework.transaction.PlatformTransactionManager; import?java.sql.Connection; import?java.sql.PreparedStatement; import?java.sql.SQLException; import?java.text.ParseException; import?java.text.SimpleDateFormat; import?java.util.Date; import?java.util.List; import?java.util.TimeZone; import?java.util.concurrent.CountDownLatch; import?java.util.concurrent.atomic.AtomicBoolean; /** ?*?多線程 ?*/ public?class?WorkThread?implements?Runnable{?//建立線程的兩種方法?1?實(shí)現(xiàn)Runnable?接口?2?繼承?Thread?類 ????private?DataSourceService?dataSourceService; ????private?DataSourceMapper?dataSourceMapper; ????private?Integer?begin; ????private?Integer?end; ????private?String?tableName; ????private?JSONArray?columnArray; ????private?Integer?dbid; ????private?DbType?type; ????private?JSONArray?bodys; ????private??List?listInteger; ????private?PlatformTransactionManager?transactionManager; ????private?CountDownLatch?mainThreadLatch; ????private?CountDownLatch?rollBackLatch; ????private?AtomicBoolean?rollbackFlag; ????private?StringBuffer?message; ????/** ?????*?@param?i ?????*?@param?num ?????*?@param?tableFrom ?????*?@param?columnArrayFrom ?????*?@param?dbidFrom ?????*?@param?typeFrom ?????*/ ????public?WorkThread(int?i,?int?num,?String?tableFrom,?JSONArray?columnArrayFrom,?int?dbidFrom ????????????,?DbType?typeFrom,?JSONArray?bodysFrom,?List ?listIntegerFrom ????????????,CountDownLatch?mainThreadLatch,CountDownLatch?rollBackLatch,AtomicBoolean?rollbackFlag ????????????,StringBuffer?messageFrom)?{ ????????begin=i*num; ????????end=begin+num; ????????tableName?=?tableFrom; ????????columnArray?=?columnArrayFrom; ????????dbid?=?dbidFrom; ????????type?=?typeFrom; ????????bodys?=?bodysFrom; ????????listInteger?=?listIntegerFrom; ????????this.dataSourceMapper?=?SpringApplicationContext.getBean(DataSourceMapper.class); ????????this.dataSourceService?=?SpringApplicationContext.getBean(DataSourceService.class); ????????this.transactionManager?=?SpringApplicationContext.getBean(PlatformTransactionManager.class); ????????this.mainThreadLatch?=?mainThreadLatch; ????????this.rollBackLatch?=?rollBackLatch; ????????this.rollbackFlag?=?rollbackFlag; ????????this.message?=?messageFrom; ????} ????public?void?run()?{ ????????DataSource?dataSource?=?dataSourceMapper.queryDataSourceByID(dbid); ????????String?cp?=?dataSource.getConnectionParams(); ????????Connection?con=null; ????????????con?=??dataSourceService.getConnection(type,cp); ????????if(con!=null) ????????{ ????????????SimpleDateFormat?sdf?=?new?SimpleDateFormat("HHss:SS"); ????????????TimeZone?t?=?sdf.getTimeZone(); ????????????t.setRawOffset(0); ????????????sdf.setTimeZone(t); ????????????Long?startTime?=?System.currentTimeMillis(); ????????????try?{ ????????????????con.setAutoCommit(false); //----------------------------?獲取字段和類型 ????????????????String?columnString?=?null;//活動(dòng)的字段 ????????????????int?intForType?=?0; ????????????????String?type[]?=?new?String[columnArray.size()];//類型集合 ????????????????for(int?i=0;i ?listInteger,String[]?array){ ????????String?[]?arrayFinal?=?new?String?[listInteger.size()]; ????????for(int?i=0;i 代碼實(shí)際運(yùn)用踩坑!!!!
還記得這里有個(gè)一批次處理多少數(shù)據(jù)么,我這邊設(shè)置了20,實(shí)際到運(yùn)用中的時(shí)候客戶給了個(gè)20W的數(shù)據(jù),我批次設(shè)置為20,那就有1W個(gè)子線程!!!!
這還不是最糟糕的,最糟糕的是每個(gè)子線程都會(huì)創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接,數(shù)據(jù)庫(kù)直接被我搞炸了
所以這里需要把:
int?num?=?20;?//一個(gè)批次的數(shù)據(jù)有多少
改成:
int?num?=?20000;?//一個(gè)批次的數(shù)據(jù)有多少
編輯:黃飛
?
評(píng)論
查看更多