Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytekit][2][untyped dict] Binary IDL With MessagePack #2757

Closed
wants to merge 10 commits into from

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Sep 18, 2024

Tracking issue

flyteorg/flyte#5318

Why are the changes needed?

We want to support untyped dict with 100% type correct, turn it to msgpack bytes can ensure 100% type correct.

What changes were proposed in this pull request?

  1. change the method dict_to_generic_literal to dict_to_binary_literal
  2. add the method self.from_binary_idl into to_python_val
  3. fix related tests
  4. add test_untyped_dict to test 8 cases of untyped dict

How was this patch tested?

unit tests, local execution and remote execution.

Setup process

from flytekit import task, workflow, ImageSpec

flytekit_hash = "c05a905b241fa825b1a8d78d57aa3e4b9d27becd"

flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

image = ImageSpec(
    packages=[flytekit],
    apt_packages=["git"],
    registry="localhost:30000",
)

@task(container_image=image)
def dict_task(input: dict) -> dict:
    return input


# Generate more complex dict inputs with lists and dicts as keys and values
dict_inputs = [
    # Basic key-value combinations with int, str, bool, float
    {1: "a", "key": 2.5, True: False, 3.14: 100},
    {"a": 1, 2: "b", 3.5: True, False: 3.1415},

    # Lists as values, mixed types
    {
        1: [1, "a", 2.5, False],
        "key_list": ["str", 3.14, True, 42],
        True: [False, 2.718, "test"],
    },

    # Nested dicts with basic types
    {
        "nested_dict": {1: 2, "key": "value", True: 3.14, False: "string"},
        3.14: {"pi": 3.14, "e": 2.718, 42: True},
    },

    # Nested lists and dicts as values
    {
        "list_in_dict": [
            {"inner_dict_1": [1, 2.5, "a"], "inner_dict_2": [True, False, 3.14]},
            [1, 2, 3, {"nested_list_dict": [False, "test"]}],
        ]
    },

    # More complex nested structures
    {
        "complex_nested": {
            1: {"nested_dict": {True: [1, "a", 2.5]}},  # Nested dict with list as value
            "string_key": {False: {3.14: {"deep": [1, "deep_value"]}}},  # Deep nesting
        }
    },

    # Dict with list as keys (not typical, but valid in Python if list is hashable, here used only as values)
    {
        "list_of_dicts": [{"a": 1, "b": 2}, {"key1": "value1", "key2": "value2"}],
        10: [{"nested_list": [1, "value", 3.14]}, {"another_list": [True, False]}],
    },

    # More nested combinations of list and dict
    {
        "outer_list": [
            [1, 2, 3],
            {"inner_dict": {"key1": [True, "string", 3.14], "key2": [1, 2.5]}},  # Dict inside list
        ],
        "another_dict": {"key1": {"subkey": [1, 2, "str"]}, "key2": [False, 3.14, "test"]},
    },
]



if __name__ == "__main__":
    import os
    import json
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = os.path.realpath(__file__)
    for i, input_dict in enumerate(dict_inputs):
        print(f"\n=== Running test {i + 1} ===")
        json_input = json.dumps(input_dict)  # Convert input to JSON string
        result = runner.invoke(pyflyte.main,
                               ["run",
                                path,
                                "dict_task",
                                "--input",
                                json_input])
        print(f"Local Execution {i}: ", result.output)

    for i, input_dict in enumerate(dict_inputs):
        print(f"\n=== Running test {i + 1} ===")
        json_input = json.dumps(input_dict)  # Convert input to JSON string
        result = runner.invoke(pyflyte.main,
                               ["run",
                                "--remote",
                                path,
                                "dict_task",
                                "--input",
                                json_input])
        print("Remote Execution: ", result.output)

Screenshots

There are 8 test cases.

image
/Users/future-outlier/miniconda3/envs/dev/bin/python /Users/future-outlier/code/dev/flytekit/build/PR/JSON/stacked_PRs/untyped_dict_list_nested.py 

=== Running test 1 ===
Local Execution 0:  Running Execution on local.
{'1': False, 'key': 2.5, '3.14': 100}


=== Running test 2 ===
Local Execution 1:  Running Execution on local.
{'a': 1, '2': 'b', '3.5': True, 'false': 3.1415}


=== Running test 3 ===
Local Execution 2:  Running Execution on local.
{'1': [False, 2.718, 'test'], 'key_list': ['str', 3.14, True, 42]}


=== Running test 4 ===
Local Execution 3:  Running Execution on local.
{'nested_dict': {'1': 3.14, 'key': 'value', 'false': 'string'}, '3.14': {'pi': 3.14, 'e': 2.718, '42': True}}


=== Running test 5 ===
Local Execution 4:  Running Execution on local.
{'list_in_dict': [{'inner_dict_1': [1, 2.5, 'a'], 'inner_dict_2': [True, False, 3.14]}, [1, 2, 3, {'nested_list_dict': [False, 'test']}]]}


=== Running test 6 ===
Local Execution 5:  Running Execution on local.
{'complex_nested': {'1': {'nested_dict': {'true': [1, 'a', 2.5]}}, 'string_key': {'false': {'3.14': {'deep': [1, 'deep_value']}}}}}


=== Running test 7 ===
Local Execution 6:  Running Execution on local.
{'list_of_dicts': [{'a': 1, 'b': 2}, {'key1': 'value1', 'key2': 'value2'}], '10': [{'nested_list': [1, 'value', 3.14]}, {'another_list': [True, False]}]}


=== Running test 8 ===
Local Execution 7:  Running Execution on local.
{'outer_list': [[1, 2, 3], {'inner_dict': {'key1': [True, 'string', 3.14], 'key2': [1, 2.5]}}], 'another_dict': {'key1': {'subkey': [1, 2, 'str']}, 'key2': [False, 3.14, 'test']}}


=== Running test 1 ===
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1726674651.422437 1735447 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
I0000 00:00:1726674651.476439 1735447 work_stealing_thread_pool.cc:320] WorkStealingThreadPoolImpl::PrepareFork
Remote Execution:  Running Execution on Remote.
Image localhost:30000/flytekit:3R9iB9xupVwhApb5oEhS1g found. Skip building.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/a444fpwpcj2h5sz879wc to see execution in the console.


=== Running test 2 ===
Remote Execution:  Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/actnrqkjgnv5xr52c4qn to see execution in the console.


=== Running test 3 ===
Remote Execution:  Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/ad6hdw2czhj9r5x28pqc to see execution in the console.


=== Running test 4 ===
Remote Execution:  Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/a7mdpbghr8k9xl7hd52b to see execution in the console.


=== Running test 5 ===
Remote Execution:  Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/ajmwfrclxtfmsx9lrz95 to see execution in the console.


=== Running test 6 ===
Remote Execution:  Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/azqfxtjxs9rv4n8r4cc9 to see execution in the console.


=== Running test 7 ===
Remote Execution:  Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/af2jrclxrfnx8xhpzqrp to see execution in the console.


=== Running test 8 ===
Remote Execution:  Running Execution on Remote.

[✔] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/aqp6dzw54ncfdsdmmswn to see execution in the console.


Process finished with exit code 0

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: Future-Outlier <[email protected]>
Copy link

codecov bot commented Sep 18, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 89.07%. Comparing base (11c3a18) to head (dd5e1c9).
Report is 13 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2757       +/-   ##
===========================================
+ Coverage   66.44%   89.07%   +22.62%     
===========================================
  Files           9        8        -1     
  Lines         453      357       -96     
===========================================
+ Hits          301      318       +17     
+ Misses        152       39      -113     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@eapolinario
Copy link
Collaborator

@Future-Outlier , why did you close this PR?

# In Mashumaro, the default encoder uses strict_map_key=False, while the default decoder uses strict_map_key=True.
# This is relevant for cases like Dict[int, str].
# If strict_map_key=False is not used, the decoder will raise an error when trying to decode keys that are not strictly typed.`
def _default_flytekit_decoder(data: bytes) -> Any:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we call this _default_msgpack_decoder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we may have other decoder in the future, like _default_utf8_decoder

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, pretty good advice, thank you!

self._msgpack_decoder[expected_python_type] = decoder
return decoder.decode(binary_idl_object.value)
else:
raise TypeTransformerFailedError(f"Unsupported binary format {binary_idl_object.tag}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise TypeTransformerFailedError(f"Unsupported binary format {binary_idl_object.tag}")
raise TypeTransformerFailedError(f"Unsupported binary format `{binary_idl_object.tag}`")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

@@ -1697,17 +1714,15 @@ def extract_types_or_metadata(t: Optional[Type[dict]]) -> typing.Tuple:
return None, None

@staticmethod
def dict_to_generic_literal(ctx: FlyteContext, v: dict, allow_pickle: bool) -> Literal:
def dict_to_binary_literal(ctx: FlyteContext, v: dict, allow_pickle: bool) -> Literal:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we still use generic to save the pickle file, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I didn't change the logic for pickle

@@ -1717,7 +1732,7 @@ def dict_to_generic_literal(ctx: FlyteContext, v: dict, allow_pickle: bool) -> L
),
metadata={"format": "pickle"},
)
raise e
raise TypeTransformerFailedError(f"Cannot convert {v} to Flyte Literal.\n" f"Error Message: {e}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise TypeTransformerFailedError(f"Cannot convert {v} to Flyte Literal.\n" f"Error Message: {e}")
raise TypeTransformerFailedError(f"Cannot convert `{v}` to Flyte Literal.\n" f"Error Message: {e}")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem, thank you

flytekit/core/type_engine.py Show resolved Hide resolved
(typing.Dict[str, int], {"a": 1, "b": 2, "c": 3}),
(typing.Dict[str, typing.List[int]], {"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]}),
(typing.Dict[str, typing.Dict[int, str]], {"a": {"1": "a", "2": "b", "3": "c"}, "b": {"4": "d", "5": "e", "6": "f"}}),
(typing.Dict[str, typing.Dict[int, str]], {"a": {1: "a", 2: "b", 3: "c"}, "b": {4: "d", 5: "e", 6: "f"}}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add one more example for Union[dict, str]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, no problem

Copy link
Collaborator

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the great set of unit tests.

@@ -161,6 +163,7 @@ async def test_agent(mock_boto_call, mock_return_value):
if "pickle_check" in mock_return_value[0][0]:
assert "pickle_file" in outputs["result"]
else:
outputs["result"] = msgpack.loads(base64.b64decode(outputs["result"]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if scalar.binary:
return base64.b64encode(scalar.binary.value)

Because someone serialize the bytes in the binary IDL here before.

@@ -159,7 +160,7 @@ async def test_openai_batch_agent(mock_retrieve, mock_create, mock_context):
outputs = literal_map_string_repr(resource.outputs)
result = outputs["result"]

assert result == batch_retrieve_result.to_dict()
assert msgpack.loads(base64.b64decode(result)) == batch_retrieve_result.to_dict()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

flytekit/core/type_engine.py Show resolved Hide resolved
def test_untyped_dict():
ctx = FlyteContextManager.current_context()

dict_inputs = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have tests that include datetime and other possibly more complex objects too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NO PROBLEM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum and datetime is not supported in untyped_dict, and I think this is ok, no one use it.

Copy link
Member Author

@Future-Outlier Future-Outlier Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other complex objects like data class will not be supported too, since we don't have type hints

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants