Go并发模式:Pipeline
简介
Go 语言原生支持并发,所以构建数据流管道会很容易,数据流管道可有效利用I / O和多个CPU。
管道定义
在 Go 语言中没有对管道进行正式定义,常规来说就是通过 channel 来连接的多个阶段,每个阶段等效于在一个函数中运行的一组 gorutines 。这些 gorutines 的特征如下
- 通过入站 channel 从上游接收值。
- 对数据执行一些功能,通常来说是生产新数据。
- 通过出站 channel 发送值到下游。
除了第一个和最后一个阶段,每个阶段都有任意数量的出站和入站 channel 。通常第一个阶段称为生成者(或源数据),最后一个阶段被称为消费者(或接收器)。
代码示例
建立包含三个阶段的管道并输出
第一个阶段
func gen(nums ...int) <-chan int{
out := make(chan int)
go func(){
for _, n :=range nums {
out <-n
}
close(out)
}()
return out
}
第二阶段
func sq(in <-chan int) <-chan int{
out := make(chan int)
go func(){
for n :=range in {
out <- n*n
}
close(out)
}()
return out
}
最后一个阶段
func main(){
// 建立管道
c := gen(2,3)
out := sq(c)
//获取输出
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
// 另一种建立管道的形式
for n := range sq(sq(gen(2,3))){
fmt.Println(n) // 16 then 81
}
}
Fan-out, fan-in 模式
fan-out: 多个函数从同一个 channel 中读取数据直到 channel 被关闭。这种方式提供了在一组 workers 之间分配 work 并行化使用 CPU 和 I/O 的方法
fan-in: 一个函数可以从多个 channel 中读取并继续操作,直到所有 channel 复用到一个 channel 上。当所有复用它的通道都关闭时,该通道关闭。
将上面的代码改成 fan-in 模式
func main(){
in:=gen(2,3)
// 将 sq 任务分到两个 gorutine 中处理, 它们都从 in 中读取数据。
c1 := sq(in)
c2 := sq(in)
// 读取输出
for n :=range merge(c1,c2){
fmt.Println(n) // 4 then 9, 9 then 4
}
}
func merge(cs ...<-chan int) <-chan int{
// 发送完成后要关闭 out,用 sync.WaitGroup 方式来同步 goroutine ,完成关闭。
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int){
for n :=range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c :=range cs {
go output(c)
}
// 不阻塞返回 out
go func(){
wg.Wait()
close(out)
}
return out
}
以上代码形成的模式
- 当所有的发送操作结束时关闭它们的出站 channel 。
- 从入站的 channels 接收值直到它们关闭。
上面代码会造成内存泄漏,如果此时只接收一个值,那么一个生产者所在的 gorutine 会挂起等待发送。
func main(){
in:=gen(2,3)
// 将 sq 任务分到两个 gorutine 中处理, 它们都从 in 中读取数据。
c1 := sq(in)
c2 := sq(in)
out := merge(c1,c2)
fmt.Println(<-out) // 4 or 9
// c1 或 c2 中的一个 goruntine 会挂起等待发送剩余的值。
// 本例中的协程会随着 main 函数的退出而退出,在服务器程序中可以浮现这个问题。
}
在我们知道发送数据的数量时,可以通过设置带缓冲的 channel 。但是这种方式不太好,可以通过在消费者处发送关闭信号到生产者处来解决这个问题。
func main() {
in := gen(2, 3)
// 将 sq 任务分到两个 gorutine 中处理, 它们都从 in 中读取数据。
c1 := sq(in)
c2 := sq(in)
// 读取输出
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// 通知其他协程不再需要其他值了
done <- struct{}{}
done <- struct{}{}
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 使用 select 语句接收结束信号 done
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// 剩余部分代码没有改变
上面代码会有一个问题,就是消费者需要知道生产者发送的数量,这样才能发送对应数量的结束信号,所以应该采用 close channel 的方式来实现,在关闭通道上接收操作可以立即执行产生元素类型的零值。
func main(){
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
}
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n :
case <-done:
return
}
}
}()
return out
}
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 使用 select 语句接收结束信号 done
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// 剩余部分代码没有改变
建立管道应该遵守如下准则:
- 在一个阶段,当所有的发送操作完成时关闭出站 channel 。
- 在一个阶段,会一直从入站通道接收值,直到这些通道关闭或发送方被阻止发送。
真实场景下管道的使用
使用 md5 计算文件的摘要
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
将上面代码改成两种管道模式进行实现
方式一:将 MD5All 分成两个阶段的管道,第一阶段是开启一个新的 gorutine去 sumFiles, 遍历树,计算每个文件的摘要然后将结果和 file.Walk
函数的错误返回。
type result struct{
path string
sum [md5.Size]byte
err error
}
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
// 开启一个新的协程去处理文件
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
这种方式为每个文件单独开启了一个协程处理,但是如果在文件夹中有许多大文件,内存分配可能不足,我们可以通过限制并行读取的文件数来限制这些分配。
方式二:创建多个 gorutines 读取文件,现在管道分为三个阶段遍历树结构 、获取文件摘要、 收集文件摘要信息。
func walkFiles(done <-chan struct{},root string)(<-chan string, <-chan error){
paths := make(chan string)
errc := make(chan error, 1)
go func(){
defer close(paths)
// 该发送操作不需要 select 语句,因为 errc 是缓存的。
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths,errc
}
// 这里没有关闭 paths,是因为我们在 MD5All 中进行关闭。
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
func MD5All(root string) (map[string][md5.Size]byte, error) {
done := make(chan struct{})
defer close(done)
paths, errc := walkFiles(done, root)
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}