Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Fix bug when propagating DAG application exceptions #45237

Merged
merged 6 commits into from
May 14, 2024

Conversation

stephanie-wang
Copy link
Contributor

Why are these changes needed?

If a task in the DAG raised an application-level exception, we would re-raise correctly if it was read directly by the driver, but not if it was read by another actor in the DAG. This PR fixes the issue by writing the exception to the next actor.

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
@@ -251,7 +251,7 @@ def get_actor_id(self) -> Optional[str]:
"""
# only worker mode has actor_id
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
logger.debug(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also changed this to DEBUG because it's very spammy when the driver calls this method. It seems fine to me if the driver calls it, not sure why this should be a warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks!

Signed-off-by: Stephanie Wang <[email protected]>
@@ -251,7 +251,7 @@ def get_actor_id(self) -> Optional[str]:
"""
# only worker mode has actor_id
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
logger.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks!

return True
except Exception as exc:
# Previous task raised an application-level exception.
# Propagate it and skip the actual task.
output_writer.write(exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this not need _wrap_exception()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is because the exception has already been wrapped by the original task that errored. Will add a comment!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. It's wrapped in a RayTaskError, which is itself an exception. So exc is an instance of RayTaskError here.

self.count = 0

def _fail_if_needed(self):
if self.fail_after and self.count > self.fail_after:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor preference: self.count >= self.fail_after (because self.count starts at 0)

output_channels.end_read()

with pytest.raises(RuntimeError):
for i in range(99):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why loop 99 times? Shouldn't this already throw an exception on the first iteration because fail_after is set to 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah the failures are randomized.

except ValueError as exc:
# ValueError is raised if a type hint was set and the returned
# type did not match the hint.
except IOError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: should we also string match the error message, or is this certain IOError == channel closed in this case? (it could be wrong if we have different close API?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm true, we should probably introduce a different Ray system error instead of using IOError. I'll add an issue to track this.

@stephanie-wang stephanie-wang merged commit a3439ad into ray-project:master May 14, 2024
6 checks passed
@stephanie-wang stephanie-wang deleted the dag-errors branch May 14, 2024 17:56
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…ons (ray-project#45237)

If a task in the DAG raised an application-level exception, we would
re-raise correctly if it was read directly by the driver, but not if it
was read by another actor in the DAG. This PR fixes the issue by writing
the exception to the next actor.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 6, 2024
…ons (ray-project#45237)

If a task in the DAG raised an application-level exception, we would
re-raise correctly if it was read directly by the driver, but not if it
was read by another actor in the DAG. This PR fixes the issue by writing
the exception to the next actor.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants