22 Go常见的并发模式和并发模型
一 Go并发模型
传统的编程语言C++ Java Python等,他们的并发逻辑多事基于操作系统的线程。并发执行单元(线程)之间的通信利用的就是操作系统提供的线程或进程间通信的原语。如:共享内存、信号、管道、消息队列、套接字等。在这些通信原语中,使用最广泛的就是共享内存。
如果你使用过这种共享内存的并发模型,其实是难用的和容易发生错误的,特别是在大型或复杂的业务场景中。
Go语言从程序设计当初,就将解决上面传统并发模型问题作为目标,并在新并发模型设计中借鉴注明的CSP(Communicationing Sequential Processes-通信顺序进程)并发模型。
CSP模型目的在于简化并发程序的编写,让并发程序的编写顺序与编写顺序程序一样简单。
生产者 —》输出数据 — 输入/输出原语 —》输出数据
为了实现CSP模型,GO语言引入了Channel.Goroutine可以读写channel中的数据,通过channel将goroutine组合连接在一起。
Go语言中CSP虽然是主流并发模型,但是还是支持共享内存并发模型。主要是在sync包中的互斥锁、读写锁、条件变量、原子操作等。那么我们该如何选择呢?
第一种:创建模式
通常会使用下面的方式:
type Worker struct {
}
func Do(f func()) chan Worker {
w:= make(chan Worker)
go func() {
f()
w<-Worker{}
}()
return w
}
func main() {
c:=Do(func() {
fmt.Print('到下班时间了...')
})
<-c
}
Do函数内部创建了一个gorutine并且返回了一个channel类型的变量。Do函数创建的新goroutine与调用的Do函数的goroutine之间通过一个channel联系了起来,2个goroutine可以通过channel进行通讯。Do函数的实现因为channel在Go语言中是一等公民,channel可以像变量一样初始化、传递和赋值。上面的例子Do返回了一个变量,这个变量就是通道,实现了主goroutine和子goroutine的通信。
第二种:退出模式
a) 分离模式
分离模式使用最广泛的是goroutine退出模式。所谓分离模式就是创建它的goroutine不需要关心它的退出,这类goroutine启动后与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。
场景1:一次性任务
// $GOROOT/src/net/dial.go
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
... ...
if oldCancel := d.Cancel; oldCancel != nil {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():
}
}()
ctx = subCtx
}
... ...
}
在DialContext方法中创建了一个goroutine,用来监听量个channel是否有数据,一旦有数据,处理后即退出。
场景2 常驻后台执行一些特定任务,比如常用for{…}或for{select{…}}形式,还可以用定时器或事件驱动执行。下面是Go给每个P内置的GC goroutine就是这种场景的。
// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
// Background marking is performed by per-P G's. Ensure that
// each P has a background GC G.
for _, p := range allp {
if p.gcBgMarkWorker == 0 {
go gcBgMarkWorker(p) // 这里每个P创建一个goroutine,以运行gcBgMarkWorker
notetsleepg(&work.bgMarkReady, -1)
noteclear(&work.bgMarkReady)
}
}
}
func gcBgMarkWorker(_p_ *p) {
gp := getg()
... ...
for {
// 处理GC
... ...
}
}
b) join模式
在线程模型中,父线程可以通过pthread join来等待子线程结束并获取子线程的结束状态。在Go中,我们有时候也有这种需求:goroutine的创建者需要等待新goroutine的结果。
type Worker struct {
}
func Do(f func()) chan Worker {
w:= make(chan Worker)
go func() {
f()
w<-Worker{}
}()
return w
}
func main() {
c:=Do(func() {
fmt.Print('到下班时间了...')
})
<-c
}
我们还是看刚刚上面的这个例子,Do函数使用典型的goroutine的创建模式创建了一个groutine,main的goroutine作为创建通过Do函数返回的channel与新goroutine建立关系,这个channel得用途就是在goroutine之间建立退出时间的“信号”通信机制。main goroutine在创建完新goroutine后就在该channel上阻塞等待了,直到新的goroutine退出前向该channel发送了一个”信号”。
运行代码,结果如下:
到下班时间了...
Process finished with exit code 0
获取goroutine的退出状态
如果新goroutine的创建者不仅仅要等待goroutine的退出,还要知道结束状态,我们可以通过自定义类型的channel来实现这样的需求。
func add(a,b int) int{
return a+b
}
func Do(f func(a,b int) int,a,b int) chan int{
c:=make(chan int)
go func() {
r:=f(a,b)
c<-r
}()
return c
}
func main() {
c:=Do(add,1,5)
fmt.Println(<-c)
}
运行结果是 6
等待多个goroutine退出
func add(a,b int) int{
return a+b
}
func Do(f func(a,b int) int,a,b,n int) chan int{
c:=make(chan int)
var wg sync.WaitGroup
for i:=0;i<n;i++{
wg.Add(1)
go func() {
r:=f(a,b)
fmt.Println(r)
wg.Done()
}()
}
go func() {
wg.Wait()
c<-100
}()
go func() {
}()
return c
}
func main() {
c:=Do(add,1,5,5)
fmt.Println(<-c)
}
运行结果
6
6
6
6
6
100
c) notify-wait模式
前面的场景中,goroutine的创建者都是在被动地等待新goroutine的退出。有些场景,goroutine的创建者需要主动通知那些新goroutine退出。
通知并等待一个goroutine的退出
func add(a, b int) int {
return a + b
}
func Do(f func(a, b int) int, a, b int) chan int {
quit := make(chan int)
go func() {
var job chan string
for {
select {
case x := <-job:
f(a, b)
fmt.Println(x)
case y := <-quit:
quit <- y
}
}
}()
return quit
}
func main() {
c := Do(add, 1, 5)
fmt.Println('开始干活')
time.Sleep(1 * time.Second)
c <- 0
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-c:
fmt.Println(status)
case <-timer.C:
fmt.Println('等待...')
}
}
执行代码结果如下
开始干活
0
通知并等待多个goroutine退出
下面是通知并等待多个goroutine退出的场景。Go语言的channel有一个特性,那就是当使用close函数关闭channel时,所有阻塞到该channel上的goroutine都会得到通知。
func worker(x int) {
time.Sleep(time.Second * time.Duration(x))
}
func Do(f func(a int), n int) chan int {
quit := make(chan int)
job:=make(chan int)
var wg sync.WaitGroup
for i:=0;i<n;i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
name := fmt.Sprintf('worker-%d',i)
for {
j,ok:=<-job
if !ok{
fmt.Println(name,'done')
return
}
worker(j)
}
}(i)
}
go func() {
<-quit
close(job)
wg.Wait()
quit<-200
}()
return quit
}
func main() {
quit:=Do(worker,5)
fmt.Println('func Work...')
quit<-1
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-quit:
fmt.Println(status)
case <-timer.C:
fmt.Println('等待...')
}
}
运行结果
func Work...
worker-1 done
worker-2 done
worker-3 done
worker-4 done
worker-0 done
200