Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]fix s3 slow down not being handled correctly #1795

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion metaflow/plugins/datatools/s3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ def _one_boto_op(self, op, url, create_tmp_file=True):
if S3_RETRY_COUNT > 0:
self._jitter_sleep(i)
raise MetaflowS3Exception(
"S3 operation failed.\n" "Key requested: %s\n" "Error: %s" % (url, error)
"S3 operation failed.\n" "Key requested: %s\n" "Error: %s\n" "Error Code: %d" % (url, error, error_code)
)

# add some jitter to make sure retries are not synchronized
Expand Down Expand Up @@ -1558,6 +1558,7 @@ def _update_out_lines(out_lines, ok_lines, resize=False):
# position as if transient retries did not exist. This
# makes sure that order is respected even in the presence of
# transient retries.
print("This is the idx %s" % idx.decode(encoding="utf-8"))
out_lines[int(idx.decode(encoding="utf-8"))] = rest

def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures):
Expand Down
19 changes: 16 additions & 3 deletions metaflow/plugins/datatools/s3/s3op.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def format_result_line(idx, prefix, url="", local=""):
# with boto errors. This function can be replaced
# with better error handling code.
def normalize_client_error(err):
print("err before normalize:\n", err)
print("err code:\n", err.response["Error"]["Code"])
error_code = err.response["Error"]["Code"]
try:
return int(error_code)
Expand Down Expand Up @@ -148,6 +150,7 @@ def normalize_client_error(err):
"RequestThrottled",
"EC2ThrottledException",
):
print("yesssss! error code recognize")
return 503
return error_code

Expand Down Expand Up @@ -267,6 +270,10 @@ def op_info(url):
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
continue
else:
print(
"s3op download failed with error code :\n%d"
% error_code
)
raise
# TODO specific error message for out of disk space
# If we need the metadata, get it and write it out
Expand Down Expand Up @@ -332,11 +339,15 @@ def op_info(url):
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
continue
else:
print(
"s3op upload failed with error code :\n%d"
% error_code
)
raise
except:
traceback.print_exc()
sys.exit(ERROR_WORKER_EXCEPTION)

# sys.exit(ERROR_WORKER_EXCEPTION)
sys.exit(error_code)

def start_workers(mode, urls, num_workers, inject_failure, s3config):
# We start the minimum of len(urls) or num_workers to avoid starting
Expand Down Expand Up @@ -486,6 +497,7 @@ def get_info(self, url):
return False, url, ERROR_URL_ACCESS_DENIED
# Transient errors are going to be retried by the aws_retry decorator
else:
print("s3op get_info failed with error code :\n%d" % error_code)
raise

@aws_retry
Expand Down Expand Up @@ -534,6 +546,7 @@ def list_prefix(self, prefix_url, delimiter=""):
return False, prefix_url, ERROR_URL_ACCESS_DENIED
# Transient errors are going to be retried by the aws_retry decorator
else:
print("s3op list_prefix failed with error code :\n%d" % error_code)
raise


Expand Down Expand Up @@ -574,7 +587,7 @@ def exit(exit_code, url):
elif exit_code == ERROR_TRANSIENT:
msg = "Transient error for url: %s" % url
else:
msg = "Unknown error"
msg = "Unknown error, exit code: %s" % exit_code
print("s3op failed:\n%s" % msg, file=sys.stderr)
sys.exit(exit_code)

Expand Down