介紹
Go 的并發原語可以輕松構建流數據管道,從而高效利用 I/O 和多個 CPU。 本文展示了此類pipelines的示例,強調了操作失敗時出現的細微之處,并介紹了干凈地處理失敗的技術。
什么是pipeline?
pipeline在Go中并沒有書面的定義,只是眾多并發程序中的一種。非正式地,pipeline由一系列stage組成。每個stage是運行著同一個function的協程組。在每個stage,協程們
通過inbound channel從上游獲取數據
在data上進行運算,通常會產生新的值
通過outbound channel向下游發送數據
每個Stage都有數個inbound channel和outbound channel,除了第一個和最后一個Stage,分別只有outbound和inbound channel。第一個Stage通常叫做Source或Producer。最后一個Stage通常叫做Sink或Consumer。
我們將從一個簡單的示例pipeline開始來解釋這些想法和技術。 稍后,我們將提供一個更實際的例子。
Squaring numbers 平方數
考慮一個有著三個階段的流水線。
第一階段,gen,是個將整數列表轉換為一個發射列表中整數的channel的函數。gen函數啟動一個go routine,用來發送channel中的整數,然后當所有的整數都被發出后,將channel關閉:
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
第二階段,sq從上面的channel中接收數據,返回一個發射對應整數平方數的channel。當inbound channel關閉后,并且這一階段將所有的value發送到下游后,再將這個outbound channel關閉
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
main函數組織整個pipeline,并且運行最終的stage:從第二個stage中接收數據然后逐個打印,直到channel被關閉
func main() { // Set up the pipeline c := gen(2, 3) out := sq(c) // Consume the output // 4 fmt.Println(<-out) // 9 fmt.Println(<-out) }
既然sq的inbound channel和outbound channel類型相同,我們可以將其進行任意數量的組合。我們還可以將main函數重寫為循環,就像在其他Stage中做的那樣一樣。
func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
扇入和扇出
許多函數可以從一個channel中獲取數據直到channel被關閉,這被叫做扇出。這提供了一種在worker之間分配工作以并行化 CPU 使用和 I/O 的方法。
一個函數可以通過將多個input channel多路復用到同一個channel,當所有的channel關閉時,該多路復用channel才關閉。從而達到從多個input獲取數據并處理,直到所有input channel都關閉才停止的效果。這叫做扇入。
我們可以將我們的流水線改為運行兩個sq,每個都從相同的channel讀取數據。我們引入一個新的函數merge,來做扇入的工作
func main() {
in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the merged output from c1 and c2. for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 } }
merge函數通過對每個channel開啟一個協程,把數據拷貝到另一個out channel中,實現將channel列表轉換為一個channel的效果。當所有send操作完成后,再將out channel關閉。
向一個已經關閉上的channel發送數據會導致panic,所以保證發送完所有再關閉channel至關重要。sync.WaitGroup提供了一個簡單地方式來編排這個同步
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out }
短暫的停頓
我們的pipeline函數有這樣的模式:
當發送任務結束后,關閉發送output channel
直到input channel關閉前,一直從input channel中接收消息
這個模式下,每個階段都可以用協程+for循環的模式來書寫,保證每個數據發送到下游后再關閉所有協程。
但是在實際的pipeline中,階段并不總是接收所有來自inbound channel的數據。通常,如果inbound的值出現了錯誤,pipeline會提前退出。 在任何一種情況下,接收者都不必等待剩余值到達,并且我們希望fast fail(較早階段的Stage盡早停止后期Stage不需要的值)。
在我們的示例pipeline中,如果一個Stage未能消費所有inbound值,則嘗試計算后并發送這些值的 goroutine 將無限期阻塞:
// Consume the first value from the output. out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Since we didn't receive the second value from out, // one of the output goroutines is hung attempting to send it. }
這就導致了資源泄漏:協程消耗內存、運行資源,并且在協程棧內的golang堆引用導致垃圾無法回收。協程只能自己退出,不能由垃圾回收機制回收。
即使下游的Stage無法接收所有inbound value,我們也需要把上游的協程退出。如果把上游的協程改為有buffer的,可以解決上面的問題。如果Buffer中還有空間,則發送操作可以立刻完成
c := make(chan int, 2) // buffer size 2 c <- 1 // succeeds immediately c <- 2 // succeeds immediately c <- 3 // blocks until another goroutine does <-c and receives 1
當要發送的數目可以在channel創建時知道時,buffer可以簡化代碼。舉個例子,讓我們來使用buffer channel,不開辟新的協程來重寫gen方法:
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
在我們的pipeline中,我們就需要在merge方法中使用的channel添加buffer:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup out := make(chan int, 1) // enough space for the unread inputs // ... 其余的沒有變更 ...
盡管上面這個方案修復了阻塞的問題,但它是很差的方案。這里有一個對1的硬編碼,這太脆弱了?你真的能預料到有多少個值不能被正常發送嗎?一旦兩個值不能正常發送,你的協程又阻塞了。
作為替代,我們需要給下游階段提供一個機制,知會下游階段,發送者已經停止發送了。
Explicity cancellation 顯示取消
當main函數決定不從out處接收所有數據,而是退出時,它必須知會上游階段的協程放棄接下來的發送。它通過向一個名叫done的channel發送數據來完成這個動作。因為發送方有兩個,所以 向done發送兩次數據。
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // Tell the remaining senders we're leaving. done <- struct{}{} done <- struct{}{} }
發送到out channel的發送者把原來的邏輯替換成一個select操作,select或者發送一個數據,抑或從done處接收到數據。因為done中數據值的類型根本不重要,主要是接收到值這個事件本身很重要,所以donechannel的類型時struct {}。output循環繼續在inboundchannel上執行,所以上游的階段并沒有被阻塞。(我們稍后會討論如何讓循環迅速返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
這個方法有一個問題:每一個下游接收者都需要知道可能阻塞的上游發送者總數。維護它們的數目,是一個瑣碎又容易出錯的事情。
我們需要一個機制來讓不可知的、無界的發送協程來停止發送到下游的值。在Go,我們可以通過關閉channel來完成這件事,因為在已經關閉的channel上執行receive操作,會立刻返回該元素的零值。
這說明main函數可以簡單地通過關閉donechannel來讓所有的發送者不阻塞。關閉操作是一個高效的廣播。我們把pipeline中的每個函數都接受done作為參數,并把done在defer語句中關閉, 這樣,如果在main函數中返回,都會通知pipeline中的階段退出。
func main() { // Set up a done channel that's shared by the whole pipeline, // and close that channel when this pipeline exits, as a signal // for all the goroutines we started to exit. done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in) // Consume the first value from output. out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // done will be closed by the deferred call. }
現在當donechannel關閉后,接收到close信息的階段,都可以直接退出了。merge函數中的outout協程可以不從inboundchannel中取數據直接退出,因為它知道,上游的發送sq,接收到close信息,也會直接退出。output通過defer語句來保證wg.Done()一定被調用。(譯者注:來關閉out channel)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
相似的,當接收到close信號時,sq函數也可以立刻返回。sq通過defer語句來保證outchannel一定被關閉。
這是給構建pipeline的一些指導:
當所有的發送操作完成后,關閉outbound channel
如果發送發不阻塞,或是channel沒有關閉,接收者會一直從channel中接收數據
Pipeline通過如下兩個方式來解除發送者的阻塞
確保channel的buffer足夠大
顯示知會發送者,接收者已經放棄接收
Digesting a tree 對樹進行摘要
讓我們來考慮一個更實際的pipeline
MD5 是一種消息摘要算法,可用作文件校驗和。 命令行實用程序 md5sum 打印文件列表的摘要值。
% md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的示例程序類似于 md5sum,但將單個目錄作為參數并打印該目錄下每個常規文件的摘要值,按路徑名排序。
% go run serial.go . d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的主函數調MD5All這個輔助函數,返回路徑名和摘要值的map,main函數再將它們排序打印
func main() { // Calculate the MD5 sum of all files under the specified directory, // then print the results sorted by path name. m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s ", m[path], path) } }
MD5All函數是我們討論的重點。在如下串行化的實現中,沒有使用并發技術,只是簡單對文件進行了遍歷
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }
并行計算摘要
在并行的解法中,我們將MD5All分割為兩個階段的pipeline。第一個階段,sumFiles,遍歷文件樹,針對每個文件,在新的協程中計算摘要,然后把結果發送到channel中,這是result的類型
type result struct { path string sum [md5.Size]byte err error }
sumFiles返回兩個channel:一個是result channel,另一個是filepath.Walk中產生的錯誤。walk函數針對每個文件啟動一個新的協程來處理,然后檢查donechannel。如果done已經被關閉,walk函數會立刻停止:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // For each regular file, start a goroutine that sums the file and // sends the result on c. // Send the result of the walk on errc. c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup // If any error occurred, walk method will return err := filepath.Walk(root, func(path string, info fs.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{ path: path, sum: md5.Sum(data), err: err, }: case <-done: } wg.Done() }() // Abort the walk if done is closed. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Walk has returned, so all calls to wg.Add are done. // Start a goroutine to close c once all the sends are done. // No select needed here, since errc is buffered. errc <- err }() return c, errc }
MD5All從c中接收到摘要數據。當發生錯誤時,MD5All會迅速返回,通過defer語句來關閉donechannel
func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All closes the done channel when it returns; it may do so before // receiving all the values from c and errc. done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil }
有界的并行
parallel.go 中的 MD5All 實現為每個文件啟動一個新的 goroutine。 在包含許多大文件的目錄中,這可能會分配比機器上可用的內存更多的內存。
我們可以通過限制并行讀取的文件數量來限制這些分配。 在新的解決方式中,我們通過創建固定數量的 goroutine 來讀取文件來做到這一點。 我們的pipeline現在分為三個階段:遍歷樹、讀取并計算文件摘要以及收集摘要。
第一階段 walkFiles 發射出文件樹中常規文件的路徑:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // Close the paths channel after Walk returns. defer close(paths) // No select needed for this send, since errc is buffered. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc }
第二階段啟動固定數量的協程來計算文件摘要,然后發送到c channel中
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { for path := range paths { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: return } } }
和之前的示例不同,因為多個協程都在共享channel上發送數據,digester函數并沒有關閉output channel。作為替代,當所有的digesters跑完之后,MD5All會關閉channel
// Start a fixed number of goroutines to read and digest files. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }()
這里也可以針對每個digester開啟獨立的channel,不過到時候就要對channel進行扇入處理。
最終階段從c中取得所有結果,并且檢查errc中的錯誤。此檢查不能更早發生,因為在此之前,walkFiles 可能會阻塞:
(譯者注:要保證檢查errc的錯誤,發生在filePath.Walk啟動后,done不會再次發送了,協程就不會退出)
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // Check whether the Walk failed. if err := <-errc; err != nil { return nil, err } return m, nil }
總結
本文介紹了在 Go 中構建流數據pipeline的技術。 處理此類pipeline中的故障很棘手,因為pipeline中的每個階段可能會阻止嘗試向下游發送值,并且下游階段可能不再關心傳入的數據。 我們展示了關閉通道如何向管道啟動的所有 goroutine 廣播“done”信號,并定義了正確構建管道的指南。
審核編輯:黃飛
-
Pipeline
+關注
關注
0文章
28瀏覽量
9383
原文標題:實例詳解在Go中構建流數據pipeline的騷操作
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論