一、何為原子操作
原子操作:顧名思義就是不可分割的操作,該操作只存在未開始和已完成兩種狀態,不存在中間狀態;
原子類型:原子庫中定義的數據類型,對這些類型的所有操作都是原子的,包括通過原子類模板std::atomic< T >實例化的數據類型,也都是支持原子操作的。
二、如何使用原子類型
2.1 原子庫atomic支持的原子操作
原子庫< atomic >中提供了一些基本原子類型,也可以通過原子類模板實例化一個原子對象,下面列出一些基本原子類型及相應的特化模板如下:
對原子類型的訪問,最主要的就是讀和寫,但原子庫提供的對應原子操作是load()與store(val)。原子類型支持的原子操作如下:
2.2 原子操作中的內存訪問模型
原子操作保證了對數據的訪問只有未開始和已完成兩種狀態,不會訪問到中間狀態,但我們訪問數據一般是需要特定順序的,比如想讀取寫入后的最新數據,原子操作函數是支持控制讀寫順序的,即帶有一個數據同步內存模型參數std::memory_order,用于對同一時間的讀寫操作進行排序。C++11定義的6種類型如下:
- memory_order_relaxed: 寬松操作,沒有同步或順序制約,僅對此操作要求原子性;
- memory_order_release & memory_order_acquire: 兩個線程A&B,A線程Release后,B線程Acquire能保證一定讀到的是最新被修改過的值;這種模型更強大的地方在于它能保證發生在A-Release前的所有寫操作,在B-Acquire后都能讀到最新值;
- memory_order_release & memory_order_consume: 上一個模型的同步是針對所有對象的,這種模型只針對依賴于該操作涉及的對象:比如這個操作發生在變量a上,而s = a + b; 那s依賴于a,但b不依賴于a; 當然這里也有循環依賴的問題,例如:t = s + 1,因為s依賴于a,那t其實也是依賴于a的;
- memory_order_seq_cst: 順序一致性模型,這是C++11原子操作的默認模型;大概行為為對每一個變量都進行Release-Acquire操作,當然這也是一個最慢的同步模型;
內存訪問模型屬于比較底層的控制接口,如果對編譯原理和CPU指令執行過程不了解的話,容易引入bug。內存模型不是本章重點,這里不再展開介紹,后續的代碼都使用默認的順序一致性模型或比較穩妥的Release-Acquire模型。
2.3 使用原子類型替代互斥鎖編程
為便于比較,直接基于前篇文章:線程同步之互斥鎖中的示例程序進行修改,用原子庫取代互斥庫的代碼如下:
//atomic1.cpp 使用原子庫取代互斥庫實現線程同步
#include < chrono >
#include < atomic >
#include < thread >
#include < iostream >
std::chrono::milliseconds interval(100);
std::atomic< bool > readyFlag(false); //原子布爾類型,取代互斥量
std::atomic< int > job_shared(0); //兩個線程都能修改'job_shared',將該變量特化為原子類型
int job_exclusive = 0; //只有一個線程能修改'job_exclusive',不需要保護
//此線程只能修改 'job_shared'
void job_1()
{
std::this_thread::sleep_for(5 * interval);
job_shared.fetch_add(1);
std::cout < < "job_1 shared (" < < job_shared.load() < < ")n";
readyFlag.store(true); //改變布爾標記狀態為真
}
// 此線程能修改'job_shared'和'job_exclusive'
void job_2()
{
while (true) { //無限循環,直到可訪問并修改'job_shared'
if (readyFlag.load()) { //判斷布爾標記狀態是否為真,為真則修改‘job_shared’
job_shared.fetch_add(1);
std::cout < < "job_2 shared (" < < job_shared.load() < < ")n";
return;
} else { //布爾標記為假,則修改'job_exclusive'
++job_exclusive;
std::cout < < "job_2 exclusive (" < < job_exclusive < < ")n";
std::this_thread::sleep_for(interval);
}
}
}
int main()
{
std::thread thread_1(job_1);
std::thread thread_2(job_2);
thread_1.join();
thread_2.join();
getchar();
return 0;
}
由示例程序可以看出,原子布爾類型可以實現互斥鎖的部分功能,但在使用條件變量condition variable時,仍然需要mutex保護對condition variable的消費,即使condition variable是一個atomic object。
2.4 使用原子類型實現自旋鎖
自旋鎖(spinlock)與互斥鎖(mutex)類似,在任一時刻最多只能有一個持有者,但如果資源已被占用,互斥鎖會讓資源申請者進入睡眠狀態,而自旋鎖不會引起調用者睡眠,會一直循環判斷該鎖是否成功獲取。自旋鎖是專為防止多處理器并發而引入的一種鎖,它在內核中大量應用于中斷處理等部分(對于單處理器來說,防止中斷處理中的并發可簡單采用關閉中斷的方式,即在標志寄存器中關閉/打開中斷標志位,不需要自旋鎖)。
對于多核處理器來說,檢測到鎖可用與設置鎖狀態兩個動作需要實現為一個原子操作,如果分為兩個原子操作,則可能一個線程在獲得鎖后設置鎖前被其余線程搶到該鎖,導致執行錯誤。這就需要原子庫提供對原子變量“讀-修改-寫(Read-Modify-Write)”的原子操作,上文原子類型支持的操作中就提供了RMW(Read-Modify-Write)原子操作,比如a.exchange(val)與a.compare_exchange(expected,desired)。
標準庫還專門提供了一個原子布爾類型std::atomic_flag,不同于所有 std::atomic 的特化,它保證是免鎖的,不提供load()與store(val)操作,但提供了test_and_set()與clear()操作,其中test_and_set()就是支持RMW的原子操作,可用std::atomic_flag實現自旋鎖的功能,代碼如下:
//atomic2.cpp 使用原子布爾類型實現自旋鎖的功能
#include < thread >
#include < vector >
#include < iostream >
#include < atomic >
std::atomic_flag lock = ATOMIC_FLAG_INIT; //初始化原子布爾類型
void f(int n)
{
for (int cnt = 0; cnt < 100; ++cnt) {
while (lock.test_and_set(std::memory_order_acquire)) // 獲得鎖
; // 自旋
std::cout < < n < < " thread Output: " < < cnt < < 'n';
lock.clear(std::memory_order_release); // 釋放鎖
}
}
int main()
{
std::vector< std::thread > v; //實例化一個元素類型為std::thread的向量
for (int n = 0; n < 10; ++n) {
v.emplace_back(f, n); //以參數(f,n)為初值的元素放到向量末尾,相當于啟動新線程f(n)
}
for (auto& t : v) { //遍歷向量v中的元素,基于范圍的for循環,auto&自動推導變量類型并引用指針指向的內容
t.join(); //阻塞主線程直至子線程執行完畢
}
getchar();
return 0;
}
自旋鎖除了使用atomic_flag的TAS(Test And Set)原子操作實現外,還可以使用普通的原子類型std::atomic實現:其中a.exchange(val)是支持TAS原子操作的,a.compare_exchange(expected,desired)是支持CAS(Compare And Swap)原子操作的,感興趣可以自己實現出來。其中CAS原子操作是無鎖編程的主要實現手段,我們接著往下介紹無鎖編程。
三、如何進行無鎖編程
3.1 什么是無鎖編程
在原子操作出現之前,對共享數據的讀寫可能得到不確定的結果,所以多線程并發編程時要對使用鎖機制對共享數據的訪問過程進行保護。但鎖的申請釋放增加了訪問共享資源的消耗,且可能引起線程阻塞、鎖競爭、死鎖、優先級反轉、難以調試等問題。
現在有了原子操作的支持,對單個基礎數據類型的讀、寫訪問可以不用鎖保護了,但對于復雜數據類型比如鏈表,有可能出現多個核心在鏈表同一位置同時增刪節點的情況,這將會導致操作失敗或錯序。所以我們在對某節點操作前,需要先判斷該節點的值是否跟預期的一致,如果一致則進行操作,不一致則更新期望值,這幾步操作依然需要實現為一個RMW(Read-Modify-Write)原子操作,這就是前面提到的CAS(Compare And Swap)原子操作,它是無鎖編程中最常用的操作。
既然無鎖編程是為了解決鎖機制帶來的一些問題而出現的,那么無鎖編程就可以理解為不使用鎖機制就可保證多線程間原子變量同步的編程。無鎖(lock-free)的實現只是將多條指令合并成了一條指令形成一個邏輯完備的最小單元,通過兼容CPU指令執行邏輯形成的一種多線程編程模型。
無鎖編程是基于原子操作的,對基本原子類型的共享訪問由load()與store(val)即可保證其并發同步,對抽象復雜類型的共享訪問則需要更復雜的CAS來保證其并發同步,并發訪問過程只是不使用鎖機制了,但還是可以理解為有鎖止行為的,其粒度很小,性能更高。對于某個無法實現為一個原子操作的并發訪問過程還是需要借助鎖機制來實現。
3.1 CAS原子操作實現無鎖編程
CAS原子操作主要是通過函數a.compare_exchange(expected,desired)實現的,其語義為“我認為V的值應該為A,如果是,那么將V的值更新為B,否則不修改并告訴V的值實際為多少”,CAS算法的實現偽碼如下:
bool compare_exchange_strong(T& expected, T desired)
{
if( this- >load() == expected ) {
this- >store(desired);
return true;
} else {
expected = this- >load();
return false;
}
}
下面嘗試實現一個無鎖棧,代碼如下:
//atomic3.cpp 使用CAS操作實現一個無鎖棧
#include < atomic >
#include < iostream >
template< typename T >
class lock_free_stack
{
private:
struct node
{
T data;
node* next;
node(const T& data) : data(data), next(nullptr) {}
};
std::atomic< node* > head;
public:
lock_free_stack(): head(nullptr) {}
void push(const T& data)
{
node* new_node = new node(data);
do{
new_node- >next = head.load(); //將 head 的當前值放入new_node- >next
}while(!head.compare_exchange_strong(new_node- >next, new_node));
// 如果新元素new_node的next和棧頂head一樣,證明在你之前沒人操作它,使用新元素替換棧頂退出即可;
// 如果不一樣,證明在你之前已經有人操作它,棧頂已發生改變,該函數會自動更新新元素的next值為改變后的棧頂;
// 然后繼續循環檢測直到狀態1成立退出;
}
T pop()
{
node* node;
do{
node = head.load();
}while (node && !head.compare_exchange_strong(node, node- >next));
if(node)
return node- >data;
}
};
int main()
{
lock_free_stack< int > s;
s.push(1);
s.push(2);
s.push(3);
std::cout < < s.pop() < < std::endl;
std::cout < < s.pop() < < std::endl;
getchar();
return 0;
}
程序注釋中已經解釋的很清楚了,在將數據壓棧前,先通過比較原子類型head與新元素的next指向對象是否相等來判斷head是否已被其他線程修改,根據判斷結果選擇是繼續操作還是更新期望,而這一切都是在一個原子操作中完成的,保證了在不使用鎖的情況下實現共享數據的并發同步。
CAS 看起來很厲害,但也有缺點,最著名的就是 ABA 問題,假設一個變量 A ,修改為 B之后又修改為 A,CAS 的機制是無法察覺的,但實際上已經被修改過了。如果在基本類型上是沒有問題的,但是如果是引用類型呢?這個對象中有多個變量,我怎么知道有沒有被改過?聰明的你一定想到了,加個版本號啊。每次修改就檢查版本號,如果版本號變了,說明改過,就算你還是 A,也不行。
上面的例子節點指針也屬于引用類型,自然也存在ABA問題,比如在線程2執行pop操作,將A,B都刪掉,然后創建一個新元素push進去,因為操作系統的內存分配機制會重復使用之前釋放的內存,恰好push進去的內存地址和A一樣,我們記為A’,這時候切換到線程1,CAS操作檢查到A沒變化成功將B設為棧頂,但B是一個已經被釋放的內存塊。該問題的解決方案就是上面說的通過打標簽標識A和A’為不同的指針,具體實現代碼讀者可以嘗試實現。
-
數據
+關注
關注
8文章
7145瀏覽量
89582 -
編程
+關注
關注
88文章
3637瀏覽量
93986 -
函數
+關注
關注
3文章
4346瀏覽量
62975 -
C++
+關注
關注
22文章
2114瀏覽量
73858 -
原子
+關注
關注
0文章
89瀏覽量
20344
發布評論請先 登錄
相關推薦
評論