Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add C APIs for transition_* functions #4650

Closed
wants to merge 32 commits into from
Closed
Changes from 10 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9ddf362
Add C APIs for all `transition_*` functions
jakirkham Mar 30, 2021
52450c1
Drop `**kwargs` from `transition_*`
jakirkham Mar 30, 2021
2f077b9
Extract `status` in `_transition`
jakirkham Mar 30, 2021
aedd565
Extract `thread` in `_transition`
jakirkham Mar 30, 2021
9f554ee
Extract `metadata` in `_transition`
jakirkham Mar 30, 2021
dc40037
Extract `worker` in `transition_processing_erred`
jakirkham Mar 30, 2021
b6a67fe
Extract `startstops` in `transition_processing_erred`
jakirkham Mar 30, 2021
333ec79
Extract `text` in `transition_processing_erred`
jakirkham Mar 30, 2021
934996f
Move `transition_processing_erred` kwargs
jakirkham Apr 10, 2021
015af69
Pass excess kwargs to plugins
jakirkham Apr 10, 2021
85b729f
Skip extracting `worker` argument
jakirkham Apr 13, 2021
f2c3e34
Add `worker` arg to `transition_processing_erred`
jakirkham Apr 13, 2021
5eeffde
Pass `startstops` through
jakirkham Apr 13, 2021
fea65f7
Set `startstops` only if the key is in data
jakirkham Apr 13, 2021
4766207
`worker: str` in `transition_processing_memory`
jakirkham Apr 13, 2021
6712923
Move other `assert`s under `self._validate`
jakirkham Apr 13, 2021
45b5c94
Drop unneeded `worker` argument
jakirkham Apr 13, 2021
ab951c6
Pick up `text` in `transition_processing_erred`
jakirkham Apr 13, 2021
fbdf587
Pack arguments back into `kwargs`
jakirkham Apr 13, 2021
d15d82e
Revert "Pack arguments back into `kwargs`"
jakirkham Apr 13, 2021
33fc4af
Keep passing `startstops` in error case
jakirkham Apr 13, 2021
653f53d
Readd `worker` argument
jakirkham Apr 14, 2021
feb5285
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 14, 2021
1d3025f
Grab `func` for `finish` case only if used
jakirkham Apr 14, 2021
7ae22ad
Add & use `_transition_dispatch` function
jakirkham Apr 14, 2021
1c55db6
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 14, 2021
08ed07f
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 15, 2021
eec1849
Disable flake8 on Cython `intern` call
jakirkham Apr 15, 2021
6c92b6c
Flatten dispatch table to cutdown on overhead
jakirkham Apr 15, 2021
57a949b
Use `_` instead of `-` in `str`s
jakirkham Apr 15, 2021
328d7fc
Merge dask/main into jakirkham/cy_transitions
jakirkham Apr 26, 2021
bb3e5cf
Merge dask/main into jakirkham/cy_transitions
jakirkham May 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 74 additions & 19 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1814,7 +1814,19 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState:
# State Transitions #
#####################

def _transition(self, key, finish: str, *args, **kwargs):
def _transition(
self,
key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't key have a type?

finish: str,
*args,
status: str = None,
thread: Py_ssize_t = -1,
metadata: dict = None,
worker=None,
startstops=None,
text=None,
**kwargs,
):
"""Transition a key from its current state to the finish state

Examples
Expand Down Expand Up @@ -1931,7 +1943,19 @@ def _transition(self, key, finish: str, *args, **kwargs):
parent._tasks[ts._key] = ts
for plugin in list(self.plugins):
try:
plugin.transition(key, start, finish2, *args, **kwargs)
plugin.transition(
key,
start,
finish2,
*args,
status=status,
thread=thread,
metadata=metadata,
worker=worker,
startstops=startstops,
text=text,
**kwargs,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a dealbreaker but that could be perceived as a breaking change, couldn't it? At least this is a subtle change in the required signature. So far we document this as *args, **kwargs, see https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.SchedulerPlugin.transition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. These are already passed as keyword arguments and we are continuing to pass them that way

That said, Idk if this is how things will shake out yet. Things are still changing here. IOW I'm not sure this is worth reviewing yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what John said. I think that this is a no-op if you consider the extraction of these from kwargs above.

except Exception:
logger.info("Plugin failed with exception", exc_info=True)
if ts._state == "forgotten":
Expand Down Expand Up @@ -1998,7 +2022,9 @@ def _transitions(self, recommendations: dict, client_msgs: dict, worker_msgs: di
for key in keys:
self.validate_key(key)

def transition_released_waiting(self, key):
@ccall
@exceptval(check=False)
def transition_released_waiting(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2053,7 +2079,9 @@ def transition_released_waiting(self, key):
pdb.set_trace()
raise

def transition_no_worker_waiting(self, key):
@ccall
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why all ccall rather than cfunc - can they be called from anywhere but _transition?

@exceptval(check=False)
def transition_no_worker_waiting(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2163,7 +2191,9 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState) -> double:
ws._processing[ts] = total_duration
return total_duration

def transition_waiting_processing(self, key):
@ccall
@exceptval(check=False)
def transition_waiting_processing(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2210,9 +2240,11 @@ def transition_waiting_processing(self, key):
pdb.set_trace()
raise

@ccall
@exceptval(check=False)
def transition_waiting_memory(
self, key, nbytes=None, type=None, typename: str = None, worker=None, **kwargs
):
self, key, nbytes=None, type=None, typename: str = None, worker=None
) -> tuple:
try:
ws: WorkerState = self._workers_dv[worker]
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2250,6 +2282,8 @@ def transition_waiting_memory(
pdb.set_trace()
raise

@ccall
@exceptval(check=False)
def transition_processing_memory(
self,
key,
Expand All @@ -2258,8 +2292,7 @@ def transition_processing_memory(
typename: str = None,
worker=None,
startstops=None,
**kwargs,
):
) -> tuple:
ws: WorkerState
wws: WorkerState
recommendations: dict = {}
Expand Down Expand Up @@ -2367,7 +2400,9 @@ def transition_processing_memory(
pdb.set_trace()
raise

def transition_memory_released(self, key, safe: bint = False):
@ccall
@exceptval(check=False)
def transition_memory_released(self, key, safe: bint = False) -> tuple:
ws: WorkerState
try:
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2442,7 +2477,9 @@ def transition_memory_released(self, key, safe: bint = False):
pdb.set_trace()
raise

def transition_released_erred(self, key):
@ccall
@exceptval(check=False)
def transition_released_erred(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2487,7 +2524,9 @@ def transition_released_erred(self, key):
pdb.set_trace()
raise

def transition_erred_released(self, key):
@ccall
@exceptval(check=False)
def transition_erred_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2527,7 +2566,9 @@ def transition_erred_released(self, key):
pdb.set_trace()
raise

def transition_waiting_released(self, key):
@ccall
@exceptval(check=False)
def transition_waiting_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
recommendations: dict = {}
Expand Down Expand Up @@ -2565,7 +2606,9 @@ def transition_waiting_released(self, key):
pdb.set_trace()
raise

def transition_processing_released(self, key):
@ccall
@exceptval(check=False)
def transition_processing_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2611,9 +2654,15 @@ def transition_processing_released(self, key):
pdb.set_trace()
raise

@ccall
@exceptval(check=False)
def transition_processing_erred(
self, key, cause=None, exception=None, traceback=None, **kwargs
):
self,
key,
cause=None,
exception=None,
traceback=None,
) -> tuple:
ws: WorkerState
try:
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2690,7 +2739,9 @@ def transition_processing_erred(
pdb.set_trace()
raise

def transition_no_worker_released(self, key):
@ccall
@exceptval(check=False)
def transition_no_worker_released(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
dts: TaskState
Expand Down Expand Up @@ -2733,7 +2784,9 @@ def remove_key(self, key):
ts._exception_blame = ts._exception = ts._traceback = None
self._task_metadata.pop(key, None)

def transition_memory_forgotten(self, key):
@ccall
@exceptval(check=False)
def transition_memory_forgotten(self, key) -> tuple:
ws: WorkerState
try:
ts: TaskState = self._tasks[key]
Expand Down Expand Up @@ -2775,7 +2828,9 @@ def transition_memory_forgotten(self, key):
pdb.set_trace()
raise

def transition_released_forgotten(self, key):
@ccall
@exceptval(check=False)
def transition_released_forgotten(self, key) -> tuple:
try:
ts: TaskState = self._tasks[key]
recommendations: dict = {}
Expand Down