查看原文
其他

写了一段“高端”C语言代码

程序喵大人 程序喵大人 2022-08-22

之前写过一篇文章叫《写了一段高端C++代码》,这篇文章的背景和它完全相同,我这里再复述一遍。

背景:

在音视频方向中,线程分为普通线程和GL线程(OpenGL线程),GL线程中可以执行OpenGL相关的语句,做一些图像渲染的工作,也可以理解为所有GL语句都要在GL线程中执行;而在普通线程中,只能执行那些我们平时经常接触的普通语句。


在具体项目开发中会有些需求:在普通线程中突然想要执行某些必须要在GL线程下执行的任务(比如某些初始化工作,释放某些GL相关的对象),执行完此任务后又继续执行自己的任务,像在同一个线程执行一样:

void func() { task1(); task2(); // 需要在GL线程执行 task3();}


分析:

这里有个关键点:task3()一定要等到task2()执行完毕后才可执行,但是由于task2()是被抛到了其他线程运行,没有起到阻塞执行的效果。


怎么能达到目的呢?可以这样使用条件变量:

void task2() { ... notify();}
void func() { task1(); task2(); // 需要在GL线程执行 wait(); task3();}

普通线程在task2()后使用wait()阻塞线程,待GL线程中的任务执行完后使用notity()打断普通线程的阻塞,可达到顺序执行的目的。


但这样非常麻烦,而且不通用,代码还相当难看。

在之前的文章里我使用C++的future封装了一套函数,可以方便的跨线程阻塞调度某个任务执行,然而我还有个项目是使用纯C语言开发的,没有了C++的future,要完成类似需求就比较困难,但是,困难也得搞阿,于是有了下面的代码。


先看下如何函数的设计:

#ifndef __GL_DISPATCH_H__#define __GL_DISPATCH_H__#include <stdbool.h>
typedef struct Dispatcher Dispatcher;typedef void (*DispatcherFunc)(void* arg);/** * @brief 创建一个实例,内部会常驻一个GL线程,外部可以将某些任务丢到此线程里执行,可以选择是否阻塞执行 */Dispatcher* Dispatcher_create();/** * @brief 销毁实例 */void Dispatcher_destroy(Dispatcher** dispatcher_p);/** * @brief 利用此函数将任务丢到线程里执行 * * @param dispatcher 实例 * @param func 要执行的函数 * @param arg 函数参数 * @param block 选择是否阻塞执行 */void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block);
#endif // __GL_DISPATCH_H__

主要功能就在最后一个函数,该函数可以选择是否阻塞执行,这里大家可以思考一下,如何实现阻塞的需求?条件变量是肯定的,但是如何保证条件变量等待的是当前事件呢?还是直接看代码实现吧:

#include <stdatomic.h>#include "libctools.h"#include "gldispatch.h"
typedef struct Task { DispatcherFunc func; void* arg; Mutex* mutex; Cond* cond; bool is_continue; bool is_block;} Task;
struct Dispatcher { Thread* thread_id; Thread _thread_id; Mutex* mutex; Cond* cond; List* task_queue; atomic_bool is_interrupt;};
static void taskDestroy(Task* task) { if (task->mutex) { mutex_destroyp(&task->mutex); } if (task->cond) { cond_destroyp(&task->cond); } free((void*)task);}
static Task* taskCreate(DispatcherFunc func, void* arg, bool is_block) { Task* task = (Task*)calloc(1, sizeof(Task)); if (!task) return NULL; task->func = func; task->arg = arg; task->is_block = is_block; if (is_block) { task->mutex = mutex_create(); task->cond = cond_create(); } return task;}
static int Dispatcher_process(void* arg) { log_info("%s start", __func__); Dispatcher* self = (Dispatcher*)arg; while (!atomic_load(&self->is_interrupt)) { mutex_lock(self->mutex); while (self->task_queue->len == 0 && !atomic_load(&self->is_interrupt)) { cond_wait(self->cond, self->mutex); } ListNode* node = list_pop_front(self->task_queue); mutex_unlock(self->mutex); if (atomic_load(&self->is_interrupt)) { break; } if (node) { Task* task = (Task*)node->val; task->func(task->arg); if (task->is_block) { mutex_lock(task->mutex); task->is_continue = true; cond_signal(task->cond); mutex_unlock(task->mutex); } else { taskDestroy(task); } free(node); } } log_info("%s stop", __func__); return 0;}
static void Dispatcher_start(Dispatcher* dispatcher) { dispatcher->thread_id = thread_create_with_name(&dispatcher->_thread_id, Dispatcher_process, dispatcher, "Dispatcher_process");}
static void Dispatcher_stop(Dispatcher* dispatcher) { atomic_store(&dispatcher->is_interrupt, true); cond_signal(dispatcher->cond); list_clear(dispatcher->task_queue, true); if (dispatcher->thread_id) { thread_wait(dispatcher->thread_id, NULL); }}
Dispatcher* Dispatcher_create() { Dispatcher* self = (Dispatcher*)calloc(1, sizeof(Dispatcher)); if (!self) return NULL; self->mutex = mutex_create(); self->cond = cond_create(); self->task_queue = list_create(); self->task_queue->free_func = (void (*)(int64_t))taskDestroy; Dispatcher_start(self); return self;}
void Dispatcher_destroy(Dispatcher** dispatcher_p) { if (NULL == dispatcher_p || NULL == *dispatcher_p) return; Dispatcher* self = *dispatcher_p; Dispatcher_stop(self); mutex_destroyp(&self->mutex); cond_destroyp(&self->cond); list_destroy(self->task_queue); freep((void**)dispatcher_p);}
void Dispatcher_run(Dispatcher* dispatcher, DispatcherFunc func, void* arg, bool block) { if (!dispatcher) return; Task* task = taskCreate(func, arg, block); if (task) { ListNode* node = list_node_new((int64_t)task); mutex_lock(dispatcher->mutex); list_push_back(dispatcher->task_queue, node); cond_signal(dispatcher->cond); mutex_unlock(dispatcher->mutex); if (task->is_block) { mutex_lock(task->mutex); while (!task->is_continue) { cond_wait(task->cond, task->mutex); } mutex_unlock(task->mutex); taskDestroy(task); } }}

使用方式:

void func1(void* arg) { print("hello func1");}
void func2(void* arg) { print("hello func2");}
int main() { Dispatcher* dispatcher = Dispatcher_create(); Dispatcher_run(dispatcher, func1, NULL, true); Dispatcher_run(dispatcher, func2, NULL, true); Dispatcher_destroy(&dispatcher); return 0;}

tips:

代码中的log、mutex、cond、thread、list都是二次封装的函数,功能无非就是log、加解锁、条件变量、创建线程以及C语言的链表。这里就不贴出他们的实现了,大家可以自己实现一套,当作个小练习,不难。

代码我也就不过多介绍了,相信有点水平的朋友都能看懂,有问题可以留言!


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存