-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA] Dynamic Task Graph / Task Checkpointing #3811
Comments
This sounds great. I'd been keen to help out with this. |
+1 on supporting this as a good idea, because I think it would make possible features that I wish to implement at a higher level of abstraction: The ability to support (1) while loops and (2) if-else statements as graph tasks. In @madsbk's terminology, these would be checkpoint tasks that submit new work to the scheduler depending on whether their respective logic conditions are satisfied. @madsbk's description of checkpointing:
sounds conceptually similar to the way tensorflow handles it's while_loop construct:
In the dask realm, I was thinking along the following lines: class WhileLoop(object):
def __init__(self, condition, body):
self.condition = condition
self.body = body
def __call__(self, *args):
while self.condition(*args).compute():
args = self.body(*args)
return args
def cond(array):
return array.sum() < 100
def body(array):
return (array + 1,)
while_loop = WhileLoop(cond, body)
out = while_loop(da.zeros(10)) |
Thank you for writing this up @madsbk ! My apologies for the delayed response. Larger issues like this are hard for me to prioritize during the work-week these days.
I'm curious to learn more about the semantics here, and about how users would interact with this. I think that before we create an implementation (which I imagine would take several weeks to get right) we start first with some simple code examples of how this would work in a few concrete cases that are pain points for users today. I think that figuring out those semantics are probably going to be useful to constrain the design of the implementation. I think that without doing this you're likely to build something that works for your use case, but breaks several other constraints and so wouldn't get in, which would be frustrating. Also, if your main goal is to accelerate shuffle operations then my gut says that something like dask/dask#6164 is still going to be faster both at runtime and faster to design/implement (because it can be built orthgonally to the scheduler). That's just a gut reaction though. |
Also sorry for the late reply, I have been busy studying how the scheduling works in detail :)
To start the discussion, I am working on a minimal example that make use of as many existing features as possible. Hopefully, we only need one new scheduler function; the rest can be implementing using existing primitives.
I agree and that is definitely an approach I would be interested in working on as well. But right now I have the time to look at the broader issue and see how far that gets us. |
No, I haven't worked any further on this idea :/ |
TL;DR: By introducing task checkpointing where a running task can update its state on the scheduler, it is possible to reduce scheduler overhead, support long running tasks, and uses explicit worker-to-worker communication while maintaining resilient.
Motivation
As discussed in many issues and PRs (e.g. #3783, #854, #3139, dask/dask#6163), the scheduler overhead of Dask/Distribued can be a problem as the number of tasks increases. Many proposals involves optimizing the Python code through PyPy, Cython, Rust, or some other tool/language.
This PR propose an orthogonal approach that reduces the number of tasks and make it possible to encapsulate domain knowledge of specific operations into tasks -- such as minimizing memory use, overlapping computation and communication, etc.
Related Approaches
Current Task Workflow
All tasks go through the follow flow:
Task Fusion
All tasks go through steps 1 to 4 but by fusing tasks (potential into
SubgraphCallable
) only a reduced graph goes through step 5 and 6, which can significantly easy the load on the scheduler. However, fusing tasks also limits the available parallelism thus it has its limits.Task Generation
At graph creation, we use task generators to reduce the size of the graph. Particularly, in operations such as
shuffle()
that consist of up ton**2
number of tasks. This means that only steps 3 to 7 encounter all tasks. And if we allow the scheduler to execute python code, we can extend this to steps 5 to 7.Submit Tasks from Tasks
Instead of implementing expensive operations such as
shuffle()
in a task graph, we can use few long running jobs that use direct worker-to-worker communicate to bypass the scheduler altogether. This approach is very performance efficient but also has two major drawbacks:shuffle()
, this approach requires extra memory because the inputs to the long running jobs must be in-memory until the jobs completes. Something that can be an absolute deal breaker [HACK] Ordering to priorities "shuffle-split" dask#6051.Proposed Approach
Dynamic Task Graph / Task Checkpointing
At graph creation, we use dynamic tasks to reduce the size of the graph and encapsulate domain knowledge of specific operations. This means that only step 7 encounters all tasks.
Dynamic tasks are regular tasks that are optimized, scheduled, and executed on workers as regular tasks. It is only when they use checkpointing that they differ. The following is the logic flow when a running task calls checkpointing:
waiting
state.Any thoughts? Is it something I should begin implementing?
cc. @mrocklin, @quasiben, @rjzamora, @jakirkham
The text was updated successfully, but these errors were encountered: