Go并发模式:Context

Go并发模式:Context

应用场景

在 Go 服务器中,每一个传入请求都在一个单独的协程中进行处理。如果在该请求中需要处理其他耗时任务,或其他服务请求,例如数据库操作、RPC 服务;一般都会启动另外的协程去执行这些操作,此刻若传入请求被关闭或者超时,所有工作在该请求的其他协程都应该立刻推出,并且由系统释放它们使用的资源。在一些简单的场景下可以自己使用 channel 实现,但是在一些复杂的场景下,要想实现一种较好的模式代价太高,所以此时可以通过 1.7 版本引入标准库的 context 包去控制它们能正确退出和释放资源。

Context结构

// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
    // Deadline returns the time when work done on behalf of this context
    // should be canceled. Deadline returns ok==false when no deadline is
    // set. Successive calls to Deadline return the same results.
    Deadline() (deadline time.Time, ok bool)

    // Done returns a channel that's closed when work done on behalf of this
    // context should be canceled. Done may return nil if this context can
    // never be canceled. Successive calls to Done return the same value.
    //
    Done() <-chan struct{}

    // If Done is not yet closed, Err returns nil.
    // If Done is closed, Err returns a non-nil error explaining why:
    // Canceled if the context was canceled
    // or DeadlineExceeded if the context's deadline passed.
    // After Err returns a non-nil error, successive calls to Err return the same error.
    Err() error

    // Value returns the value associated with this context for key, or nil
    // if no value is associated with key. Successive calls to Value with
    // the same key returns the same result.
    //
    Value(key interface{}) interface{}
}

Context 是一个接口类型,其中包括了4个方法分别为 Deadline(最后期限[时间点]),Done(关闭信号),Err(关闭原因)和 Value(context 传递中保存的值,如 session)。

使用规则:

  • 在各个不同的程序包之间保证接口一致性,可以用静态分析工具(go vet)追踪传播链路。
  • 不再结构体类型中存储,显示声明函数需要 Context 类型。
  • Context 应该作为第一个参数,命名为 ctx
  • 不能传递 nil Context,不确定使用哪个context , 使用 context.TODO
  • context value用于传递过程和 API 的请求范围数据,而不用于将可选参数传递给函数。
  • context 是并发安全的。
func DoSomething(ctx context.Context, arg Arg) error {
    // use ctx ...
}

可以通过 context.Background() 方法创建使用在主函数,init 函数和测试中,通常作为根节点 context。还可以通过 context.TODO() 方法创建,在不确定使用哪个 context 时。

Context使用

在 context 包中提供了以下4中方法,让我们使用:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key interface{}, val interface{}) Context

WithCancel 方法返回一个可以取消的 context ,当需要取消时调用 cancel 方法。WithTimeoutWithDeadline 方法可以设置超时 context 自动取消,也可以手动关闭。 WithValue 方法可以设置在多个 goroutine 中使用的值。

示例代码:

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        select {
        case <-time.After(2 * time.Second):
            fmt.Println("timeout")
        case <-ctx.Done():
            fmt.Println("end")
        }
        fmt.Println("return")
    }()

    go func() {
        time.Sleep(time.Second)
        cancel()
    }()

    select {
    case <-ctx.Done():
        fmt.Println("main exit")
    }
}

Context 源码分析和细节说明

在 context 包中定义了两个错误类型

  • Canceled 表示当 context 已经关闭时,调用了 Context.Err 方法返回的错误。
  • DeadlineExceeded 表示当超过 context 的截至时间,调用了 Context.Err 方法返回的错误。

emptyCtx 是实现了Context 接口的不能关闭,没有值,没有截止时间的类型。它不是一个结构体,因为这种类型的变量不能是同一种类型。

type emptyCtx int 

func (*emptyCtx) Deadline() (deadline time.Time,ok bool){
    return 
}

func (*emptyCtx) Done() <-chan struct{}{
    return nil
}

func (*emptyCtx) Err() error{
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{}{
    return nil
}

func (e *emptyCtx) String() string {
    switch e {
    case background:
        return "context.Background"
    case todo:
        return "context.TODO"
    }
    return "unknown empty Context"
}

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

context.Backgroud 方法返回的是 context 包中的 backgroud,它通常用在 main 函数,初始化,测试和传入请求的顶层 Context 即跟节点。

context.TODO 方法返回的是 context 包中的 todo,它通常是在不确定使用哪种 Context 或不可用时。

注:because the surrounding function has not yet been extended to accept a Context parameter

上面是官方对context.TODO 在不可用时的说明,暂时还没有理解到什么意思。

CancelFunc 是一个类型,用于结束工作,它不会等待工作完成在调用。若连续调用,不再生效。

type CancelFunc func()

closedchan 是一个可重用的已经关闭的 channel。

var closedchan = make(chan struct{})

func init() {
    close(closedchan)    
}

canceler 是一个接口,是一个 context 类型能够被直接关闭,*cancelCtx*timerCtx 实现了该接口。

type canceler interface{
    cancel(removeFromParent bool, err error)
    Done()<-chan struct{}
}

propagateCancel 方法的作用是当父 context 被关闭时它的子 context 同时被取消。

func propagateCancel(parent Context, child canceler){
    if parent.Done() == nil {
        return // 父节点不能被关闭(当 context 为 backgroud 和 todo 时)
    }
    if p, ok := parentCancelCtx(parent); ok { // 找到为 *cancelCtx 类型的父 context 
        p.mu.Lock()
        if p.err != nil {
            child.cancel(false,p.err)
        }else{
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            p.children[child] = struct{}{}
        }
    } else {
        go func(){
            select {
                case <-parent.Done():
                child.cancel(false.parent.Err())
                case <- child.Done():
            }
        }()
    }
}


// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    for {
        switch c := parent.(type) {
        case *cancelCtx:
            return c, true
        case *timerCtx:
            return &c.cancelCtx, true
        case *valueCtx:
            parent = c.Context
        default:
            return nil, false
        }
    }
}

cancelCtx 用来关闭 context ,实现了 canceler 接口的子 context 也会被关闭 。它嵌入了 Context 接口,实现了 DoneErr 方法有一个互斥锁用于保护 done,children,err 字段。

type cancelCtx struct{
    Context 

    mu sync.Mutex // protects following fields 
    done chan struct{} // created lazily, closed by first cancel call 
    children map[canceler]struct{} // set to nil by the first cancel call
    err error // set to non-nil by the first cancel call
}

func (c *cancelCtx) Done() <-chan struct{} {
    c.mu.Lock()
    if c.done == nil {
        c.done = make(chan struct{})
    }
    d := c.done
    c.mu.Unlock()
    return d
}

func (c *cancelCtx) Err() error {
    c.mu.Lock()
    err := c.err
    c.mu.Unlock()
    return err
}

func (c *cancelCtx) String() string {
    return fmt.Sprintf("%v.WithCancel", c.Context)
}

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return // already canceled
    }
    c.err = err
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)
    }
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
        removeChild(c.Context, c)
    }
}

timerCtx 嵌入了 cancelCtxtimer 计时器 、deadline 截止时间。

type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
}

func (c *timerCtx) String() string {
    return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, time.Until(c.deadline))
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
        // Remove this timerCtx from its parent cancelCtx's children.
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()
}

valueCtx 携带了一个键值对,实现了 Context 的 Value 方法。

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
    Context
    key, val interface{}
}

func (c *valueCtx) String() string {
    return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
        return c.val
    }
    return c.Context.Value(key)
}

从整个 context 包中的实现过程来看,通过 cancelCtx 基础结构衍生出timerCtxvalueCtx 结构。通过 cancelFunc 方法关闭当前 context 以及它的子 context(如果存在)。对整个 context 树结构进行控制,通过 context.Backgroud 作为跟节点,通过 WithCancelWithDeadlineWithTimeoutWithValue 派生子 context ,然后在调用链上传递 context 以达到控制当前工作何时结束以及正确释放资源。

其中重点在于其 context.Done 方法,即何时收到关闭信号。用法如下

func DoSomething(ctx context.Context, arg Arg) error {
    select {
        case <-ctx.Done(): 
        fmt.Println("context is canceled")
        case <-time.Tick(5*time.Second):
        fmt.Println("time out")
    }
}

以上是自己总结的,其他的细节说明不想写了直接贴参考链接:

上下文 Context

https://blog.golang.org/context

https://blog.golang.org/pipelines

https://www.sohamkamani.com/golang/2018-06-17-golang-using-context-cancellation/


 上一篇
基础知识记录 基础知识记录
基础知识记录Hash 函数hash 也称散列,哈希。基本原理是将任意长度的输入,通过 hash 函数变成固定长度的输出。原始数据映射后的二进制串就是哈希值 Hash 表hash 表是一个存储键值映射的数据结构,它的读写效率在装载因子在正常水
下一篇 
高性能MySQL读书笔记 高性能MySQL读书笔记
高性能MySQL读书笔记Chapter1MySQL的架构分为三层:第一层是客户端,通过网络连接服务端获取对应的数据。第二层是它的核心架构设计包括解析器,查询缓存,优化器。第三层是存储引擎,引擎之间不会互相通信,通过 API 接口向上层暴露端
2020-07-23
  目录