Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add C APIs for transition_* functions #4650

Closed
wants to merge 32 commits into from
Closed
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9ddf362
Add C APIs for all `transition_*` functions
jakirkham Mar 30, 2021
52450c1
Drop `**kwargs` from `transition_*`
jakirkham Mar 30, 2021
2f077b9
Extract `status` in `_transition`
jakirkham Mar 30, 2021
aedd565
Extract `thread` in `_transition`
jakirkham Mar 30, 2021
9f554ee
Extract `metadata` in `_transition`
jakirkham Mar 30, 2021
dc40037
Extract `worker` in `transition_processing_erred`
jakirkham Mar 30, 2021
b6a67fe
Extract `startstops` in `transition_processing_erred`
jakirkham Mar 30, 2021
333ec79
Extract `text` in `transition_processing_erred`
jakirkham Mar 30, 2021
934996f
Move `transition_processing_erred` kwargs
jakirkham Apr 10, 2021
015af69
Pass excess kwargs to plugins
jakirkham Apr 10, 2021
85b729f
Skip extracting `worker` argument
jakirkham Apr 13, 2021
f2c3e34
Add `worker` arg to `transition_processing_erred`
jakirkham Apr 13, 2021
5eeffde
Pass `startstops` through
jakirkham Apr 13, 2021
fea65f7
Set `startstops` only if the key is in data
jakirkham Apr 13, 2021
4766207
`worker: str` in `transition_processing_memory`
jakirkham Apr 13, 2021
6712923
Move other `assert`s under `self._validate`
jakirkham Apr 13, 2021
45b5c94
Drop unneeded `worker` argument
jakirkham Apr 13, 2021
ab951c6
Pick up `text` in `transition_processing_erred`
jakirkham Apr 13, 2021
fbdf587
Pack arguments back into `kwargs`
jakirkham Apr 13, 2021
d15d82e
Revert "Pack arguments back into `kwargs`"
jakirkham Apr 13, 2021
33fc4af
Keep passing `startstops` in error case
jakirkham Apr 13, 2021
653f53d
Readd `worker` argument
jakirkham Apr 14, 2021
feb5285
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 14, 2021
1d3025f
Grab `func` for `finish` case only if used
jakirkham Apr 14, 2021
7ae22ad
Add & use `_transition_dispatch` function
jakirkham Apr 14, 2021
1c55db6
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 14, 2021
08ed07f
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 15, 2021
eec1849
Disable flake8 on Cython `intern` call
jakirkham Apr 15, 2021
6c92b6c
Flatten dispatch table to cutdown on overhead
jakirkham Apr 15, 2021
57a949b
Use `_` instead of `-` in `str`s
jakirkham Apr 15, 2021
328d7fc
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 26, 2021
bb3e5cf
Merge dask/main into jakirkham/cy_transitions
jakirkham May 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 124 additions & 49 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ def nogil(func):
return func


@cfunc
@inline
def sys_intern(s):
if compiled:
return intern(s) # noqa: F821
else:
return sys.intern(s)


if sys.version_info < (3, 8):
try:
import pickle5 as pickle
Expand Down Expand Up @@ -1751,7 +1760,6 @@ class SchedulerState:
_task_metadata: dict
_total_nthreads: Py_ssize_t
_total_occupancy: double
_transitions_table: dict
_unknown_durations: dict
_unrunnable: set
_validate: bint
Expand Down Expand Up @@ -1804,23 +1812,6 @@ def __init__(
self._task_metadata = dict()
self._total_nthreads = 0
self._total_occupancy = 0
self._transitions_table = {
("released", "waiting"): self.transition_released_waiting,
("waiting", "released"): self.transition_waiting_released,
("waiting", "processing"): self.transition_waiting_processing,
("waiting", "memory"): self.transition_waiting_memory,
("processing", "released"): self.transition_processing_released,
("processing", "memory"): self.transition_processing_memory,
("processing", "erred"): self.transition_processing_erred,
("no-worker", "released"): self.transition_no_worker_released,
("no-worker", "waiting"): self.transition_no_worker_waiting,
("released", "forgotten"): self.transition_released_forgotten,
("memory", "forgotten"): self.transition_memory_forgotten,
("erred", "forgotten"): self.transition_released_forgotten,
("erred", "released"): self.transition_erred_released,
("memory", "released"): self.transition_memory_released,
("released", "erred"): self.transition_released_erred,
}
self._unknown_durations = dict()
if unrunnable is not None:
self._unrunnable = unrunnable
Expand Down Expand Up @@ -1975,7 +1966,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState:
# State Transitions #
#####################

def _transition(self, key, finish: str, *args, **kwargs):
def _transition_dispatch(self, start: str, finish: str, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@optimize.use_switch(True) is default, I believe, but maybe worth declaring explicitly.

start_finish: str = sys_intern(f"{start}_{finish}".replace("-", "_"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be interested in a speed test of this approach versus hash/dict lookup or actual strcmp (which is what I assume cython does when both operands are char*). For the latter, you don't need the string manipulation to make the compound, and the if statements can be grouped.


if start_finish is "released_waiting": # noqa: F632
return self.transition_released_waiting(*args, **kwargs)
elif start_finish is "released_forgotten": # noqa: F632
return self.transition_released_forgotten(*args, **kwargs)
elif start_finish is "released_erred": # noqa: F632
return self.transition_released_erred(*args, **kwargs)
elif start_finish is "waiting_released": # noqa: F632
return self.transition_waiting_released(*args, **kwargs)
elif start_finish is "waiting_processing": # noqa: F632
return self.transition_waiting_processing(*args, **kwargs)
elif start_finish is "waiting_memory": # noqa: F632
return self.transition_waiting_memory(*args, **kwargs)
elif start_finish is "processing_released": # noqa: F632
return self.transition_processing_released(*args, **kwargs)
elif start_finish is "processing_memory": # noqa: F632
return self.transition_processing_memory(*args, **kwargs)
elif start_finish is "processing_erred": # noqa: F632
return self.transition_processing_erred(*args, **kwargs)
elif start_finish is "no_worker_released": # noqa: F632
return self.transition_no_worker_released(*args, **kwargs)
elif start_finish is "no_worker_waiting": # noqa: F632
return self.transition_no_worker_waiting(*args, **kwargs)
elif start_finish is "memory_released": # noqa: F632
return self.transition_memory_released(*args, **kwargs)
elif start_finish is "memory_forgotten": # noqa: F632
return self.transition_memory_forgotten(*args, **kwargs)
elif start_finish is "erred_released": # noqa: F632
return self.transition_erred_released(*args, **kwargs)
elif start_finish is "erred_forgotten": # noqa: F632
return self.transition_released_forgotten(*args, **kwargs)

def _transition(
self,
key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't key have a type?

finish: str,
*args,
status: str = None,
thread: Py_ssize_t = -1,
metadata: dict = None,
**kwargs,
):
"""Transition a key from its current state to the finish state

Examples
Expand All @@ -1994,7 +2028,6 @@ def _transition(self, key, finish: str, *args, **kwargs):
parent: SchedulerState = cast(SchedulerState, self)
ts: TaskState
start: str
start_finish: tuple
finish2: str
recommendations: dict
worker_msgs: dict
Expand All @@ -2019,12 +2052,10 @@ def _transition(self, key, finish: str, *args, **kwargs):
dependents = set(ts._dependents)
dependencies = set(ts._dependencies)

start_finish = (start, finish)
func = self._transitions_table.get(start_finish)
if func is not None:
a: tuple = func(key, *args, **kwargs)
a: tuple = self._transition_dispatch(start, finish, key, *args, **kwargs)
if a is not None:
recommendations, client_msgs, worker_msgs = a
elif "released" not in start_finish:
elif start != "released" and finish != "released":
assert not args and not kwargs
a_recs: dict
a_cmsgs: dict
Expand All @@ -2033,11 +2064,12 @@ def _transition(self, key, finish: str, *args, **kwargs):
a_recs, a_cmsgs, a_wmsgs = a

v = a_recs.get(key, finish)
func = self._transitions_table["released", v]
b_recs: dict
b_cmsgs: dict
b_wmsgs: dict
b: tuple = func(key)
b: tuple = self._transition_dispatch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need b here, can combine with next line.

"released", v, key, *args, **kwargs
)
b_recs, b_cmsgs, b_wmsgs = b

recommendations.update(a_recs)
Expand Down Expand Up @@ -2070,7 +2102,7 @@ def _transition(self, key, finish: str, *args, **kwargs):

start = "released"
else:
raise RuntimeError("Impossible transition from %r to %r" % start_finish)
raise RuntimeError(f"Impossible transition from {start} to {finish}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would change the logic a little, but would make sense to me to have _transition_dispatch raise this error if none of the cases match.


finish2 = ts._state
self.transition_log.append((key, start, finish2, recommendations, time()))
Expand All @@ -2091,7 +2123,16 @@ def _transition(self, key, finish: str, *args, **kwargs):
parent._tasks[ts._key] = ts
for plugin in list(self.plugins):
try:
plugin.transition(key, start, finish2, *args, **kwargs)
plugin.transition(
key,
start,
finish2,
*args,
status=status,
thread=thread,
metadata=metadata,
**kwargs,
)
except Exception:
logger.info("Plugin failed with exception", exc_info=True)
if ts._state == "forgotten":
Expand Down Expand Up @@ -2158,7 +2199,9 @@ def _transitions(self, recommendations: dict, client_msgs: dict, worker_msgs: di
for key in keys:
self.validate_key(key)

def transition_released_waiting(self, key):
@ccall
@exceptval(check=False)
def transition_released_waiting(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2213,7 +2256,9 @@ def transition_released_waiting(self, key):
pdb.set_trace()
raise

def transition_no_worker_waiting(self, key):
@ccall
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why all ccall rather than cfunc - can they be called from anywhere but _transition?

@exceptval(check=False)
def transition_no_worker_waiting(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2336,7 +2381,9 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState) -> double:
ws._processing[ts] = total_duration
return total_duration

def transition_waiting_processing(self, key):
@ccall
@exceptval(check=False)
def transition_waiting_processing(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2383,9 +2430,11 @@ def transition_waiting_processing(self, key):
pdb.set_trace()
raise

@ccall
@exceptval(check=False)
def transition_waiting_memory(
self, key, nbytes=None, type=None, typename: str = None, worker=None, **kwargs
):
self, key, nbytes=None, type=None, typename: str = None, worker=None
) -> tuple:
try:
ws: WorkerState = self._workers_dv[worker]
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2423,27 +2472,28 @@ def transition_waiting_memory(
pdb.set_trace()
raise

@ccall
@exceptval(check=False)
def transition_processing_memory(
self,
key,
nbytes=None,
type=None,
typename: str = None,
worker=None,
worker: str = None,
startstops=None,
**kwargs,
):
) -> tuple:
ws: WorkerState
wws: WorkerState
recommendations: dict = {}
client_msgs: dict = {}
worker_msgs: dict = {}
try:
ts: TaskState = self._tasks[key]
assert worker
assert isinstance(worker, str)

if self._validate:
assert worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good move

assert isinstance(worker, str)
assert ts._processing_on
ws = ts._processing_on
assert ts in ws._processing
Expand Down Expand Up @@ -2544,7 +2594,9 @@ def transition_processing_memory(
pdb.set_trace()
raise

def transition_memory_released(self, key, safe: bint = False):
@ccall
@exceptval(check=False)
def transition_memory_released(self, key, safe: bint = False) -> tuple:
ws: WorkerState
try:
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2619,7 +2671,9 @@ def transition_memory_released(self, key, safe: bint = False):
pdb.set_trace()
raise

def transition_released_erred(self, key):
@ccall
@exceptval(check=False)
def transition_released_erred(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2664,7 +2718,9 @@ def transition_released_erred(self, key):
pdb.set_trace()
raise

def transition_erred_released(self, key):
@ccall
@exceptval(check=False)
def transition_erred_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2704,7 +2760,9 @@ def transition_erred_released(self, key):
pdb.set_trace()
raise

def transition_waiting_released(self, key):
@ccall
@exceptval(check=False)
def transition_waiting_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
recommendations: dict = {}
Expand Down Expand Up @@ -2741,7 +2799,9 @@ def transition_waiting_released(self, key):
pdb.set_trace()
raise

def transition_processing_released(self, key):
@ccall
@exceptval(check=False)
def transition_processing_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2786,9 +2846,18 @@ def transition_processing_released(self, key):
pdb.set_trace()
raise

@ccall
@exceptval(check=False)
def transition_processing_erred(
self, key, cause=None, exception=None, traceback=None, **kwargs
):
self,
key,
cause=None,
exception=None,
traceback=None,
worker=None,
text=None,
startstops=None,
) -> tuple:
ws: WorkerState
try:
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2864,7 +2933,9 @@ def transition_processing_erred(
pdb.set_trace()
raise

def transition_no_worker_released(self, key):
@ccall
@exceptval(check=False)
def transition_no_worker_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2907,7 +2978,9 @@ def remove_key(self, key):
ts._exception_blame = ts._exception = ts._traceback = None
self._task_metadata.pop(key, None)

def transition_memory_forgotten(self, key):
@ccall
@exceptval(check=False)
def transition_memory_forgotten(self, key) -> tuple:
ws: WorkerState
try:
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2949,7 +3022,9 @@ def transition_memory_forgotten(self, key):
pdb.set_trace()
raise

def transition_released_forgotten(self, key):
@ccall
@exceptval(check=False)
def transition_released_forgotten(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
recommendations: dict = {}
Expand Down