Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MSGPACK IDL] Gate feature by setting ENV #2894

Merged
merged 16 commits into from
Nov 6, 2024
Merged

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Nov 4, 2024

Tracking issue

flyteorg/flyte#5318

Why are the changes needed?

We want to ease the pain of generic IDL -> msgpack IDL when the user upgrades to flytekit 1.14.

What changes were proposed in this pull request?

  1. if os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true", then use old behavior to generate protobuf struct in the generic IDL.
  2. add back unit tests deleted in this PR:
    Binary IDL With MessagePack #2760
  3. Most of the old code that was removed in this PR will now be re-added:
    Binary IDL With MessagePack #2760

Note: Local execution for Protobuf structs is not as fully supported as local execution for msgpack IDL.
I've created an issue and guide my friend to solve it.
flyteorg/flyte#5959

How was this patch tested?

unit test and remote execution.

import typing
import os
from dataclasses import dataclass, field
from typing import Dict, List
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from flytekit.types.structured import StructuredDataset
from flytekit.types.schema import FlyteSchema
from flytekit import task, workflow, ImageSpec
from enum import Enum
import pandas as pd

flytekit_hash = "d6e3cf6f09075e4bde4754a39fcdfc66cbf94fc1"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
image = ImageSpec(
    packages=[flytekit,
              "pandas",
              "pyarrow"],
    apt_packages=["git"],
    registry="localhost:30000",
    env={"FLYTE_USE_OLD_DC_FORMAT": "true"},
)

class Status(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"

@dataclass
class InnerDC:
    a: int = -1
    b: float = 2.1
    c: str = "Hello, Flyte"
    d: bool = False
    e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2])
    f: List[FlyteFile] = field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")])
    g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]])
    h: List[Dict[int, bool]] = field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
    i: Dict[int, bool] = field(default_factory=lambda: {0: False, 1: True, -1: False})
    j: Dict[int, FlyteFile] = field(default_factory=lambda: {
        0: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        -1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")
    })
    k: Dict[int, List[int]] = field(default_factory=lambda: {0: [0, 1, -1]})
    l: Dict[int, Dict[int, int]] = field(default_factory=lambda: {1: {-1: 0}})
    m: dict = field(default_factory=lambda: {"key": "value"})
    n: FlyteFile = field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"))
    o: FlyteDirectory = field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
    enum_status: Status = field(default=Status.PENDING)
    sd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet"))
    fsc: FlyteSchema = field(default_factory=lambda: FlyteSchema(remote_path="s3://my-s3-bucket/s3_flyte_dir/df.parquet"))

@dataclass
class DC:
    a: int = -1
    b: float = 2.1
    c: str = "Hello, Flyte"
    d: bool = False
    e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2])
    f: List[FlyteFile] = field(default_factory=lambda: [FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")])
    g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]])
    h: List[Dict[int, bool]] = field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}])
    i: Dict[int, bool] = field(default_factory=lambda: {0: False, 1: True, -1: False})
    j: Dict[int, FlyteFile] = field(default_factory=lambda: {
        0: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"),
        -1: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt")
    })
    k: Dict[int, List[int]] = field(default_factory=lambda: {0: [0, 1, -1]})
    l: Dict[int, Dict[int, int]] = field(default_factory=lambda: {1: {-1: 0}})
    m: dict = field(default_factory=lambda: {"key": "value"})
    n: FlyteFile = field(default_factory=lambda: FlyteFile("s3://my-s3-bucket/s3_flyte_dir/example.txt"))
    o: FlyteDirectory = field(default_factory=lambda: FlyteDirectory("s3://my-s3-bucket/s3_flyte_dir"))
    inner_dc: InnerDC = field(default_factory=lambda: InnerDC())
    enum_status: Status = field(default=Status.PENDING)
    sd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", file_format="parquet"))
    fsc: FlyteSchema = field(default_factory=lambda: FlyteSchema(remote_path="s3://my-s3-bucket/s3_flyte_dir/df.parquet"))

@task(container_image=image)
def t_dc(dc: DC) -> DC:
    return dc

@task(container_image=image)
def t_inner(inner_dc: InnerDC) -> InnerDC:
    assert isinstance(inner_dc, InnerDC), "inner_dc is not of type InnerDC"

    expected_file_content = "Default content"

    # f: List[FlyteFile]
    for ff in inner_dc.f:
        assert isinstance(ff, FlyteFile), "Expected FlyteFile"
        with open(ff, "r") as f:
            assert f.read() == expected_file_content, "File content mismatch in f"

    # j: Dict[int, FlyteFile]
    for _, ff in inner_dc.j.items():
        assert isinstance(ff, FlyteFile), "Expected FlyteFile in j"
        with open(ff, "r") as f:
            assert f.read() == expected_file_content, "File content mismatch in j"

    # n: FlyteFile
    assert isinstance(inner_dc.n, FlyteFile), "n is not FlyteFile"
    with open(inner_dc.n, "r") as f:
        assert f.read() == expected_file_content, "File content mismatch in n"

    # o: FlyteDirectory
    assert isinstance(inner_dc.o, FlyteDirectory), "o is not FlyteDirectory"
    assert not inner_dc.o.downloaded, "o should not be downloaded initially"
    with open(os.path.join(inner_dc.o, "example.txt"), "r") as fh:
        assert fh.read() == expected_file_content, "File content mismatch in o"
    assert inner_dc.o.downloaded, "o should be marked as downloaded after access"

    assert inner_dc.enum_status == Status.PENDING, "enum_status does not match"
    assert isinstance(inner_dc.sd, StructuredDataset), "sd is not StructuredDataset"
    assert isinstance(inner_dc.fsc, FlyteSchema), "fsc is not FlyteSchema"
    print("All checks in InnerDC passed")

    return inner_dc

@task(container_image=image)
def t_test_all_attributes(a: int, b: float, c: str, d: bool, e: List[int], f: List[FlyteFile], g: List[List[int]],
                          h: List[Dict[int, bool]], i: Dict[int, bool], j: Dict[int, FlyteFile],
                          k: Dict[int, List[int]], l: Dict[int, Dict[int, int]], m: dict,
                          n: FlyteFile, o: FlyteDirectory, enum_status: Status, sd: StructuredDataset,
                          fsc: FlyteSchema
                          ):

    # Strict type checks for simple types
    assert isinstance(a, int), f"a is not int, it's {type(a)}"
    assert a == -1
    assert isinstance(b, float), f"b is not float, it's {type(b)}"
    assert isinstance(c, str), f"c is not str, it's {type(c)}"
    assert isinstance(d, bool), f"d is not bool, it's {type(d)}"

    # Strict type checks for List[int]
    assert isinstance(e, list) and all(isinstance(i, int) for i in e), "e is not List[int]"

    # Strict type checks for List[FlyteFile]
    assert isinstance(f, list) and all(isinstance(i, FlyteFile) for i in f), "f is not List[FlyteFile]"

    # Strict type checks for List[List[int]]
    assert isinstance(g, list) and all(
        isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g), "g is not List[List[int]]"

    # Strict type checks for List[Dict[int, bool]]
    assert isinstance(h, list) and all(
        isinstance(i, dict) and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) for i in h
    ), "h is not List[Dict[int, bool]]"

    # Strict type checks for Dict[int, bool]
    assert isinstance(i, dict) and all(
        isinstance(k, int) and isinstance(v, bool) for k, v in i.items()), "i is not Dict[int, bool]"

    # Strict type checks for Dict[int, FlyteFile]
    assert isinstance(j, dict) and all(
        isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items()), "j is not Dict[int, FlyteFile]"

    # Strict type checks for Dict[int, List[int]]
    assert isinstance(k, dict) and all(
        isinstance(k, int) and isinstance(v, list) and all(isinstance(i, int) for i in v) for k, v in
        k.items()), "k is not Dict[int, List[int]]"

    # Strict type checks for Dict[int, Dict[int, int]]
    assert isinstance(l, dict) and all(
        isinstance(k, int) and isinstance(v, dict) and all(
            isinstance(sub_k, int) and isinstance(sub_v, int) for sub_k, sub_v in v.items())
        for k, v in l.items()), "l is not Dict[int, Dict[int, int]]"

    # Strict type check for a generic dict
    assert isinstance(m, dict), "m is not dict"

    # Strict type check for FlyteFile
    assert isinstance(n, FlyteFile), "n is not FlyteFile"

    # Strict type check for FlyteDirectory
    assert isinstance(o, FlyteDirectory), "o is not FlyteDirectory"

    # # Strict type check for Enum
    assert isinstance(enum_status, Status), "enum_status is not Status"

    assert isinstance(sd, StructuredDataset), "sd is not StructuredDataset"
    print("sd:", sd.open(pd.DataFrame).all())

    assert isinstance(fsc, FlyteSchema), "fsc is not FlyteSchema"
    print("fsc: ", fsc.open().all())

    print("All attributes passed strict type checks.")


@workflow
def wf(dc: DC):
    new_dc = t_dc(dc=dc)
    t_inner(new_dc.inner_dc)
    t_test_all_attributes(a=new_dc.a, b=new_dc.b, c=new_dc.c,
                          d=new_dc.d, e=new_dc.e, f=new_dc.f,
                          g=new_dc.g, h=new_dc.h, i=new_dc.i,
                          j=new_dc.j, k=new_dc.k, l=new_dc.l,
                          m=new_dc.m, n=new_dc.n, o=new_dc.o,
                          enum_status=new_dc.enum_status,
                          sd=new_dc.sd,
                          fsc=new_dc.fsc
                          )
    t_test_all_attributes(a=new_dc.inner_dc.a, b=new_dc.inner_dc.b, c=new_dc.inner_dc.c,
                          d=new_dc.inner_dc.d, e=new_dc.inner_dc.e, f=new_dc.inner_dc.f,
                          g=new_dc.inner_dc.g, h=new_dc.inner_dc.h, i=new_dc.inner_dc.i,
                          j=new_dc.inner_dc.j, k=new_dc.inner_dc.k, l=new_dc.inner_dc.l,
                          m=new_dc.inner_dc.m, n=new_dc.inner_dc.n, o=new_dc.inner_dc.o,
                          enum_status=new_dc.inner_dc.enum_status,
                          sd=new_dc.inner_dc.sd, fsc=new_dc.inner_dc.fsc)


if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner
    import os
    # FLYTE_USE_OLD_DC_FORMAT": "true"
    os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true"

    runner = CliRunner()
    path = os.path.realpath(__file__)
    input_val = '{"a": -1, "b": 3.14}'
    # result = runner.invoke(pyflyte.main, ["run", path, "wf", "--dc", input_val])
    # print("Local Execution: ", result.output)
    result = runner.invoke(pyflyte.main, ["run", "--remote", path, "wf", "--dc", input_val])
    print("Remote Execution: ", result.output)

Setup process

use single binary.

python: 3.12
flyte: master branch

Screenshots

image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

@Future-Outlier Future-Outlier changed the title Gate MSGPACK IDL feature by setting ENV [WIP] Gate MSGPACK IDL feature by setting ENV Nov 4, 2024
Signed-off-by: Future-Outlier <[email protected]>
@Future-Outlier Future-Outlier changed the title [WIP] Gate MSGPACK IDL feature by setting ENV [WIP][MSGPACK IDL] Gate feature by setting ENV Nov 4, 2024
Signed-off-by: Future-Outlier <[email protected]>
Copy link

codecov bot commented Nov 4, 2024

Codecov Report

Attention: Patch coverage is 46.77419% with 33 lines in your changes missing coverage. Please review.

Project coverage is 46.97%. Comparing base (5bf0acd) to head (3472815).
Report is 12 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/core/type_engine.py 38.77% 26 Missing and 4 partials ⚠️
flytekit/extras/pydantic_transformer/__init__.py 0.00% 2 Missing ⚠️
flytekit/extras/pydantic_transformer/decorator.py 0.00% 1 Missing ⚠️

❗ There is a different number of reports uploaded between BASE (5bf0acd) and HEAD (3472815). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (5bf0acd) HEAD (3472815)
3 2
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2894       +/-   ##
===========================================
- Coverage   75.12%   46.97%   -28.16%     
===========================================
  Files         199      199               
  Lines       20781    20753       -28     
  Branches     2671     2672        +1     
===========================================
- Hits        15612     9748     -5864     
- Misses       4400    10521     +6121     
+ Partials      769      484      -285     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@Future-Outlier Future-Outlier changed the title [WIP][MSGPACK IDL] Gate feature by setting ENV [MSGPACK IDL] Gate feature by setting ENV Nov 5, 2024
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Future-Outlier and others added 4 commits November 5, 2024 23:18
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: pingsutw  <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@Future-Outlier
Copy link
Member Author

@pingsutw
this can be merged.
just tested this again.

image image

@Future-Outlier Future-Outlier enabled auto-merge (squash) November 6, 2024 01:39
@Future-Outlier Future-Outlier enabled auto-merge (squash) November 6, 2024 01:39
@Future-Outlier Future-Outlier merged commit 2fbdc63 into master Nov 6, 2024
31 of 33 checks passed
kumare3 pushed a commit that referenced this pull request Nov 8, 2024
* Gate MSGPACK IDL feature by setting ENV

Signed-off-by: Future-Outlier <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

* Add async to def dict_to_old_generic_idl

Signed-off-by: Future-Outlier <[email protected]>

* print

Signed-off-by: Future-Outlier <[email protected]>

* Fix structured dataset bug

Signed-off-by: Future-Outlier <[email protected]>

* dict update

Signed-off-by: Future-Outlier <[email protected]>

* remvoe breakpoint

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* remove promise.py

Signed-off-by: Future-Outlier <[email protected]>

* ad tests

Signed-off-by: Future-Outlier <[email protected]>

* Add Comments

Signed-off-by: Future-Outlier <[email protected]>

* add commetns

Signed-off-by: Future-Outlier <[email protected]>

* apply naming suggestion from Kevin

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: pingsutw  <[email protected]>

* nit

Signed-off-by: Future-Outlier <[email protected]>

* use flyte_use_old_dc_format as constant

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: pingsutw <[email protected]>
katrogan pushed a commit that referenced this pull request Nov 15, 2024
* Gate MSGPACK IDL feature by setting ENV

Signed-off-by: Future-Outlier <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

* Add async to def dict_to_old_generic_idl

Signed-off-by: Future-Outlier <[email protected]>

* print

Signed-off-by: Future-Outlier <[email protected]>

* Fix structured dataset bug

Signed-off-by: Future-Outlier <[email protected]>

* dict update

Signed-off-by: Future-Outlier <[email protected]>

* remvoe breakpoint

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* remove promise.py

Signed-off-by: Future-Outlier <[email protected]>

* ad tests

Signed-off-by: Future-Outlier <[email protected]>

* Add Comments

Signed-off-by: Future-Outlier <[email protected]>

* add commetns

Signed-off-by: Future-Outlier <[email protected]>

* apply naming suggestion from Kevin

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: pingsutw  <[email protected]>

* nit

Signed-off-by: Future-Outlier <[email protected]>

* use flyte_use_old_dc_format as constant

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: pingsutw <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
400Ping pushed a commit to 400Ping/flytekit that referenced this pull request Nov 22, 2024
* Gate MSGPACK IDL feature by setting ENV

Signed-off-by: Future-Outlier <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

* Add async to def dict_to_old_generic_idl

Signed-off-by: Future-Outlier <[email protected]>

* print

Signed-off-by: Future-Outlier <[email protected]>

* Fix structured dataset bug

Signed-off-by: Future-Outlier <[email protected]>

* dict update

Signed-off-by: Future-Outlier <[email protected]>

* remvoe breakpoint

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* remove promise.py

Signed-off-by: Future-Outlier <[email protected]>

* ad tests

Signed-off-by: Future-Outlier <[email protected]>

* Add Comments

Signed-off-by: Future-Outlier <[email protected]>

* add commetns

Signed-off-by: Future-Outlier <[email protected]>

* apply naming suggestion from Kevin

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: pingsutw  <[email protected]>

* nit

Signed-off-by: Future-Outlier <[email protected]>

* use flyte_use_old_dc_format as constant

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: pingsutw <[email protected]>
Signed-off-by: 400Ping <[email protected]>
@honnix
Copy link
Member

honnix commented Dec 19, 2024

if os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true", then use old behavior to generate protobuf struct in the generic IDL.

Is this supposed to be a breaking change? One of our integration test failed because of converting dict to binary instead of protobuf struct.

@honnix
Copy link
Member

honnix commented Dec 19, 2024

if os.environ["FLYTE_USE_OLD_DC_FORMAT"] = "true", then use old behavior to generate protobuf struct in the generic IDL.

Is this supposed to be a breaking change? One of our integration test failed because of converting dict to binary instead of protobuf struct.

Sorry now I read the release note more carefully and it is not the problem of this PR (which basically improves backward compatibility).

Since we need python <-> java/scala interoperability, what would you suggest in order to support msgpack on java/scala side? https://github.com/msgpack/msgpack-java is it the right lib to use? Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants