Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discussion] Client/Scheduler Performance #3783

Open
13 tasks
quasiben opened this issue May 7, 2020 · 11 comments
Open
13 tasks

[Discussion] Client/Scheduler Performance #3783

quasiben opened this issue May 7, 2020 · 11 comments

Comments

@quasiben
Copy link
Member

quasiben commented May 7, 2020

Many of us are experimenting with scheduler changes in the hopes of accelerating performance. As graph size increases, the scheduler and the processing of the graph can become a bottleneck. However, we should not limit our attention to only the scheduler. The construction of the graph in the client can also be improved as graph creation can also be slow when the graph size greatly increases

We've also seen some experiments/discussions around scheduler performance, notably:

In thinking about changes to the scheduler and client we should develop some workflow based benchmarks which can be executed in CI (fast execution) but also can tuned for something more realistic

Benchmarks

  • tunable dataframe benchmark
    • a shuffle
    • task which only targets update_graph
    • slow client graph creation
    • full data frame workflow (filter/aggregation/merge -- something representative of common work)
    • Dask Array workflow
    • Dask Bag workflow

We also need to better under the scheduler/client/graph internals. We should document these. (Though I don't know where this document should be or how to organize it yet). But I think we need the following

Documentation:

  • Document the Scheduler
  • Document protocol for messages
  • Background on Communication Protocol
  • Detailing Message (from rust folks)
  • Document Graph specification
    • Developing a better understand of the graph spec might also allow us to re-write parts of the client in native languages to increase performance
  • In doing the above, we should be able to outline how we separate the scheduler into two pieces. Currently, the scheduler is a mix of comms and state machine. This separation would allow us to more easily swap scheduler experiments in and out of dask workloads while also minimizing the requirements necessary for new schedulers to adhere to.

Evaluate Schedulers

  • Run Rust scheduler on reasonable workflow and document breakages/performance
  • Document Rust Scheduler

This list is probably far from complete and happy to amend/change/update as we proceed

@quasiben quasiben pinned this issue May 11, 2020
@quasiben quasiben unpinned this issue May 11, 2020
@Kobzol
Copy link
Contributor

Kobzol commented May 18, 2020

Thank you for this umbrella issue! We have a lot of accrued information about the Dask protocol and I'd be glad if we could publish them. However, I'm not sure what's the best format in which we could do that. It's a protocol specification, so some API/type language is probably ideal, but there are also some edge cases that need a manual comment. I have used TypeScript so far (https://github.com/spirali/rsds/blob/0513f5c83d42d34cfda01febf7b87482ec250d0f/dask/message-gallery.ts) as it has a pretty powerful but also quite terse type syntax and allows comments, but I'm not sure if that's ideal.

If you have any other ideas, let me know.

@quasiben
Copy link
Member Author

@Kobzol thank you for publishing a link to the TypeScript and the work generating it. Dask, as of late, has been using jsonschema but in YAML representation:

It might be nice to adopt this though I am unsure that jsonschema is best for a protocol spec. The other technology I am aware of here, which is widely used, would be Swagger.

@Kobzol
Copy link
Contributor

Kobzol commented May 18, 2020

I wanted to use jsonschema and/or Swagger originally, but avoided it because it seemed very verbose. I suppose that it's a better solution than TS and it could be rendered into something more readable.

@mrocklin
Copy link
Member

mrocklin commented May 18, 2020 via email

@Kobzol
Copy link
Contributor

Kobzol commented May 19, 2020

Right, it might be more important now to provide some discussion about the protocol rather than a rigid type definition. @quasiben I can rewrite the TS file into markdown and write up some things/quirks that we hit in our Rust implementation. Is there any thing specific that would be of value for you? For example are there some messages missing in our message gallery that you are interested in?

@Kobzol
Copy link
Contributor

Kobzol commented May 19, 2020

Regarding the benchmarks: I noticed that lately there have been quite a few issues and PRs that try to optimize some parts of the scheduler (#3423, #3793, #3770, #3760). It seems to me that it is currently difficult to properly evaluate the effect of such changes to avoid potential regressions, but also to avoid newly added needless complexity in cases where the performance gains are minimal. Dask has a lot of users with a lot of varying use cases and it is very easy to inadvertedly introduce a performance regression (twice so in Python). On some of our Rust benchmarks, I noticed that Dask 2.8.1 vs master was faster in some cases by up to 20% (this is anecdotal though, it might have been caused by something else).

The Dask benchmarks repository (https://github.com/dask/dask-benchmarks) is a pretty good starting point to evaluate performance, but it currently does not have realistic distributed pipelines and it's not automated and integrated into CI (AFAIK). I can add e.g. our Rust benchmarks to the repository, but for it to be really useful, it should be CI-integrated. I don't have any experience with multi-node CI jobs, but even on a single node we could run some reasonable benchmarks (if the node has at least a few cores).

As an example, the Rust project uses CI infrastructure that allows you to query a bot on a PR which runs the PR's commit against master (or a specified tag) on a set of benchmarks and displays relative changes in performance. This is probably a bit too far fetched for now, but it would be nice if something like that existed for Dask.

As a more realistic goal, is there a way to run the programs from the Dask benchmarks repository using ASV on CI (either Travis or Github actions)? Even without ASV, just running the benchmarks and comparing the numbers manually might be useful for a PR. Is there potentially a will to manage a separate CI infrastructure (i.e. a small cluster) to run more demanding benchmarks?

@TomAugspurger
Copy link
Member

TomAugspurger commented May 19, 2020

it's not automated and integrated into CI (AFAIK)

It's run nightly (on a dedicated machine in my basement) and the results are uploaded to https://pandas.pydata.org/speed/distributed/. In the past we've found running benchmarks on public CI providers too noisy, at least for the micro-benchmarks that pandas tends to care about.

Pandas recently won a NumFOCUS small development grant, part of which is to fund development of a way to integrate benchmarks into a CI workflow. This might be notifications when regressions are detected after a PR is merged, or a bot that responds to requests from maintainers to pull some branch, run (some subset of) the benchmarks, and post the results. That will take a bit of time, however.

We don't have the budget to do all of our goals, so any volunteer effort would be welcome. This will primarily be developed at https://github.com/asv-runner/, and I can add anyone interested in helping to the organization.

@Kobzol
Copy link
Contributor

Kobzol commented May 19, 2020

That is nice! Although testing the performance before merging the PR is a much better approach than just notifying about regressions in retrospect. If you only get notified about a regression later, the motivation to revert the commit (and potentially other work built on top of it) might be hard to find :-)

Could this be modified in such a way that you could ask for a benchmark between two specific revisions? It would be useful even if you had to wait ~24h for the result. If there are any specific tasks that can get us closer to this goal, I'd be glad to help, although at least some basic documentation about the setup (how does the hardware look like, how can you run a benchmark, where to add new benchmarks etc.) would be nice so that others can join in and help improve it. We could for example create a GitHub bot that would be watching PRs for benchmark requests and post the results back into the PR.

@TomAugspurger
Copy link
Member

TomAugspurger commented May 19, 2020

FWIW, you can also run asv locally. I'll sometimes remember to do that when writing code that's likely to impact performance

Change the repo to point to your local git repo (this is for dask, but distributed as well)

diff --git a/dask/asv.conf.json b/dask/asv.conf.json
index 147e999..2b3f673 100644
--- a/dask/asv.conf.json
+++ b/dask/asv.conf.json
@@ -11,7 +11,7 @@

     // The URL or local path of the source code repository for the
     // project being benchmarked
-    "repo": "https://github.com/dask/dask.git",
+    "repo": "../../dask",

     // List of branches to benchmark. If not provided, defaults to "master"
     // (for git) or "default" (for mercurial).

and then compare the two commits

$ asv continuous upstream/master <your branch>

That will print out any regressions detected.

Yes, documentation is indeed a blocker :) I wrote up a summary somewhere. I'll see if I can find it. I'll invite you to the org. I'd also recommend waiting a week or so before digging into this. We had one person do some initial work, waiting for a report back.

@quasiben
Copy link
Member Author

For those interested in client performance the following demonstrates lengthy graph creation time:

 from dask.datasets import timeseries
from dask.dataframe.shuffle import shuffle

ddf_d = timeseries(start='2000-01-01', end='2010-01-01', partition_freq='1d')
ddf_h = timeseries(start='2000-01-01', end='2010-01-01', partition_freq='1h')

%timeit ddf_d_2 = shuffle(ddf_d, "id", shuffle="tasks")
%timeit ddf_h_2 = shuffle(ddf_h, "id", shuffle="tasks")

Code like this is invoked during a repartition. In the current form, no work is being done while the client is creating the graph. For a more complete example, I would recommend looking at q08 from the TPCx-BB repo. One idea was to kick off work earlier in the graph creation but this may come at a cost of stability as this may involve coordinating multiple threads for graph creation and submission

@mrocklin
Copy link
Member

One idea was to kick off work earlier in the graph creation but this may come at a cost of stability as this may involve coordinating multiple threads for graph creation and submission

This gets us at most 2x is my understanding. This would be great, but also seems fairly complex. My guess is that it sounds simple in practice but will run up against a few consistency issues. It may still be a great idea, but we should go into expecting to run into complexity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants