-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add C APIs for transition_*
functions
#4650
Changes from 10 commits
9ddf362
52450c1
2f077b9
aedd565
9f554ee
dc40037
b6a67fe
333ec79
934996f
015af69
85b729f
f2c3e34
5eeffde
fea65f7
4766207
6712923
45b5c94
ab951c6
fbdf587
d15d82e
33fc4af
653f53d
feb5285
1d3025f
7ae22ad
1c55db6
08ed07f
eec1849
6c92b6c
57a949b
328d7fc
bb3e5cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1814,7 +1814,19 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState: | |
# State Transitions # | ||
##################### | ||
|
||
def _transition(self, key, finish: str, *args, **kwargs): | ||
def _transition( | ||
self, | ||
key, | ||
finish: str, | ||
*args, | ||
status: str = None, | ||
thread: Py_ssize_t = -1, | ||
metadata: dict = None, | ||
worker=None, | ||
startstops=None, | ||
text=None, | ||
**kwargs, | ||
): | ||
"""Transition a key from its current state to the finish state | ||
|
||
Examples | ||
|
@@ -1931,7 +1943,19 @@ def _transition(self, key, finish: str, *args, **kwargs): | |
parent._tasks[ts._key] = ts | ||
for plugin in list(self.plugins): | ||
try: | ||
plugin.transition(key, start, finish2, *args, **kwargs) | ||
plugin.transition( | ||
key, | ||
start, | ||
finish2, | ||
*args, | ||
status=status, | ||
thread=thread, | ||
metadata=metadata, | ||
worker=worker, | ||
startstops=startstops, | ||
text=text, | ||
**kwargs, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is a dealbreaker but that could be perceived as a breaking change, couldn't it? At least this is a subtle change in the required signature. So far we document this as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. These are already passed as keyword arguments and we are continuing to pass them that way That said, Idk if this is how things will shake out yet. Things are still changing here. IOW I'm not sure this is worth reviewing yet There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to what John said. I think that this is a no-op if you consider the extraction of these from kwargs above. |
||
except Exception: | ||
logger.info("Plugin failed with exception", exc_info=True) | ||
if ts._state == "forgotten": | ||
|
@@ -1998,7 +2022,9 @@ def _transitions(self, recommendations: dict, client_msgs: dict, worker_msgs: di | |
for key in keys: | ||
self.validate_key(key) | ||
|
||
def transition_released_waiting(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_released_waiting(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2053,7 +2079,9 @@ def transition_released_waiting(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_no_worker_waiting(self, key): | ||
@ccall | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why all |
||
@exceptval(check=False) | ||
def transition_no_worker_waiting(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2163,7 +2191,9 @@ def set_duration_estimate(self, ts: TaskState, ws: WorkerState) -> double: | |
ws._processing[ts] = total_duration | ||
return total_duration | ||
|
||
def transition_waiting_processing(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_waiting_processing(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2210,9 +2240,11 @@ def transition_waiting_processing(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
@ccall | ||
@exceptval(check=False) | ||
def transition_waiting_memory( | ||
self, key, nbytes=None, type=None, typename: str = None, worker=None, **kwargs | ||
): | ||
self, key, nbytes=None, type=None, typename: str = None, worker=None | ||
) -> tuple: | ||
try: | ||
ws: WorkerState = self._workers_dv[worker] | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2250,6 +2282,8 @@ def transition_waiting_memory( | |
pdb.set_trace() | ||
raise | ||
|
||
@ccall | ||
@exceptval(check=False) | ||
def transition_processing_memory( | ||
self, | ||
key, | ||
|
@@ -2258,8 +2292,7 @@ def transition_processing_memory( | |
typename: str = None, | ||
worker=None, | ||
startstops=None, | ||
**kwargs, | ||
): | ||
) -> tuple: | ||
ws: WorkerState | ||
wws: WorkerState | ||
recommendations: dict = {} | ||
|
@@ -2367,7 +2400,9 @@ def transition_processing_memory( | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_memory_released(self, key, safe: bint = False): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_memory_released(self, key, safe: bint = False) -> tuple: | ||
ws: WorkerState | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2442,7 +2477,9 @@ def transition_memory_released(self, key, safe: bint = False): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_released_erred(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_released_erred(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2487,7 +2524,9 @@ def transition_released_erred(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_erred_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_erred_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2527,7 +2566,9 @@ def transition_erred_released(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_waiting_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_waiting_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
recommendations: dict = {} | ||
|
@@ -2565,7 +2606,9 @@ def transition_waiting_released(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_processing_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_processing_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2611,9 +2654,15 @@ def transition_processing_released(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
@ccall | ||
@exceptval(check=False) | ||
def transition_processing_erred( | ||
self, key, cause=None, exception=None, traceback=None, **kwargs | ||
): | ||
self, | ||
key, | ||
cause=None, | ||
exception=None, | ||
traceback=None, | ||
) -> tuple: | ||
ws: WorkerState | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2690,7 +2739,9 @@ def transition_processing_erred( | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_no_worker_released(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_no_worker_released(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
dts: TaskState | ||
|
@@ -2733,7 +2784,9 @@ def remove_key(self, key): | |
ts._exception_blame = ts._exception = ts._traceback = None | ||
self._task_metadata.pop(key, None) | ||
|
||
def transition_memory_forgotten(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_memory_forgotten(self, key) -> tuple: | ||
ws: WorkerState | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
|
@@ -2775,7 +2828,9 @@ def transition_memory_forgotten(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def transition_released_forgotten(self, key): | ||
@ccall | ||
@exceptval(check=False) | ||
def transition_released_forgotten(self, key) -> tuple: | ||
try: | ||
ts: TaskState = self._tasks[key] | ||
recommendations: dict = {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't
key
have a type?