Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Open FlyteFile from remote path #2991

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions flytekit/types/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,45 @@ def __init__(
self._remote_source: typing.Optional[str] = None

def __fspath__(self):
# This is where a delayed downloading of the file will happen
"""
Define the file path protocol for opening FlyteFile with the context manager,
following show two common use cases:

1. Directly open a FlyteFile with a local path:

ff = FlyteFile(path=local_path)
with open(ff, "r") as f:
# Read your local file here
# ...

There's no need to handle downloading of the file because it's on the local file system.
In this case, a dummy downloading will be done.

2. Directly open a FlyteFile with a remote path:

ff = FlyteFile(path=remote_path)
with open(ff, "r") as f:
# Read your remote file here
# ...

We now support directly opening a FlyteFile with a file from the remote data storage.
In this case, a delayed downloading of the remote file will be done.
For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/6090.
"""
ctx = FlyteContextManager.current_context()

if ctx.file_access.is_remote(self.path) and self._remote_source is None:
# Setup remote file source and local file destination
self._remote_source = self.path
local_path = ctx.file_access.get_random_local_path(self._remote_source)
self._downloader = lambda: FlyteFilePathTransformer.downloader(
remote_path=self._remote_source, local_path=local_path
)
self.path = local_path
Comment on lines +330 to +337
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to move these code to __init__ ?


if not self._downloaded:
# Download data from remote to local or
# run dummy downloading for input local path
self._downloader()
self._downloaded = True
return self.path
Expand Down Expand Up @@ -693,16 +730,25 @@ async def async_to_python_value(

# For the remote case, return an FlyteFile object that can download
local_path = ctx.file_access.get_random_local_path(uri)

def _downloader():
return ctx.file_access.get_data(uri, local_path, is_multipart=False)

expected_format = FlyteFilePathTransformer.get_format(expected_python_type)
ff = FlyteFile.__class_getitem__(expected_format)(local_path, _downloader)
ff = FlyteFile.__class_getitem__(expected_format)(
path=local_path, downloader=lambda: self.downloader(remote_path=uri, local_path=local_path)
)
ff._remote_source = uri

return ff

@staticmethod
def downloader(remote_path: str, local_path: str) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the context that we pass to async_to_python_value?

Suggested change
def downloader(remote_path: str, local_path: str) -> None:
def downloader(ctx: FlyteContext, remote_path: str, local_path: str) -> None:

"""
Download data from remote_path to local_path.

We design the downloader as a static method because its behavior is logically
related to this class but don't need to interact with class or instance data.
"""
ctx = FlyteContextManager.current_context()
ctx.file_access.get_data(remote_path, local_path, is_multipart=False)

def guess_python_type(self, literal_type: LiteralType) -> typing.Type[FlyteFile[typing.Any]]:
if (
literal_type.blob is not None
Expand Down
98 changes: 98 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from flytekit.types.schema import FlyteSchema
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient
from flytekit.configuration import PlatformConfig
from botocore.client import BaseClient

MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic"
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml"))
Expand Down Expand Up @@ -804,3 +805,100 @@ def test_get_control_plane_version():
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True))
version = client.get_control_plane_version()
assert version == "unknown" or version.startswith("v")


class SimpleFileTransfer:
"""Utilities for file transfer to minio s3 bucket.

Mainly support single file uploading and automatic teardown.
"""

def __init__(self) -> None:
self._remote = FlyteRemote(
config=Config.auto(config_file=CONFIG),
default_project=PROJECT,
default_domain=DOMAIN
)
self._s3_client = self._get_minio_s3_client(self._remote)

def _get_minio_s3_client(self, remote: FlyteRemote) -> BaseClient:
"""Creat a botocore client."""
minio_s3_config = remote.file_access.data_config.s3
sess = botocore.session.get_session()

return sess.create_client(
"s3",
endpoint_url=minio_s3_config.endpoint,
aws_access_key_id=minio_s3_config.access_key_id,
aws_secret_access_key=minio_s3_config.secret_access_key,
)

def upload_file(self, file_type: str) -> str:
"""Upload a single file to minio s3 bucket.

Args:
file_type: File type. Support "txt" and "json".

Return:
remote_file_path: Remote file path.
"""
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file_path = self._dump_tmp_file(file_type, tmp_dir)

# Upload to minio s3 bucket
_, remote_file_path = self._remote.upload_file(
to_upload=tmp_file_path,
project=PROJECT,
domain=DOMAIN,
)

return remote_file_path

def _dump_tmp_file(self, file_type: str, tmp_dir: str) -> str:
"""Generate and dump a temporary file locally."""
if file_type == "txt":
tmp_file_path = pathlib.Path(tmp_dir) / "test.txt"
with open(tmp_file_path, "w") as f:
f.write("Hello World!")
elif file_type == "json":
d = {"name": "jiawei", "height": 171}
tmp_file_path = pathlib.Path(tmp_dir) / "test.json"
with open(tmp_file_path, "w") as f:
json.dump(d, f)

return tmp_file_path

def delete_file(self, bucket: str, key: str) -> None:
"""Delete the remote file from minio s3 bucket to free the space.

Args:
bucket: s3 bucket name.
key: Key name of the object.
"""
res = self._s3_client.delete_object(Bucket=bucket, Key=key)
assert res["ResponseMetadata"]["HTTPStatusCode"] == 204


def test_open_ff():
"""Test opening FlyteFile from a remote path."""
# Set environment variables for interacting with minio
os.environ["AWS_ENDPOINT_URL"] = "http://localhost:30002"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"

# Upload a file to minio s3 bucket
file_transfer = SimpleFileTransfer()
remote_file_path = file_transfer.upload_file(file_type="json")
print(remote_file_path)

execution_id = run("flytefile.py", "wf", "--remote_file_path", remote_file_path)
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
execution = remote.fetch_execution(name=execution_id)
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
print("Execution Error:", execution.error)
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}"

# Delete the remote file to free the space
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)
29 changes: 29 additions & 0 deletions tests/flytekit/integration/remote/workflows/basic/flytefile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from flytekit import task, workflow
from flytekit.types.file import FlyteFile


@task
def open_ff_from_remote(remote_file_path: str) -> FlyteFile:
"""Open FlyteFile from a remote file path.

Args:
remote_file_path: Remote file path.

Returns:
ff: FlyteFile object.
"""
ff = FlyteFile(path=remote_file_path)
with open(ff, "r") as f:
content = f.read()
print(f"FILE CONTENT | {content}")

return ff


@workflow
def wf(remote_file_path: str) -> None:
remote_ff = open_ff_from_remote(remote_file_path=remote_file_path)


if __name__ == "__main__":
wf()
Loading