Go并发模式:Pipeline

Go并发模式:Pipeline

简介

Go 语言原生支持并发,所以构建数据流管道会很容易,数据流管道可有效利用I / O和多个CPU。

管道定义

在 Go 语言中没有对管道进行正式定义,常规来说就是通过 channel 来连接的多个阶段,每个阶段等效于在一个函数中运行的一组 gorutines 。这些 gorutines 的特征如下

  • 通过入站 channel 从上游接收值。
  • 对数据执行一些功能,通常来说是生产新数据。
  • 通过出站 channel 发送值到下游。

除了第一个和最后一个阶段,每个阶段都有任意数量的出站和入站 channel 。通常第一个阶段称为生成者(或源数据),最后一个阶段被称为消费者(或接收器)。

代码示例

建立包含三个阶段的管道并输出

第一个阶段

func gen(nums ...int) <-chan int{
    out := make(chan int)
    go func(){
        for _, n :=range nums {
            out <-n
        } 
        close(out)
    }()
    return out
}

第二阶段

func sq(in <-chan int) <-chan int{
    out := make(chan int)
    go func(){
        for n :=range in {
            out <- n*n
        }
        close(out)
    }()
    return out
}

最后一个阶段

func main(){
    // 建立管道
    c := gen(2,3)
    out := sq(c)

    //获取输出
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9

    // 另一种建立管道的形式
    for n := range sq(sq(gen(2,3))){
        fmt.Println(n) // 16 then 81
    }
}

Fan-out, fan-in 模式

fan-out: 多个函数从同一个 channel 中读取数据直到 channel 被关闭。这种方式提供了在一组 workers 之间分配 work 并行化使用 CPU 和 I/O 的方法

fan-in: 一个函数可以从多个 channel 中读取并继续操作,直到所有 channel 复用到一个 channel 上。当所有复用它的通道都关闭时,该通道关闭。

将上面的代码改成 fan-in 模式

func main(){
    in:=gen(2,3)

    // 将 sq 任务分到两个 gorutine 中处理, 它们都从 in 中读取数据。
    c1 := sq(in)
    c2 := sq(in)

    // 读取输出
    for n :=range merge(c1,c2){
        fmt.Println(n) // 4 then 9, 9 then 4 
    }
}

func merge(cs ...<-chan int) <-chan int{
    // 发送完成后要关闭 out,用 sync.WaitGroup 方式来同步 goroutine ,完成关闭。
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int){
        for n :=range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c :=range cs {
        go output(c)
    }
    // 不阻塞返回 out
    go func(){
        wg.Wait()
        close(out)
    }
    return out
}

以上代码形成的模式

  • 当所有的发送操作结束时关闭它们的出站 channel 。
  • 从入站的 channels 接收值直到它们关闭。

上面代码会造成内存泄漏,如果此时只接收一个值,那么一个生产者所在的 gorutine 会挂起等待发送。

func main(){
    in:=gen(2,3)
    // 将 sq 任务分到两个 gorutine 中处理, 它们都从 in 中读取数据。
    c1 := sq(in)
    c2 := sq(in)
    out := merge(c1,c2)
    fmt.Println(<-out) // 4 or 9
    // c1 或 c2 中的一个 goruntine 会挂起等待发送剩余的值。
    // 本例中的协程会随着 main 函数的退出而退出,在服务器程序中可以浮现这个问题。
}

在我们知道发送数据的数量时,可以通过设置带缓冲的 channel 。但是这种方式不太好,可以通过在消费者处发送关闭信号到生产者处来解决这个问题。

func main() {
    in := gen(2, 3)

    // 将 sq 任务分到两个 gorutine 中处理, 它们都从 in 中读取数据。
    c1 := sq(in)
    c2 := sq(in)

    // 读取输出
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // 通知其他协程不再需要其他值了
    done <- struct{}{}
    done <- struct{}{}
}

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 使用 select 语句接收结束信号 done 
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // 剩余部分代码没有改变

上面代码会有一个问题,就是消费者需要知道生产者发送的数量,这样才能发送对应数量的结束信号,所以应该采用 close channel 的方式来实现,在关闭通道上接收操作可以立即执行产生元素类型的零值。

func main(){
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9
}

func gen(done <-chan struct{}, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n :
            case <-done:
                return
            }
        }
    }()
    return out
}

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 使用 select 语句接收结束信号 done 
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return 
            }
        }
    }
    // 剩余部分代码没有改变

建立管道应该遵守如下准则:

  • 在一个阶段,当所有的发送操作完成时关闭出站 channel 。
  • 在一个阶段,会一直从入站通道接收值,直到这些通道关闭或发送方被阻止发送。

真实场景下管道的使用

使用 md5 计算文件的摘要

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

将上面代码改成两种管道模式进行实现

方式一:将 MD5All 分成两个阶段的管道,第一阶段是开启一个新的 gorutine去 sumFiles, 遍历树,计算每个文件的摘要然后将结果和 file.Walk函数的错误返回。

type result struct{
    path string 
    sum [md5.Size]byte
    err error
} 

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            // 开启一个新的协程去处理文件
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

这种方式为每个文件单独开启了一个协程处理,但是如果在文件夹中有许多大文件,内存分配可能不足,我们可以通过限制并行读取的文件数来限制这些分配。

方式二:创建多个 gorutines 读取文件,现在管道分为三个阶段遍历树结构 、获取文件摘要、 收集文件摘要信息。

func walkFiles(done <-chan struct{},root string)(<-chan string, <-chan error){
    paths := make(chan string)
    errc := make(chan error, 1)
    go func(){
        defer close(paths)
        // 该发送操作不需要 select 语句,因为 errc 是缓存的。
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths,errc
}
// 这里没有关闭 paths,是因为我们在 MD5All 中进行关闭。
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

func MD5All(root string) (map[string][md5.Size]byte, error) {
    done := make(chan struct{})
    defer close(done)
    paths, errc := walkFiles(done, root)
    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

来源

Go Concurrency Patterns: Pipelines and cancellation


 上一篇
数据库学习笔记 数据库学习笔记
数据库学习笔记MySQL基础知识SELECT语法结构 SELECT select_list FROM table_name; 注意事项 SQL 语法不区分大小写,但为了格式化和强调性关键字最好大写。 FROM 关键字最好另起一行,为了更好
2020-09-15
下一篇 
docker学习笔记 docker学习笔记
docker学习笔记简介Docker是一个用于开发,发布和运行应用程序的开放平台。 Docker使您能够将应用程序与基础架构分开,从而可以快速交付软件。 借助Docker,您可以以与管理应用程序相同的方式来管理基础架构。 通过利用Docke
2020-09-01
  目录