go编程:说说channel哪些事

channel是什么

channel中文翻译为通道,它是Go语言内置的数据类型,使用channel不需要导入任何包,像int/float一样直接使用。它主要用于goroutine之间的消息传递和事件通知。 在Go语言中流传着一句话,就是说不要通过共享内存来通信,而是应该通过通信来共享内存。

Don't communicate by sharing memory, share memory by communicating.

上面这句话也包含了通信的两种方式:1是通过共享内存 2是通过通信。channel来源灵感要追溯到CSP(communicating sequential process)模型,CSP是用来描述并发系统中进行交互的一种模式。CSP概念最早由Tony Hoare提出,对这个人我们可能很陌生。但说到快速排序算法(Quicksort),你一定不陌生,快速排序太经典了,对Tony Hoare就是快速排序算法的作者。CSP允许使用进程组件来描述系统,各个组件独立运行,它们之间通过消息传递的方式进行通信。 Go语言中使用channel实现CSP思想,整个channel实现只有短短的700行代码,非常精炼,非常值得一读。channel和goroutine的结合,为并发编程提供了优雅的、便利的、 与传统并发控制不同的方案,并产生了在Go中特有的并发模式。

为什么需要channel

channel不是必需的,不用channel也可以完成goroutine之间的消息传递和事件通知,比如通过共享变量的方式。但使用了channel,会大大提升开发的效率,因为channel是并发安全的,channel的设计与goroutine之间完美配合,降低了并发编程的难度,减少了data race产生,大幅提升了生产力,嗯,程序员的福音。

channel基本用法

channel创建

channel有3种类型,分别为只能接收、只能发送、既能接收又能发送。定义方法如下:

func main() { // readChan只能接收 var readChan <-chan int // writeChan只能发送 var writeChan chan<- int // rwChan既能接收又能发送 var rwChan chan int fmt.Println(readChan == nil)  //true fmt.Println(writeChan == nil) //true fmt.Println(rwChan == nil)    //true}

channel中的元素对类型没有限制,任意类型都可以,所以元素的类型也可以是chan类型。那怎么判断<-属于哪个chan呢? <-的匹配规则是总是尽量与它左边的chan结合(the <-operator associates with the leftmost chan possible)。所以writeIntChan是一个只能发送的chan,发送的元素为chan int(双向的int型chan), readIntChan是一个只能接收的chan,发送的元素为chan int, rwIntChan是一个双向的chan, 发送的元素为chan int.

func elemIsChan(){ // chan元素类型也可以是channel var writeIntCahn chan<- chan int var readIntChan <- chan chan int var rwIntChan chan chan int}

channel定义完成之后,并不能直接使用,需要初始化。上面readChan/writeChan/rwChan都是未被初始化的,它们的值都是nil. channel用make初始化,不能用new方法。make创建的时候可以传一个数字,表示创建有缓冲区的chan, 如果没有设置,它是无缓冲区的。

func makeChan(){ // 无缓冲区的chan unbufferedCh:=make(chan int) // 有缓冲区的chan,可以缓存10个int数据 bufferedCh:=make(chan int,10)}

向channel中发送数据

往chan中发送一个数据使用“ch<-”,下面的操作往ch发送一个int数据200. ch是一个双向的chan,可以往里面发送数据。 ch2是一个只能发送的ch,也可以往里面发送数据。

func sendDataToChan() { ch := make(chan int, 1) ch <- 200 ch2 := make(chan<- int, 1) ch2 <- 100}

从channel中取数据

使用<-ch从chan中接收一条数据,ch是一个双向chan或者只读chan.下面的操作从ch读取数据,ch2是一个只读chan,也可以进行读取数据的操作。 chan接收操作,可以返回一个值可以可以返回两个值,第一个值是返回chan中的元素,第二个值是个bool类型,表示是否成功地从chan中读取到了一个值。如果chan已经被关闭而且所有的数据都已经读完,第一个值将是零值。需要注意的是,如果从chan读取到一个零值,可能是sender真实的在发送零值,也有可能是chan被关闭且没有缓存的元素了产生的零值。

func recvDataFromChan() { // 双向chan ch := make(chan int, 1) <-ch // 只读chan ch2 := make(<-chan int, 1) <-ch2  // 只读取数据 _=<-ch2  // 读取数据并想知道ch2是否已关闭 _,_=<-ch2}

关闭channel

close(ch)直接将一个chan关闭,需要注意的是,如果一个chan未被初始化,也就是没有执行make操作,是不能close的,否则引发panic.还有就是不能重复关闭一个chan,重复关闭一个chan也会产生panic.还有就是不能往一个关闭的chan中发送数据,也会产生panic. 最后一个需要注意的是不能close一个只读的chan,直接编译不会通过。

func closeNilChan() { var ch chan int close(ch)  //panic:close of nil channel}func closeOnlyReadChan() { var ch <-chan int ch = make(<-chan int) close(ch) // invalid operation: close(ch) (cannot close receive-only channel)}

其他操作

Go内置的cap、len都可以操作chan,cap返回chan的容量,len返回chan中缓存的还未被取走的元素数量。还可以在select case中向chan发送数据或从chan中接收数据。 for-range操作chan,从chan读取数据。当ch被close之后,for-rang循环都会结束。

func forRangeChan() { ch := make(chan int, 1) wg := sync.WaitGroup{} wg.Add(1) go func() {  defer wg.Done()  // ch被close后,for-range会结束循环  for v := range ch {   fmt.Println(v)  }  fmt.Println('for-range end') }() ch <- 1 close(ch) wg.Wait()}func forRangeChan2() { ch := make(chan int, 1) wg := sync.WaitGroup{} wg.Add(1) go func() {  defer wg.Done()  // ch被close后,for-range结束循环  for range ch {  }  fmt.Println('for-range end') }() close(ch) wg.Wait()}

channel实现原理

下面介绍channel底层是怎么实现的,源码在runtime/chan.go文件中。chan分配的是一个hchan的数据结构,定义如下:

const ( maxAlign = 8 // hchanSize为8的倍数,如果不是调整到下一个最小的8的倍数,假如hchan大小9,hchanSize为16 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) debugChan = false)type hchan struct { // 队列中元素的个数 qcount uint // 环形队列的大小 dataqsiz uint // 指向含有dataqsiz个元素的数组的地址 buf unsafe.Pointer // 元素的大小 elemsize uint16 // 通道是否关闭标识 closed uint32 // 元素的类型 elemtype *_type // 发送下标,即发送元素在环形队列的位置 sendx uint // 接收下标,即接收元素在环形队列的位置 recvx uint // 等待接收元素的goroutine队列 recvq waitq // 等待发送元素的goroutine队列 sendq waitq // 互斥锁,保护所有字段,包括sudogs中的字段,当持有锁的时候,不要修改其他G的状态 // 因为这可能导致堆栈在收缩时产生死锁。 lock mutex}

qcount:代表chan中已经接收但还没被取走的元素的个数,使用len函数可以返回qcount的值

dataqsiz:队列的大小,chan中用一个循环队列来存放数据。

buf:存放元素的循环队列的buffer地址。

elemtype和elemsize:表示chan中元素的类型和size。chan一旦申明,它的元素类型是固定的,即普通类型或指针类型,所以元素大小也是固定的。

sendx:处理发送数据的指针在buf中的位置,一旦接收了新数据,指针就会加上elemsize,移动到下一个位置。buf的总大小是elemsize的整数倍,而且buf是一个循环列表。

recvx:处理接收请求时的指针在buf中的位置,一旦取出数据,该指针会移动到下一个位置。

recvq:chan是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到recq队列中。

sendq: 如果生产者因为buf满了而阻塞,会被加入到sendq队列中。

上述的字段概括起来分为3部分,第一个部分是存储元素相关的,元素的类型,元素的大小,第二部分是buf相关的,存储数据buf的地址,buf的大小,buf中数据元素的个数,收发下标在buf中的索引位置,第三部分是调用相关的,recvq和sendq等待被调度的G队列。

channel创建实现

在编译的时候,编译器会根据容量的大小选择调用makechan64还是makechan。 makechan64只是做了size检查,后面调用的还是makechan函数,makechan函数的目的就是产生hchan结构体对象。下面对源码做了比较详细的解析。在分配buf的时候,根据存放元素是否含有指针做了不同的分配策略,如果通道中存放的元素中不含有指针类型,一次性分配hchanSize+mem大小的空间,如果含有指针,需要进行两次分配,第一次分配通道结构头hchan,第二次为通道中元素分配存储空间buf。需要注意的时,产生的hchan对象肯定是8字节的倍数,不够8字节时候,分配时候会调整。

func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size {  panic(plainError('makechan: size out of range')) } return makechan(t, int(size))}// 创建一个chan类型的结构体指针*hchan,传入参数有2个:t表示// 通道中放的元素类型,size表示通道的大小func makechan(t *chantype, size int) *hchan { elem := t.elem // 检查通道中元素大小是否小于2^16(65536),如果大于这个将抛异常 if elem.size >= 1<<16 {  throw('makechan: invalid channel element type') } // 检查hchanSize是否为8的倍数,如果不是将抛出异常 if hchanSize%maxAlign != 0 || elem.align > maxAlign {  throw('makechan: bad alignment') } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 如果要申请的空间大小溢出或者超过最大分配值(maxAlloc)-hchanSize的值,或者申请通道 // 的大小为负数,将引发panic if overflow || mem > maxAlloc-hchanSize || size < 0 {  panic(plainError('makechan: size out of range')) }  var c *hchan switch { case mem == 0:  // 申请的是无缓冲区的channel  // 直接申请hchanSize字节大小的空间,hchanSize大小就是hchan结构体的大小  // 如果不是8的倍数,调整到8的倍数  c = (*hchan)(mallocgc(hchanSize, nil, true))  // c.buf指向自己(即buf的位置)  c.buf = c.raceaddr() case elem.ptrdata == 0:  // 通道中存放的元素中不含有指针类型,一次性分配hchanSize+mem大小的空间  // 存放元素占用的空间是一个连续的数组,跟hchan分配在一起  c = (*hchan)(mallocgc(hchanSize+mem, nil, true))  // c.buf指向连续数组的起始位置  c.buf = add(unsafe.Pointer(c), hchanSize) default:  // 通道中存放的元素中含有指针,需要进行两次分配,第一次分配通道结构头hchan  // 第二次为通道中元素分配存储空间,调用mallocgc分配mem个字节空间,并将分配  // 的地址位置赋值给c.buf  c = new(hchan)  c.buf = mallocgc(mem, elem, true) } //填充hchan元素大小、类型和环形队列大小字段 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) // 调试打印 if debugChan {  print('makechan: chan=', c, '; elemsize=', elem.size, '; dataqsiz=', size, '\n') } return c}const MaxUintptr = ^uintptr(0)// MulUintPtr返回a*b的值和一个bool类型,bool类型表示a*b是否溢出func MulUintptr(a, b uintptr) (uintptr, bool) { if a|b < 1<<(4*sys.PtrSize) || a == 0 {  return a * b, false } overflow := b > MaxUintptr/a return a * b, overflow}

发送操作实现原理

在编译发送数据给chan的时候,会把ch<-语句转换成chansend1函数,chansend1函数会调用chansend。chansend源码解析如下,整个chansend函数处理逻辑比较复杂,可以分为6部分处理逻辑。第一部分是对chan进行判空操作,如果chan是nil,调用者就永远被阻塞住了。第二部分是向一个缓存队列满的chan对象发送数据的时候,并且想不阻塞当前调用,这里的处理方式是直接返回。这里多说一点,什么时候不想阻塞呢?看chansend1调用chansend的时候block传递的是true。什么时候传递false呢?在处理select case + default时候,如果chan被阻塞,需要执行default逻辑,像这种情况下传递的就是false.第三部分是如果chan已经被关闭,在向里面发送数据的话会panic.第四部分,如果等待队列中有等待的receiver,直接将数据拷贝的目标位置,不需要经过buf进行中转,只有1次拷贝。第五部分是说当前没有receiver,需要将数据放到buf,然后成功返回。第六部分是处理buf满的情况,如果buf满了,发送者的goroutine会被加入到发送者的等待队列中,直到被唤醒。

// 发送操作c<-x调用入口func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc())}// 往通道发送元素处理逻辑,func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // c为nil表示通道还未初始化,如果不阻塞即block传false,会直接返回false // chansend1中调用block参数传的是true,chansend1是发送操作的入口, // 所以这里可知,往未初始化的通道发送数据,会走到下面的gopark逻辑里。 if c == nil {  if !block {   return false  }  // 挂起当前G  gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)  throw('unreachable') } if debugChan {  print('chansend: chan=', c, '\n') } if raceenabled {  racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // 特殊情况的处理,非阻塞(block为false),且通道未被关闭,非缓冲通道且没有接收者直接返回, // 对于缓冲性通道,但通道满了,也直接返回 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||  (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {  return false } var t0 int64 if blockprofilerate > 0 {  t0 = cputicks() } lock(&c.lock) // 通过已经关闭,不能再发送元素,直接报panic if c.closed != 0 {  unlock(&c.lock)  panic(plainError('send on closed channel')) } // 找到第一个等待的G, 直接将数据ep拷贝给它,存在等待的G有两种情况 // 情况1:通道非缓冲,receiver先被运行,被gopark了 // 情况2:缓冲通道,通道中ring buffer是空的,receiver先被运行,被gopark了 if sg := c.recvq.dequeue(); sg != nil {  // Found a waiting receiver. We pass the value we want to send  // directly to the receiver, bypassing the channel buffer (if any).  send(c, sg, ep, func() { unlock(&c.lock) }, 3)  return true } // 有缓冲的通道,通道还没满,将元素加入到缓冲区 if c.qcount < c.dataqsiz {  // qp是当前元素应该放入在环形队列的位置  qp := chanbuf(c, c.sendx)  if raceenabled {   raceacquire(qp)   racerelease(qp)  }  // 将ep的内容拷贝到qp的位置,即将发送元素拷贝到环形队列中  typedmemmove(c.elemtype, qp, ep)  // [c.recex,c.sendx]位置已填充带接收得元素,发送缓存区的位置c.sendx加1  // 如果c.sendx加1操作之后已到达数组的尾部,即数组满了,回到起始位置0  c.sendx++  if c.sendx == c.dataqsiz {   c.sendx = 0  }  c.qcount++  unlock(&c.lock)  return true } if !block {  unlock(&c.lock)  return false } // 走到这里表示,缓冲性通道已经满了,要挂起当前的G,构造一个sudog, 加入到sendq队列 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 {  mysg.releasetime = -1 } // 在elem被赋值到mysg加入sendq队列这段期间,栈是不会被分裂的 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 将mysg加入到sendq c.sendq.enqueue(mysg) // gopark将当前的g挂起,切换到g0执行调度逻辑,当前的g状态会从running->waiting gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 确保要发送的数据已经被接收者copy获取到,因为ep是一个栈上的对象,mysg.elem指向的 // 是一个栈上的对象 KeepAlive(ep) // 被唤醒后要执行的逻辑 if mysg != gp.waiting {  throw('G waiting list is corrupted') } gp.waiting = nil gp.activeStackChans = false if gp.param == nil {  if c.closed == 0 {   throw('chansend: spurious wakeup')  }  panic(plainError('send on closed channel')) } gp.param = nil if mysg.releasetime > 0 {  blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 数据竞争检查,执行 go run -race会进入 if raceenabled {  if c.dataqsiz == 0 {   racesync(c, sg)  } else {   // 修改缓冲区中发送者和接收者的下标位置   qp := chanbuf(c, c.recvx)   raceacquire(qp)   racerelease(qp)   raceacquireg(sg.g, qp)   racereleaseg(sg.g, qp)   c.recvx++   if c.recvx == c.dataqsiz {    c.recvx = 0   }   c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz  } } // 发送的数据不为空,直接调用sendDirect将数据ep拷贝到sg的elem中 if sg.elem != nil {  sendDirect(c.elemtype, sg, ep)  sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 {  sg.releasetime = cputicks() } // 切换gp的状态为runnable goready(gp, skip+1)}// 直接在两个栈上进行数据拷贝,src是发送者栈G上待发送的数据,sg是接收Gfunc sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // 一旦对sg.elem进行了读操作,如果发送栈扩容,它将永不会被更新 dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // 从src拷贝t.size字节到dst memmove(dst, src, t.size)}

接收操作实现原理

在编译器处理<-ch操作时,会将其转成chanrecv1函数,如果要返回两个返回值,会转成chanrecv2, chanrecv1和chanrecv2都会调用chanrecv。chanrecv1和chanrecv2传入的block参数都是true。分析的时候先不考虑block为false的情况。接收操作源码解析如下

// 接收操作入口<-c, 有2个入参,c表示通道结构体指针// elem是接收通道元素的变量地址,即<-c左边的接收者func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true)}// _,_:=<-c chanrecv2比chanrecv1会返回一个bool类型// 该bool表示是否成功地从chan中读取到了一个值,chanrecv1和chanrecv2内部调用// 都是同一个函数func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return}// chanrecv将通道c中接收到的数据写入到ep执行的地址内存中,ep指向的位置可能是一个堆空间也可能在栈空间上// 如果忽略接收通道值,ep将是nil值,如果传入的block为false并且通道中没有元素,将返回false,false// 如果通道c被关闭,ep将会填充类型的零值,返回true和false, 其他情况ep会填充通道中接收到的值,返回// 值为true,truefunc chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 调试输出 if debugChan {  print('chanrecv: chan=', c, '\n') } // c为nil,即未初始化,调用gopark挂起当前的 if c == nil {  if !block {   return  }  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)  throw('unreachable') } // 非阻塞接收且通道未被关闭,非缓冲通道且没有发送中G 直接返回false,false // 有缓冲通道且通道中没有元素,也直接返回false,false if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||  c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&  atomic.Load(&c.closed) == 0 {  return } var t0 int64 if blockprofilerate > 0 {  t0 = cputicks() } lock(&c.lock) // 通过已关闭且通道中没有元素了 if c.closed != 0 && c.qcount == 0 {  if raceenabled {   raceacquire(c.raceaddr())  }  unlock(&c.lock)  if ep != nil {   typedmemclr(c.elemtype, ep)  }  return true, false } // 等待发送的G队列非空,即有G在等待发送元素,如果是非缓冲队列,直接将数据从 // 发送者的sg中的elem拷贝到ep中;如果是缓冲队列,将队列头的元素添加到ep // 中,并将发送者sg中的元素拷贝到队列的尾部。 if sg := c.sendq.dequeue(); sg != nil {  recv(c, sg, ep, func() { unlock(&c.lock) }, 3)  return true, true } // 队列中有元素,将待拷贝的元素拷贝到ep中 if c.qcount > 0 {  // Receive directly from queue  // qp为环形队列中待拷贝的元素的位置  qp := chanbuf(c, c.recvx)  if raceenabled {   raceacquire(qp)   racerelease(qp)  }  if ep != nil {   // 将qp位置的元素拷贝到ep中   typedmemmove(c.elemtype, ep, qp)  }  typedmemclr(c.elemtype, qp)  // 接收位置的下标+1,如果接收位置到达队尾,重置为0  c.recvx++  if c.recvx == c.dataqsiz {   c.recvx = 0  }  c.qcount--  unlock(&c.lock)  return true, true } if !block {  unlock(&c.lock)  return false, false } // 走到这里有2种情况,1是非缓冲通道发送者没有准备好,2是缓冲型通道但没有元素, //构造一个sudog加入到通道的等待接收的G队列,调用gopark挂起当前的G,即将状态 //标为waiting, 进入G0执行调度逻辑。 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 {  mysg.releasetime = -1 } // 在elem被赋值到mysg加入sendq队列这段期间,栈是不会被分裂的 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 将mysg加入到等待接收的G队列 c.recvq.enqueue(mysg) // gopark将当前的g挂起,切换到g0执行调度逻辑,当前的g状态会从running->waiting gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 执行唤醒后的逻辑 if mysg != gp.waiting {  throw('G waiting list is corrupted') } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 {  blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed}// 通道接收操作,包含2部分处理逻辑,1是将sg中发送者的元素拷贝到通道中,并唤醒发送者;2是将// 通道中待接收处理的元素拷贝到ep中。注意,走到recv只有2中情况,队列为空或满了,队列没// 满不会走到这里。func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 非缓冲型通道,直接将元素从sg拷贝到ep中,不经过环形队列作为中转 if c.dataqsiz == 0 {  if raceenabled {   racesync(c, sg)  }  if ep != nil {   // 直接将数据从sg拷贝到ep   recvDirect(c.elemtype, sg, ep)  } } else {  // 队列满的情况,走到这里只可能是队列满的情况,对于队列不满的情况,不会走  // 到recv函数里c.recvx和c.sendx是在同一个位置,先将环形队列  // 中待接收处理位置qp地方的元素拷贝到ep中,然后将发送者sg.elem  // 中的元素拷贝到队列的尾部,对于队列满的情况  qp := chanbuf(c, c.recvx)  if raceenabled {   raceacquire(qp)   racerelease(qp)   raceacquireg(sg.g, qp)   racereleaseg(sg.g, qp)  }  // 将qp位置的数据拷贝到ep中  if ep != nil {   typedmemmove(c.elemtype, ep, qp)  }  // 将sg.elem中的元素拷贝到qp位置  typedmemmove(c.elemtype, qp, sg.elem)  c.recvx++  if c.recvx == c.dataqsiz {   c.recvx = 0  }  c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 {  sg.releasetime = cputicks() } // 对gp执行goread,将其状态从waiting修改为runnable状态 goready(gp, skip+1)}// 直接在两个栈上进行数据拷贝,dst是接收者栈G上待接收数据的地址,sg是发送Gfunc recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size)}

第一部分处理的是chan为nil的情况,从nil chan中接收数据,调用者会被永远阻塞。第二部分是对特殊情况的处理,非阻塞(block为false),且通道未被关闭,非缓冲通道且没有接收者直接返回,对于缓冲性通道,但通道满了,也直接返回。第三部分是chan已经被close的情况,如果chan已经被close了,并且队列中没有缓存的元素,直接返回true,false.第四部分是处理等待发送队列中有等待G的情况,这时,如果buf中有数据,先从buf中读取数据,否则直接从等待队列中获取一个发送者,把它的数据复制给这个receiver.第五部分是处理没有等待发送者G的情况,如果buf有元素,就取出一个元素给接收者。第六部分是处理buf中没有元素的情况,如果没有元素,阻塞当前的G,直到它从发送者中获取到了数据,或是chan被关闭了,才返回。

关闭操作实现原理

执行close(ch)关闭chan ch,最终会调用的是closechan方法。下面是closechan的源码解析。关闭nil chan会产生panic. 如果chan已经关闭再次关闭也会产生panic.否则将ch等待队列中的全部接收者和发送者G从队列中全部移除加入了glist.然后将glist中的所有G执行goready唤醒。

// 关闭通道func closechan(c *hchan) { // 通道为空进行关闭引发panic if c == nil {  panic(plainError('close of nil channel')) } // 加锁,会对c进行修改 lock(&c.lock) if c.closed != 0 {  unlock(&c.lock)  panic(plainError('close of closed channel')) } if raceenabled {  callerpc := getcallerpc()  racewritepc(c.raceaddr(), callerpc, funcPC(closechan))  racerelease(c.raceaddr()) } // closed修改为1表示,通道已关闭 c.closed = 1 var glist gList // 将所有的等待接收队列中的G出队,加入到glist中 for {  sg := c.recvq.dequeue()  if sg == nil {   break  }  if sg.elem != nil {   typedmemclr(c.elemtype, sg.elem)   sg.elem = nil  }  if sg.releasetime != 0 {   sg.releasetime = cputicks()  }  gp := sg.g  gp.param = nil  if raceenabled {   raceacquireg(gp, c.raceaddr())  }  glist.push(gp) } // 将所有等待发送队列中的G加入到glist中 for {  sg := c.sendq.dequeue()  if sg == nil {   break  }  sg.elem = nil  if sg.releasetime != 0 {   sg.releasetime = cputicks()  }  gp := sg.g  gp.param = nil  if raceenabled {   raceacquireg(gp, c.raceaddr())  }  glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 将glist中所有的G唤醒,即他们的状态都变为可运行态runnable for !glist.empty() {  gp := glist.pop()  gp.schedlink = 0  goready(gp, 3) }}

其他操作实现原理

下面讲解chan接收和发送与select结合是如何实现的。先看一个发送的情景。select chan + default的语句会被编译器转换成if selectnbased(c,v){}else do{...}语句。selectnbased内部调用的也是chansend方法,只是在block参数传递上与前面的发送操作不同,这里传递的是false,就是不要阻塞在chansend, 不能发送的时候,要返回回来走default逻辑。同理,接收操作方法select中同时又有default的时候,会翻译成selectnbrecv或selectnbrecv2,两者的不同是接收操作有2种类型,一种是带bool的表示是否成功地从chan中读取到了一个值,另一种是不关心该参数。

// compiler implements//// select {// case c <- v://  ... foo// default://  ... bar// }//// as//// if selectnbsend(c, v) {//  ... foo// } else {//  ... bar// }//func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc())}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return}func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { // TODO(khr): just return 2 values from this function, now that it is in Go. selected, *received = chanrecv(c, elem, false) return}

channel使用容易犯的错误

  • close一个nil型chan
  • 往已关闭的chan中发送数据
  • close已经关闭的chan
  • goroutine泄露

close一个nil值的chan会产生panic.我们在关闭chan的时候要清楚chan是否是nil值,如果不确定可以判断一下。第2点和第3点涉及到chan的关闭,不当的操作会导致panic.如何优雅的关闭channel, 可以读一读小冰的这篇文章优雅关闭channel,里面有讲关闭思路。

下面将一个groutine泄露的例子,分析产生的原因。这个例子来自于go-zero作者,原文见这里一文搞懂如何实现 Go 超时控制。可以从这个例子中吸取经验,避免写出类似的bug.

func main() { const total = 1000 var wg sync.WaitGroup wg.Add(total) now := time.Now() for i := 0; i < total; i++ {  go func() {   defer wg.Done()   requestWork(context.Background(), 'any')  }() } wg.Wait() fmt.Println('elapsed:', time.Since(now)) time.Sleep(time.Minute * 2) fmt.Println('number of goroutines:', runtime.NumGoroutine())}func hardWork(job interface{}) error { time.Sleep(time.Minute) return nil}func requestWork(ctx context.Context, job interface{}) error { ctx, cancel := context.WithTimeout(ctx, time.Second*2) defer cancel() done := make(chan error) go func() {  done <- hardWork(job) }() select { case err := <-done:  return err case <-ctx.Done():  return ctx.Err() }}

上面的代码输出结果如下:

啥,打印1001个goroutine,goroutine泄露了。问题在于requestWork函数中,会执行到select <-ctx.Done()这句,因为ctx是一个超时取消context,这里超时时间设置的是2秒钟,hardWork是一个比较耗时的任务,这里模拟sleep 1分钟。所以2秒后,requestWork退出了, 当1分钟后执行 done<-hardWork(job)的时候会被卡住,因为没有接收者了,当前的G会执行gopark挂起了,并且永远得不到执行。所以main函数中的打印会输出1001(有1个是main groutine)。 那怎么修改呢?把done:=make(chan error,1)改成带有1个缓冲区的chan就可以了,因为有1个容量的缓冲区,所以当hardWork(job)执行完后,会将结果放在缓冲区中,然后协程就退出了,不会卡住。也许有读者会问题,这里协程退出之后,done怎么办,会不会存在泄漏。不会出现泄漏,当与chan绑定的的G都不存在的时候,chan会被gc回收掉。

总结

chan值和状态存在很多种情况,不同状态下执行发送、接收、关闭操作会产生不同的情况,下面将各种情况汇总,看完下表可以梳理清楚chan的各个知识点。

(0)

相关推荐

  • Go 中如何让消息队列达到最大吞吐量?

    kevwan Go语言中文网 今天 你在使用消息队列的时候关注过吞吐量吗? 思考过吞吐量的影响因素吗? 考虑过怎么提高吗? 总结过最佳实践吗? 本文带你一起探讨下消息队列消费端高吞吐的 Go 框架实现 ...

  • Go 最细节篇 — chan 为啥没有判断 close 的接口 ?

    大纲 Go 为什么没有判断 close 的接口? Go 关闭 channel 究竟做了什么? `closechan` 一个判断 chan 是否 close 的函数 思考方法一:通过"写&qu ...

  • Go并发处理

    写了一个web接口,想高并发的请求这个接口,进行压力测试,所以服务端就实现了一个线程池. 代码从网上理解了之后写的.代码实例 简单的介绍: 首先实现一个Job接口,只要有方法实现了Do方法即可 定义个 ...

  • 多图详解Go中的Channel源码

    chan介绍 package mainimport "fmt" func main() { c := make(chan int) go func() { c <- 1 // ...

  • Go:如何优雅地实现并发编排任务

    Go语言中文网 昨天 以下文章来源于吴亲强的深夜食堂 ,作者吴亲库里 业务场景 在做任务开发的时候,你们一定会碰到以下场景: 场景1:调用第三方接口的时候, 一个需求你需要调用不同的接口,做数据组装. ...

  • 在Go中,你犯过这些错误吗

    Go语言中文网 今天 以下文章来源于吴亲强的深夜食堂 ,作者吴亲库里 吴亲强的深夜食堂关注一些奇奇怪怪的设计,分享一些有有趣趣的生活 迭代器变量上使用 goroutine 这算高频吧. package ...

  • RabbitMQ Golang教程(三)

    RabbitMQ Golang教程(三) 什么是发布/订阅? 创建一个队列每个任务只传递给一个工人,做些不同的事,向多个消费者传递一个消息.这就是所谓的"订阅/发布模式". 构建一 ...

  • 从 bug 中学:六大开源项目告诉你 go 并发编程的那些坑

    作者:richardyao,腾讯 CSIG 后台开发工程师 并发编程中,go不仅仅支持传统的通过共享内存的方式来通信,更推崇通过channel来传递消息,这种新的并发编程模型会出现不同于以往的bug. ...

  • 手把手教姐姐写消息队列

    前言 这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧.因为要用go语言写,这可给姐姐愁坏了.赶紧来求助我,我这么坚贞不 ...

  • RabbitMQ Golang教程(二)

    RabbitMQ Golang教程(二) 任务队列 什么是任务队列 ? 把要执行的任务放在队列中.使用较多的任务队列有machiney.Celery.goWorker.YTask.每一个任务队列都有自 ...

  • 再次探讨 Go 的无限缓冲的channel

    Go语言中文网 今天 以下文章来源于吴亲强的深夜食堂 ,作者吴亲库里 chanx 上篇文章无限缓冲的channel(1)我们提到,当我们创建一个有缓冲的通道并指定了容量,那么在这个通道的生命周期内,我 ...