Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Dynamic Task Graph / Task Checkpointing #3811

Closed
madsbk opened this issue May 19, 2020 · 6 comments
Closed

[FEA] Dynamic Task Graph / Task Checkpointing #3811

madsbk opened this issue May 19, 2020 · 6 comments

Comments

@madsbk
Copy link
Contributor

madsbk commented May 19, 2020

TL;DR: By introducing task checkpointing where a running task can update its state on the scheduler, it is possible to reduce scheduler overhead, support long running tasks, and uses explicit worker-to-worker communication while maintaining resilient.

Motivation

As discussed in many issues and PRs (e.g. #3783, #854, #3139, dask/dask#6163), the scheduler overhead of Dask/Distribued can be a problem as the number of tasks increases. Many proposals involves optimizing the Python code through PyPy, Cython, Rust, or some other tool/language.

This PR propose an orthogonal approach that reduces the number of tasks and make it possible to encapsulate domain knowledge of specific operations into tasks -- such as minimizing memory use, overlapping computation and communication, etc.

Related Approaches

Current Task Workflow

All tasks go through the follow flow:

**Client**  
  1. Graph creation  
  2. Graph optimization 
  3. Serialize graph 
  4. Send graph to scheduler 
**Scheduler** 
  5. Update graph 
  6. Send tasks, one at a time, to workers 
**Worker**  
  7. Execute tasks

Task Fusion

All tasks go through steps 1 to 4 but by fusing tasks (potential into SubgraphCallable) only a reduced graph goes through step 5 and 6, which can significantly easy the load on the scheduler. However, fusing tasks also limits the available parallelism thus it has its limits.

Task Generation

At graph creation, we use task generators to reduce the size of the graph. Particularly, in operations such as shuffle() that consist of up to n**2 number of tasks. This means that only steps 3 to 7 encounter all tasks. And if we allow the scheduler to execute python code, we can extend this to steps 5 to 7.

Submit Tasks from Tasks

Instead of implementing expensive operations such as shuffle() in a task graph, we can use few long running jobs that use direct worker-to-worker communicate to bypass the scheduler altogether. This approach is very performance efficient but also has two major drawbacks:

  • It provides no resilient, if a worker disconnects unexpected the states of the long running jobs are all lost.
  • In cases such as shuffle(), this approach requires extra memory because the inputs to the long running jobs must be in-memory until the jobs completes. Something that can be an absolute deal breaker [HACK] Ordering to priorities "shuffle-split" dask#6051.

Proposed Approach

Dynamic Task Graph / Task Checkpointing

At graph creation, we use dynamic tasks to reduce the size of the graph and encapsulate domain knowledge of specific operations. This means that only step 7 encounters all tasks.

Dynamic tasks are regular tasks that are optimized, scheduled, and executed on workers as regular tasks. It is only when they use checkpointing that they differ. The following is the logic flow when a running task calls checkpointing:

  1. A task running on a worker sends a task update to the scheduler that contains:
    • New keys that is now in-memory on the worker
    • New keys that the task now depend on
    • Existing keys that the task doesn’t depend on anymore
    • A new task (function & key/literal arguments) that replaces the existing task.
  2. The scheduler updates relevant TaskStates and release keys that no one depend on anymore.
  3. If all dependencies are satisfied, the task can now be rescheduled from its new state. If not, the task transits to the waiting state.

Any thoughts? Is it something I should begin implementing?

cc. @mrocklin, @quasiben, @rjzamora, @jakirkham

@jacobtomlinson
Copy link
Member

This sounds great. I'd been keen to help out with this.

@sjperkins
Copy link
Member

+1 on supporting this as a good idea, because I think it would make possible features that I wish to implement at a higher level of abstraction: The ability to support (1) while loops and (2) if-else statements as graph tasks.

In @madsbk's terminology, these would be checkpoint tasks that submit new work to the scheduler depending on whether their respective logic conditions are satisfied.

@madsbk's description of checkpointing:

  • A task running on a worker sends a task update to the scheduler that contains:
    • New keys that is now in-memory on the worker
    • New keys that the task now depend on
    • Existing keys that the task doesn’t depend on anymore
    • A new task (function & key/literal arguments) that replaces the existing task.

sounds conceptually similar to the way tensorflow handles it's while_loop construct:

Note that while_loop calls cond and body exactly once (inside the call to while_loop, and not at all during Session.run()). while_loop stitches together the graph fragments created during the cond and body calls with some additional graph nodes to create the graph flow that repeats body until cond returns false.

In the dask realm, I was thinking along the following lines:

class WhileLoop(object):
    def __init__(self, condition, body):
        self.condition = condition
        self.body = body

    def __call__(self, *args):
        while self.condition(*args).compute():
            args = self.body(*args)

        return args

def cond(array):
    return array.sum() < 100

def body(array):
    return (array + 1,)


while_loop = WhileLoop(cond, body)
out = while_loop(da.zeros(10))

@mrocklin
Copy link
Member

Thank you for writing this up @madsbk ! My apologies for the delayed response. Larger issues like this are hard for me to prioritize during the work-week these days.

Any thoughts? Is it something I should begin implementing?

I'm curious to learn more about the semantics here, and about how users would interact with this. I think that before we create an implementation (which I imagine would take several weeks to get right) we start first with some simple code examples of how this would work in a few concrete cases that are pain points for users today. I think that figuring out those semantics are probably going to be useful to constrain the design of the implementation. I think that without doing this you're likely to build something that works for your use case, but breaks several other constraints and so wouldn't get in, which would be frustrating.

Also, if your main goal is to accelerate shuffle operations then my gut says that something like dask/dask#6164 is still going to be faster both at runtime and faster to design/implement (because it can be built orthgonally to the scheduler). That's just a gut reaction though.

@madsbk
Copy link
Contributor Author

madsbk commented May 30, 2020

Also sorry for the late reply, I have been busy studying how the scheduling works in detail :)

I'm curious to learn more about the semantics here, and about how users would interact with this.

To start the discussion, I am working on a minimal example that make use of as many existing features as possible. Hopefully, we only need one new scheduler function; the rest can be implementing using existing primitives.

Also, if your main goal is to accelerate shuffle operations then my gut says that something like dask/dask#6164 is still going to be faster both at runtime and faster to design/implement

I agree and that is definitely an approach I would be interested in working on as well. But right now I have the time to look at the broader issue and see how far that gets us.

@asford
Copy link
Contributor

asford commented Jan 30, 2021

Just saw this, and it reminded me of an issue from many years ago at: #1663

The semantics proposed here are very similar to the "promise-like" semantics discussed there. @madsbk did you ever investigate an MVP or prototype for this feature?

@madsbk
Copy link
Contributor Author

madsbk commented Feb 1, 2021

@madsbk did you ever investigate an MVP or prototype for this feature?

No, I haven't worked any further on this idea :/

@madsbk madsbk closed this as completed May 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants