Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document communication protocol #3357

Open
Kobzol opened this issue Jan 8, 2020 · 5 comments
Open

Document communication protocol #3357

Kobzol opened this issue Jan 8, 2020 · 5 comments

Comments

@Kobzol
Copy link
Contributor

Kobzol commented Jan 8, 2020

Hi! Together with @spirali we are attempting to rewrite the Dask scheduler in Rust (#3139). We had some initial success, but when we moved to more advanced Dask programs (for example distributed pandas), the communication protocol between the clients/workers and the scheduler became very difficult to handle.

Right now we are facing two issues:

  1. The serialization format of the protocol is somewhat static-type-language unfriendly, which is described here.
  2. We haven't found a proper and complete documentation of the Dask communication protocol (which messages are available between client/scheduler and worker/scheduler, what are their parameters, which parameters are optional etc.). It is difficult to reimplement the scheduler without the protocol documented. There is some information at https://distributed.dask.org/en/latest/protocol.html, but that's just a very small part of the protocol.

For example, we thought that the compute-task message which is sent from the scheduler to workers needs to have function and args parameters which contain the necessary code and data to run on the worker. However, when running the following Python script:

import dask
from dask.distributed import Client

client = Client("tcp://localhost:8786")
df = dask.datasets.timeseries(start="2020-01-30", end="2020-01-31")
print(len(df))

df.groupby("name")["x"].mean().compute()
df[(df["x"] > 0) | (df["y"] < 0)].compute()

the scheduler sends some compute-task messages to workers which do not contain the function and args keys:

{
    'duration': 0.5,
    'key': "('series-groupby-count-chunk-series-groupby-count-agg-24c2448278b10581d563ee8a2bb5c45b', '0)'",
    'nbytes': {"('make-timeseries-11817facbee00a90e53448d7973e9de2', 0)": 8205680},
    'op': 'compute-task',
    'priority': (0, 1, 1),
    'who_has': {"('make-timeseries-11817facbee00a90e53448d7973e9de2', 0)": [   'tcp://127.0.0.1:45605']}
}

Another problem is that the definition of the task itself (inside tasks dictionary in update-graph messages) may be serialized. Even if it only uses msgpack, the serialized format can be quite complex and we are not sure how to reimplement the (de)serialization in Rust. Without a proper documentation of the format we can only guess how to implement the scheduler properly.

I suspect that there may be some legacy cruft hidden inside the Dask communication protocol (both the serialization format and the message API itself) and it may be worth it to document it properly and possibly make some simplifications. If this documentation exists somewhere already please let us know. What do you think?

@TomAugspurger
Copy link
Member

cc @mrocklin if you have time to comment.

My understanding is that the protocol developed organically as we needed things.

Another problem is that the definition of the task itself (inside tasks dictionary in update-graph messages) may be serialized. Even if it only uses msgpack, the serialized format can be quite complex and we are not sure how to reimplement the (de)serialization in Rust.

Is deserialization necessary? I (possibly incorrectly) thought that the scheduler didn't deserialize things, but I may be thinking of something else.

The documents in docs/ are the extent of whatever documentation we have on this. And the usual disclaimer that they may be out of date.

I suspect we'd be happy to take simplifications to the protocol as long as they don't harm the current implementation.

@Kobzol
Copy link
Contributor Author

Kobzol commented Jan 15, 2020

Right, (de)serialization is an overloaded term in this scenario. I'll describe a specific use case.
We wanted to add the simplest possible functionality - receive a task from client (update-graph) and then send it to a worker (compute-task).

The update-graph message sends tasks in a dictionary keyed by the task id/key. The definition of the task is a dict containg attributes func, args. The compute-task message also has func and args attributes, which we passed on from the task definition. This has worked fine for simple Dask scripts.

However, with more complex pipelines (distributed pandas), we have observed that the definition of the task may also be a msgpack object, which is recursively serialized, with subheaders, possibly compression and other stuff. We thought that since the scheduler should be language agnostic, we could just take this msgpack blob and pass it on to the worker, but the compute-task message expects a dict with func and args, so we didn't know how to extract these attributes from the serialized task definition (later we also found out that sometimes the compute-task message is sent by the Dask scheduler without the func/args attributes, but we have no idea how that works).

I suspect that the property that the Dask scheduler is language agnostic might not be enforced strictly. The low level loads and dumps methods in protocol/core.py call a deserialize function from distributed/protocol/serialize.py, which transforms the complex serialized msgpack values into Python objects, which is something that we cannot do in Rust. It's possible the scheduler is implementable without this deserialization, but without a proper documentation of the protocol, it's pretty difficult to find out what should the scheduler do.

So to go forward, we would need to either document or simplify the protocol. If there is no documentation because the protocol grew organically, we will probably have to find some simple subset of the protocol and change the complex client code (for example the Dask messages generated from pandas tables) to use this simpler version of the protocol. Because without a documentation I'm not sure that we could reimplement the existing protocol in Rust, since there will probably be a ton of edge cases, intended or not (Python is super dynamic, so what is an edge case in Rust might not be a problem in Python).

@TomAugspurger
Copy link
Member

Thanks. I suspect that improvements to the docs would be welcome wholeheartedly. And changes to the protocol / the implementation that make things easier in other languages would be welcome as long they don't overly harm the current implementation's performance and readability.

@mrocklin
Copy link
Member

mrocklin commented Jan 18, 2020 via email

@Kobzol
Copy link
Contributor Author

Kobzol commented Jan 29, 2020

FYI I created a work-in-progress documentation of Dask messages. I couldn't find a better (terser) schema format, so I just used TypeScript.

The documentation contains a subset of Dask messages (more or less exactly the subset that our Rust scheduler currently attempts to support).

Apart from some inconsistent naming conventions and the Task definition structure (which is pretty... polymorphic :-) ), there are some small quirks that we had to replicate. For example the {"op": "stream-start"} message, which is sent from the scheduler to the client after it registers, needs to be inside a message list of size one, because the client asserts it (https://github.com/dask/distributed/blob/master/distributed/client.py#L1067).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants