go编程:说说channel哪些事
channel是什么
channel中文翻译为通道,它是Go语言内置的数据类型,使用channel不需要导入任何包,像int/float一样直接使用。它主要用于goroutine之间的消息传递和事件通知。 在Go语言中流传着一句话,就是说不要通过共享内存来通信,而是应该通过通信来共享内存。
❝
Don't communicate by sharing memory, share memory by communicating.
❞
上面这句话也包含了通信的两种方式:1是通过共享内存 2是通过通信。channel来源灵感要追溯到CSP(communicating sequential process)模型,CSP是用来描述并发系统中进行交互的一种模式。CSP概念最早由Tony Hoare提出,对这个人我们可能很陌生。但说到快速排序算法(Quicksort),你一定不陌生,快速排序太经典了,对Tony Hoare就是快速排序算法的作者。CSP允许使用进程组件来描述系统,各个组件独立运行,它们之间通过消息传递的方式进行通信。 Go语言中使用channel实现CSP思想,整个channel实现只有短短的700行代码,非常精炼,非常值得一读。channel和goroutine的结合,为并发编程提供了优雅的、便利的、 与传统并发控制不同的方案,并产生了在Go中特有的并发模式。
为什么需要channel
channel不是必需的,不用channel也可以完成goroutine之间的消息传递和事件通知,比如通过共享变量的方式。但使用了channel,会大大提升开发的效率,因为channel是并发安全的,channel的设计与goroutine之间完美配合,降低了并发编程的难度,减少了data race产生,大幅提升了生产力,嗯,程序员的福音。
channel基本用法
channel创建
channel有3种类型,分别为只能接收、只能发送、既能接收又能发送。定义方法如下:
func main() { // readChan只能接收 var readChan <-chan int // writeChan只能发送 var writeChan chan<- int // rwChan既能接收又能发送 var rwChan chan int fmt.Println(readChan == nil) //true fmt.Println(writeChan == nil) //true fmt.Println(rwChan == nil) //true}
channel中的元素对类型没有限制,任意类型都可以,所以元素的类型也可以是chan类型。那怎么判断<-属于哪个chan呢? <-的匹配规则是总是尽量与它左边的chan结合(the <-operator associates with the leftmost chan possible)。所以writeIntChan是一个只能发送的chan,发送的元素为chan int(双向的int型chan), readIntChan是一个只能接收的chan,发送的元素为chan int, rwIntChan是一个双向的chan, 发送的元素为chan int.
func elemIsChan(){ // chan元素类型也可以是channel var writeIntCahn chan<- chan int var readIntChan <- chan chan int var rwIntChan chan chan int}
channel定义完成之后,并不能直接使用,需要初始化。上面readChan/writeChan/rwChan都是未被初始化的,它们的值都是nil. channel用make初始化,不能用new方法。make创建的时候可以传一个数字,表示创建有缓冲区的chan, 如果没有设置,它是无缓冲区的。
func makeChan(){ // 无缓冲区的chan unbufferedCh:=make(chan int) // 有缓冲区的chan,可以缓存10个int数据 bufferedCh:=make(chan int,10)}
向channel中发送数据
往chan中发送一个数据使用“ch<-”,下面的操作往ch发送一个int数据200. ch是一个双向的chan,可以往里面发送数据。 ch2是一个只能发送的ch,也可以往里面发送数据。
func sendDataToChan() { ch := make(chan int, 1) ch <- 200 ch2 := make(chan<- int, 1) ch2 <- 100}
从channel中取数据
使用<-ch从chan中接收一条数据,ch是一个双向chan或者只读chan.下面的操作从ch读取数据,ch2是一个只读chan,也可以进行读取数据的操作。 chan接收操作,可以返回一个值可以可以返回两个值,第一个值是返回chan中的元素,第二个值是个bool类型,表示是否成功地从chan中读取到了一个值。如果chan已经被关闭而且所有的数据都已经读完,第一个值将是零值。需要注意的是,如果从chan读取到一个零值,可能是sender真实的在发送零值,也有可能是chan被关闭且没有缓存的元素了产生的零值。
func recvDataFromChan() { // 双向chan ch := make(chan int, 1) <-ch // 只读chan ch2 := make(<-chan int, 1) <-ch2 // 只读取数据 _=<-ch2 // 读取数据并想知道ch2是否已关闭 _,_=<-ch2}
关闭channel
close(ch)直接将一个chan关闭,需要注意的是,如果一个chan未被初始化,也就是没有执行make操作,是不能close的,否则引发panic.还有就是不能重复关闭一个chan,重复关闭一个chan也会产生panic.还有就是不能往一个关闭的chan中发送数据,也会产生panic. 最后一个需要注意的是不能close一个只读的chan,直接编译不会通过。
func closeNilChan() { var ch chan int close(ch) //panic:close of nil channel}func closeOnlyReadChan() { var ch <-chan int ch = make(<-chan int) close(ch) // invalid operation: close(ch) (cannot close receive-only channel)}
其他操作
Go内置的cap、len都可以操作chan,cap返回chan的容量,len返回chan中缓存的还未被取走的元素数量。还可以在select case中向chan发送数据或从chan中接收数据。 for-range操作chan,从chan读取数据。当ch被close之后,for-rang循环都会结束。
func forRangeChan() { ch := make(chan int, 1) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() // ch被close后,for-range会结束循环 for v := range ch { fmt.Println(v) } fmt.Println('for-range end') }() ch <- 1 close(ch) wg.Wait()}func forRangeChan2() { ch := make(chan int, 1) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() // ch被close后,for-range结束循环 for range ch { } fmt.Println('for-range end') }() close(ch) wg.Wait()}
channel实现原理
下面介绍channel底层是怎么实现的,源码在runtime/chan.go文件中。chan分配的是一个hchan的数据结构,定义如下:
const ( maxAlign = 8 // hchanSize为8的倍数,如果不是调整到下一个最小的8的倍数,假如hchan大小9,hchanSize为16 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) debugChan = false)type hchan struct { // 队列中元素的个数 qcount uint // 环形队列的大小 dataqsiz uint // 指向含有dataqsiz个元素的数组的地址 buf unsafe.Pointer // 元素的大小 elemsize uint16 // 通道是否关闭标识 closed uint32 // 元素的类型 elemtype *_type // 发送下标,即发送元素在环形队列的位置 sendx uint // 接收下标,即接收元素在环形队列的位置 recvx uint // 等待接收元素的goroutine队列 recvq waitq // 等待发送元素的goroutine队列 sendq waitq // 互斥锁,保护所有字段,包括sudogs中的字段,当持有锁的时候,不要修改其他G的状态 // 因为这可能导致堆栈在收缩时产生死锁。 lock mutex}
qcount:代表chan中已经接收但还没被取走的元素的个数,使用len函数可以返回qcount的值
dataqsiz:队列的大小,chan中用一个循环队列来存放数据。
buf:存放元素的循环队列的buffer地址。
elemtype和elemsize:表示chan中元素的类型和size。chan一旦申明,它的元素类型是固定的,即普通类型或指针类型,所以元素大小也是固定的。
sendx:处理发送数据的指针在buf中的位置,一旦接收了新数据,指针就会加上elemsize,移动到下一个位置。buf的总大小是elemsize的整数倍,而且buf是一个循环列表。
recvx:处理接收请求时的指针在buf中的位置,一旦取出数据,该指针会移动到下一个位置。
recvq:chan是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到recq队列中。
sendq: 如果生产者因为buf满了而阻塞,会被加入到sendq队列中。
上述的字段概括起来分为3部分,第一个部分是存储元素相关的,元素的类型,元素的大小,第二部分是buf相关的,存储数据buf的地址,buf的大小,buf中数据元素的个数,收发下标在buf中的索引位置,第三部分是调用相关的,recvq和sendq等待被调度的G队列。
channel创建实现
在编译的时候,编译器会根据容量的大小选择调用makechan64还是makechan。 makechan64只是做了size检查,后面调用的还是makechan函数,makechan函数的目的就是产生hchan结构体对象。下面对源码做了比较详细的解析。在分配buf的时候,根据存放元素是否含有指针做了不同的分配策略,如果通道中存放的元素中不含有指针类型,一次性分配hchanSize+mem大小的空间,如果含有指针,需要进行两次分配,第一次分配通道结构头hchan,第二次为通道中元素分配存储空间buf。需要注意的时,产生的hchan对象肯定是8字节的倍数,不够8字节时候,分配时候会调整。
func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError('makechan: size out of range')) } return makechan(t, int(size))}// 创建一个chan类型的结构体指针*hchan,传入参数有2个:t表示// 通道中放的元素类型,size表示通道的大小func makechan(t *chantype, size int) *hchan { elem := t.elem // 检查通道中元素大小是否小于2^16(65536),如果大于这个将抛异常 if elem.size >= 1<<16 { throw('makechan: invalid channel element type') } // 检查hchanSize是否为8的倍数,如果不是将抛出异常 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw('makechan: bad alignment') } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 如果要申请的空间大小溢出或者超过最大分配值(maxAlloc)-hchanSize的值,或者申请通道 // 的大小为负数,将引发panic if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError('makechan: size out of range')) } var c *hchan switch { case mem == 0: // 申请的是无缓冲区的channel // 直接申请hchanSize字节大小的空间,hchanSize大小就是hchan结构体的大小 // 如果不是8的倍数,调整到8的倍数 c = (*hchan)(mallocgc(hchanSize, nil, true)) // c.buf指向自己(即buf的位置) c.buf = c.raceaddr() case elem.ptrdata == 0: // 通道中存放的元素中不含有指针类型,一次性分配hchanSize+mem大小的空间 // 存放元素占用的空间是一个连续的数组,跟hchan分配在一起 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // c.buf指向连续数组的起始位置 c.buf = add(unsafe.Pointer(c), hchanSize) default: // 通道中存放的元素中含有指针,需要进行两次分配,第一次分配通道结构头hchan // 第二次为通道中元素分配存储空间,调用mallocgc分配mem个字节空间,并将分配 // 的地址位置赋值给c.buf c = new(hchan) c.buf = mallocgc(mem, elem, true) } //填充hchan元素大小、类型和环形队列大小字段 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) // 调试打印 if debugChan { print('makechan: chan=', c, '; elemsize=', elem.size, '; dataqsiz=', size, '\n') } return c}const MaxUintptr = ^uintptr(0)// MulUintPtr返回a*b的值和一个bool类型,bool类型表示a*b是否溢出func MulUintptr(a, b uintptr) (uintptr, bool) { if a|b < 1<<(4*sys.PtrSize) || a == 0 { return a * b, false } overflow := b > MaxUintptr/a return a * b, overflow}
发送操作实现原理
在编译发送数据给chan的时候,会把ch<-语句转换成chansend1函数,chansend1函数会调用chansend。chansend源码解析如下,整个chansend函数处理逻辑比较复杂,可以分为6部分处理逻辑。第一部分是对chan进行判空操作,如果chan是nil,调用者就永远被阻塞住了。第二部分是向一个缓存队列满的chan对象发送数据的时候,并且想不阻塞当前调用,这里的处理方式是直接返回。这里多说一点,什么时候不想阻塞呢?看chansend1调用chansend的时候block传递的是true。什么时候传递false呢?在处理select case + default时候,如果chan被阻塞,需要执行default逻辑,像这种情况下传递的就是false.第三部分是如果chan已经被关闭,在向里面发送数据的话会panic.第四部分,如果等待队列中有等待的receiver,直接将数据拷贝的目标位置,不需要经过buf进行中转,只有1次拷贝。第五部分是说当前没有receiver,需要将数据放到buf,然后成功返回。第六部分是处理buf满的情况,如果buf满了,发送者的goroutine会被加入到发送者的等待队列中,直到被唤醒。
// 发送操作c<-x调用入口func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc())}// 往通道发送元素处理逻辑,func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // c为nil表示通道还未初始化,如果不阻塞即block传false,会直接返回false // chansend1中调用block参数传的是true,chansend1是发送操作的入口, // 所以这里可知,往未初始化的通道发送数据,会走到下面的gopark逻辑里。 if c == nil { if !block { return false } // 挂起当前G gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw('unreachable') } if debugChan { print('chansend: chan=', c, '\n') } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // 特殊情况的处理,非阻塞(block为false),且通道未被关闭,非缓冲通道且没有接收者直接返回, // 对于缓冲性通道,但通道满了,也直接返回 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) // 通过已经关闭,不能再发送元素,直接报panic if c.closed != 0 { unlock(&c.lock) panic(plainError('send on closed channel')) } // 找到第一个等待的G, 直接将数据ep拷贝给它,存在等待的G有两种情况 // 情况1:通道非缓冲,receiver先被运行,被gopark了 // 情况2:缓冲通道,通道中ring buffer是空的,receiver先被运行,被gopark了 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 有缓冲的通道,通道还没满,将元素加入到缓冲区 if c.qcount < c.dataqsiz { // qp是当前元素应该放入在环形队列的位置 qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } // 将ep的内容拷贝到qp的位置,即将发送元素拷贝到环形队列中 typedmemmove(c.elemtype, qp, ep) // [c.recex,c.sendx]位置已填充带接收得元素,发送缓存区的位置c.sendx加1 // 如果c.sendx加1操作之后已到达数组的尾部,即数组满了,回到起始位置0 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // 走到这里表示,缓冲性通道已经满了,要挂起当前的G,构造一个sudog, 加入到sendq队列 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 在elem被赋值到mysg加入sendq队列这段期间,栈是不会被分裂的 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 将mysg加入到sendq c.sendq.enqueue(mysg) // gopark将当前的g挂起,切换到g0执行调度逻辑,当前的g状态会从running->waiting gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 确保要发送的数据已经被接收者copy获取到,因为ep是一个栈上的对象,mysg.elem指向的 // 是一个栈上的对象 KeepAlive(ep) // 被唤醒后要执行的逻辑 if mysg != gp.waiting { throw('G waiting list is corrupted') } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw('chansend: spurious wakeup') } panic(plainError('send on closed channel')) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 数据竞争检查,执行 go run -race会进入 if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // 修改缓冲区中发送者和接收者的下标位置 qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } // 发送的数据不为空,直接调用sendDirect将数据ep拷贝到sg的elem中 if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 切换gp的状态为runnable goready(gp, skip+1)}// 直接在两个栈上进行数据拷贝,src是发送者栈G上待发送的数据,sg是接收Gfunc sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // 一旦对sg.elem进行了读操作,如果发送栈扩容,它将永不会被更新 dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // 从src拷贝t.size字节到dst memmove(dst, src, t.size)}
接收操作实现原理
在编译器处理<-ch操作时,会将其转成chanrecv1函数,如果要返回两个返回值,会转成chanrecv2, chanrecv1和chanrecv2都会调用chanrecv。chanrecv1和chanrecv2传入的block参数都是true。分析的时候先不考虑block为false的情况。接收操作源码解析如下
// 接收操作入口<-c, 有2个入参,c表示通道结构体指针// elem是接收通道元素的变量地址,即<-c左边的接收者func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true)}// _,_:=<-c chanrecv2比chanrecv1会返回一个bool类型// 该bool表示是否成功地从chan中读取到了一个值,chanrecv1和chanrecv2内部调用// 都是同一个函数func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return}// chanrecv将通道c中接收到的数据写入到ep执行的地址内存中,ep指向的位置可能是一个堆空间也可能在栈空间上// 如果忽略接收通道值,ep将是nil值,如果传入的block为false并且通道中没有元素,将返回false,false// 如果通道c被关闭,ep将会填充类型的零值,返回true和false, 其他情况ep会填充通道中接收到的值,返回// 值为true,truefunc chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 调试输出 if debugChan { print('chanrecv: chan=', c, '\n') } // c为nil,即未初始化,调用gopark挂起当前的 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw('unreachable') } // 非阻塞接收且通道未被关闭,非缓冲通道且没有发送中G 直接返回false,false // 有缓冲通道且通道中没有元素,也直接返回false,false if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) // 通过已关闭且通道中没有元素了 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 等待发送的G队列非空,即有G在等待发送元素,如果是非缓冲队列,直接将数据从 // 发送者的sg中的elem拷贝到ep中;如果是缓冲队列,将队列头的元素添加到ep // 中,并将发送者sg中的元素拷贝到队列的尾部。 if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 队列中有元素,将待拷贝的元素拷贝到ep中 if c.qcount > 0 { // Receive directly from queue // qp为环形队列中待拷贝的元素的位置 qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { // 将qp位置的元素拷贝到ep中 typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 接收位置的下标+1,如果接收位置到达队尾,重置为0 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // 走到这里有2种情况,1是非缓冲通道发送者没有准备好,2是缓冲型通道但没有元素, //构造一个sudog加入到通道的等待接收的G队列,调用gopark挂起当前的G,即将状态 //标为waiting, 进入G0执行调度逻辑。 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 在elem被赋值到mysg加入sendq队列这段期间,栈是不会被分裂的 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 将mysg加入到等待接收的G队列 c.recvq.enqueue(mysg) // gopark将当前的g挂起,切换到g0执行调度逻辑,当前的g状态会从running->waiting gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 执行唤醒后的逻辑 if mysg != gp.waiting { throw('G waiting list is corrupted') } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed}// 通道接收操作,包含2部分处理逻辑,1是将sg中发送者的元素拷贝到通道中,并唤醒发送者;2是将// 通道中待接收处理的元素拷贝到ep中。注意,走到recv只有2中情况,队列为空或满了,队列没// 满不会走到这里。func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 非缓冲型通道,直接将元素从sg拷贝到ep中,不经过环形队列作为中转 if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // 直接将数据从sg拷贝到ep recvDirect(c.elemtype, sg, ep) } } else { // 队列满的情况,走到这里只可能是队列满的情况,对于队列不满的情况,不会走 // 到recv函数里c.recvx和c.sendx是在同一个位置,先将环形队列 // 中待接收处理位置qp地方的元素拷贝到ep中,然后将发送者sg.elem // 中的元素拷贝到队列的尾部,对于队列满的情况 qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // 将qp位置的数据拷贝到ep中 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将sg.elem中的元素拷贝到qp位置 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 对gp执行goread,将其状态从waiting修改为runnable状态 goready(gp, skip+1)}// 直接在两个栈上进行数据拷贝,dst是接收者栈G上待接收数据的地址,sg是发送Gfunc recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size)}
第一部分处理的是chan为nil的情况,从nil chan中接收数据,调用者会被永远阻塞。第二部分是对特殊情况的处理,非阻塞(block为false),且通道未被关闭,非缓冲通道且没有接收者直接返回,对于缓冲性通道,但通道满了,也直接返回。第三部分是chan已经被close的情况,如果chan已经被close了,并且队列中没有缓存的元素,直接返回true,false.第四部分是处理等待发送队列中有等待G的情况,这时,如果buf中有数据,先从buf中读取数据,否则直接从等待队列中获取一个发送者,把它的数据复制给这个receiver.第五部分是处理没有等待发送者G的情况,如果buf有元素,就取出一个元素给接收者。第六部分是处理buf中没有元素的情况,如果没有元素,阻塞当前的G,直到它从发送者中获取到了数据,或是chan被关闭了,才返回。
关闭操作实现原理
执行close(ch)关闭chan ch,最终会调用的是closechan方法。下面是closechan的源码解析。关闭nil chan会产生panic. 如果chan已经关闭再次关闭也会产生panic.否则将ch等待队列中的全部接收者和发送者G从队列中全部移除加入了glist.然后将glist中的所有G执行goready唤醒。
// 关闭通道func closechan(c *hchan) { // 通道为空进行关闭引发panic if c == nil { panic(plainError('close of nil channel')) } // 加锁,会对c进行修改 lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError('close of closed channel')) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } // closed修改为1表示,通道已关闭 c.closed = 1 var glist gList // 将所有的等待接收队列中的G出队,加入到glist中 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // 将所有等待发送队列中的G加入到glist中 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 将glist中所有的G唤醒,即他们的状态都变为可运行态runnable for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }}
其他操作实现原理
下面讲解chan接收和发送与select结合是如何实现的。先看一个发送的情景。select chan + default的语句会被编译器转换成if selectnbased(c,v){}else do{...}语句。selectnbased内部调用的也是chansend方法,只是在block参数传递上与前面的发送操作不同,这里传递的是false,就是不要阻塞在chansend, 不能发送的时候,要返回回来走default逻辑。同理,接收操作方法select中同时又有default的时候,会翻译成selectnbrecv或selectnbrecv2,两者的不同是接收操作有2种类型,一种是带bool的表示是否成功地从chan中读取到了一个值,另一种是不关心该参数。
// compiler implements//// select {// case c <- v:// ... foo// default:// ... bar// }//// as//// if selectnbsend(c, v) {// ... foo// } else {// ... bar// }//func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc())}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return}func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { // TODO(khr): just return 2 values from this function, now that it is in Go. selected, *received = chanrecv(c, elem, false) return}
channel使用容易犯的错误
- close一个nil型chan
- 往已关闭的chan中发送数据
- close已经关闭的chan
- goroutine泄露
close一个nil值的chan会产生panic.我们在关闭chan的时候要清楚chan是否是nil值,如果不确定可以判断一下。第2点和第3点涉及到chan的关闭,不当的操作会导致panic.如何优雅的关闭channel, 可以读一读小冰的这篇文章优雅关闭channel,里面有讲关闭思路。
下面将一个groutine泄露的例子,分析产生的原因。这个例子来自于go-zero作者,原文见这里一文搞懂如何实现 Go 超时控制。可以从这个例子中吸取经验,避免写出类似的bug.
func main() { const total = 1000 var wg sync.WaitGroup wg.Add(total) now := time.Now() for i := 0; i < total; i++ { go func() { defer wg.Done() requestWork(context.Background(), 'any') }() } wg.Wait() fmt.Println('elapsed:', time.Since(now)) time.Sleep(time.Minute * 2) fmt.Println('number of goroutines:', runtime.NumGoroutine())}func hardWork(job interface{}) error { time.Sleep(time.Minute) return nil}func requestWork(ctx context.Context, job interface{}) error { ctx, cancel := context.WithTimeout(ctx, time.Second*2) defer cancel() done := make(chan error) go func() { done <- hardWork(job) }() select { case err := <-done: return err case <-ctx.Done(): return ctx.Err() }}
上面的代码输出结果如下:
啥,打印1001个goroutine,goroutine泄露了。问题在于requestWork函数中,会执行到select <-ctx.Done()这句,因为ctx是一个超时取消context,这里超时时间设置的是2秒钟,hardWork是一个比较耗时的任务,这里模拟sleep 1分钟。所以2秒后,requestWork退出了, 当1分钟后执行 done<-hardWork(job)的时候会被卡住,因为没有接收者了,当前的G会执行gopark挂起了,并且永远得不到执行。所以main函数中的打印会输出1001(有1个是main groutine)。 那怎么修改呢?把done:=make(chan error,1)改成带有1个缓冲区的chan就可以了,因为有1个容量的缓冲区,所以当hardWork(job)执行完后,会将结果放在缓冲区中,然后协程就退出了,不会卡住。也许有读者会问题,这里协程退出之后,done怎么办,会不会存在泄漏。不会出现泄漏,当与chan绑定的的G都不存在的时候,chan会被gc回收掉。
总结
chan值和状态存在很多种情况,不同状态下执行发送、接收、关闭操作会产生不同的情况,下面将各种情况汇总,看完下表可以梳理清楚chan的各个知识点。