Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask Futures performance as the number of tasks increases #5715

Open
mrogowski opened this issue Jan 27, 2022 · 9 comments
Open

Dask Futures performance as the number of tasks increases #5715

mrogowski opened this issue Jan 27, 2022 · 9 comments

Comments

@mrogowski
Copy link

I have been running a performance comparison of Dask Futures and other solutions and found unexpected behavior when it comes to the throughput as I increase the number of tasks.

In this test, I am running on 6 nodes (client, scheduler and 4 workers, 1 thread per worker); however, the behavior is consistent for 2-2048 workers. Each task is a dummy function returning its argument. I am timing this:

futures = client.map(dummy, data, key=labels)
wait(futures,return_when='ALL_COMPLETED')

where data is a list of None, labels is a unique key, and the dummy function simply returns its argument. Complete code is attached.

Based on 30 repetitions for each configuration and after discarding the first timing to account for any lazy initialization, I am getting the following results:

plot

Is this characteristic something you would expect given such a trivial task? Is the scheduler not able to keep up with this many tasks?

Thank you for any insights!

Environment:

  • dask==2021.12.0, dask_mpi==2021.11.0, distributed==2021.12.0
  • Python 3.10.1
  • Cray XC-40, each node has 32 cores and 128 GB of RAM
  • reproduced on another cluster with dask 2021.11

Complete code:
latency.py.txt

@jakirkham
Copy link
Member

Thanks Marcin! 😄

I've simplified the script a bit in the process of playing with it locally just on my laptop. Also picked an ntasks that corresponds to where task processing drops off.

Script:
import time

from distributed import Client, wait


# params
ntasks = 10_000
nreps = 10

# initialize data
item = None
data = [item] * ntasks
labels = list(map(str, range(ntasks)))

# run workload
with Client() as c:
    for irep in range(nreps):
        tic = time.time()
        futures = c.map(lambda e: e, data, key=labels)
        wait(futures, return_when="ALL_COMPLETED")
        print("time in seconds", time.time() - tic)
        del futures

As an interesting note when I run this locally (on my MacBook Pro (15-inch, 2018)), I see the following

Results:
time in seconds 8.590115070343018
time in seconds 53.51888108253479
time in seconds 0.7998080253601074
time in seconds 37.951663970947266
time in seconds 0.8645291328430176
time in seconds 31.745975971221924
time in seconds 31.907670974731445
time in seconds 32.492034912109375
time in seconds 32.979432821273804
time in seconds 0.969944953918457

Interestingly it looks like the runtime here is variable. Admittedly I also have a whole bunch of browser tabs open and other applications running in the background. So this might not be the cleanest result, but interesting to notice the variability of the runtimes. Looks like you are not seeing this though assuming the black lines on each bar represent the range.

@jakirkham
Copy link
Member

Generally over the past year or so, we have been doing work to optimize the scheduler. See this blogpost for an overview.

One of the key observations is Dask graphs track a lot of small details as users build them up. These then add overhead throughout in handling all of the tasks in this detailed way. To address this a proposal was made to use High Level Graphs (or HLGs for short), which would effectively aggregating many nodes into one. So data loading or creation could be a single node independent of how many operations this contains. Similarly other operations performed could be represented with HLGs. The result is an HLG could be built on the client, shipped to the scheduler, and converted to tasks for workers on-demand. Going down this path has involved a fair bit of engineering to transform existing operations to HLG based ones. Here's a good blogpost though work is ongoing.

As moving to HLGs would increase the workload on the Scheduler, we spent time benchmarking the Scheduler and looking for slow points. So we spent time profiling workloads and optimizing the Scheduler in particular by using Cython annotations ( for example #4302 ). Once sufficiently optimized on that front, we determined communication was the primary bottleneck ( #4443 ). Work on this has continued on a few fronts. First aggregating communication into fewer, larger messages ( #4526 ). Second optimizations in serialization to traverse complex structures more efficiently ( #4699 , ongoing). Third optimizations in communication itself by moving to asyncio ( #5450 ) with the option to use uvloop ( #5531 ). Outside of that work UCX-Py can be combined with Dask to leverage hightroughput communication channels (note setup of clusters and workers are streamlined in the upcoming 22.02 release).

As noted some of this work is more recent or ongoing, a good recap is in this presentation. At this point think we are in a good place to benchmark this work a bit more. Think a benchmark that leverages HLGs in one of the builtin collections like Arrays or DataFrames would be a good starting place.

@mrogowski
Copy link
Author

Interestingly it looks like the runtime here is variable. Admittedly I also have a whole bunch of browser tabs open and other applications running in the background. So this might not be the cleanest result, but interesting to notice the variability of the runtimes. Looks like you are not seeing this though assuming the black lines on each bar represent the range.

Black lines are 95% confidence intervals. Looking at the raw timings, I see similar behavior. It is rare for 2048 tasks (between 0.07s and 0.47s) but quite common for 4096 tasks (0.16s - 1.08s). I have the scheduler running on a dedicated node of a Cray XC-40 here.

Results for 4096 tasks:
time in seconds 0.16305017471313477
time in seconds 0.2077953815460205
time in seconds 0.1598653793334961
time in seconds 0.18428874015808105
time in seconds 0.847132682800293
time in seconds 0.18993926048278809
time in seconds 0.6985807418823242
time in seconds 0.7729971408843994
time in seconds 0.776334285736084
time in seconds 0.18843913078308105
time in seconds 0.6464235782623291
time in seconds 0.16218113899230957
time in seconds 0.22185921669006348
time in seconds 0.8570101261138916
time in seconds 0.18695449829101562
time in seconds 0.18707060813903809
time in seconds 0.6919937133789062
time in seconds 1.0757660865783691
time in seconds 0.18456482887268066
time in seconds 0.15624499320983887
time in seconds 0.18169879913330078
time in seconds 0.18533682823181152
time in seconds 0.9453377723693848
time in seconds 0.6190814971923828
time in seconds 0.18610930442810059
time in seconds 0.15897655487060547
time in seconds 0.1836533546447754
time in seconds 0.18377447128295898
time in seconds 0.346935510635376
time in seconds 0.1618971824645996

@jakirkham
Copy link
Member

Something else worth exploring here may be just using a custom Executor (like MPIPoolExecutor) with just Dask. This would leave all the communication/execution to mpi4py (however this is done with the MPIPoolExecutor) while leaving Dask to handle the graph building and submission

@gjoseph92
Copy link
Collaborator

Anecdotally, this high variability seems pretty familiar to me. See some similarly noisy results in #5036 (comment).

Have you tried profiling these? I'd recommend https://github.com/gjoseph92/scheduler-profilers using py-spy with native=True; you can learn some interesting things from that.

from scheduler_profilers import pyspy_on_scheduler, pyspy

with pyspy_on_scheduler("scheduler.json", native=True), pyspy("workers", native=True):
    futures = client.map(dummy, data, key=labels)
    wait(futures,return_when='ALL_COMPLETED')

@jakirkham
Copy link
Member

Curious if you have had a chance to try any of these things or if you have other questions @mrogowski 🙂 No worries if not. Just wanted to check-in

@mrogowski
Copy link
Author

Thanks, @jakirkham and @gjoseph92 and sorry for the late reply! I profiled as @gjoseph92 suggested, and it seems like the variability originates from update_graph_hlg. It varies from 80 to 440ms when I am running 10 repetitions, 4096 tasks on 32 local workers. Do you know what might be happening here?

@gjoseph92
Copy link
Collaborator

@mrogowski if you used pyspy, can you share the full profile?

@mrogowski
Copy link
Author

Sure, here it is: pyspy.json.zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants