Golang不可重入函数


在同一时间点只会被运行一次的函数,我们称其为不可重入函数。本文介绍Go语言不可重入函数的实现方式。

使用场景

某个服务需要在多个goroutine中完成同一个任务,而这个任务中某一个步骤涉及goroutine之间共享的资源,如果我们不对这个过程做特殊处理,则可能会出现一些问题,比如下面的代码。

package main
  
import (
    "fmt"
    "time"
    "sync"
)

func main() {
    wg := sync.WaitGroup{}
    wg.Add(3)
    for i,_ := range []int{1,2,3} {
        index := i
        go func() {
            fmt.Printf("Goroutine %d do something...\n", index)
            time.Sleep(2 * time.Second)
            fmt.Printf("Goroutine %d end...\n", index)
            wg.Done()
        }()
    }
    wg.Wait()
}

我们启动了三个goroutine,执行后会发现三个goroutine同时执行,共同等待2s之后一同退出。

go run main.go 
Goroutine 2 do something...
Goroutine 0 do something...
Goroutine 1 do something...
Goroutine 0 end...
Goroutine 2 end...
Goroutine 1 end...

阻塞不可重用

一般来说,我们可以使用锁来在解决不同goroutine之间抢占资源的问题。Go语言中的sync.Mutex即可以用来实现锁,使用方式如下:

package main                                                                                 
                                                                                             
import (                                                                                     
    "fmt"                                                                                    
    "time"                                                                                   
    "sync"                                                                                   
)                                                                                            
                                                                                             
func main() {                                                                                
    var mu sync.Mutex
    wg := sync.WaitGroup{}                                                                   
    wg.Add(3)
    for i,_ := range []int{1,2,3} {                                                          
        index := i
        go func() {
            mu.Lock()
            defer mu.Unlock()
            fmt.Printf("Goroutine %d do something...\n", index)                              
            time.Sleep(2 * time.Second)                                                      
            fmt.Printf("Goroutine %d end...\n", index)                                       
            wg.Done()                                                                        
        }()                                                                                  
    }
    wg.Wait()
}

执行过程与前面的例子大不相同,没有拿到锁的goroutine会阻塞,直到获取到锁后则继续运行。所以我们看到的是三个goroutine依次完成自己的运行过程。

go run main.go 
Goroutine 2 do something...
Goroutine 2 end...
Goroutine 0 do something...
Goroutine 0 end...
Goroutine 1 do something...
Goroutine 1 end...

这种方式在很多场景是可以满足需求的,但有时我们不希望goroutine阻塞等待,而是直接返回,继续做其他工作,那么这种方式就不适用了。

非阻塞不可重用

我们知道使用mutex的话是会使goroutine阻塞的,那么有没有什么办法能让获取mutex的过程不阻塞呢?有些朋友想到了下面的方式来实现:

package locker
  
import "sync"

type Locker struct {
    l1 sync.Mutex
    l2 sync.Mutex
    locked bool
}

func (doubleLocker *Locker) Lock() (success bool) {
    doubleLocker.l1.Lock()
    defer doubleLocker.l1.Unlock()
    if doubleLocker.locked == false {
        doubleLocker.locked = true
        success = true
        doubleLocker.l2.Lock()
    }
    return
}

func (doubleLocker *Locker) Unlock() {
    doubleLocker.l1.Lock()
    defer doubleLocker.l1.Unlock()
    doubleLocker.locked = false
    doubleLocker.l2.Unlock()
}

我们定义了一个Locker结构,其中的l2是为读写数据而加的锁。success用来做状态判断,如果其值为false,那么获取l2锁,并对数据进行处理;否则直接返回。这个过程肯定不是原子的,放在任何并发环境里都会出现问题。所以我们需要另一个锁l1来对这个判断过程加锁,使其成为“原子”操作。而每个goroutine获取l1的持续时间都很短,造成的阻塞可以忽略不记,从而达到非阻塞的效果。

调用方式如下:

package main
  
import (
    "fmt"
    "time"
    "sync"
    "github.com/Jnchk/codecollection/go/others/nonreentrant/asyncmutex/locker"
)

func main() {
    var doubleLocker locker.Locker
    wg := sync.WaitGroup{}
    wg.Add(3)
    for i,_ := range []int{1,2,3} {
        index := i
        go func() {
            if doubleLocker.Lock() {
                defer doubleLocker.Unlock()
            } else {
                wg.Done()
                return
            }
            fmt.Printf("Goroutine %d do something...\n", index)
            time.Sleep(2 * time.Second)
            fmt.Printf("Goroutine %d end...\n", index)
            wg.Done()
        }()
    }
    wg.Wait()
}

执行效果如下(只运行了一个goroutine):

go run main.go 
Goroutine 2 do something...
Goroutine 2 end...

非阻塞不可重入1

事实上,go语言已经帮助我们完成了上面做的事情,功能包含在了sync/atomic包中。代码如下:

package main
  
import (
    "fmt"
    "time"
    "sync"
    "sync/atomic"
)

func main() {
    wg := sync.WaitGroup{}
    var reentrance int64
    wg.Add(3)
    for i,_ := range []int{1,2,3} {
        index := i
        go func() {
            if atomic.CompareAndSwapInt64(&reentrance, 0, 1) {
                defer atomic.StoreInt64(&reentrance, 0)
            } else {
                wg.Done()
                return
            }
            fmt.Printf("Goroutine %d do something...\n", index)
            time.Sleep(2 * time.Second)
            fmt.Printf("Goroutine %d end...\n", index)
            wg.Done()
        }()
    }
    wg.Wait()
}

上面的代码中我们使用了atomic.CompareAndSwapInt64这个函数。调用时,函数会首先判断reentrance是否为0,如果是0则将其赋值为1,并返回true。否则(即reentrance为1)直接返回false。这所有操作整体是原子的。

执行结果与我们自己实现的版本是一样的。

go run main.go 
Goroutine 2 do something...
Goroutine 2 end...

两个非阻塞版本有何区别

实现逻辑上没有明显区别,我们自己实现的版本其实就是把atomic.CompareAndSwapInt64atomic.StoreInt64的实现逻辑描述了一遍。

而论具体细节,我们先看atomic.CompareAndSwapInt64。这个函数实际上在go源码中是通过汇编实现的,最终实现内容在src/runtime/internal/atomic目录下。

mutex,也就是mutex.go,我们可以看到其Lock()Unlock()也是通过atomic系列函数来实现的。也就是说我们自己实现的版本虽然功能相同,但却在无形当中增加了许多不必要的逻辑。从这方面考虑,还是使用atomic更直接一些。

总结

本文介绍了不可重用函数在阻塞与非阻塞两种场景下的基本实现。

参考链接

Creative Commons License
This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
comments powered by Disqus