diff --git a/docs/content/docs/source-reading/pipeline_model_graph_based.md b/docs/content/docs/source-reading/pipeline_model_graph_based.md index ddb7c66..8d02589 100644 --- a/docs/content/docs/source-reading/pipeline_model_graph_based.md +++ b/docs/content/docs/source-reading/pipeline_model_graph_based.md @@ -16,20 +16,22 @@ giscus = true 作者:[JackTan25](https://github.com/JackTan25) | Databend Contributor ## 一.基于图的初始化 -![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/01-pipeline-arch.png) -```text - 上图便是databend的一条pipeline结构,通常对于每一个PipeItem,这里只会有一个input_port和output_port,一个Pipe当中的PipeItem的数量则通常代表着并行度.每一个PipeItem里面对应着一个算子(不过在有些情况下并不一定一个pipeItem就只有一对input_port和output_port,所以上图画的更加广泛一些),算子的推进由调度模型来触发 -``` -将pipeline初始化为graph:这里细致展示下生成的过程 -```text - databend采取的是采取的是StableGraph这个结构,我们最开始是得到了下面第一张图这样的Pipeline,我们最后生成的是下面第二张图的graph. -``` -![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/02-pipeline-graph-build-01.jpg) -![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/03-pipeline-graph-build-02.jpg) -```text - 上面第二张图的的连接只是一个物理上的单纯图的连接,但是node内部pipe_item对应的port没有对接起来.我们还需要关心的是具体如何把对应的port给连接起来的.在构建图的时候每一个PipeItem包装为一个node,包装的过程是以Pipe为顺序的.这样我们就为上面每一个PipeItem都加上了一个Node编号,后面我们需要按照为对应的input_port和output_port去加上edge,我们的连接是一个平行的连接. -``` + +![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/1-pipeline-arch.png) + +上图便是 databend 的一条 pipeline 结构,通常对于每一个 PipeItem,这里只会有一个 input_port 和 output_port,一个 Pipe 当中的 PipeItem 的数量则通常代表着并行度.每一个 PipeItem 里面对应着一个算子(不过在有些情况下并不一定一个 pipeItem 就只有一对 input_port 和 output_port,所以上图画的更加广泛一些),算子的推进由调度模型来触发 + +将 pipeline 初始化为 graph:这里细致展示下生成的过程 + +databend 采取的是采取的是 StableGraph 这个结构,我们最开始是得到了下面第一张图这样的 Pipeline,我们最后生成的是下面第二张图的 graph. + +![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/2-pipeline-graph-build-01.jpg) +![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/3-pipeline-graph-build-02.jpg) + +上面第二张图的的连接只是一个物理上的单纯图的连接,但是 node 内部 pipe_item 对应的 port 没有对接起来.我们还需要关心的是具体如何把对应的 port 给连接起来的.在构建图的时候每一个 PipeItem 包装为一个 node,包装的过程是以 Pipe 为顺序的.这样我们就为上面每一个 PipeItem 都加上了一个 Node 编号,后面我们需要按照为对应的 input_port 和 output_port 去加上 edge,我们的连接是一个平行的连接. + 我们将构建过程当中需要使用到的结构做一个介绍: + ```rust // 一个Node对应一个PipeItem struct Node { @@ -38,7 +40,7 @@ struct Node { // 单元PipeItem的状态,一共有如下三种状态: // Idle,Processing,Finished, state: std::sync::Mutex, - + updated_list: Arc, // 一下是pipeItem的内容 inputs_port: Vec>, @@ -80,7 +82,9 @@ pub struct UpdateTrigger { } // 上面的例子最后我们得到的graph初始化后应该是下面这样 ``` -![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/04-pipeline-graph-build-03.jpg) + +![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/4-pipeline-graph-build-03.jpg) + ```rust // 而对于input_port和output_port的数据的传递,则是两者之间共享一个SharedData pub struct SharedStatus { @@ -91,14 +95,17 @@ pub struct SharedStatus { data: AtomicPtr, } ``` -## 二.基于work-steal与状态机的并发调度模型 -```text - 初始化调度是将我们的graph的所有出度为0的Node作为第一次任务调度节点,对应我们的例子就是Node4,Node5每一次调度都是抽取出graph当中的同步任务和异步任务,下图是pipeline的调度模型,用于抽取出当前graph当中可执行的同步processor和异步processor,调度模型的输入是最上面的graph,而输出则是sync_processor_queue和async_processor_queue,无论是在初始化时还是在后面继续执行的过程都是利用的下面的调度模型来进行调度.调度模型的执行终点是need_schedule_nodes和need_schedule_edges均为空 -``` -![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/05-pipeline-model.jpg)) + +## 二.基于 work-steal 与状态机的并发调度模型 + +初始化调度是将我们的 graph 的所有出度为 0 的 Node 作为第一次任务调度节点,对应我们的例子就是 Node4,Node5 每一次调度都是抽取出 graph 当中的同步任务和异步任务,下图是 pipeline 的调度模型,用于抽取出当前 graph 当中可执行的同步 processor 和异步 processor,调度模型的输入是最上面的 graph,而输出则是 sync_processor_queue 和 async_processor_queue,无论是在初始化时还是在后面继续执行的过程都是利用的下面的调度模型来进行调度.调度模型的执行终点是 need_schedule_nodes 和 need_schedule_edges 均为空 + +![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/5-parallel-pipeline-model.jpg) ## 三.执行模型 + 执行模型对应相关结构如下: + ```rust struct ExecutorTasks { // 记录当前还剩余的task的数量 @@ -151,11 +158,13 @@ struct WorkerCondvar { condvar: Condvar, } ``` + 执行模型的流程图如下: -![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/6-parallel-pipeline-model.jpg)) +![](https://psiace.github.io/databend-internals/source-reading/pipeline_model_graph/6-parallel-pipeline-model.jpg) + ## 四. 限时机制 -```text - 限时机制其实是比较简单的,其主要的作用就是限制sql的pipeline执行的时间在规定时间内完成, -如果超时则自动终止.这个机制底层实现就是用了一个异步任务来跟踪,一旦超时就通知整个执行模型结束,这里对应的就是执行模型流程图里面的finish. -``` -以上便是databend的机遇状态机和work-steal机制的并发调度模型实现. \ No newline at end of file + +限时机制其实是比较简单的,其主要的作用就是限制 sql 的 pipeline 执行的时间在规定时间内完成, +如果超时则自动终止.这个机制底层实现就是用了一个异步任务来跟踪,一旦超时就通知整个执行模型结束,这里对应的就是执行模型流程图里面的 finish。 + +以上便是 databend 的机遇状态机和 work-steal 机制的并发调度模型实现.