查看原文
其他

bthread源码剖析(二): 工作窃取与TaskGroup的run_main_task()

果冻虾仁 编程往事 2022-08-22

上一篇文章,介绍了TaskControl(简称TC)的初始化逻辑、worker的基本概念,并引出了TaskGroup(简称TG)的主要函数:run_main_task()。在谈run_main_task()之前,我们先看一下TG的几个主要成员。

TG的主要成员

讲到TG先看TG的主要成员:

    size_t _steal_seed;
    size_t _steal_offset;
    ContextualStack* _main_stack;
    bthread_t _main_tid;
    WorkStealingQueue<bthread_t> _rq;
    RemoteTaskQueue _remote_rq;

每个TG都维护自己一个单独的栈指针:_main_stack和_main_tid。也就是是说TG中有一个特殊的TM。我姑且称之为“主TM”。这两个是在TG初始化的时候赋值的。

每个TG有两个TM的队列:rq和remote_rq,它们之间有啥区别呢?

rq 与 remote_rq

通过在代码里搜索这两个队列入队的逻辑,可以发现。当调用bthread_start_background()创建bthread任务时,其内部会继续调用TG的ready_to_run(),接着push_rq()函数,给TG的rq入队。而remote_rq队列的入队是是通过执行TG的ready_to_run_remote()完成的。

再看一下ready_to_run_remote注释:

    // Push a bthread into the runqueue from another non-worker thread.
    void ready_to_run_remote(bthread_t tid, bool nosignal = false);

在没有woker(TG)的线程中把bthread入队,只能入到有worder线程中的TG的remote_rq队列。

再看下ready_to_run_remote()的调用的地方。

在butex_wake()中:

    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, bbw->tid);
    } else {
        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
    }

在start_background()中:

template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) 
{
...
...
    if (REMOTE) {
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}

start_background<>()模板参数为true的时候被调用ready_to_run_remote()。而在start_from_non_worker()中,会调用start_background<true>()

好了,言归正传。

TaskGroup::run_main_task()

run_main_task(),去掉一些bvar相关的代码,这个函数也异常简洁。

void TaskGroup::run_main_task() {
    ...
    TaskGroup* dummy = this;
    bthread_t tid;
    while (wait_task(&tid)) {
        TaskGroup::sched_to(&dummy, tid);
        DCHECK_EQ(this, dummy);
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
            TaskGroup::task_runner(1/*skip remained*/);
        }
        ...
    }
    // Don't forget to add elapse of last wait_task.
    current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}

死循环执行wait_task来等待有效的任务,如果能等到任务,wait_task()的出参tid(bthread_t类型)会记录这个任务的ID号。好了,拿到任务ID号tid后,执行sched_to函数来切换栈。在进行了一些check工作后,判断如果当前的tid不是TG的主要tid(main_tid)则执行:TaskGroup::task_runner(1);

由此观之,我们发现三个关键函数:wait_task()sched_to()task_runner()

简述一下他们的基本功能:

  1. wait_task():找到一个任务。其中会涉及工作窃取(work stealing)。
  2. sched_to():进行栈、寄存器等运行时上下文的切换,为接下来运行的任务恢复其上下文。
  3. task_runner():执行任务。

现在我们的观察视角终于可以切入到“work stealing”了。

工作窃取(work stealing)

首先声明,work stealing不是协程的专利,更不是Go语言的专利。work stealing是一种通用的实现负载均衡的算法。这里的负载均衡说的不是像Nginx那种对于外部网络请求做负载均衡,此处指的是每个CPU处理任务时,每个核的负载均衡。不止协程,其实线程池也可以做work stealing。

20世纪90年代,MIT的Charles E. Leiserson 教授发起并指导了CILK项目。该项目发表了许多论文,启发了各种使用“工作窃取”的基于任务的调度器。

TaskGroup::wait_task()

看源码,这里简化起见,去掉了BTHREAD_DONT_SAVE_PARKING_STATE条件宏判断逻辑相关

bool TaskGroup::wait_task(bthread_t* tid) {
    do {
        if (_last_pl_state.stopped()) {
            return false;
        }
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
            return true;
        }
    } while (true);
}

_pl是ParkingLot*类型,_last_plstate是pl中的state。关于它俩的更多介绍,后面会有其他文章。

_pl->wait(_last_pl_state)内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞,当阻塞结束之后,执行steal_task()来进行工作窃取。如果窃取成功则返回。

TaskGoup::steal_task()
    bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
        _last_pl_state = _pl->get_state();
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }

首先TG的remote_rq队列中的任务出队,如果没有则同全局TC来窃取任务。

视角从TG跳出,来看一看TC的steal_task()

TaskControl::steal_task

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    // 1: Acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
        return false;
    }

    // NOTE: Don't return inside `for' iteration since we need to update |seed|
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}

可以看出是随机找一个TG,先从它的rq队列窃取任务,如果失败再从它的remote_rq队列窃取任务。在消费的时候rq比remote_rq有更高的优先级,显而易见,我们一定是想先执行有woker的线程自己push到队列中的bthread,然后再消费其他线程push给自己的bthread。

通过上面三个函数可以看出TaskGroup::wait_task() 在等待任务的时候,是优先获取当前TG的remote_rq,然后是依次窃取其他TG的rq、remote_rq。它并没有从当前TG的rq找任务!这是为什么呢?原因是避免race condition。也就是避免多个TG 等待任务的时候,当前TG从rq取任务,与其他TG过来自己这边窃取任务造成竞态。从而提升一点点的性能。

那么当前TG的rq是什么时候被消费的呢?

在TG的ending_sched()函数中有rq的出队操作,而ending_sched()在task_runner中被调用,task_runner也是run_main_task()的三个关键函数之一。

TaskGroup::task_runner()

void TaskGroup::task_runner(intptr_t skip_remained) {
    TaskGroup* g = tls_task_group;

    if (!skip_remained) {
        while (g->_last_context_remained) {
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }
     ...
     }

在run_main_task()中task_runner()的输入参数是1,所以上面的if逻辑会被跳过。这里忽略这个if,继续向下看,下面是一个很长的do-while循环(去掉一些日志和bvar相关逻辑,补充注释):

    do {
        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;
        ... 
        // 执行TM(bthread)中的回调函数
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }

        // Group is probably changed
        g = tls_task_group;

        // TODO: Save thread_return
        (void)thread_return;

        ... 日志

        // 清理 线程局部变量(下面是原注释)
        // Clean tls variables, must be done before changing version_butex
        // otherwise another thread just joined this thread may not see side
        // effects of destructing tls variables.
        KeyTable* kt = tls_bls.keytable;
        if (kt != NULL) {
            return_keytable(m->attr.keytable_pool, kt);
            // After deletion: tls may be set during deletion.
            tls_bls.keytable = NULL;
            m->local_storage.keytable = NULL// optional
        }

        // 累加版本号,且版本号不能为0(下面是原注释)
        // Increase the version and wake up all joiners, if resulting version
        // is 0, change it to 1 to make bthread_t never be 0. Any access
        // or join to the bthread after changing version will be rejected.
        // The spinlock is for visibility of TaskGroup::get_attr.
        {
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
                ++*m->version_butex;
            }
        }
        // 唤醒joiner
        butex_wake_except(m->version_butex, 0);

        // _nbthreads减1(注意_nbthreads不是整型)
        g->_control->_nbthreads << -1;
        g->set_remained(TaskGroup::_release_last_context, m);

        // 查找下一个任务,并切换到其对应的运行时上下文
        ending_sched(&g);

    } while (g->_cur_meta->tid != g->_main_tid); 

do while循环中会执行回调函数,结束的时候会查找下一个任务,并切换上下文。循环的终止条件是tls_task_group的_cur_meta不等于其_main_tid。

在ending_sched()中,会有依次从TG的rq、remote_rq取任务,找不到再窃取其他TG的任务,如果都找不到任务,则设置_cur_meta为_main_tid,也就是让task_runner()的循环终止。

然后就会回到run_main_task()的主循环,继续wait_task()等待新任务了。

好了,run_main_task()的三大关键函数,已过其二,还剩下一个sched_to()还未揭开其庐山真面,下一篇文章,我来带大家解读sched_to()。之所以把它单独成篇,是因为会涉及一些汇编的知识,读起来可能晦涩艰深。大家做好准备!


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存