Go并发模式:Context
应用场景
在 Go 服务器中,每一个传入请求都在一个单独的协程中进行处理。如果在该请求中需要处理其他耗时任务,或其他服务请求,例如数据库操作、RPC 服务;一般都会启动另外的协程去执行这些操作,此刻若传入请求被关闭或者超时,所有工作在该请求的其他协程都应该立刻推出,并且由系统释放它们使用的资源。在一些简单的场景下可以自己使用 channel 实现,但是在一些复杂的场景下,要想实现一种较好的模式代价太高,所以此时可以通过 1.7 版本引入标准库的 context 包去控制它们能正确退出和释放资源。
Context结构
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
Done() <-chan struct{}
// If Done is not yet closed, Err returns nil.
// If Done is closed, Err returns a non-nil error explaining why:
// Canceled if the context was canceled
// or DeadlineExceeded if the context's deadline passed.
// After Err returns a non-nil error, successive calls to Err return the same error.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
Value(key interface{}) interface{}
}
Context 是一个接口类型,其中包括了4个方法分别为 Deadline(最后期限[时间点]),Done(关闭信号),Err(关闭原因)和 Value(context 传递中保存的值,如 session)。
使用规则:
- 在各个不同的程序包之间保证接口一致性,可以用静态分析工具(go vet)追踪传播链路。
- 不再结构体类型中存储,显示声明函数需要 Context 类型。
- Context 应该作为第一个参数,命名为
ctx
。 - 不能传递
nil
Context,不确定使用哪个context , 使用context.TODO
。 context value
用于传递过程和API
的请求范围数据,而不用于将可选参数传递给函数。- context 是并发安全的。
func DoSomething(ctx context.Context, arg Arg) error {
// use ctx ...
}
可以通过 context.Background()
方法创建使用在主函数,init
函数和测试中,通常作为根节点 context。还可以通过 context.TODO()
方法创建,在不确定使用哪个 context 时。
Context使用
在 context 包中提供了以下4中方法,让我们使用:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key interface{}, val interface{}) Context
WithCancel
方法返回一个可以取消的 context ,当需要取消时调用 cancel 方法。WithTimeout
和 WithDeadline
方法可以设置超时 context 自动取消,也可以手动关闭。 WithValue
方法可以设置在多个 goroutine
中使用的值。
示例代码:
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-time.After(2 * time.Second):
fmt.Println("timeout")
case <-ctx.Done():
fmt.Println("end")
}
fmt.Println("return")
}()
go func() {
time.Sleep(time.Second)
cancel()
}()
select {
case <-ctx.Done():
fmt.Println("main exit")
}
}
Context 源码分析和细节说明
在 context 包中定义了两个错误类型
Canceled
表示当 context 已经关闭时,调用了Context.Err
方法返回的错误。DeadlineExceeded
表示当超过 context 的截至时间,调用了Context.Err
方法返回的错误。
emptyCtx
是实现了Context 接口的不能关闭,没有值,没有截止时间的类型。它不是一个结构体,因为这种类型的变量不能是同一种类型。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time,ok bool){
return
}
func (*emptyCtx) Done() <-chan struct{}{
return nil
}
func (*emptyCtx) Err() error{
return nil
}
func (*emptyCtx) Value(key interface{}) interface{}{
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
context.Backgroud
方法返回的是 context 包中的 backgroud
,它通常用在 main 函数,初始化,测试和传入请求的顶层 Context 即跟节点。
context.TODO
方法返回的是 context 包中的 todo
,它通常是在不确定使用哪种 Context 或不可用时。
注:because the surrounding function has not yet been extended to accept a Context parameter
上面是官方对
context.TODO
在不可用时的说明,暂时还没有理解到什么意思。
CancelFunc
是一个类型,用于结束工作,它不会等待工作完成在调用。若连续调用,不再生效。
type CancelFunc func()
closedchan
是一个可重用的已经关闭的 channel。
var closedchan = make(chan struct{})
func init() {
close(closedchan)
}
canceler
是一个接口,是一个 context 类型能够被直接关闭,*cancelCtx
和 *timerCtx
实现了该接口。
type canceler interface{
cancel(removeFromParent bool, err error)
Done()<-chan struct{}
}
propagateCancel
方法的作用是当父 context 被关闭时它的子 context 同时被取消。
func propagateCancel(parent Context, child canceler){
if parent.Done() == nil {
return // 父节点不能被关闭(当 context 为 backgroud 和 todo 时)
}
if p, ok := parentCancelCtx(parent); ok { // 找到为 *cancelCtx 类型的父 context
p.mu.Lock()
if p.err != nil {
child.cancel(false,p.err)
}else{
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
} else {
go func(){
select {
case <-parent.Done():
child.cancel(false.parent.Err())
case <- child.Done():
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
cancelCtx
用来关闭 context ,实现了 canceler
接口的子 context 也会被关闭 。它嵌入了 Context 接口,实现了 Done
和 Err
方法有一个互斥锁用于保护 done,children,err 字段。
type cancelCtx struct{
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
timerCtx
嵌入了 cancelCtx
、timer
计时器 、deadline
截止时间。
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, time.Until(c.deadline))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
valueCtx
携带了一个键值对,实现了 Context 的 Value 方法。
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
从整个 context 包中的实现过程来看,通过 cancelCtx
基础结构衍生出timerCtx
和 valueCtx
结构。通过 cancelFunc
方法关闭当前 context 以及它的子 context(如果存在)。对整个 context 树结构进行控制,通过 context.Backgroud
作为跟节点,通过 WithCancel
、WithDeadline
、 WithTimeout
、WithValue
派生子 context ,然后在调用链上传递 context 以达到控制当前工作何时结束以及正确释放资源。
其中重点在于其 context.Done
方法,即何时收到关闭信号。用法如下
func DoSomething(ctx context.Context, arg Arg) error {
select {
case <-ctx.Done():
fmt.Println("context is canceled")
case <-time.Tick(5*time.Second):
fmt.Println("time out")
}
}
以上是自己总结的,其他的细节说明不想写了直接贴参考链接:
https://blog.golang.org/context
https://blog.golang.org/pipelines
https://www.sohamkamani.com/golang/2018-06-17-golang-using-context-cancellation/