Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add C APIs for transition_* functions #4650

Closed
wants to merge 32 commits into from

Conversation

jakirkham
Copy link
Member

@jakirkham jakirkham commented Mar 30, 2021

To cutdown on the overhead of calling transition_* functions, add C APIs for these functions to allow Cython to call these with C.

  • Closes #xxxx
  • Tests added / passed
  • Passes black distributed / flake8 distributed

As C APIs cannot be generated for functions that use `**kwargs` and
`**kwargs` are not used in these functions, just drop them.
@jakirkham
Copy link
Member Author

Seeing this on CI

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
    handler(**merge(extra, msg))
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4828, in handle_task_finished
    r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4214, in stimulus_task_finished
    r: tuple = parent._transition(key, "memory", worker=worker, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1864, in _transition
    a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_memory() got an unexpected keyword argument 'status'

https://github.com/dask/distributed/pull/4650/checks?check_run_id=2224608533

@jakirkham
Copy link
Member Author

Seeing this on CI

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
    handler(**merge(extra, msg))
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4828, in handle_task_finished
    r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4214, in stimulus_task_finished
    r: tuple = parent._transition(key, "memory", worker=worker, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1864, in _transition
    a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_memory() got an unexpected keyword argument 'thread'

https://github.com/dask/distributed/pull/4650/checks?check_run_id=2224796235

@jakirkham
Copy link
Member Author

Seeing this on CI

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
    handler(**merge(extra, msg))
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4836, in handle_task_finished
    r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4222, in stimulus_task_finished
    r: tuple = parent._transition(key, "memory", worker=worker, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1872, in _transition
    a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_memory() got an unexpected keyword argument 'metadata'

https://github.com/dask/distributed/pull/4650/checks?check_run_id=2224960448

@jakirkham
Copy link
Member Author

Seeing this on CI

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/utils.py", line 668, in log_errors
    yield
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 3873, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4959, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
    handler(**merge(extra, msg))
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4848, in handle_task_erred
    r: tuple = self.stimulus_task_erred(key=key, **msg)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4271, in stimulus_task_erred
    **kwargs,
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1873, in _transition
    a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_erred() got an unexpected keyword argument 'worker'

https://github.com/dask/distributed/pull/4650/checks?check_run_id=2225283910

@jakirkham
Copy link
Member Author

Seeing this on CI

distributed.core - ERROR - transition_processing_erred() got an unexpected keyword argument 'startstops'
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
    handler(**merge(extra, msg))
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4848, in handle_task_erred
    r: tuple = self.stimulus_task_erred(key=key, **msg)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4264, in stimulus_task_erred
    r = parent._transition(
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1873, in _transition
    a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_erred() got an unexpected keyword argument 'startstops'

https://github.com/dask/distributed/pull/4650/checks?check_run_id=2225419119

@jakirkham
Copy link
Member Author

Seeing this on CI

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
    handler(**merge(extra, msg))
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4854, in handle_task_erred
    r: tuple = self.stimulus_task_erred(key=key, **msg)
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4270, in stimulus_task_erred
    r = parent._transition(
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1873, in _transition
    a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_erred() got an unexpected keyword argument 'text'

https://github.com/dask/distributed/pull/4650/checks?check_run_id=2225540455

@jakirkham
Copy link
Member Author

jakirkham commented Mar 30, 2021

@mrocklin @quasiben above are some examples of the extra arguments that are being passed to transition_ functions that are not needed. Also would welcome better suggestions on how to handle these arguments to avoid them being passed to functions that don't use them

@jakirkham
Copy link
Member Author

@mrocklin could you please weigh in on how we should handle these extra arguments?

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2021

In some of these cases I suspect that we do actually use these keyword arguments, but we use them in plugins. For example the thread= keyword is used by the task stream plot. https://github.com/dask/distributed/blob/main/distributed/diagnostics/task_stream.py

Maybe we need to send an extras dict?

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2021

Also, thanks to @jrbourbeau for pointing me to this.

For reference @jakirkham my e-mail is no longer reliably read. If you're blocked on me specifically then you're probably better served by trying to connect out-of-band. My apologies for the lack of responsiveness.

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2021

Maybe we need to send an extras dict?

Or, maybe easier would be to just include them as explicit keywords even if they aren't used in these particular methods.

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2021

I'm enthusiastic about this change by the way. I look forward to seeing if it has an impact.

@jakirkham
Copy link
Member Author

If these are meant for plugins called in _transition, would it make sense to extract these at that level?

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2021

Oh yeah, maybe. That makes sense. Presumably the SchedulerState class is intended to be stripped down anyway. That sounds like a great solution.

@jakirkham
Copy link
Member Author

jakirkham commented Apr 9, 2021

Yeah I think that is next thing is making a clean break between Scheduler and SchedulerState (instead of Scheduler inheriting from SchedulerState)

@mrocklin
Copy link
Member

mrocklin commented Apr 9, 2021

To be clear, my understanding is that that change would not be necessary in order to implement the changes here though. That's more future-looking. Is that correct?

@jakirkham
Copy link
Member Author

Yeah that's future looking and unrelated to this

@douglasdavis
Copy link
Member

Yeah I think that is next thing is making a clean break between Scheduler and SchedulerState (instead of Scheduler inheriting from SchedulerState)

@jakirkham chiming in to say I'd be interested in helping in this area (if there's room for it!). Would be happy to have a quick chat about status of/plans for the effort if you think it would be worthwhile

@jakirkham
Copy link
Member Author

Communication is one area we could improve that would help. Shared some thoughts in this comment ( #4513 (comment) )

Comment on lines 1946 to 1958
plugin.transition(
key,
start,
finish2,
*args,
status=status,
thread=thread,
metadata=metadata,
worker=worker,
startstops=startstops,
text=text,
**kwargs,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a dealbreaker but that could be perceived as a breaking change, couldn't it? At least this is a subtle change in the required signature. So far we document this as *args, **kwargs, see https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.SchedulerPlugin.transition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. These are already passed as keyword arguments and we are continuing to pass them that way

That said, Idk if this is how things will shake out yet. Things are still changing here. IOW I'm not sure this is worth reviewing yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what John said. I think that this is a no-op if you consider the extraction of these from kwargs above.

This seems to be used by plugins even though it is not used in this
transition. So just catch the argument anyways.
Needed by some Scheduler plugins.
@jakirkham
Copy link
Member Author

Think the other issues are fixed. Though started seeing test_dashboard failing. Saw there were some changes in main to this test. So tried to merge those in, but it looks like that might not work either. Appears test_dashboard has been having issues elsewhere though ( #4697 ). So maybe it's not related to these changes

The transition `func` that was being grabbed depended on whether `key`
was found in the recommendations from the last transition. If `key` was
found, one function based on `v` was called. If not, `finish` was used
in place of `v`. To make this a bit more explicit, simple, and
efficient, assign `finish` to `v` if `key` is not found. This way `v`
can be used in all cases with one call to retrieve the `func`.
@jakirkham jakirkham force-pushed the cy_transitions branch 3 times, most recently from c978bc0 to b75dc23 Compare April 14, 2021 06:05
@fjetter
Copy link
Member

fjetter commented Apr 14, 2021

I see errors connecting to the transitioning code in the ubu-py3.7 tests

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1963, in _transition
    raise RuntimeError(f"Impossible transition from {start} to {finish}")
RuntimeError: Impossible transition from no-worker to released
distributed.scheduler - ERROR - Error transitioning 'lambda-df0fe8d7604e475821f9b929791d1a43' from 'no-worker' to 'waiting'

https://github.com/dask/distributed/pull/4650/checks?check_run_id=2340272649

@jakirkham
Copy link
Member Author

Yeah I pushed some more changes last night after fixing some errors. So there may be some new ones

if finish is "released": # noqa: F632
return self.transition_erred_released(*args, **kwargs)
elif finish is "forgotten": # noqa: F632
return self.transition_released_forgotten(*args, **kwargs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From @mrocklin's comment ( 8ff10d9#r49507747 ).

Does this change result in a meaningful speedup? This seems significantly more complex to me. I agree that it's worth doing, but only if it has an impact. If possible I think that we should try to measure the impact of this PR before merging. Thoughts?

(pasted here as the comment wasn't in the PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the goal of this change is to try and cutdown on time spent calling the transition_* functions themselves by moving those calls to C. To benefit from adding C APIs to those functions we need to call them directly. Doing it indirectly will just go through Python so won't have any benefit. IOW something like this will be needed

We could flatten the ifs by comparing a single str (maybe like "{start} {finish}"?) to make this a bit simpler/more readable. May play with this a bit. Am open to other suggestions to improve readability/maintainability here

It's worth noting that today this lives in a dict that is roughly as long as this code block

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add currently Cython is passing *args and **kwargs through the Python API atm. So there are probably additional tweaks needed to get Cython to move those function calls to C

The comparisons are already pointer comparisons so that part is being handled efficiently

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand why, from first principles, this is a good idea. I'm curious what impact is has on benchmarks and performance.

If this improves benchmarks by a percent or two then awesome, let's good for it. If it's only shaving off a couple of nanoseconds then maybe we shouldn't do this.

I think that right now our development flow is to make some changes, merge them in, and then see how benchmarks respond. I'm suggesting that we may want to roll those benchmarks in earlier in the process so that we make changes, look at how they change benchmarks/profiling, and then based on those improvements we decide to merge.

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that is fine. Was just trying to provide more context as well as make sure we are comfortable with this kind of change before diving deeper. Sound like we are ok with it in principle, but would like to make sure it is delivering value, which makes sense to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakirkham I've noticed you active on github again. Welcome back from vacation? I hope that you had a relaxing time.

I'm curious to learn how much impact this PR had. I suspect that this is already on your queue of work coming back (which I imagine is long) but I thought I'd throw a 👍 on it anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I had another idea on how to improve things here, but as guessed other things are at the forefront of the queue atm 😅

While this function is not defined in Python, it is defined in Cython.
Even though flake8 complains about it not being defined, flake8 doesn't
know anything. So just disable this flake8 error.
Avoids needing to `intern` 2 `str`s using one instead. Also performs
only 1 `str` comparison before making a function call instead of 2.
A `str` literal with `-` does not automatically get `intern`ed in
Python. Though `str`s with `_` do. So replace `-` with `_` to leverage
`intern`ing of literal `str`s.
@jakirkham
Copy link
Member Author

Regarding the "no-worker" issue noted above. Python literals are normally interned automatically in pure Python. Cython also will intern and manage these globally for the module. For example...

In [1]: s1 = "processing"

In [2]: s2 = "processing"

In [3]: s1 is s2
Out[3]: True

However it seems strs with a "-" don't get interned automatically.

In [1]: s1 = "no-worker"

In [2]: s2 = "no-worker"

In [3]: s1 is s2
Out[3]: False

This is a non-issue when using "_".

In [1]: s1 = "no_worker"

In [2]: s2 = "no_worker"

In [3]: s1 is s2
Out[3]: True

So replaced "-" with "_" in the dispatch table to fix this issue

@jakirkham jakirkham marked this pull request as draft April 15, 2021 08:32
Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive some naive questions here.

From a new perspective, I think this is largely good. I wonder if we can prove that the string concatenation is actually faster.

@@ -1975,7 +1966,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState:
# State Transitions #
#####################

def _transition(self, key, finish: str, *args, **kwargs):
def _transition_dispatch(self, start: str, finish: str, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@optimize.use_switch(True) is default, I believe, but maybe worth declaring explicitly.

@@ -1975,7 +1966,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState:
# State Transitions #
#####################

def _transition(self, key, finish: str, *args, **kwargs):
def _transition_dispatch(self, start: str, finish: str, *args, **kwargs):
start_finish: str = sys_intern(f"{start}_{finish}".replace("-", "_"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be interested in a speed test of this approach versus hash/dict lookup or actual strcmp (which is what I assume cython does when both operands are char*). For the latter, you don't need the string manipulation to make the compound, and the if statements can be grouped.

b_recs: dict
b_cmsgs: dict
b_wmsgs: dict
b: tuple = func(key)
b: tuple = self._transition_dispatch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need b here, can combine with next line.

@@ -2070,7 +2102,7 @@ def _transition(self, key, finish: str, *args, **kwargs):

start = "released"
else:
raise RuntimeError("Impossible transition from %r to %r" % start_finish)
raise RuntimeError(f"Impossible transition from {start} to {finish}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would change the logic a little, but would make sense to me to have _transition_dispatch raise this error if none of the cases match.


def _transition(
self,
key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't key have a type?

@@ -2213,7 +2256,9 @@ def transition_released_waiting(self, key):
pdb.set_trace()
raise

def transition_no_worker_waiting(self, key):
@ccall
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why all ccall rather than cfunc - can they be called from anywhere but _transition?


if self._validate:
assert worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good move

@jakirkham jakirkham mentioned this pull request Apr 12, 2022
3 tasks
@jakirkham jakirkham closed this Apr 29, 2022
@jakirkham jakirkham deleted the cy_transitions branch April 29, 2022 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants