intro

入门

for range 实现

range for slice

for range底层是通过调用for i = 0; i < len; i++来实现的。通过将元素赋值给indexvalue

特定情况下有问题。

  1. 循环取地址,所有的,取到的地址是那个临时的value的地址。
arr := [2]int{1, 2}
res := []*int{}
for _, v := range arr {
    res = append(res, &v)  
 }
fmt.Println(*res[0],*res[1])  //expect: 1 2, actual 2 2. 
  1. 循环开go协程。
var m = []int{1, 2, 3}
for i := range m {
    go func() {
        fmt.Print(i)
    }()
}
//阻塞1分钟等待所有goroutine运行完
time.Sleep(time.Millisecond) 
//结果可能是3 3 3。 原因类似

range for map

对map遍历时删除元素能遍历到么?

var m = map[int]int{1: 1, 2: 2, 3: 3} //only del key once, and not del the current iteration key
var once sync.Once 
for i := range m {
    once.Do(func() {
        for _, key := range []int{1, 2, 3} {
            if key != i {
                fmt.Printf("当前 i:%d, del key:%d\n", i, key)
                delete(m, key)
                break
             }
         }
    })
    fmt.Printf("%d%d\n", i, m[i])
}

运行结果:

当前 i:1, del key:2
11
33

上述代码once.Do保证只删除一次。

map内部实现是一个链式hash表,为保证每次无序,初始化时会随机一个遍历开始的位置, 这样,如果删除的元素开始没被遍历到,那就后边就不会出现。

对map遍历时新增元素能遍历到么?

var m = map[int]int{1:1, 2:2, 3:3}
for i, _ := range m {
    m[4] = 4
    fmt.Printf("%d%d ", i, m[i])
}

答案是:可能会

原因同上,map的hash表初始化时会随机一个遍历开始的位置

range for channel

遍历channel是最特殊的,这是由channel的实现机制决定的:

// The loop we generate:
//   for {
//           index_temp, ok_temp = <-range
//           if !ok_temp {
//                   break
//           }
//           index = index_temp
//           original body
//   }

channel遍历是依次从channel中读取数据,读取前是不知道里面有多少个元素的。如果channel中没有元素,则会阻塞等待,如果channel已被关闭,则会解除阻塞并退出循环。

注:

  • 上述注释中index_temp实际上描述是有误的,应该为value_temp,因为index对于channel是没有意义的。
  • 使用for-range遍历channel时只能获取一个返回值。

总结

  • for-range的实现实际上是C风格的for循环
  • 使用index,value接收range返回值会发生一次数据拷贝

Select实现原理

数据结构

源码包src/runtime/select.go:scase定义了表示case语句的数据结构:

type scase struct {
    c           *hchan         // channel
    kind        uint16  
    elem        unsafe.Pointer // data element
}

scase.c为当前case语句所操作的channel指针,这也说明了一个case语句只能操作一个channel。
scase.kind表示该case的类型,分为读channel、写channel和default,三种类型分别由常量定义:

  • caseRecv:case语句中尝试读取scase.c中的数据;
  • caseSend:case语句中尝试向scase.c中写入数据;
  • caseDefault: default语句

scase.elem表示缓冲区地址,跟据scase.kind不同,有不同的用途:

  • scase.kind == caseRecv : scase.elem表示读出channel的数据存放地址;
  • scase.kind == caseSend : scase.elem表示将要写入channel的数据存放地址;

Select

源码包src/runtime/select.go:selectgo()定义了select选择case的函数:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool)

函数参数:

  • cas0为scase数组的首地址,selectgo()就是从这些scase中找出一个返回。
  • order0为一个两倍cas0数组长度的buffer,保存scase随机序列pollorder和scase中channel地址序列lockorder
    • pollorder:每次selectgo执行都会把scase序列打乱,以达到随机检测case的目的。
    • lockorder:所有case语句中channel序列,以达到去重防止对channel加锁时重复加锁的目的。
  • ncases表示scase数组的长度

函数返回值:

  1. int: 选中case的编号,这个case编号跟代码一致
  2. bool: 是否成功从channle中读取了数据,如果选中的case是从channel中读数据,则该返回值表示是否读取成功。

selectgo实现伪代码如下:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    //1. 锁定scase语句中所有的channel
    //2. 按照随机顺序检测scase中的channel是否ready
    //   2.1 如果case可读,则读取channel中数据,解锁所有的channel,然后返回(case index, true)
    //   2.2 如果case可写,则将数据写入channel,解锁所有的channel,然后返回(case index, false)
    //   2.3 所有case都未ready,则解锁所有的channel,然后返回(default index, false)
    //3. 所有case都未ready,且没有default语句
    //   3.1 将当前协程加入到所有channel的等待队列
    //   3.2 当将协程转入阻塞,等待被唤醒
    //4. 唤醒后返回channel对应的case index
    //   4.1 如果是读操作,解锁所有的channel,然后返回(case index, true)
    //   4.2 如果是写操作,解锁所有的channel,然后返回(case index, false)
}

特别说明:对于读channel的case来说,如case elem, ok := <-chan1:, 如果channel有可能被其他协程关闭的情况下,一定要检测读取是否成功,因为close的channel也有可能返回,此时ok == false。

Context实现

https://zhuanlan.zhihu.com/p/68792989

Timer实现原理

https://www.cyhone.com/articles/analysis-of-golang-timer/

时间轮Timer

Timer

Timer的数据结构如下

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

Timer只有两个成员:

  • C: 管道,上层应用跟据此管道接收事件;
  • r: runtime定时器,该定时器即系统管理的定时器,对上层应用不可见;

这里应该按照层次来理解Timer数据结构,Timer.C即面向Timer用户的,Timer.r是面向底层的定时器实现,用户无法感知Timer.r的存在。

runtimeTimer

创建一个Timer实质上是把一个定时任务交给专门的协程进行监控,这个任务的载体便是runtimeTimer,简单的讲,每创建一个Timer意味着创建一个runtimeTimer变量,然后把它交给系统进行监控。我们通过设置runtimeTimer过期后的行为来达到定时的目的。

type runtimeTimer struct {
    tb uintptr                          // 存储当前定时器的数组地址
    i  int                              // 存储当前定时器的数组下标

    when   int64                        // 当前定时器触发时间
    period int64                        // 当前定时器周期触发间隔
    f      func(interface{}, uintptr)   // 定时器触发时执行的函数
    arg    interface{}                  // 定时器触发时执行函数传递的参数一
    seq    uintptr                      // 定时器触发时执行函数传递的参数二(该参数只在网络收发场景下使用)
}
  • tb: 系统底层存储runtimeTimer的数组地址;
  • i: 当前runtimeTimer在tb数组中的下标;
  • when: 定时器触发事件的时间;
  • period: 定时器周期性触发间隔(对于Timer来说,此值恒为0);
  • f: 定时器触发时执行的回调函数,回调函数接收两个参数;
  • arg: 定时器触发时执行回调函数的参数一;
  • seq: 定时器触发时执行回调函数的参数二(Timer并不使用该参数);

一个进程中的多个Timer都由一个Go底层的协程来管理,为了描述方便我们把这个协程称为系统协程。

系统协程把runtimeTimer存放在数组中,并按照when字段对所有的runtimeTimer进行堆排序,定时器触发时执行runtimeTimer中的预定义函数f,即完成了一次定时任务。

Timer对外接口

创建Timer

创建Timer实际上就是生成一个Timer,并加入系统协程管理的过程。

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)  // 创建一个管道
    t := &Timer{ // 构造Timer数据结构
        C: c,               // 新创建的管道
        r: runtimeTimer{
            when: when(d),  // 触发时间
            f:    sendTime, // 触发后执行函数sendTime
            arg:  c,        // 触发后执行函数sendTime时附带的参数
        },
    }
    startTimer(&t.r) // 此处启动定时器,只是把runtimeTimer放到系统协程的堆中,由系统协程维护
    return t
}

NewTimer()只是构造了一个Timer,然后把Timer.r通过startTimer()交给系统协程维护。

其中when()方法是计算下一次定时器触发的绝对时间,即当前时间+NewTimer()参数d。整体流程如下

用户协程NewTimer,创建Timer,并startTimer,即加入系统协程的堆中由系统协程进行管理。

等时间到期时,系统协程调用创建时给定的函数f,即sendTime函数。sendtime函数将时间写入channel,此时channel会醒来,唤醒主线线程。

停止Timer

停止Timer,只是简单的把Timer从系统协程中移除。函数主要实现如下:

func (t *Timer) Stop() bool {
    return stopTimer(&t.r)
}

stopTimer()即通知系统协程把该Timer移除,即不再监控。系统协程只是移除Timer并不会关闭管道,以避免用户协程读取错误。

系统协程监控Timer是否需要触发,Timer触发后,系统协程会删除该Timer。所以在Stop()执行时有两种情况:

  • Timer还未触发,系统协程已经删除该Timer,Stop()返回false;
  • Timer已经触发,系统协程还未删除该Timer,Stop()返回true;

综上,停止一个Timer示意图如下:

重置Timer

重置Timer时会先timer先从系统协程中删除,修改周期后重新添加到系统协程中。

func (t *Timer) Reset(d Duration) bool {
    w := when(d)
    active := stopTimer(&t.r)
    t.r.when = w
    startTimer(&t.r)
    return active
}

注意

需要注意的是,按照官方说明,Reset()应该作用于已经停掉的Timer或者已经触发的Timer,按照这个约定其返回值将总是返回false,之所以仍然保留是为了保持向前兼容,使用老版本Go编写的应用不需要因为Go升级而修改代码。

如果不按照此约定使用Reset(),有可能遇到Reset()和Timer触发同时执行的情况,此时有可能会收到两个事件,从而对应用程序造成一些负面影响,使用时一定要注意。



type timer struct {
    tb *timersBucket // the bucket the timer lives in   // 当前定时器寄存于系统timer堆的地址
    i  int           // heap index                      // 当前定时器寄存于系统timer堆的下标

    when   int64                                        // 当前定时器下次触发时间
    period int64                                        // 当前定时器周期触发间隔(如果是Timer,间隔为0,表示不重复触发)
    f      func(interface{}, uintptr)                 // 定时器触发时执行的函数
    arg    interface{}                                // 定时器触发时执行函数传递的参数一
    seq    uintptr                                     // 定时器触发时执行函数传递的参数二(该参数只在网络收发场景下使用)
}

type timersBucket struct {
    lock         mutex
    gp           *g          // 处理堆中事件的协程
    created      bool        // 事件处理协程是否已创建,默认为false,添加首个定时器时置为true
    sleeping     bool        // 事件处理协程(gp)是否在睡眠(如果t中有定时器,还未到触发的时间,那么gp会投入睡眠)
    rescheduling bool        // 事件处理协程(gp)是否已暂停(如果t中定时器均已删除,那么gp会暂停)
    sleepUntil   int64       // 事件处理协程睡眠时间
    waitnote     note        // 事件处理协程睡眠事件(据此唤醒协程)
    t            []*timer    // 定时器切片
}

timer数据结构

Timer和Ticker数据结构除名字外完全一样,二者都含有一个runtimeTimer类型的成员,这个就是系统协程所维护的对象。 runtimeTimer类型是time包的名称,在runtime包中,这个类型叫做timer

timer数据结构如下所示:

type timer struct {
    tb *timersBucket // the bucket the timer lives in   // 当前定时器寄存于系统timer堆的地址
    i  int           // heap index                      // 当前定时器寄存于系统timer堆的下标

    when   int64                                        // 当前定时器下次触发时间
    period int64                                        // 当前定时器周期触发间隔(如果是Timer,间隔为0,表示不重复触发)
    f      func(interface{}, uintptr)                 // 定时器触发时执行的函数
    arg    interface{}                                // 定时器触发时执行函数传递的参数一
    seq    uintptr                                     // 定时器触发时执行函数传递的参数二(该参数只在网络收发场景下使用)
}

其中timersBucket便是系统协程存储timer的容器,里面有个切片来存储timer,而i便是timer所在切片的下标。

存储拓扑

以Ticker为例,我们回顾一下Ticker、timer和timersBucket关系,假设我们已经创建了3个Ticker,那么它们之间的关系如下:

用户创建Ticker时会生成一个timer,这个timer指向timersBucket,timersBucket记录timer的指针。

timersBucket数据结构

我们来看一下timersBucket数据结构:

type timersBucket struct {
    lock         mutex
    gp           *g          // 处理堆中事件的协程
    created      bool        // 事件处理协程是否已创建,默认为false,添加首个定时器时置为true
    sleeping     bool        // 事件处理协程(gp)是否在睡眠(如果t中有定时器,还未到触发的时间,那么gp会投入睡眠)
    rescheduling bool        // 事件处理协程(gp)是否已暂停(如果t中定时器均已删除,那么gp会暂停)
    sleepUntil   int64       // 事件处理协程睡眠时间
    waitnote     note        // 事件处理协程睡眠事件(据此唤醒协程)
    t            []*timer    // 定时器切片
}

“Bucket”译成中文意为”桶”,顾名思义,timersBucket意为存储timer的容器。

  • lock: 互斥锁,在timer增加和删除时需要使用;
  • gp: 事件处理协程,就是我们所说的系统协程,这个协程在首次创建Timer或Ticker时生成;
  • create: 状态值,表示系统协程是否创建;
  • sleeping: 系统协程是否在睡眠;
  • rescheduling: 系统协程是否已暂停;
  • sleepUntil: 系统协程睡眠到指定的时间(如果有新的定时任务可能会提前唤醒);
  • waitnote: 提前唤醒时使用的通知;
  • t: 保存timer的切片,当调用NewTimer()或NewTicker()时便会有新的timer存到此切片中;

看到这里应该能明白,系统协程在首次创建定时器时创建,定时器存储在切片中,系统协程负责计时并维护这个切片。

timersBucket数组

通过timersBucket数据结构可以看到,系统协程负责计时并维护其中的多个timer,一个timersBucket包含一个系统协程。

当系统中定时器非常多时,一个系统协程可能处理能力跟不上,所以Go在实现时实际上提供了多个timersBucket,也就有多个系统协程来处理定时器。

最理想的情况,应该预留GOMAXPROCS个timersBucket,以便充分使用CPU资源,但需要跟据实际环境动态分配。为了实现简单,Go在实现时预留了64个timersBucket,绝大部分场景下这些足够了。

每当协程创建定时器时,使用协程所属的ProcessID%64来计算定时器存入的timersBucket。

下图三个协程创建定时器时,定时器分布如下图所示:

为描述方便,上图中3个协程均分布于3个Process中。

timerproc

timerproc为系统协程的具体实现。它是在首次创建定时器创建并启动的,一旦启动永不销毁。 如果timersBucket中有定时器,取出堆顶定时器,计算睡眠时间,然后进入睡眠,醒来后触发事件。

某个timer的事件触发后,跟据其是否是周期性定时器来决定将其删除还是修改时间后重新加入堆。

如果堆中已没有事件需要触发,则系统协程将进入暂停态,也可认为是无限时睡眠,直到有新的timer加入才会被唤醒。

timerproc处理事件的流程图如下:

Ticker

Ticker与Tiimer类似,不再赘述。

与Timer不同的是,Ticker停止时没有返回值,即不需要关注返回值,实际上返回值也没啥用途。

Ticker没有重置接口,也即Ticker创建后不能通过重置修改周期。

需要格外注意的是Ticker用完后必须主动停止,否则会产生资源泄露,会持续消耗CPU资源。

资源泄露问题

前面介绍Ticker时格外提醒不使用的Ticker需要显式的Stop(),否则会产生资源泄露。研究过timer实现机制后,可以很好的解释这个问题了。

首先,创建Ticker的协程并不负责计时,只负责从Ticker的管道中获取事件; 其次,系统协程只负责定时器计时,向管道中发送事件,并不关心上层协程如何处理事件;

如果创建了Ticker,则系统协程将持续监控该Ticker的timer,定期触发事件。如果Ticker不再使用且没有Stop(),那么系统协程负担会越来越重,最终将消耗大量的CPU资源。

疑问

  1. 默认创建64个timersBucket,极端情况下会创建出64个协程来处理事件吗?会
  2. Timer 1.20版本的新原理
  3. Timer警告reset部分是出于什么情况考虑?

Defer

  1. 延迟函数的参数在defer语句出现时就已经确定下来了
  2. 延迟函数执行按后进先出顺序执行,即先出现的defer最后执行
  3. 延迟函数可能操作主函数的具名返回值

延迟函数的参数在defer语句出现时就已经确定下来了

参数会被拷贝,如果参数是地址,则元素是最新值。

func a() {
    i := 0
    defer fmt.Println(i) // 0
    i++
    return
}

func deferFuncParameter() {
    var aArray = [3]int{1, 2, 3}
	//由于只拷贝aArray地址,并未拷贝元素。
    defer printArray(&aArray) // 10, 2, 3

    aArray[0] = 10 
    return
}

延迟函数执行按后进先出顺序执行,即先出现的defer最后执行

这个规则很好理解,定义defer类似于入栈操作,执行defer类似于出栈操作。

设计defer的初衷是简化函数返回时资源清理的动作,资源往往有依赖顺序,比如先申请A资源,再跟据A资源申请B资源,跟据B资源申请C资源,即申请顺序是:A–>B–>C,释放时往往又要反向进行。这就是把deffer设计成FIFO的原因。

每申请到一个用完需要释放的资源时,立即定义一个defer来释放资源是个很好的习惯。

延迟函数可能操作主函数的具名返回值

有一个事实必须要了解,关键字return不是一个原子操作,实际上return只代理汇编指令ret,即将跳转程序执行。比如语句return i,实际上分两步进行,即将i值存入栈中作为返回值,然后执行跳转,而defer的执行时机正是跳转前,所以说defer执行时还是有机会操作返回值的。

举个实际的例子进行说明这个过程:

func deferFuncReturn() (result int) {
    i := 1

    defer func() {
       result++
    }()

    return i
}

该函数的return语句可以拆分成下面两行:

result = i
return

而延迟函数的执行正是在return之前,即加入defer后的执行过程如下:

result = i
result++
return

所以上面函数实际返回i++值。

关于主函数有不同的返回方式,但返回机制就如上机介绍所说,只要把return语句拆开都可以很好的理解,下面分别举例说明

分多重情况进行分析

  1. 主函数拥有匿名返回值,返回字面值
  2. 主函数拥有匿名返回值,返回变量
  3. 主函数拥有具名返回值

主函数含有匿名返回值,返回字面值

一个主函数拥有一个匿名的返回值,返回时使用字面值,比如返回”1”、”2”、”Hello”这样的值,这种情况下defer语句是无法操作返回值的。

一个返回字面值的函数,如下所示:

func foo() int {
    var i int

    defer func() {
        i++
    }()

    return 1
}

上面的return语句,直接把1写入栈中作为返回值,延迟函数无法操作该返回值,所以就无法影响返回值。

主函数拥有匿名返回值,返回变量

一个主函数拥有一个匿名的返回值,返回使用本地或全局变量,这种情况下defer语句可以引用到返回值,但不会改变返回值。

一个返回本地变量的函数,如下所示:

func foo() int {
    var i int

    defer func() {
        i++
    }()

    return i
}

上面的函数,返回一个局部变量,同时defer函数也会操作这个局部变量。对于匿名返回值来说,可以假定仍然有一个变量存储返回值,假定返回值变量为”anony”,上面的返回语句可以拆分成以下过程:

anony = i
i++
return

由于i是整型,会将值拷贝给anony,所以defer语句中修改i值,对函数返回值不造成影响。

主函数拥有具名返回值

主函声明语句中带名字的返回值,会被初始化成一个局部变量,函数内部可以像使用局部变量一样使用该返回值。如果defer语句操作该返回值,可能会改变返回结果。

一个影响函返回值的例子:

func foo() (ret int) {
    defer func() {
        ret++
    }()

    return 0
}

上面的函数拆解出来,如下所示:

ret = 0
ret++
return

函数真正返回前,在defer中对返回值做了+1操作,所以函数最终返回1。

defer实现原理

本节我们尝试了解一些defer的实现机制。

defer数据结构

源码包src/src/runtime/runtime2.go:_defer定义了defer的数据结构:

type _defer struct {
    sp      uintptr   //函数栈指针
    pc      uintptr   //程序计数器
    fn      *funcval  //函数地址
    link    *_defer   //指向自身结构的指针,用于链接多个defer
}

我们知道defer后面一定要接一个函数的,所以defer的数据结构跟一般函数类似,也有栈地址、程序计数器、函数地址等等。

与函数不同的一点是它含有一个指针,可用于指向另一个defer,每个goroutine数据结构中实际上也有一个defer指针,该指针指向一个defer的单链表,每次声明一个defer时就将defer插入到单链表表头,每次执行defer时就从单链表表头取出一个defer执行。

下图展示多个defer被链接的过程:

从上图可以看到,新声明的defer总是添加到链表头部。

函数返回前执行defer则是从链表首部依次取出执行,不再赘述。

一个goroutine可能连续调用多个函数,defer添加过程跟上述流程一致,进入函数时添加defer,离开函数时取出defer,所以即便调用多个函数,也总是能保证defer是按FIFO方式执行的。

4.2 defer的创建和执行

源码包src/runtime/panic.go定义了两个方法分别用于创建defer和执行defer。

  • deferproc(): 在声明defer处调用,其将defer函数存入goroutine的链表中;
  • deferreturn():在return指令,准确的讲是在ret指令前调用,其将defer从goroutine链表中取出并执行。

可以简单这么理解,在编译在阶段,声明defer处插入了函数deferproc(),在函数return前插入了函数deferreturn()。

总结

  • defer定义的延迟函数参数在defer语句出时就已经确定下来了
  • defer定义顺序与实际执行顺序相反
  • return不是原子操作,执行过程是: 保存返回值(若有)–>执行defer(若有)–>执行ret跳转
  • 申请资源后立即使用defer关闭资源是好习惯

Mutex实现原理

type Mutex struct {
	state int32
	sema  uint32
}

Go中的互斥锁是使用信号量实现的。由两个字段 statesema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被从唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;

正常模式与饥饿模式

sync.Mutex 有两种模式 — 正常模式和饥饿模式。

在刚开始的时候,是处于正常模式(Barging),也就是,当一个G1持有着一个锁的时候,G2会自旋的去尝试获取这个锁

自旋超过4次还没有能获取到锁的时候,这个G2就会被加入到获取锁的等待队列里面,并阻塞等待唤醒。

正常来说,获取的锁的顺序是按照FIFO先入先出的顺序。

但唤醒的goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 竞争锁。新请求锁的 goroutine 具有优势:它正在 CPU 上执行,而且可能有好几个,所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败,长时间获取不到锁,此时就会进入饥饿状态。这就是引入饥饿模式的原因,防止等待队列中的goroutine多次与正在运行的goroutine竞争失败。

那么也不可能说永远的保持一个饥饿的状态,总归会有吃饱的时候,也就是总有那么一刻Mutex会回归到正常模式,那么回归正常模式必须具备的条件有以下几种:

  1. G的执行时间小于1ms
  2. 等待队列已经全部清空了

当满足上述两个条件的任意一个的时候,Mutex会切换回正常模式。

Lock

  1. 首先假设当前mutex是无竞争、无等待的情况,尝试原子加锁。成功表示加上了锁。对应就是最简单、也可能是最常见的情况,只有一个goruntine在加锁。
  2. 不成功则进入慢速加锁。
    1. 尝试自旋、避免频繁的协程上下文切换。自旋4次。
    2. 自旋4次也拿不到锁,用信号量控制并发,进入睡眠。
    3. 等待唤醒后拿到锁。

当然这个流程中还有设置唤醒和饥饿标记、重置自旋次数并重新获取锁的一些步骤。

Unlock

Go的Metex是由semaphore实现的,

  1. 那么posix库中的pmutex也是sem实现的吗?
  2. C++的thread库中的mutex也是sem实现的吗?从封装posix库而来呢?

todo

  • 第一类对象是指什么?

参考


intro
https://messenger1th.github.io/2024/07/24/Golang/intro/
作者
Epoch
发布于
2024年7月24日
许可协议