如何优雅的关闭channel

如何优雅的关闭channel

关闭channel会出现的问题

1.在没有改变 channel 的状态情况下,没有简单的方法来检查通道是否关闭。

2.关闭已经关闭的 channel 会造成 panic

3.向一个已经关闭的 channel 中,发送值会造成 panic

在已经确保通道没有值发送的情况下可以采用下面的方式进行检测

package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
    select {
    case <-ch:
        return true
    default:
    }

    return false
}

func main() {
    c := make(chan T)
    fmt.Println(IsClosed(c)) // false
    close(c)
    fmt.Println(IsClosed(c)) // true
}

以上方式不能实时检测 channel 的状态,若向已经关闭的 channel 发送值或重复关闭则会导致 panic

channel 关闭的一般原则:

1.不在接收方关闭 channel

2.如果 channel 同时有多个发送者,不要关闭 channel 。(这里可以用原子锁或 sync.WaitGroup)

暴力关闭 channel

func SafeClose(ch chan T)(closed bool){
    defer func(){
        if recover() != nil {
            log.Println("already closed")
            closed= false
        }
    }()

    close(ch)
    closed = true
    return  
}

以上方式虽然可以避免程序崩溃但是破坏了 channel 的使用规则。

解决方案

使用 sync.Once

type MyChannel struct{
    C chan T
   once  sync.Once
}

// New genarate MyChannel
func New() *MyChannel {
    return &MyChannel {
        C: make(chan T),
    }
}

func (m *MyChannel) SafeClose() {
    m.once.Do(func(){
        close(m.C)
    })
}

使用 sync.Mutex

type MyChannel struct {
    C      chan T
    closed bool
    mu  sync.Mutex
}

// New genarate MyChannel
func New() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (m *MyChannel) SafeClose() {
    m.mu.Lock()
    defer m.mu.Unlock()
    if !mc.closed {
        close(m.C)
        m.closed = true
    }
}

func (m *MyChannel) IsClosed() bool {
    m.mu.Lock()
    defer m.mu.Unlock()
    return m.closed
}

以上方式有一些小弊端,如可能会产生竞态(关闭和发送操作发生在同一时刻),尽管此类数据竞态通常不会造成任何伤害;不能用在 select 语句中进行检查 channel 的状态。

优雅的关闭 channel

情况1:多个接受者,一个发送者。

func close(){
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    // ...
    const Max = 100000
    const NumReceivers = 100

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)

    // the sender
    go func() {
        for {
            if value := rand.Intn(Max); value == 0 {
                // The only sender can close the
                // channel at any time safely.
                close(dataCh)
                break
            } else {
                dataCh <- value
            }
        }
    }()

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()

            // Receive values until dataCh is
            // closed and the value buffer queue
            // of dataCh becomes empty.
            for value := range dataCh {
                log.Println(value)
            }
        }()
    }

    wgReceivers.Wait()
}

情况2:一个接受者,多个发送者。

func close(){
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    // ...
    const Max = 100000
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)

    // ...
    dataCh := make(chan int)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel
        // dataCh, and its receivers are the
        // senders of channel dataCh.

    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                // The try-receive operation is to try
                // to exit the goroutine as early as
                // possible. For this specified example,
                // it is not essential.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first
                // branch in the second select may be
                // still not selected for some loops if
                // the send to dataCh is also unblocked.
                // But this is acceptable for this
                // example, so the first select block
                // above can be omitted.
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(Max):
                }
            }
        }()
    }

    // the receiver
    go func() {
        defer wgReceivers.Done()

        for value := range dataCh {
            if value == Max-1 {
                // The receiver of channel dataCh is
                // also the sender of stopCh. It is
                // safe to close the stop channel here.
                close(stopCh)
                return
            }

            log.Println(value)
        }
    }()

    // ...
    wgReceivers.Wait()
}

情况3:多个接受者,多个发送者。

func close(){
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    // ...
    const Max = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown
        // below, and its receivers are all senders
        // and receivers of dataCh.
    toStop := make(chan string, 1)
        // The channel toStop is used to notify the
        // moderator to close the additional signal
        // channel (stopCh). Its senders are any senders
        // and receivers of dataCh, and its receiver is
        // the moderator goroutine shown below.
        // It must be a buffered channel.

    var stoppedBy string

    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(Max)
                if value == 0 {
                    // Here, the try-send operation is
                    // to notify the moderator to close
                    // the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }

                // The try-receive operation here is to
                // try to exit the sender goroutine as
                // early as possible. Try-receive and
                // try-send select blocks are specially
                // optimized by the standard Go
                // compiler, so they are very efficient.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first
                // branch in this select block might be
                // still not selected for some loops
                // (and for ever in theory) if the send
                // to dataCh is also non-blocking. If
                // this is unacceptable, then the above
                // try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()

            for {
                // Same as the sender goroutine, the
                // try-receive operation here is to
                // try to exit the receiver goroutine
                // as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }

                // Even if stopCh is closed, the first
                // branch in this select block might be
                // still not selected for some loops
                // (and forever in theory) if the receive
                // from dataCh is also non-blocking. If
                // this is not acceptable, then the above
                // try-receive operation is essential.
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == Max-1 {
                        // Here, the same trick is
                        // used to notify the moderator
                        // to close the additional
                        // signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }

                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }

    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

上面是最复杂的关闭方式,采用了中间者来进行关闭。

情况4:一个发送者,多个接收者的变种。(需要额外的 gorutine 进行关闭)

func close(){
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    // ...
    const Max = 100000
    const NumReceivers = 100
    const NumThirdParties = 15

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)
    closing := make(chan struct{}) // signal channel
    closed := make(chan struct{})

    // The stop function can be called
    // multiple times safely.
    stop := func() {
        select {
        case closing<-struct{}{}:
            <-closed
        case <-closed:
        }
    }

    // some third-party goroutines
    for i := 0; i < NumThirdParties; i++ {
        go func() {
            r := 1 + rand.Intn(3)
            time.Sleep(time.Duration(r) * time.Second)
            stop()
        }()
    }

    // the sender
    go func() {
        defer func() {
            close(closed)
            close(dataCh)
        }()

        for {
            select{
            case <-closing: return
            default:
            }

            select{
            case <-closing: return
            case dataCh <- rand.Intn(Max):
            }
        }
    }()

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()

            for value := range dataCh {
                log.Println(value)
            }
        }()
    }

    wgReceivers.Wait()
}

情况5:多个发送者的变种。(数据通道必须关闭)

func close(){
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    // ...
    const Max = 1000000
    const NumReceivers = 10
    const NumSenders = 1000
    const NumThirdParties = 15

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int)     // will be closed
    middleCh := make(chan int)   // will never be closed
    closing := make(chan string) // signal channel
    closed := make(chan struct{})

    var stoppedBy string

    // The stop function can be called
    // multiple times safely.
    stop := func(by string) {
        select {
        case closing <- by:
            <-closed
        case <-closed:
        }
    }

    // the middle layer
    go func() {
        exit := func(v int, needSend bool) {
            close(closed)
            if needSend {
                dataCh <- v
            }
            close(dataCh)
        }

        for {
            select {
            case stoppedBy = <-closing:
                exit(0, false)
                return
            case v := <- middleCh:
                select {
                case stoppedBy = <-closing:
                    exit(v, true)
                    return
                case dataCh <- v:
                }
            }
        }
    }()

    // some third-party goroutines
    for i := 0; i < NumThirdParties; i++ {
        go func(id string) {
            r := 1 + rand.Intn(3)
            time.Sleep(time.Duration(r) * time.Second)
            stop("3rd-party#" + id)
        }(strconv.Itoa(i))
    }

    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(Max)
                if value == 0 {
                    stop("sender#" + id)
                    return
                }

                select {
                case <- closed:
                    return
                default:
                }

                select {
                case <- closed:
                    return
                case middleCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for range [NumReceivers]struct{}{} {
        go func() {
            defer wgReceivers.Done()

            for value := range dataCh {
                log.Println(value)
            }
        }()
    }

    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

这里将多个发送者转为中间层的一个发送者。


 上一篇
Go调用JS脚本 Go调用JS脚本
Go调用JS脚本Goja项目地址:goja 基本使用func main(){ vm := goja.New() val, err := vm.RunString(` 2+2 `) if err
2020-12-18
下一篇 
Tcp/Ip详解读书笔记 Tcp/Ip详解读书笔记
Tcp/IP 详解读书笔记网络分层为应用层,传输层,网络层,数据链路层,物理层。 物理层: 网线连接电脑,路由器。传递0,1的电信号! 链路层: 确定0和1的分组方式,使用以太网协议,规定使用一组电信号作为一个数据包,被曾为帧(Frame)
2020-11-02
  目录