Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RetryHandlerSkeleton #152

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class NonRetryableExc(RuntimeError):
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
"""
Class represents a non-retryable error
"""

def __init__(self, *args:object) --> None:
super().__init__(*args)
7 changes: 7 additions & 0 deletions deltacat/utils/ray_utils/RetryHandler/RetryableExc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class RetryableEXc(RuntimeError):
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
"""
class for errors that can be retried
"""

def __init__(self, *args: object) --> None:
super().__init__(*args)
8 changes: 8 additions & 0 deletions deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Class TaskInfoObject:
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, task_callable: Callable, task_input, num_retries: int, retry_delay: int): #what inputs do I need here
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
self.task_callable = task_callable
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
self.task_input = task_input
#self.remote_task_options = ray_remote_task_options
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
self.num_retries = num_retries
self.retry_delay = retry_delay
self.attempt_count = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is attempt_count here?

63 changes: 63 additions & 0 deletions deltacat/utils/ray_utils/RetryHandler/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import ray
import time
import logging
from typing import List, Callable
from ray.types import ObjectRef
from RetryExceptions.retryable_exception import RetryableException
from RetryExceptions.non_retryable_exception import NonRetryableException
from RetryExceptions.TaskInfoObject import TaskInfoObject

#inputs: task_callable, task_input, ray_remote_task_options, exception_retry_strategy_configs
#include a seperate class for errors: break down into retryable and non-retryable
#seperate class to put info in a way that the retry class can handle: ray retry task info

#This is what specifically retries a single task
@ray.remote
def submit_single_task(taskObj: TaskInfoObject) -> Any:
try:
taskObj.attempt_count += 1
curr_attempt = taskObj.attempt_count
return tackObj.task_callable(taskObj.task_input)
except (Exception) as exception:
# if exception is detected we want to figure out how to handle it
#pass to a new method that handles exception strategy
#retry_strat = ...exception_retry_strategy_configs
retry_config = get_retry_strategy() #need to come up with fields needed for this
if retry_config is not None:
return the exception that retry_config detected



class RetryHandler:
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
#given a list of tasks that are failing, we want to classify the error messages and redirect the task
#depending on the exception type using a wrapper
#wrapper function that before execution, checks what exception is being thrown and go to second method to
#commence retrying
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved

def get_task_results(self, num_of_results: int) -> List[Any]:
#implement wrapper here that before execution will try catch an exception
#get what tasks we need to run our execution on
finished, unfinished = ray.wait(unfinished, num_of_results)
#assuming we have the tasks we want to get results of
for finished in finished:
finished_result = None
try:
finished_result = ray.get(finished)
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
except (Exception) as exception:
#if exception send to method handle_ray_exception to determine what to do and assign the corresp error
finished_result = #evaluate the exception and return the error

if finished_result == RetryableException:
#feed into submit_single_task
else:
#Non-retryable Exception


def handle_ray_exception(self, exception: Exception, TaskInfo: TaskInfoObject) -> Error:
#will compare the exception with known exceptions and determine way to handle it based off that
#if RayOOM Error then: raise that error
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved

def get_retry_strategy(self, exception: Exception, TaskInfo: TaskInfoObject) --> Any:
ekaschaw marked this conversation as resolved.
Show resolved Hide resolved
"""
Given the exception and task info it will check what retry to execute
"""