本文最后更新于:2023年12月10日 晚上

1、作用

context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等,

context的数据结构一棵倒排树,具体会在下文源码走读中剖析。
image

2、代码结构

context包的核心是context interface,及实现了context接口的emptyCtx、valueCtx、cancelCtx、timerCtx。
image

2.1、Context interface

Context 是一个接口,定义了 4 个方法,它们都是幂等的。也就是说连续多次调用同一个方法,得到的结果都是相同的。

type Context interface {
        // 返回一个 channel,可以表示 context 被取消的信号:当这个 channel 被关闭时,
        // 说明 context 被取消了。注意,这是一个只读的channel,读一个关闭的 channel 会读出相应类型的零值。
        // 并且源码里没有地方会向这个 channel 里面塞入值。换句话说,这是一个 receive-only 的 channel。
        // 因此在子协程里读这个 channel,除非被关闭,否则读不出来任何东西。
        // 也正是利用了这一点,子协程从 channel 里读出了值(零值)后,就可以做一些收尾工作,尽快退出。
        Done() <-chan struct{}

        // 在 channel Done 关闭后,返回 context 取消原因
        Err() error
        
        // 返回 context 是否会被取消以及自动取消时间(即 deadline)
        Deadline() (deadline time.Time, ok bool)

        // 获取 key 对应的 value
        Value(key interface{}) interface{}
}

2.2、emptyCtx

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key any) any {
    return 
}
  • emptyCtx 是一个空的 context,类型为一个整型;
  • Deadline 方法会返回一个公元元年时间以及 false 的 flag,标识当前 context 不存在过期时间;
  • Done 方法返回一个 nil 值,用户无论往 nil 中写入或者读取数据,均会陷入阻塞;
  • Err 方法返回的错误为 nil;
  • Value 方法返回的 value 为 nil.

2.2.1、Background & TODO

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

func Background() Context {
    return background
}

func TODO() Context {
    return todo
}

Background 和TODO是一个空的 context, context.Background() 和 context.TODO() 方法返回的均是 emptyCtx 类型的一个实例。

2.3、context.WithValue() & valueCtx

context.WithValue() 用于基于传入的ctx作为parent ctx构造一个valueCtx

func WithValue(parent Context, key, val any) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val any
}

valueCtx继承了Context interface,并实现了Stringer 接口

从源码的value方法,可以发现context取值的查找方向是往上走的,且一般最终会找到的父ctx是emptyCtx,并调用emptyCtx的Value方法返回nil。所以,父节点没法获取子节点存储的值,子节点却可以获取父节点的值。

// 直接打印ctx 出现的.WithValue前缀,就是因为valueCtx重写了stringer接口
func (c *valueCtx) String() string {
	return contextName(c.Context) + ".WithValue(type " +
		reflectlite.TypeOf(c.key).String() +
		", val " + stringify(c.val) + ")"
}

func (c *valueCtx) Value(key any) any {
        // 假如当前 valueCtx 的 key 等于用户传入的 key,则直接返回其 value;
	if c.key == key {
		return c.val
	}
        // 否则从 parent context 中依次向上寻找
	return value(c.Context, key)
}

func value(c Context, key any) any {
   for {
      switch ctx := c.(type) {
      // valueCtx 正常递归向上查找
      case *valueCtx:
         if key == ctx.key {
            return ctx.val
         }
         c = ctx.Context
     // cancelCtx timerCtx 则需要先判断key是否是cancelCtxKey
     // 用于cancel的内部机制
      case *cancelCtx:
         if key == &cancelCtxKey {
            return c
         }
         c = ctx.Context
      case *timerCtx:
         if key == &cancelCtxKey {
            return &ctx.cancelCtx
         }
         c = ctx.Context
      case *emptyCtx:
         return nil
      default:
         return c.Value(key)
      }
   }
}

2.4、context.WithCancel & cancelCtx

withCancel 方法返回了一个cancelCtx 和一个cancel闭包函数

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

2.4.1、cancelCtx

先看cancelCtx ,同样继承了Context 接口,并实现了canceler 接口

type cancelCtx struct {
    Context
   
    mu       sync.Mutex            // protects following fields
    done     atomic.Value          // 实际类型为 chan struct{},即用以反映 cancelCtx 生命周期的通道;
    children map[canceler]struct{} // 指向 cancelCtx 的所有子 context,子ctx只需要实现Done和cancel方法,因此重新定义了canceler接口,方便管理;
    err      error                 // set to non-nil by the first cancel call
}

type canceler interface {
    cancel(removeFromParent bool, err error)
    Done() <-chan struct{}
}

2.4.2、cancelCtx.Done()

返回的是无缓冲区只读channel,用于配合select监听该ctx 的取消信号

func (c *cancelCtx) Done() <-chan struct{} {
    //  基于 atomic 包,读取 cancelCtx 中的 chan;倘若已存在,则直接返回;
    d := c.done.Load()
    if d != nil {
        return d.(chan struct{})
    } 
    // 上锁
    c.mu.Lock()
    defer c.mu.Unlock()
    // 再次判断 chan是否创建(double check)
    d = c.done.Load()
    if d == nil {
        // 创建并返回chan(懒汉式,只有调用Done方法才会实际创建该channel)
        d = make(chan struct{})
        c.done.Store(d)
    }
    return d.(chan struct{})
}

2.4.3、cancelCtx.cancel()

// closedchan is a reusable closed channel.
var closedchan = make(chan struct{})

func init() {
	close(closedchan)
}

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	// 必须要传 err
        if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
        // 处理 cancelCtx 的 channel,若 channel 此前未初始化,则直接注入一个 closedChan,
        // 否则关闭该 channel;
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		c.done.Store(closedchan)
	} else {
		close(d)
	}
        // 循环调用所用子ctx的cancel方法,通知取消
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
        // 移除child ctx
	c.children = nil
	c.mu.Unlock()
        // 判断是否需要将自身从父ctx的children map中删除
	if removeFromParent {
		removeChild(c.Context, c)
	}
}

func removeChild(parent Context, child canceler) {
    // 如果 parent 不是 cancelCtx,直接返回(因为只有 cancelCtx 才有 children set) 
    p, ok := parentCancelCtx(parent)
    if !ok {
        return
    }
    p.mu.Lock()
    if p.children != nil {
        // 从 parent 的 children set 中删除对应 child
        delete(p.children, child)
    }
    p.mu.Unlock()
}

2.4.4、WithCancel()

var Canceled = errors.New("context canceled")

type CancelFunc func()

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {   
    //  校验父 context 非空;
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    // 注入父 context 构造好一个新的 cancelCtx;
    c := newCancelCtx(parent)
    
    // 在 propagateCancel 方法内启动一个守护协程,以保证父 context 终止时,该 cancelCtx 也会被终止;、
    // 因为父ctx不一定是cancelCtx
    propagateCancel(parent, &c)
    
    // 将 cancelCtx 返回,连带返回一个用以终止该 cancelCtx 的闭包函数
    return &c, func() { c.cancel(true, Canceled) }
}

2.4.5、propagateCancel()

传递父子 context 之间的 cancel 事件

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // parent is never canceled(不可取消类型)
	}

	select {
	case <-done: // 从parent 的done channel中尝试读数据,如果未取消,goroutine会阻塞在这
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}
   
	if p, ok := parentCancelCtx(parent); ok { // parentCtx是cancelCtx类型
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			child.cancel(false, p.err)
		} else {
                        // 将自身注册进parent的children中,依靠parent的cancel机制取消
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
                // parent是用户实现的canceler 接口,额外启动一个goroutine去处理该ctx的取消回收
                // 非必要情况避免自己实现canceler,会有额外的goroutine开销
		atomic.AddInt32(&goroutines, +1)
		go func() {
			select {
			case <-parent.Done(): // parent退出,透传parent err 并cancel child
				child.cancel(false, parent.Err())
			case <-child.Done(): // 自身被cancel时,可以退出该goroutine,避免内存泄露
			}
		}()
	}
}

2.4.6、parentCancelCtx()

向上获取一个最近的cancelCtx

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
   done := parent.Done()
   // parent 的 channel 已关闭或者是不会被 cancel 的类型,则返回 false
   if done == closedchan || done == nil {
      return nil, false
   }
   
   // 若以特定的 cancelCtxKey 从 parent 中取值,取得的 value 是 parent 本身,则返回 true. 
   //(基于 cancelCtxKey 为 key 取值时返回 cancelCtx 自身,是 cancelCtx 特有的协议)
   // 作用是向上追溯到一个最近的cancelCtx,将child添加到它的children map中,
   // 从而少启动一个goroutine协程
   p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
   if !ok {
      return nil, false
   }
   pdone, _ := p.done.Load().(chan struct{})
   if pdone != done {
      return nil, false
   }
   return p, true
}

var cancelCtxKey int

func (c *cancelCtx) Value(key any) any {
   if key == &cancelCtxKey { // cancelCtxKey的指针 可以作为特殊标识符,因为它不会被外部修改
      return c
   }
   return value(c.Context, key)
}

关于cancel函数的 removeFromParent参数,分两种情况:

1、在context包内部 propagateCancel 调用时,传入为false,因为该方法是由parent cancel而发起的,children 只需负责将自身和自身的ctx取消即可,不需要再重复的将自身从parent中的children map中删除,否则会有同时遍历和删除一个 map 的问题

2、在WithCancel 返回的闭包cancel方法中,传入的是true。因为自身cancel,需要把自己从parent children map中移除(parent还未cancel)

2.5、timeCtx & context.WithTimeout & context.WithDeadline

timerCtx 基于 cancelCtx,只是多了一个 time.Timer 和一个 deadline。Timer 会在 deadline 到来时,自动取消 context。

type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.
    deadline time.Time
}

2.5.1、timerCtx.Deadline()

仅timerCtx实现了Deadline,返回过期时间

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
}

2.5.2、timerCtx.cancel()

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    // 复用继承的 cancelCtx 的 cancel 能力,进行 cancel 处理;
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
        // 从父节点中删除子节点
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        // 关掉定时器,这样在deadline 到来时,不会再次取消
        c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()
}

2.5.3、context.WithTimeout & context.WithDeadline

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

// Before reports whether the time instant t is before u.
func (t Time) Before(u Time) bool {
	if t.wall&u.wall&hasMonotonic != 0 {
		return t.ext < u.ext
	}
	ts := t.sec()
	us := u.sec()
	return ts < us || ts == us && t.nsec() < u.nsec()
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
                // 如果parent的deadline早于本ctx指定的deadline时间,则直接构造一个cancelCtx
                // 当parent超时时,直接调用cancel取消本ctx,不必在校验本ctx的超时逻辑
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
        // 传递cancel事件,并注册进parent的children map
	propagateCancel(parent, c)
	dur := time.Until(d)
	if dur <= 0 {
                // 已经到期,取消ctx,err为DeadlineExceeded
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
                // c.timer 会在 d 时间间隔后,自动调用 cancel 函数,
                // 并且传入的错误就是 DeadlineExceeded:超时错误
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

3、例子

最后举例一个实现一个canceler interface,使上文2.4.5说的propagateCancel 方法进入else分支,创建额外goroutine来处理ctx cancel的情况:

在propagateCancel else分支处打断点即可

package context

import (
	"context"
	"sync"
	"testing"
)

type tachiuCancelCtx struct {
	context.Context               // 嵌入内置的 Context 接口
	done            chan struct{} // 自定义的 done 通道
	cancelOnce      sync.Once     // 保证取消操作只执行一次的 sync.Once
}

func (m *tachiuCancelCtx) Done() <-chan struct{} {
	return m.done
}

func (m *tachiuCancelCtx) Err() error {
	select {
	case <-m.done:
		return context.Canceled
	default:
		return nil
	}
}

func (m *tachiuCancelCtx) Value(key interface{}) interface{} {
	return nil
}

func (m *tachiuCancelCtx) cancel() {
	m.cancelOnce.Do(func() {
		close(m.done)
	})
}

func WithMyCancel(parent context.Context) (context.Context, context.CancelFunc) {
	ctx := &tachiuCancelCtx{
		Context: parent,
		done:    make(chan struct{}),
	}
	return ctx, ctx.cancel
}

func TestContext(t *testing.T) {
	parent := context.Background()

	// 使用自定义的 WithMyCancel 函数创建一个新的上下文
	ctx, _ := WithMyCancel(parent)

	childCtx, _ := context.WithCancel(ctx)
	t.Log(childCtx)
}

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!

golang对unicode的读写方式 下一篇