协程库 libtask 源码分析

本文在公司内网有不错的反响,但不同于传统的前端技术文章,所以阅读起来可能有点晦涩。

假设读者已经了解了协程的概念、实现协程的底层技术支持,基于底层基础,我们来看看如何实现协程以及协程的应用。

什么是 libtask

libtask 是 google 大佬 Russ Cox(Go 的核心开发者)所写。libtask 非常有意思,为数不多的代码就可以让人了解和理解协程的具体应用,很值得学习,我感兴趣的点在于如何在服务器中使用协程,传统的服务器,基本都是多进程、多线程、池化、单线程/多线程多路复用等等,而 libtask 使用少量的代码就让我看到了如何使用协程写一个服务器,非常赞(源码分析)。

libtask 源码解析

我们从 libtask 的 main 函数开始,这个main函数就是我们在c语言中使用的c函数,libtask 本身实现了 main 这个函数,用户使用 libtask时,要实现的是 taskmain 函数。taskmain 和 main 的函数声明是一样的。下面我们看一下 main 函数。

int main(int argc, char **argv)
{
 struct sigaction sa, osa;
 // 注册SIGQUIT信号处理函数
 memset(&sa, 0, sizeof sa);
 sa.sa_handler = taskinfo;
 sa.sa_flags = SA_RESTART;
 sigaction(SIGQUIT, &sa, &osa);

// 保存命令行参数
 argv0 = argv[0];
 taskargc = argc;
 taskargv = argv;

if(mainstacksize == 0)
  mainstacksize = 256*1024;
 // 创建第一个协程
 taskcreate(taskmainstart, nil, mainstacksize);
 // 开始调度
 taskscheduler();
 fprint(2, 'taskscheduler returned in main!\n');
 abort();
 return 0;
}

main 函数主要的两个逻辑是 taskcreate 和 taskscheduler 函数。我们先来看 taskcreate。

int taskcreate(void (*fn)(void*), void *arg, uint stack){ int id; Task *t;

 t = taskalloc(fn, arg, stack); taskcount++; id = t->id; // 记录位置 t->alltaskslot = nalltask; // 保存到alltask中 alltask[nalltask++] = t; // 修改状态为就绪,可以被调度,并且加入到就绪队列 taskready(t); return id;}

taskcreate 首先调用 taskalloc 分配一个表示协程的结构体 Task。我们看看这个结构体的定义。

struct Task
{
 char name[256]; // offset known to acid
 char state[256];
 // 前后指针
 Task *next;
 Task *prev;
 Task *allnext;
 Task *allprev;
 // 执行上下文
 Context context;
 // 睡眠时间
 uvlong alarmtime;
 uint id;
 // 栈信息
 uchar *stk;
 uint stksize;
 //是否退出了
 int exiting;
 // 在alltask的索引
 int alltaskslot;
 // 是否是系统协程
 int system;
 // 是否就绪状态
 int ready;
 // 入口函数
 void (*startfn)(void*);
 // 入口参数
 void *startarg;
 // 自定义数据
 void *udata;
};

接着看看 taskalloc 的实现。

// 分配一个协程所需要的内存和初始化某些字段static Task*taskalloc(void (*fn)(void*), void *arg, uint stack){ Task *t; sigset_t zero; uint x, y; ulong z;

 /* allocate the task and stack together */ // 结构体本身的大小+栈大小 t = malloc(sizeof *t+stack); memset(t, 0, sizeof *t); // 栈的内存位置 t->stk = (uchar*)(t+1); // 栈大小 t->stksize = stack; // 协程id t->id = ++taskidgen; // 协程工作函数和参数 t->startfn = fn; t->startarg = arg;

 /* do a reasonable initialization */ memset(&t->context.uc, 0, sizeof t->context.uc); sigemptyset(&zero); // 初始化uc_sigmask字段为空,即不阻塞信号 sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);

 /* must initialize with current context */ // 初始化uc字段 getcontext(&t->context.uc)  // 设置协程执行时的栈位置和大小 t->context.uc.uc_stack.ss_sp = t->stk+8; t->context.uc.uc_stack.ss_size = t->stksize-64; z = (ulong)t; y = z; z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */ x = z>>16; // 保存信息到uc字段 makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);

 return t;}

taskalloc 函数代码看起来很多,但是逻辑不算复杂,就是申请 Task 结构体所需的内存和执行时栈的内存,然后初始化各个字段。这样,一个协程就诞生了。接着执行 taskready 把协程加入就绪队列。

// 修改协程的状态为就绪并加入就绪队列
void taskready(Task *t)
{
 t->ready = 1;
 addtask(&taskrunqueue, t);
}

// 把协程插入队列中,如果之前在其他队列,则会被移除
void addtask(Tasklist *l, Task *t)
{
 if(l->tail){
  l->tail->next = t;
  t->prev = l->tail;
 }else{
  l->head = t;
  t->prev = nil;
 }
 l->tail = t;
 t->next = nil;
}

taskrunqueue 记录了所有就绪的协程。创建了协程并加入队列后,协程还没有开始执行,就像操作系统的进程和线程一样,需要有一个调度器来调度执行。下面我们看看调度器的实现。

// 协程调度中心static void taskscheduler(void){ int i; Task *t; for(;;){  // 没有用户协程了,则退出  if(taskcount == 0)   exit(taskexitval);  // 从就绪队列拿出一个协程  t = taskrunqueue.head;  if(t == nil){   fprint(2, 'no runnable tasks! %d tasks stalled\n', taskcount);   exit(1);  }  // 从就绪队列删除该协程  deltask(&taskrunqueue, t);  t->ready = 0;  // 保存正在执行的协程  taskrunning = t;  // 切换次数加一  tasknswitch++;  // 切换到t执行,并且保存当前上下文到taskschedcontext(即下面要执行的代码)  contextswitch(&taskschedcontext, &t->context);  // 执行到这说明没有协程在执行(t切换回来的),置空  taskrunning = nil;  // 刚才执行的协程t退出了  if(t->exiting){   // 不是系统协程,则个数减一   if(!t->system)    taskcount--;   // 当前协程在alltask的索引   i = t->alltaskslot;   // 把最后一个协程换到当前协程的位置,因为他要退出了   alltask[i] = alltask[--nalltask];   // 更新被置换协程的索引   alltask[i]->alltaskslot = i;   // 释放堆内存   free(t);  } }}

调度器的代码看起来很多,但是核心逻辑就三个

  1. 从就绪队列中拿出一个协程 t,并把 t 移出就绪队列
  2. 通过 contextswitch 切换到协程 t 中执行
  3. 协程 t 切换回调度中心,如果t已经退出,则修改数据结构,然后回收他占据的内存。继续调度其他协程执行。

至此,协程就开始跑起来了。并且也有了调度系统。这里的调度机制是比较简单的,就是按着先进先出的方式就绪调度,并且是非抢占的。即没有按时间片调度的概念,一个协程的执行时间由自己决定,放弃执行的权力也是自己控制的,当协程不想执行了可以调用 taskyield 让出 cpu。

// 协程主动让出cpu
int taskyield(void)
{
 int n;
 // 当前切换协程的次数
 n = tasknswitch;
 // 插入就绪队列,等待后续的调度
 taskready(taskrunning);
 taskstate('yield');
 // 切换协程
 taskswitch();
 // 等于0说明当前只有自己一个协程,调度的时候tasknswitch加一,所以这里减一
 return tasknswitch - n - 1;
}

/*
 切换协程,taskrunning是正在执行的协程,taskschedcontext是调度协程(主线程)的上下文,
 切换到调度中心,并保持当前上下文到taskrunning->context
*/
void taskswitch(void)
{
 needstack(0);
 contextswitch(&taskrunning->context, &taskschedcontext);
}

// 真正切换协程的逻辑
static void contextswitch(Context *from, Context *to)
{
 if(swapcontext(&from->uc, &to->uc) < 0){
  fprint(2, 'swapcontext failed: %r\n');
  assert(0);
 }
}

yield 的逻辑也很简单,因为协程在执行的时候,是不处于就绪队列的,当协程准备让出 cpu 时,协程首先把自己重新加入到就绪队列,等待下次被调度执行。当然我们也可以直接调度 contextswitch 切换到其他协程。重点在于什么时候应该让出 cpu,又什么时候应该被调度执行。接下来会详细讲解。至此,我们已经有了支持协程所需要的底层基础。我们看到这个实现的思路也不是很复杂,首先有一个队列表示待执行的的协程,每一个协程对应一个 Task 结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出 cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。接下来我们看看,基于这些底层基础,如果实现一个基于协程的服务器。下面我们通过一个例子进行讲解。

void taskmain(int argc, char **argv){ // 启动一个tcp服务器 if((fd = netannounce(TCP, 0, atoi(argv[1]))) < 0){  // ... } // 改为非阻塞模式 fdnoblock(fd); // accept成功后创建一个客户端协程 while((cfd = netaccept(fd, remote, &rport)) >= 0){  taskcreate(proxytask, (void*)cfd, STACK); }}

我们刚才讲过 taskmain 是我们需要实现的函数,首先通过 netannounce 建立一个 tcp 服务器。接着把 fd 改成非阻塞的,这个非常重要,因为在后面调用 accept 的时候,如果是阻塞的文件描述符,那么就会引起进程挂起,而非阻塞模式下,操作系统会返回 EAGAIN 的错误码,通过这个错误码我们可以决定下一步做什么。我们看看 netaccept 的实现。

// 处理(摘下)连接
int
netaccept(int fd, char *server, int *port)
{
 int cfd, one;
 struct sockaddr_in sa;
 uchar *ip;
 socklen_t len;
 // 注册事件到epoll,等待事件触发
 fdwait(fd, 'r');
 len = sizeof sa;
 // 触发后说明有连接了,则执行accept
 if((cfd = accept(fd, (void*)&sa, &len)) < 0){
  return -1;
 }
 // 和客户端通信的fd也改成非阻塞模式
 fdnoblock(cfd);
 one = 1;
 setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof one);
 return cfd;
}

netaccept 就是通过调用 accept 逐个处理 tcp 连接,但是在 accept 之前,有一个非常重要的操作 fdwait。

// 协程因为等待io需要切换void fdwait(int fd, int rw){  // 是否已经初始化epoll if(!startedfdtask){  startedfdtask = 1;        epfd = epoll_create(1);  // 没有初始化则创建一个协程,做io管理  taskcreate(fdtask, 0, 32768); }    struct epoll_event ev = {0}; // 记录事件对应的协程和感兴趣的事件    ev.data.ptr = taskrunning; switch(rw){ case 'r':  ev.events |= EPOLLIN | EPOLLPRI;  break; case 'w':  ev.events |= EPOLLOUT;  break; }

    int r = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); // 切换到其他协程,等待被唤醒 taskswitch(); // 唤醒后函数刚才注册的事件    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev);}

fdwait 首先把 fd 注册到 epoll 中,然后把协程切换到下一个待执行的协程。这里有个细节,当协程 X 被调度执行的时候,他是脱离了就绪队列的,而 taskswitch 函数只是实现了切换上下文到调度中心,调度中心会从就绪队列从选择下一个协程执行,那么这时候,脱离就绪队列的协程X就处于孤岛状态,看起来再也无法给调度中心选中执行,这个问题的处理方式是,把协程、fd和感兴趣的事件信息一起注册到 epoll 中,当 epoll 监听到某个 fd 的事件发生时,就会把对应的协程加入就绪队列,这样协程就可以被调度执行了。在 fdwait 函数一开始那里处理了 epoll 相关的逻辑。epoll 的逻辑也是在一个协程中执行的,但是 epoll 所在协程和一般协程不一样,类似于操作系统的内核线程一样,epoll 所在的协程成为系统协程,即不是用户定义的,而是系统定义的。我们看一下实现:

void fdtask(void *v)
{
 int i, ms;
 Task *t;
 uvlong now;
 // 变成系统协程
 tasksystem();
    struct epoll_event events[1000];
 for(;;){
  /* let everyone else run */
  // 大于0说明还有其他就绪协程可执行,则先让给他们执行,否则往下执行
  while(taskyield() > 0)
   ;
  /* we're the only one runnable - poll for i/o */
  errno = 0;
  // 没有定时事件则一直阻塞
  if((t=sleeping.head) == nil)
   ms = -1;
  else{
   /* sleep at most 5s */
   now = nsec();
   if(now >= t->alarmtime)
    ms = 0;
   else if(now+5*1000*1000*1000LL >= t->alarmtime)
    ms = (t->alarmtime - now)/1000000;
   else
    ms = 5000;
  }
        int nevents;
  // 等待事件发生,ms是等待的超时时间
  if((nevents = epoll_wait(epfd, events, 1000, ms)) < 0){
   if(errno == EINTR)
    continue;
   fprint(2, 'epoll: %s\n', strerror(errno));
   taskexitall(0);
  }

/* wake up the guys who deserve it */
  // 事件触发,把对应协程插入就绪队列
  for(i=0; i<nevents; i++){
            taskready((Task *)events[i].data.ptr);
  }

now = nsec();
  // 处理超时事件
  while((t=sleeping.head) && now >= t->alarmtime){
   deltask(&sleeping, t);
   if(!t->system && --sleepingcounted == 0)
    taskcount--;
   taskready(t);
  }
 }
}

我们看到 epoll 的处理逻辑和一般服务器的类似,通过 epoll_wait 阻塞,然后 epoll_wait 返回时,处理每一个发生的事件,而且 libtask 还支持超时事件。另外 libtask 中当还有其他就绪协程的时候,是不会进入 epoll_wait 的,它会把 cpu 让给就绪的协程(通过 taskyield 函数),当就绪队列只有 epoll 所在的协程时才会进入 epoll 的逻辑。至此,我们看到了 libtask 中如何把异步变成同步的。当用户要调用一个可能会引起进程挂起的接口时,就可以调用 libtask 提供的一个相应的 API,比如我们想读一个文件,我们可以调用 libtask 的 fdread。

int fdread(int fd, void *buf, int n){ int m; // 非阻塞读,如果不满足则再注册到epoll,参考fdread1 while((m=read(fd, buf, n)) < 0 && errno == EAGAIN)  fdwait(fd, 'r'); return m;}

这样就不需要担心进程被挂起,同时也不需要处理 epoll 相关的逻辑(注册事件,事件触发时的处理等等)。异步转同步,libtask 的方式就是通过提供对应的 API,先把用户的 fd 注册到 epoll 中,然后切换到其他协程,等 epoll 监听到事件触发时,就会把对应的协程插入就绪队列,当该协程被调度中心选中执行时,就会继续执行剩下的逻辑而不会引起进程挂起,因为这时候所等待的条件已经满足。

总结

libtask 的设计思想就是把业务逻辑封装到一个个协程中,由 libtask 实现协程的调度,在各个业务逻辑中进行切换,从而驱动着系统的运行。另外 libtask 也提供了一个网络和文件 io 异步变同步的解决方案。使得我们使用起来更加方便,高效。

紧追技术前沿,深挖专业领域
(0)

相关推荐