Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust implementation of dask-scheduler #3139

Open
spirali opened this issue Oct 11, 2019 · 32 comments
Open

Rust implementation of dask-scheduler #3139

spirali opened this issue Oct 11, 2019 · 32 comments
Labels
discussion Discussing a topic with no specific actions yet

Comments

@spirali
Copy link
Contributor

spirali commented Oct 11, 2019

Hello,

We (I and @Kobzol) are working on a Rust implementation of dask-scheduler as an experimental drop-in replacement of dask-scheduler without any modification on worker/client side. It is an experiment for (a) evaluate performance gain of non-Python scheduler scheduler and (b) allow experimentation with different schedulers. Here I would like to report preliminary results for (a).

I am sorry for abusing Github issues, if there is a better place for contacting the community, please redirect us.

Repository: https://github.com/spirali/rsds/tree/master/src
Project status:

  • Server is able to accept client and worker connections and redistribute simple task graphs.
  • rsds distinguishes "runtime" (=part that communicates with workers/clients and maintain service information) and "scheduler" (= part that decides where tasks will run). The scheduler is asynchronous and offloaded into a separated thread. It communicates with a simple procotol with the runtime. The protocol is serializable, hence in the future, the scheduler may be written in a different language than Rust.
  • The current version have only random scheduler that randomly assigns workers to tasks.
  • Propagating exceptions from workers to a client is implemented
  • Failure of client/worker is not yet correctly implemented
  • Many things in the protocol is not implemented
  • We did not actively profile or optimize the code, we are reporting the first running version

Benchmark

We were running the following simple code as a benchmark of a server runtime overhead.

    from dask.distributed import Client                                                                                                                                                                          
    from dask import delayed                                                                                                                                                                                     
                                                                                                                                                                                                                
    client = Client("tcp://localhost:7070")                                                                                                                                                                      
    print("CLIENT", client)                                                                                                                                                                                      
                                                                                                                                                                                                                 
    @delayed                                                                                                                                                                                                     
    def do_something(x):                                                                                                                                                                                         
        return x * 10                                                                                                                                                                                            
                                                                                                                                                                                                                 
    @delayed                                                                                                                                                                                                     
    def merge(*args):                                                                                                                                                                                            
       return sum(args)                                                                                                                                                                                         
                                                                                                                                                                                                                 
   xs = [do_something(x) for x in range(80000)]                                                                                                                                                                 
   result = merge(*xs)                                                                                                                                                                                          
   print(result.compute())                                                                                                                                                                                      

Results

Times are obtained through "time -p python test.py".

1 node / 23 workers
rsds          : 19.09 +/- 0.17
dask-scheduler: 39.19 +/- 1.01


8 nodes / 191 workers (7x24 + 23)
rsds          : 20.74 +/- 2.46
dask-scheduler: 215.19 +/- 20.07

We are aware that the benchmark is far from ideal from many aspects, we would be happy for pointing us on a code that does a better job.

@spirali
Copy link
Contributor Author

spirali commented Oct 11, 2019

I forgot to note, that distributed version 2.3.2 was used.

@mrocklin
Copy link
Member

This is very exciting. I'm glad that you and @Kobzol are working on this. We use GitHub issues for all sorts of communication, so I think that this is the ideal place to have this conversation.

Some thoughts / initial reactions below:

  1. I'm glad to see such nice speedups
  2. I like the idea of separating out the administrative/communication side of the scheduler from the actual decision-making, and placing these into separate threads
  3. I wonder if it makes sense to do just the decision-making/scheduling logic in Rust, but keep the communication in Python. Long-term there might not be much benefit to this, but it might allow us to solve half of the problem first. This is probably more complex than it's worth though.
  4. I'm surprised that the Python implementation was so much slower as you added more workers. We should look into that.
  5. At some point (maybe now even) you'll be bottlenecked by the Client side and you'll have to write a lot of that in Rust too :)

@spirali
Copy link
Contributor Author

spirali commented Oct 12, 2019

Thank you for your response. We were originally thinking about separating scheduler from the dask runtime as a first step. But we did not find an easy way how to integrate an asynchronous scheduler, i.e. that task placement decisions may be done later or before inputs tasks are finished.

@mrocklin
Copy link
Member

I think that it would still be synchronous. The scheduler logic would be in rust. This would receive a sequence of events from the Python side. Based on the events that it received it would change its internal state and emit a sequence of messages for the Python side to send out.

@Kobzol
Copy link
Contributor

Kobzol commented Oct 12, 2019

In the long run, we want to use a more general solution, i.e. an asynchronous scheduler. You're right that we could also create a synchronous scheduler in Rust and call it from the Python code, but we see two obstacles with this approach:

  1. We looked into the Python scheduler source code and we found that the scheduling decisions and the task state machine changes are tightly integrated into the rest of the scheduler code. We weren't able to find a good place where we could replace the scheduling logic with a call into an external scheduler while making only minimal changes to the Python server code at the same time.
    Actually if this were possible, we wouldn't even need to do it in Rust, we could try other scheduling heuristics directly in Python.
  2. If the benchmark above is correct, it demonstrates that at least for the benchmarked program, the overhead is in fact in the Python code that handles the client<->worker messages and not in the scheduling logic. The example is in fact designed to test the overhead of the central server, not the scheduling heuristics, because for this simple task graph the scheduling decisions are not that important.
    The Rust implementation uses a random schedule, which is of course quick to generate, but on the other hand it is a much simpler heuristic than work-stealing, so the schedules may be worse (although we have some results which show that random schedules are often surprisingly competitive). This will have to be dissected and benchmarked more to find the source of the speedup of course, right now we assume that it is simply caused by Python overhead of creating a lot of temporary objects. But it might also be some weird work-stealing artefact.

So there are two (separate) directions:

  1. Optimize the scheduling decisions using a different heuristic (this is what we aim to do)
  2. Optimize the whole central server part by writing it in Rust (this is what we have started to implement). So far it seems that this alone could potentially boost the performance significantly for task graphs that are bottlenecked by the current Python central server (lots of tasks and/or too short tasks).

@mrocklin
Copy link
Member

We looked into the Python scheduler source code and we found that the scheduling decisions and the task state machine changes are tightly integrated into the rest of the scheduler code

That's correct today. But that could also be changed.

If the benchmark above is correct, it demonstrates that at least for the benchmarked program, the overhead is in fact in the Python code that handles the client<->worker messages and not in the scheduling logic

Yes. I was surprised by this. My hope is that this is some performance regression, and can be resolved on the Python side. Most of our code is independent of the number of workers, at least in theory.

We should do more profiling at scale here.

The Rust implementation uses a random schedule, which is of course quick to generate, but on the other hand it is a much simpler heuristic than work-stealing, so the schedules may be worse

Agreed. Just to make sure that we're on the same page though, you shouldn't think of Dask as a work-stealing system. Work-stealing is a small part of Dask's scheduling. Most of our decision making has to do with determining earliest finish time quickly. Most of the scheduler is about maintaining data structures with information about expected task times, worker loads, bandwidths, data sizes, and so on, so that we can make the earliest finish time query in constant time.

I agree with the two directions that you lay out.

@Kobzol
Copy link
Contributor

Kobzol commented Oct 12, 2019

That's correct today. But that could also be changed.

Indeed, it's just that we also wanted to test if writing the central part in Rust helps the performance by itself, so we started with that and now it seems to us that it will be less effort to test various scheduling algorithms in Rust combined directly with the Rust server.

Work-stealing is a small part of Dask's scheduling.

Thanks for the clarification. Since all the statistics gathering (and also the work-stealing) is embedded very deeply into the current Python server and it generates a lot of data traffic and also eats CPU time, it is unclear if we can directly compare the two solutions and say that the difference between Rust and Python is X. For that we would need to reimplement the exact same heuristics, statistics and work-stealing of the Python solution in Rust, and that is both a lot of work and not our goal now. Our initial benchmark confirmed that there is seemingly a non-trivial difference between the two solutions (even though we do a lot less work). If using Rust, a very simple heuristic and a task graph with a trivial structure would result in a 5 % improvement, it would not make any sense to continue.

We will try to benchmark both our server and the Python server to find out what causes the difference and also experiment with different scheduling algorithms on more complex task graphs.

It would be great if you could point us to some benchmarks that represent real user task graphs to test our solution (and add missing features step-by-step to execute these graphs). Originally we wanted to pass the Dask scheduler test suite, but we noticed that there is a lot of additional logic in the scheduler. Right now we mostly want to find a performance baseline and compare scheduling heuristics, if we find a good solution and the Rust version will be significantly faster, it might make sense to add these additional features in the future, but it is not a priority for us at this moment.

@mrocklin
Copy link
Member

For that we would need to reimplement the exact same heuristics, statistics and work-stealing of the Python solution in Rust, and that is both a lot of work and not our goal now.

OK, fair enough. However, I do encourage you to learn from the existing statistics and heuristics. They have been critical to adopting users. Many more people are concerned about making sure that we can run graphs in a good order than that we can run many tasks quickly. Optimizing strictly for task throughput may not have much impact on real-world problems.

It would be great if you could point us to some benchmarks that represent real user task graphs to test our solution

I would use dask.dataframe/array/xarray to generate these graphs. Groupby-aggregations, joins, matrix multiplication, and so on. These should generate plenty of moderately complex task graphs. You might also look through examples.dask.org.

I'm curious, what additional features are you talking about? If you're just thinking about scheduling task graphs then it's kind of all-or-nothing. I agree that you probably shouldn't think about Actors, Queues, or Pub/Sub.

@Kobzol
Copy link
Contributor

Kobzol commented Oct 12, 2019

OK, fair enough. However, I do encourage you to learn from the existing statistics and heuristics. They have been critical to adopting users. Many more people are concerned about making sure that we can run graphs in a good order than that we can run many tasks quickly. Optimizing strictly for task throughput may not have much impact on real-world problems.

That is good to know. We have been using Dask mainly on HPC architectures for running large scale experiments, training many ML models with cross-validation and similar things. I imagine that low latency might be worthwhile if you construct the task graph in a very dynamic way, but is there also a use case where latency matters if you just build the graph and then you wait for the final result? Or in general, could you point us to some use cases for Dask where latency matters so that we have a better idea about this? We focus mainly on throughput right now, but we want to design the Rust server in such a way that changing the scheduling heuristics would be very simple (for example to use a heuristic that tries to minimize latency), by separating the scheduling and data movement layers.

I'm curious, what additional features are you talking about? If you're just thinking about scheduling task graphs then it's kind of all-or-nothing. I agree that you probably shouldn't think about Actors, Queues, or Pub/Sub.

We have started with the simple task graph above, ran it repeatedly and gradually implemented support for a minimal set of Dask messages needed to fully execute this task graph, plus some additional error handling. More complicated task graphs may fail right now, if they use any additional messages. Things like compression, the mentioned actors/queues/pubsub, statistics gathering, etc. are missing and I'm sure that there are many other metadata being collected by the Python scheduler right now that we don't use nor gather. To be honest we don't fully know the whole interface of the scheduler (all of the messages it can receive and send and all the possible message frame variants), so we decided to start by designing a simple use case and making it work.

@Kobzol
Copy link
Contributor

Kobzol commented Dec 3, 2019

Hi @mrocklin! We have tried to extend the Rust implementation to support more advanced use cases like distributed pandas or numpy computations, but we have hit a problem with the Dask low-level serialization protocol (https://github.com/dask/distributed/blob/master/distributed/protocol/core.py#L22).

For complicated task graphs the protocol creates messages which are quite complex. Individual key/values in the main message might be missing and they need to be "patched in" via a series of (dynamic accessor, header pointing to frame(s)) structures that are stored in a headers key in dictionary in a subsequent frame (if we understand it correctly). This is relatively simple to do in Python, but it's a real PITA in Rust (or any other statically typed language). We have tried to change the protocol to make it easier to parse in Rust: Kobzol@f6ec885

Instead of storing accessors, we build the actual message structure with the final keys, but replace the serialized payloads with a header which then points into the additional frames. As the message is received, we recursively traverse the message and replace the placeholders with the actual data from the frames. So instead of

(frame 1) [{"tasks": {}]
(frame 2) {"headers": {(0, "tasks", "a"): <header>}, "keys": [(0, "tasks", "a")], "bytestrings": ...}

we do

(frame 1) [{"tasks": {"a": {"$": <header>}}}]

and during deserialization we read the header and replace it with the content of the corresponding frame (and deserialize it if necessary). The main benefit for statically typed languages with this approach is that we can simply replace the inner content of an individual key, but we don't have to manipulate the overall structure (insert a value into the zeroth element of the first element of a key 'foo' etc.).

With this approach you have to keep all the frames in memory, but that was also the case in the previous version. Headers are not de-duplicated in the current version, but that could be changed easily by adding another meta frame.

I tried to make as few changes to Dask as possible to keep the implementation compatible with all the features it currently supports. I ran the test suite and it seems to pass with the change. Are there any memory/performance benchmarks that we could run out of the box to spot any potential performance regressions?

Our goal now is to find out if we can simplify the serialization/protocol structure to make it easier to implement the scheduler in a statically typed language. We can put up a PR with our change or with additional changes if you think that this is a good approach. Otherwise we would probably create some private patch to distributed and work on top of that. What do you think about our change?

We are also unsure about some additional things:

  • Why are the frames splitted (frame_split_size) when they are then merged again at the receiving side?
  • Why do tasks have multiple possible structures? i.e.
    • {"tasks": {"a": {"args": <direct args>}}}
    • {"tasks": {"a": {"args": <serialized>}}}
    • {"tasks": {"a": <serialized>}}
    • There are probably some others, could this possibly be unified? This is another big source of complexity for statically typed languages.
  • Why are there three separate concepts of bytestrings, Serialize and Serialized?
  • During the test distributed/tests/test_client.py:traceback_sync, there are duplicate op values in the message that is serialized:
    [{'op': 'task-erred', 'key': 'inc-58d4edbbd08494a7b896c8773fd474fb'}, {'op': 'task-erred', 'key': 'inc-58d4edbbd08494a7b896c8773fd474fb'}]
    and also the headers for the individual serialize(d) items are shared, i.e. they point to the same dictionary and that dictionary is overwritten in the loop (https://github.com/dask/distributed/blob/master/distributed/protocol/core.py#L52 - head points to the same dictionary for multiple items in data). Is this valid or is it a bug?

@mrocklin
Copy link
Member

I apologise for the delay in response. It has been a busy month.

Answers to specific questions:

Why are the frames splitted (frame_split_size) when they are then merged again at the receiving side?

I believe that the original motivation here was because some compression libraries wouldn't accept frames larger than 2GB. I don't remember perfectly though.

Why do tasks have multiple possible structures? i.e.

I don't recall

Why are there three separate concepts of bytestrings, Serialize and Serialized?

Python includes a variety of kinds of bytestrings, including bytes, bytearrays, and memoryviews. They have different uses in various libraries, but for our purposes are all valid.

I'm not sure what you're asking about with Serialize and Serialized. These are necessary, for example, to refer to messages that we've chosen not to deserialize, but want to pass on. This is common on the scheudler which intentionally never unpacks user data.

During the test distributed/tests/test_client.py:traceback_sync, there are duplicate op values in the message that is serialized:

This appears to be a list of small messages. Dask batches small messages on high-volume connections. See the batched.py file for more information.

and also the headers for the individual serialize(d) items are shared, i.e. they point to the same dictionary and that dictionary is overwritten in the loop (https://github.com/dask/distributed/blob/master/distributed/protocol/core.py#L52 - head points to the same dictionary for multiple items in data). Is this valid or is it a bug?

I'm sorry, I haven't yet had time to dive into this.

For complicated task graphs the protocol creates messages which are quite complex. Individual key/values in the main message might be missing and they need to be "patched in" via a series of (dynamic accessor, header pointing to frame(s)) structures that are stored in a headers key in dictionary in a subsequent frame (if we understand it correctly). This is relatively simple to do in Python, but it's a real PITA in Rust (or any other statically typed language). We have tried to change the protocol to make it easier to parse in Rust: Kobzol/distributed@f6ec885

I'm not sure I fully understand what you mean by "patched in" and accessors here. However, I'm not surprised to learn that the way(s) that we represent task graphs is overly complex. It has grown organically over time. I recommend raising a new issue where we discuss the current system in Python on its own and see what we can do to clean it up. We can probably involve some other maintainers in that discussion that have more time than I do.

My apologies again for my late response here.

@Kobzol
Copy link
Contributor

Kobzol commented Jan 2, 2020

Thanks for your response :) Just to make sure that we understood the current system - the "patching in" - correctly, I want to make an example, please feel free to correct me if I state something wrongly.

At protocol/core.py:140, there's a line msg = put_in(key, msg, value). The put_in function takes a list of keys that describe either string attributes or numeric indices and it uses them to put a value inside a given holder (an part of a message).
For example with a list of keys [0, "attr1", "attr2", 1], it would access the first element of the message, then access its attr1 field, then access the attr2 field and finally store something inside its second element.
This is used to put frame contents (which were previously extracted from a message during serialization), back inside the message.

While this is pretty elegant in Python, it's frustrating to implement something like this in a statically typed language (especially in Rust, which has a pretty strict type system), and even more so if we want to have proper statically typed messages and not just opaque blobs containing arbitrary dictionary/list data. Because of this we suggested a different approach, where the structure of the message would be fixed, but only the leaf attributes which were moved into a frame would be replaced by a placeholder.

@Kobzol
Copy link
Contributor

Kobzol commented Feb 17, 2020

Hi, we have some benchmark results to share.
The benchmarks were performed on the Salomon supercomputer (https://docs.it4i.cz/salomon/hardware-overview/) with 24 CPU cores per node.
Individual boxes represent a single Dask function parametrized by a specific value (their implementations can be found here: https://github.com/spirali/rsds/blob/master/benchmarks/usecases.py). They are mostly adapted from examples.dask.org. Here we would like to ask you if you could point us to more interesting Dask programs to be benchmarked! The merge function generates a lot of tasks that are very short, so it serves as a pure scheduler overhead benchmark.

Each column is a single cluster (scheduler-number of worker nodes-number of workers per node). Each Dask worker was executed with --nthreads=1 --nprocs=24 --no-dashboard with OMP_NUM_THREADS=1, Dask scheduler was executed with --no-dashboard. The client code was executed on the same node as the scheduler, the workers were executed on the rest of the nodes. rsds-ws is Rust scheduler with workstealing, rsds-random is Rust scheduler with a purely random scheduler.

The Dask version that we use is from this branch: https://github.com/Kobzol/distributed/tree/simple-frame. It uses a slightly different frame encoding, as discussed here before. We have also benchmarked this modified Dask with unmodified Dask and there don't seem to be differences.

We used 3 different PBS allocations to run all the benchmarks, so it's possible that the network connections between the nodes were slightly different in each allocation, but in practice we hadn't seen large effects caused by this fact.

All benchmarks were executed 5 times (the Dask cluster was started, then the benchmark executed, then the cluster was killed 5 times in a row to avoid any old stuff lingering in the cluster). We will probably relax this for future benchmarks and run all X iterations of the same benchmark with the same cluster to make the whole benchmark run finish faster.

Here are the results:
image

And the comparison of modified vs unmodified (daskorig) Dask:
image

Our workstealing algorithm is very simple and it seems to have rough edges on some graphs. We would be glad if you could point us to some more interesting benchmarks.

@jakirkham
Copy link
Member

First I want to say this is a very exciting piece of work. Thanks for pushing on it! 😄

Why are the frames splitted (frame_split_size) when they are then merged again at the receiving side?

I believe that the original motivation here was because some compression libraries wouldn't accept frames larger than 2GB. I don't remember perfectly though.

I think LZ4 is one of the compressors that runs into this issue.

With some of your suggested serialization changes, it would be interesting to see a draft PR here to get a sense of what might change in distributed. That might help make sense of these other questions you are asking.

@Kobzol
Copy link
Contributor

Kobzol commented Apr 17, 2020

Thanks for your answer. In the meantime, we ran a lot more benchmarks and improved our implementation to handle more Dask programs. We are now finishing writing an article about our experiments, so we're a bit busy, but once we finish writing it, I'll try to write about our modifications more (and send some more benchmark results here). The modifications are a series of WIP commits right now, so I don't want to create a PR until it's cleaned up a bit. But the gist of the changes can be seen here: Kobzol@f6ec885.

Instead of extracting values out of messages during serialization and then putting them back during deserialization (which is a bit painful to do in Rust), we keep the original message structure and replace the previously extracted values with placeholders (which can be modelled using enumerations/abstract data types in a statically typed language). During deserialization we then load the correct values into the placeholders.

Here's a simple drawing that tries to describe the change:
image

@jakirkham
Copy link
Member

Thanks for illustrating. Agree that makes sense. For my 2 cents, it seems like a reasonable change to add to Dask proper (following some profiling).

Would be interested in reading your article whenever it is available 🙂

@Kobzol
Copy link
Contributor

Kobzol commented Apr 21, 2020

We have profiled the change and found no significant performance difference between these two frame encodings on a benchmark set that we use in our article. Nonetheless, it should be investigated more, because although it's just a few line of code, they affect pretty much every Dask packet sent by the client/server/worker.

We plan to continue our work and publish it in a more documented and usable way, but right now we wanted to write an article about our efforts to gather feedback. I have sent a draft of the article to your and @mrocklin's e-mail address that is used in your git commits.

@mrocklin
Copy link
Member

Thank you for sending the article @Kobzol . I really enjoyed reading it. I had a few comments and questions that I wanted to bring up here. I'll focus here on broader questions save nitpicky comments for e-mail :)

Reactor/Scheduler split

I liked the split between reactor (networking/events) and scheduler (task placement logic). I also think that this would be a useful split to think about in terms of performance (which side is the bottleneck?) and in terms of allowing other groups like yours to more easily experiment with different scheduling options. Decoupling these two within Dask would, I think, unlock a lot of creativity. (Also, my apologies that you had to do this in order to experiment)

What is your current performance bottleneck?

It looks like in best case scenario rsds is able to get about 4x faster than Dask's scheduler. This is by a combination of rewriting in Rust, and dropping most of the task placement heuristics. I was actually a little surprised that it wasn't higher. Ideally we would want to learn that there was some way to get a 50x speedup or something similar. This would give us hope for future improvement.

What is your current bottleneck? What is slowing down performance?

Is it that the scheduler is operating as fast as a CPU can go? This seems unlikely given the ~10000 task / second task placement that you have today with a random policy.

Is it the reactor? Perhaps it's hard to manage lots of concurrent signals from different sockets? If so, is there a way around this?

Is it in the network latency of having to go back and forth between the worker and scheduler every time? If so, maybe we try to estimate size outputs ahead of time so that we can speculatively send some tasks to a worker before its dependencies are finished.

Is it in the client? Maybe that's taking up all of the time, and the scheduler is no longer the bottleneck.

@Kobzol
Copy link
Contributor

Kobzol commented Apr 24, 2020

Regarding the reactor/scheduler split: it's a win in terms of experimenting with different schedulers and probably also testability, since the two parts can be tested in isolation. But it can also be a loss in performance, because the isolation means that you probably have to duplicate some data structures or synchronize access to them if using multiple threads, so memory usage is increased. In Rust you can run the two processes concurrently, but that's not really an option in Python. And actually even in Rust it wasn't a big win because we already use async I/O so if you run the scheduler in the same thread, I/O will still be progressing in the background and the scheduler was not a bottleneck in most cases.

Regarding performance: our implementation is actually very simple, without any heavy optimizations. We didn't use TCP connection reusing (although we have it implemented), we run some scheduler stuff like recomputing the priority of tasks from scratch without caching it on every task graph change and we allocate all over the place. Although we want to improve it in the future, it was also partly intended, to see if a relatively simple algorithm + a really basic implementation is able to outperform Dask just by paying smaller runtime costs.

The idea about 50x speedup is actually very interesting and it ties in to our implementation being relatively simple. I think that the Rust server is actually faster than the Dask server by an order of magnitude at least. For example, I have tried an optimization which allocates tasks linearly in memory, which is a huge performance win. In microbenchmarks, we got almost 100 % faster performance than with normal allocation. But when I tried it on the benchmarks from the paper, we saw no improvement at all, so we did not include this optimization in the rsds version used in the paper. Or a different case: we run the scheduler after every new task event, which is a bit wasteful. We implemented some batching, which helped in microbenchmarks, but again we saw almost no improvement for real task graphs. If we microbenchmarked the rsds and Dask, I have no doubt that rsds would strictly dominate even if it is not heavily optimized, but sadly that alone doesn't translate to X times faster workflows end-to-end.

This observation led us to the implementation of the zero worker - if there are other overheads in the distributed runtime other than the server, then even if you make the server 100x faster, it will not help as much. There are fundamental limitations associated with the centralized server architecture, but we cannot really change that in Dask. But we can lower the overheads of the individual parts. Rsds tries to lower the overhead of the server. The client is usually not a bottleneck, at least not from our experience. The worker is I think the next thing that could be interesting to accelerate.

As we saw with the zero worker, if the overhead of the worker could be reduced, the performance gap between rsds and Dask could be further improved. Rewriting part of the worker in Rust would surely help, but there is also another approach that we think could help. In our usage of Dask (both for our purposes and in the paper), we never saw much benefit from using one C-thread worker (replace C by the number of cores on your machine) and we always used C one-thread workers. This gives us more computational power overall for single-threaded Python tasks, but it has severe downsides. The data on each node is located in multiple copies, there are more network transfers and there is also more scheduler overhead. We think that some kind of "super worker" could potentially provide significnat performance gains. Here's how it could look like:

  • it would have C processes for computation instead of C threads
  • it would use only a single permanent connection to the server instead of C connections
  • it could share data objects between all the processes through shared memory if possible (probably a very ambitious hope in general)
  • mainly, it could perform lightweight local scheduling among these C local workers to reduce task assignment latency. In a way, this would bring Dask closer to a more distributed scheduler, like Ray, and it could reduce pressure on the central scheduler.

Of course this is all just an ambitious idea, we don't know whether it's possible to integrate something like this into Dask or how much (if at all) it would help. But it was our impression from our experiments that the large number of workers required to run single-threaded workloads, caused by GIL, is a significant source of overhead. This is of course not a problem for workloads which are fine with multi-threaded Python workers.

I'm sorry that I do not have more specific answers to your other questions, as we didn't have time for a really deep dive. We are not CPU bound on the server and we examined our handling of sockets and async I/O and that also didn't seem to slow anything down. Network latency definitely hurts performance on some task graphs, but we don't have specific numbers. We could probably derive some based on the zero worker experiments, but they would be very specific to our topology I guess. I wanted to include UCX in the benchmarks, but I had some trouble setting it up. I think that I have managed to use it in the end (it was a few months ago), but we didn't see any large differences. It's possible that I have set it up wrong though.

@mrocklin
Copy link
Member

But it can also be a loss in performance, because the isolation means that you probably have to duplicate some data structures or synchronize access to them if using multiple threads, so memory usage is increased

I would only expect to pay double for the amount of information that is in-flight. And even then this might not be necessary. These don't necessarily have to be in separate processes. I may not fully understand your separation between handling network events and managing the state machine of the scheduler though.

The worker is I think the next thing that could be interesting to accelerate.

This would surprise me. I think that if our goal is rapid round-trips then yes, maybe it makes sense to accelerate the workers. But if our goal is task throughput then I don't think that the workers matter that much. This will depend a lot on workload. On something like a merge with an all-to-all shuffle then yes, this might matter because we'll need to send lots of small messages back and forth to coordinate who does what. But even then a centralized approach only costs you 2x (which may be large in your mind) over an distributed system if the scheduler is fast enough. For high throughput situations, like @rabernat 's climate workloads I don't think this will matter at all. We can always add more workers if the overhead here is high.

If we are limited by lots of coordination (as in the merge workload) then I think that there are two things to do:

  1. Make the scheduler faster. An infinitely fast scheduler gets us to within two network hops and whatever overhead the worker has (which I think might not be much?)
  2. Speculatively assign tasks based on historical memory usage of similar tasks. This means more complex workers (they have to do more coordination) but they're pretty simple today. I think that this would let us overlap scheduling and computation a bit.
  3. See if we can reduce network costs

Building on that a little I'm actually quite curious in how much time we spend in each of the following stages:

  1. Doing logic on the scheduler, determining where to send the task (my guess is 100us)
  2. Wrapping up the message, sending it over the network, and receiving it on the worker side (my guess is 500 us - 2ms depending on network)
  3. Handling logic on the worker side and starting the task (my guess is 100 us)

For workloads like merge, where there is a lot of co-decision-making on the workers and schedulers, my guess is that the network communication is going to be the biggest bottleneck for small workloads, and scheduling logic the biggest bottleneck for larger workloads. Because people mostly feel a pain around larger workloads I was hoping that reducing scheduling costs would be the most helpful here.

I think that the Rust server is actually faster than the Dask server by an order of magnitude at least.

I would love to know more about this. I'm also curious about your thoughts on how much faster the Rust scheduler would be if it implemented the same heuristics that the Python scheduler implemented. Still 10x? (I hope so)

This observation led us to the implementation of the zero worker - if there are other overheads in the distributed runtime other than the server, then even if you make the server 100x faster, it will not help as much

In your experiments with your zero-worker, do you track communication time of data? I would expect this to be the main reason why a random placement scheduler would be slow.

In our usage of Dask (both for our purposes and in the paper), we never saw much benefit from using one C-thread worker (replace C by the number of cores on your machine) and we always used C one-thread workers.

In practice the benefit is typically around reducing communication and memory use.

Super-worker

In practice I've seen multi-threaded workers perform pretty well. Before diving into a more complicated structure I recommend that you investigate why things weren't performing well for you with multiple local threads. My opinion so far is that one multi-threaded process per node is the simplest solution. If there is some reason why it doesn't perform well (like some library doesn't release the GIL), then lets resolve that underlying issue.

I wanted to include UCX in the benchmarks, but I had some trouble setting it up

The NVIDIA RAPIDS folks (who I imagine are now tracking this thread) may be able to help here.

@Kobzol
Copy link
Contributor

Kobzol commented Apr 25, 2020

We are probably a bit biased in our experiences, because for our purposes we have been usually running single-threaded pure-Python tasks which don't benefit from multi-threaded workers. I'm sure that there are a lot of users who are fine with multi-threaded workers and that have long enough tasks that the scheduler is never a bottleneck. You are definitely right that multi-threaded workers save memory and bandwidth, it's just that for our past usecases it was not very effective computationally.

Your question about how much time the individual parts take in the server is what also interests us and this is the next thing that we want to focus on. We have some very basic tracing infrastructure in rsds and we have also added it to Dask to compare some metrics (like the number of performed task steals), but some things are difficult to compare directly, because e.g. the scheduler is "spread" over more places in Dask, whereas in rsds it's basically a single function call. This warrants additional investigation, as I don't feel like saying "Rust is faster" is enough. In the paper we were usually faster, but maybe we just chose task graphs that were good for us (although we tried to not be biased here). Once I get to it I'll try to run the experiments with the tracing to provide you with some more numbers (including the comm. time with zero worker).

@mrocklin
Copy link
Member

because for our purposes we have been usually running single-threaded pure-Python tasks which don't benefit from multi-threaded workers

Right, multi-threaded workers are especially valuable when using other PyData libraries like Numpy, Pandas, and Scikit-Learn, where Python code takes up 1% of the runtime, and tasks are usually hundreds of milliseconds long.

We have some very basic tracing infrastructure in rsds and we have also added it to Dask to compare some metrics

+1 on measurement

In the paper we were usually faster, but maybe we just chose task graphs that were good for us (although we tried to not be biased here)

My guess is that a random scheduler will fail hard for some situations that care a lot about communication or memory use. Dask's scheduler has probably evolved to be fairly robust about minimizing memory use, which tends to be the most common complaint we've heard about over the years from data science workloads.

@Kobzol
Copy link
Contributor

Kobzol commented Sep 16, 2021

Just a random note: I ran a quick experiment on the most trivial benchmarks that show runtime overhead to test how Dask behaves these days. It is quite faster than Dask/Distributed ~2.19 that we have benchmarked previously. There is still some gap between RSDS:
image

@jakirkham
Copy link
Member

jakirkham commented Sep 16, 2021

Thanks for the update Jakub! 😄

It might be worth trying the Cythonized scheduler ( #4442 (comment) ) as well

Edit: Also what does the 1n and 7n mean?

@Kobzol
Copy link
Contributor

Kobzol commented Sep 17, 2021

We're running the benchmarks on an HPC cluster. 1n = 1 worker node, 7n = 7 worker nodes. Each node uses 24 Dask worker processes (one per core), each with a single thread. This configuration is chosen both to stress the scheduler and runtime and also because our usecases didn't benefit from multiple worker threads.

I have tried the tag 2021.03.0 with a Cythonized scheduler (well, I hope that it's Cythonized, the build succeeded, but the COMPILED flag is not there. Maybe it was added in a later version).

Here are the results with the Cythonized scheduler:
result_scatterplot

It's much better, although there's still some slowdown when more workers are added.

@jakirkham
Copy link
Member

Ah ok. Was wondering if it was something like that. Thanks for clarifying 🙂

Thanks for running that comparison, Jakub! 😄 That is nice to see.

Yeah the flag wasn't there yet, but one could check this. Might be worth trying a more recent version (if that is an option). There were some improvements to worker selection, serialization, memory handling, etc. (though those are not really related to Cythonization of the scheduler in particular)

Am curious if you would be willing to share the code you used in this experiment? No worries if not. Just thinking it might be fun to play with 🙂

@Kobzol
Copy link
Contributor

Kobzol commented Sep 17, 2021

I wanted to try a newer version, but it required Python 3.7. It's not a problem to upgrade, but the other benchmarks were executed with Python 3.6, so to keep it more fair I would probably need to upgrade all of them. I will do it eventually, I just haven't gotten to it yet :) I need to modify the benchmark suite a bit to make it easier to execute multiple benchmarks with different Python/Python library versions.

The code is open source here: https://github.com/It4innovations/rsds/blob/master/scripts/benchmark.py
The benchmarks can be run locally using python benchmark.py benchmark <input-file (see reference.json)> <directory> or on a PBS cluster using benchmark.py submit. It's not exactly super documented though 😅

The benchmarked task workflows can be found here.

@jakirkham
Copy link
Member

jakirkham commented Sep 17, 2021

Ah ok. That's fair 🙂

It's possible things work with Python 3.6. There weren't too many changes when dropping it ( #4390 ). The main differences were dropping time measurement utilities for Windows, which seems irrelevant here, and some async bits Python 3.7 simplified

Great, thank you Jakub! 😄 It seems like a lot of these are similar to things we've been benchmarking ourselves (groupbys, joins, reductions).

Not sure if you have read any of the blogposts recently. No worries if not. Think this one on speeding up the scheduler and this one on HighLevelGraphs (HLGs) may be of interest. Also there's a presentation on all of this from the Dask Summit, which has a good overview of the work.

Maybe it would be worthwhile to compare notes at some point?

@Kobzol
Copy link
Contributor

Kobzol commented Sep 17, 2021

Thanks, I have read the articles, but haven't seen the video, I will go through it. HLGs are something that we have also been experimenting with (#3872), it's cool to see that Dask has progressed so much on this front.

Maybe it would be worthwhile to compare notes at some point?

Sure :) We can chat on the nvcollab Slack if you want. I'm trying to "revive" the work on my PhD 😄 And I'll probably start with analyzing Dask/RSDS to see what the bottlenecks are for certain workflows.

@jakirkham
Copy link
Member

jakirkham commented Sep 17, 2021

Ah ok. Yeah that should provide a good overview.

After moving a bunch of things to HLGs, we've wound up spending time on other things related to serialization ( #4923 ), comms ( #4443 ), IO/creation ( dask/dask#6791 ), etc. Also there is more recent work on operation reordering ( dask/dask#7933 ). Probably forgetting other things here 😅

Sure. Oh gotcha 😄 Well it would be great to have another set of eyes. Would be curious to see what other issues you see when hammering on things 🙂

@GenevieveBuckley GenevieveBuckley added the discussion Discussing a topic with no specific actions yet label Oct 22, 2021
@Am3ra
Copy link

Am3ra commented Feb 1, 2023

any updates @Kobzol ?

@Kobzol
Copy link
Contributor

Kobzol commented Feb 1, 2023

Hi. Some of the ideas and insights from RSDS (the Rust scheduler) were used to slightly optimize Dask, but we are no longer working on the Rust scheduler. It is my understanding that since RSDS was created, Dask has moved in the direction of moving much more stuff from the client to the server (and this is a Good Thing™), however it also means that any (re)implementation of the server would now be much more complicated, because it would need to cover a much larger API surface.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet
Projects
None yet
Development

No branches or pull requests

6 participants