-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Accelerate intra-node IPC with shared memory #6267
Comments
What is the workload? Have you tried the dask.distributed scheduler? You can set up a system with sensible defaults by running the following: from dask.distributed import Client
client = Client()
# then run your normal Dask code https://docs.dask.org/en/latest/scheduling.html#dask-distributed-local |
In general a system like Plasma will be useful when you want to do a lot of random access changes to a large data structure and you have to use many processes for some reason. In my experience, the number of cases where this is true is very low. Unless you're doing something like a deep learning parameter server on one machine and can't use threads for some reason there is almost always a simpler solution.
A data loading pipeline shouldn't really require any communication, and certainly not high speed random access modifications to a large data structure. It sounds like you just want a bunch of processes (because you have code that holds the GIL) and want to minimize data movement between those processes. The dask.distributed scheduler should have you covered there, you might want to add the |
In addition to what Matt said, we have tended to keep Dask's dependencies pretty lightweight when possible. My guess is if we were to implement shared memory it would either involve That said, if serialization is really a bottleneck for you, would suggest you take a closer look at what is being serialized. If it's not something that Dask serializes efficiently (like NumPy arrays), then it might just be you need to implement Dask serialization. If you have some simple Python classes consisting of things Dask already knows how to serialize efficiently, you might be able to just register those classes with Dask. It will then recurse through them and serialize them efficiently. Additionally if you are Python with pickle protocol 5 support and a recent version of Dask, you can get efficient serialization with plain pickle thanks to out-of-band pickling ( dask/distributed#3784 ). Though you would have to check and make sure you are meeting those requirements. This may also require some work on your end to ensure your objects use things that can be handled out-of-band by either wrapping them in |
|
Maybe. We're not really bound by bandwidth there yet. Even if we were,
the people who are concerned about performance for dataframe shuffle
operations are only really concerned when we start talking about very large
datasets, for which single-node systems wouldn't be appropriate.
…On Thu, Jun 11, 2020 at 5:11 PM Dave Hirschfeld ***@***.***> wrote:
plasma might be ideally suited for e.g. shuffling operations, #6164
<#6164>
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#6267 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTGABCAUEKJ6C2SYJ23RWFXCZANCNFSM4NO46HZA>
.
|
Though if you have thoughts on how |
In the context of distributed you could have a I think that could have pretty big performance benefits for a number of workloads. IIUC that's basically what
Anyway, it might be a very big piece of work, so not something I could invest time in. I thought I'd mention it as an option though if people are considering big changes to improve performance. |
Yeah, I think that having some sort of shuffling service makes sense (this
is also what Spark does). I'm not sure that we need all of the machinery
that comes along with Plasma though, which is a bit of a bear. My guess is
that a system that just stores data in normal vanilla RAM on each process
would do the trick.
…On Thu, Jun 11, 2020 at 5:32 PM Dave Hirschfeld ***@***.***> wrote:
In the context of distributed you could have a plasma store per node and
instead of having workers communicating data directly, have them send the
data to the plasma store on the receiving node and only send the guid /
unique reference directly to the worker. All workers on that node would
then have access to that data (by passing around the guid) without having
to copy or deserialize the data.
I think that could have pretty big performance benefits for a number of
workloads. IIUC that's basically what ray
<https://ray-project.github.io/2017/08/08/plasma-in-memory-object-store.html>
does.
To illustrate the benefits of Plasma, we demonstrate an 11x speedup (on a
machine with 20 physical cores) for sorting a large pandas DataFrame (one
billion entries). The baseline is the built-in pandas sort function, which
sorts the DataFrame in 477 seconds. To leverage multiple cores, we
implement the following standard distributed sorting scheme...
Anyway, it would would be very big piece of work so, not something I could
invest time in.I thought I'd mention it as an option if people are
considering big changes to improve performance.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#6267 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTE7ESNRUJGQDG7DQ73RWFZSDANCNFSM4NO46HZA>
.
|
I could totally be wrong though. It would be great if people wanted to run
experiments here and report back.
…On Thu, Jun 11, 2020 at 6:58 PM Matthew Rocklin ***@***.***> wrote:
Yeah, I think that having some sort of shuffling service makes sense (this
is also what Spark does). I'm not sure that we need all of the machinery
that comes along with Plasma though, which is a bit of a bear. My guess is
that a system that just stores data in normal vanilla RAM on each process
would do the trick.
On Thu, Jun 11, 2020 at 5:32 PM Dave Hirschfeld ***@***.***>
wrote:
> In the context of distributed you could have a plasma store per node and
> instead of having workers communicating data directly, have them send the
> data to the plasma store on the receiving node and only send the guid /
> unique reference directly to the worker. All workers on that node would
> then have access to that data (by passing around the guid) without having
> to copy or deserialize the data.
>
> I think that could have pretty big performance benefits for a number of
> workloads. IIUC that's basically what ray
> <https://ray-project.github.io/2017/08/08/plasma-in-memory-object-store.html>
> does.
>
> To illustrate the benefits of Plasma, we demonstrate an 11x speedup (on a
> machine with 20 physical cores) for sorting a large pandas DataFrame (one
> billion entries). The baseline is the built-in pandas sort function, which
> sorts the DataFrame in 477 seconds. To leverage multiple cores, we
> implement the following standard distributed sorting scheme...
>
> Anyway, it would would be very big piece of work so, not something I
> could invest time in.I thought I'd mention it as an option if people are
> considering big changes to improve performance.
>
> —
> You are receiving this because you commented.
> Reply to this email directly, view it on GitHub
> <#6267 (comment)>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AACKZTE7ESNRUJGQDG7DQ73RWFZSDANCNFSM4NO46HZA>
> .
>
|
Has there been any further discussion on the multiprocessing shared memory implementation? I also run dask on single machines with high core counts and have read-only datastructures that I want shared. |
@alexis-intellegens the ray depelopers created a Dask scheduler for this called # don't do this:
dask.compute(dask_fn(large_object))
# instead do this:
large_object_ref = ray.put(large_object)
dask.compute(dask_fn(large_object_ref)) ray will automatically de-reference the object for you. |
Very interesting! I'll give it a go. Thanks @Hoeze |
Out of curiosity, what were to happen if I made a shared memory object (via Python 3.8 multiprocessing) and tried to access it in dask workers? I'll try it later today. |
That should work, they'd pickle as references to the shared memory buffer and be remapped in the receiving process (provided all your workers are running on the same machine, otherwise you'd get an error). In general I think we're unlikely to add direct shared memory support in dask itself, but users are free to make use of it in custom workloads using e.g. As stated above, shared memory would make the most sense if you have objects that can be mapped to shared memory without copying (meaning they contain large buffers, like a numpy array) but also still hold the GIL. In practice this is rare - if you're using large buffers you also probably are doing something numeric (like numpy) in which case you release the GIL and threads work fine. Closing. |
When implementing e.g. a data loading pipeline for machine learning with Dask, I can choose either:
I often face the issue that the threaded scheduler effectively uses only 150% CPU, no matter how many cores it gets, because of python code that does not parallelize.
The forking scheduler sometimes works better but only if the data loading is very CPU-intense.
Recently, I tried Ray and it could speed up some of my prediction models by 5-fold due to some reason.
I'm not 100% up to date with the latest development in Dask, but AFAIK Dask serializes all data when sending it between workers. That's why I assume the huge speed difference is due to the shared-memory object store Plasma that allows zero-copy transfers of Arrow arrays from the worker to Tensorflow.
=> I'd like to share two ideas how Plasma or Ray could be helpful for Dask:
Allow producer to calculate data and consumer to read it without (de)serialization or copying
Related issues:
The text was updated successfully, but these errors were encountered: