diff --git a/docs/content/docs/source-reading/pipeline-execution.md b/docs/content/docs/source-reading/pipeline-execution.md index e4419f7..63807eb 100644 --- a/docs/content/docs/source-reading/pipeline-execution.md +++ b/docs/content/docs/source-reading/pipeline-execution.md @@ -200,9 +200,9 @@ schedule_queue 的工作过程: 1. 首先初始化两个 VecDeque: `need_schedule_nodes: VecDeque` 和 `need_schedule_edges: VecDeque` 分别用来存放需要进行 schedule 的 NodeIndex 和 DirectedEdge,然后将 `executor_pid` push `need_schedule_nodes` 中。 2. 只要这两个 VecDeque 任意一个不为空,我们就需要不断地进行 schedule。 -3. 每次 schedule 时,首先我们会判断 `need_schedule_nodes` 是否为空,如果它为空,那 `need_schedule_edges` 一定不为空,此时我们从 `need_schedule_edges` 中 pop 出一条 `DirectedEdge` edge,然后获得这条 edge 的 target node(注意这个 target node 不是 edge 的指向,`DirectedEdge` 有两种类型:`Source` 和 `Target`,当 Processor 的 input 改变时,会在 triger 的 update_list 中 push 一条 `DirectedEdge::Target(self_.index)`,而如果是 Processor 的 output 改变,则 push 一条 `DirectedEdge::Source(self_.index)`),如果 target node 的状态为 `State::Idle`,表示它在上一次调用 `event` 时返回的 Event 状态为 `Event::NeedData` 或 `Event::NeedConsume`,即它上次 `event` 时 input 需要数据或 output 数据需要被消费,而它现在的状态可能是 input 的数据已经来了或者 output 的数据被消费了,因此我们需要将其 push 到 `need_schedule_nodes` 中来再次调用 `event` 看看是否可以推动这个 Processor。 +3. 每次 schedule 时,首先我们会判断 `need_schedule_nodes` 是否为空,如果它为空,那 `need_schedule_edges` 一定不为空,此时我们从 `need_schedule_edges` 中 pop 出一条 `DirectedEdge` edge,然后获得这条 edge 的 target node(注意这个 target node 不是 edge 的指向,`DirectedEdge` 有两种类型:`Source` 和 `Target`,当 Processor 的 input 改变时,会在 trigger 的 update_list 中 push 一条 `DirectedEdge::Target(self_.index)`,而如果是 Processor 的 output 改变,则 push 一条 `DirectedEdge::Source(self_.index)`),如果 target node 的状态为 `State::Idle`,表示它在上一次调用 `event` 时返回的 Event 状态为 `Event::NeedData` 或 `Event::NeedConsume`,即它上次 `event` 时 input 需要数据或 output 数据需要被消费,而它现在的状态可能是 input 的数据已经来了或者 output 的数据被消费了,因此我们需要将其 push 到 `need_schedule_nodes` 中来再次调用 `event` 看看是否可以推动这个 Processor。 4. 然后我们尝试从 `need_schedule_nodes` pop 出一个 NodeIndex,并从 `ExecutingGraph` 中得到这个 Node,然后调用它的 Processor 的 `event`,然后根据返回的 `Event` 状态来进行下一步工作(如开头描述)。 -5. 最后根据将 triger 中的 updated_list 中的 `DirectedEdge` 都 push 到 `need_schedule_edges` 中。 +5. 最后调用这个 Node 的 trigger 函数,将 updated_list 中的 `DirectedEdge` 都 push 到 `need_schedule_edges` 中。 6. 如果 `need_schedule_nodes` 或 `need_schedule_edges` 不为空则开始下一次 schedule。 7. schedule 结束,将 `schedule_queue` 返回。 diff --git a/docs/content/docs/source-reading/pipeline_model_graph_based.md b/docs/content/docs/source-reading/pipeline_model_graph_based.md index 4197f9d..13b1a57 100644 --- a/docs/content/docs/source-reading/pipeline_model_graph_based.md +++ b/docs/content/docs/source-reading/pipeline_model_graph_based.md @@ -42,7 +42,7 @@ struct Node { state: std::sync::Mutex, updated_list: Arc, - // 一下是pipeItem的内容 + // 以下是pipeItem的内容 inputs_port: Vec>, outputs_port: Vec>, processor: ProcessorPtr, @@ -98,7 +98,7 @@ pub struct SharedStatus { ## 二.基于 work-steal 与状态机的并发调度模型 -初始化调度是将我们的 graph 的所有出度为 0 的 Node 作为第一次任务调度节点,对应我们的例子就是 Node4,Node5 每一次调度都是抽取出 graph 当中的同步任务和异步任务,下图是 pipeline 的调度模型,用于抽取出当前 graph 当中可执行的同步 processor 和异步 processor,调度模型的输入是最上面的 graph,而输出则是 sync_processor_queue 和 async_processor_queue,无论是在初始化时还是在后面继续执行的过程都是利用的下面的调度模型来进行调度.调度模型的执行终点是 need_schedule_nodes 和 need_schedule_edges 均为空 +初始化调度是将我们的 graph 的所有出度为 0 的 Node 作为第一次任务调度节点,对应我们的例子就是 Node4,Node5。 每一次调度都是抽取出 graph 当中的同步任务和异步任务,下图是 pipeline 的调度模型,用于抽取出当前 graph 当中可执行的同步 processor 和异步 processor,调度模型的输入是最上面的 graph,而输出则是 sync_processor_queue 和 async_processor_queue,无论是在初始化时还是在后面继续执行的过程都是利用的下面的调度模型来进行调度。 调度模型的执行终点是 need_schedule_nodes 和 need_schedule_edges 均为空。 ![](https://databend-internals.psiace.me/source-reading/pipeline_model_graph/5-pipeline-model.jpg) @@ -167,4 +167,4 @@ struct WorkerCondvar { 限时机制其实是比较简单的,其主要的作用就是限制 sql 的 pipeline 执行的时间在规定时间内完成, 如果超时则自动终止.这个机制底层实现就是用了一个异步任务来跟踪,一旦超时就通知整个执行模型结束,这里对应的就是执行模型流程图里面的 finish。 -以上便是 databend 的机遇状态机和 work-steal 机制的并发调度模型实现. +以上便是 databend 的基于状态机和 work-steal 机制的并发调度模型实现.