Skip to content

Commit

Permalink
Merge pull request #46 from msokoloff1/ms/kfp-outputs
Browse files Browse the repository at this point in the history
add kfp outputs
  • Loading branch information
msokoloff1 authored Jan 9, 2024
2 parents 378391c + 97e0173 commit 1b934f5
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 9 deletions.
39 changes: 38 additions & 1 deletion google_cloud_automlops/orchestration/kfp/scaffold.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def create_component_scaffold(func: Optional[Callable] = None,
component_spec['name'] = name
if description:
component_spec['description'] = description
outputs = get_function_return_types(func)
if outputs:
component_spec['outputs'] = outputs
component_spec['inputs'] = get_function_parameters(func)
component_spec['implementation'] = {}
component_spec['implementation']['container'] = {}
Expand Down Expand Up @@ -106,7 +109,40 @@ def get_packages_to_install_command(func: Optional[Callable] = None,
return ['sh', '-c', install_python_packages_script, src_code]


def get_function_parameters(func: Callable) -> dict:
def get_function_return_types(func: Callable) -> list:
"""Returns a formatted list of function return types.
Args:
func: The python function to create a component from. The function
can optionally have type annotations for its return values.
Returns:
list: return value list with types converted to kubeflow spec.
Raises:
Exception: If return type is provided and not a NamedTuple.
"""
annotation = inspect.signature(func).return_annotation
if maybe_strip_optional_from_annotation(annotation) is not annotation:
raise TypeError('Return type cannot be Optional.')

# No annotations provided
# pylint: disable=protected-access
if annotation == inspect._empty:
return None

if not (hasattr(annotation,'__annotations__') and isinstance(annotation.__annotations__, dict)):
raise TypeError(f'''Return type hint for function "{func.__name__}" must be a NamedTuple.''')

outputs = []
for name, type_ in annotation.__annotations__.items():
metadata = {}
metadata['name'] = name
metadata['type'] = type_
metadata['description'] = None
outputs.append(metadata)
return update_params(outputs)


def get_function_parameters(func: Callable) -> list:
"""Returns a formatted list of parameters.
Args:
Expand Down Expand Up @@ -214,3 +250,4 @@ def get_compile_step(func_name: str):
f' package_path=pipeline_job_spec_path)\n'
f'\n'
)

77 changes: 69 additions & 8 deletions tests/unit/orchestration/kfp/scaffold_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

from contextlib import nullcontext as does_not_raise
import os
from typing import Callable, List
from typing import Callable, List, NamedTuple
from typing import Optional

import pytest

Expand All @@ -31,13 +32,14 @@
get_compile_step,
get_function_parameters,
get_pipeline_decorator,
get_function_return_types,
)
from google_cloud_automlops.utils.constants import DEFAULT_PIPELINE_NAME
import google_cloud_automlops.utils.utils
from google_cloud_automlops.utils.utils import get_function_source_definition, read_yaml_file


def add(a: int, b: int):
def add(a: int, b: int) -> NamedTuple('output', [('sum', int)]):
"""Testing
Args:
Expand Down Expand Up @@ -65,14 +67,14 @@ def div(a: float, b: float):


@pytest.mark.parametrize(
'func, packages_to_install, expectation',
'func, packages_to_install, expectation, has_return_type',
[
(add, None, does_not_raise()),
(add, ['pandas', 'pytest'], does_not_raise()),
(sub, None, pytest.raises(TypeError))
(add, None, does_not_raise(), True),
(add, ['pandas', 'pytest'], does_not_raise(), True),
(sub, None, pytest.raises(TypeError), False)
]
)
def test_create_component_scaffold(func: Callable, packages_to_install: list, expectation):
def test_create_component_scaffold(func: Callable, packages_to_install: list, expectation, has_return_type: bool):
"""Tests create_component_scaffold, which creates a tmp component scaffold
which will be used by the formalize function. Code is temporarily stored in
component_spec['implementation']['container']['command'].
Expand All @@ -86,6 +88,8 @@ def test_create_component_scaffold(func: Callable, packages_to_install: list, ex
executing func. These will always be installed at component runtime.
expectation: Any corresponding expected errors for each
set of parameters.
has_return_type: boolean indicating if the function has a return type hint.
This is used to determine if an 'outputs' key should exist in the component scaffold.
"""
with expectation:
create_component_scaffold(func=func,
Expand All @@ -97,7 +101,8 @@ def test_create_component_scaffold(func: Callable, packages_to_install: list, ex

# Assert yaml contains correct keys
component_spec = read_yaml_file(func_path)
assert list(component_spec.keys()) == ['name', 'description', 'inputs', 'implementation']
outputs_key = ['outputs'] if has_return_type else []
assert set(component_spec.keys()) == set(['name', 'description', 'inputs', 'implementation', *outputs_key])
assert list(component_spec['implementation'].keys()) == ['container']
assert list(component_spec['implementation']['container'].keys()) == ['image', 'command', 'args']

Expand Down Expand Up @@ -255,3 +260,59 @@ def test_get_compile_step(func_name: str):
f' package_path=pipeline_job_spec_path)\n'
f'\n'
)


@pytest.mark.parametrize(
'return_annotation, return_types, expectation',
[
(
NamedTuple('output', [('sum', int)]),
[{'description': None, 'name': 'sum', 'type': 'Integer'},],
does_not_raise()
),
(
NamedTuple('output', [('first', str), ('last', str)]),
[{'description': None, 'name': 'first', 'type': 'String'},
{'description': None, 'name': 'last', 'type': 'String'},],
does_not_raise()
),
(
Optional[NamedTuple('output', [('count', int)])],
None,
pytest.raises(TypeError)
),
(
int,
None,
pytest.raises(TypeError)
),(
None,
None,
pytest.raises(TypeError)
),
(
'NO_ANNOTATION',
None,
does_not_raise()
)
]
)
def test_get_function_return_types(return_annotation, return_types: List[dict], expectation):
"""Tests get_function_outputs, which returns a formatted list of
return types.
Args:
annotation (Any): The return type to test.
return_types (List[dict]): The return type converted into the kubeflow output spec.
expectation: Any corresponding expected errors for each
set of parameters.
"""

def func():
...

if return_annotation != 'NO_ANNOTATION':
func.__annotations__ = {'return' : return_annotation}

with expectation:
assert return_types == get_function_return_types(func=func)

0 comments on commit 1b934f5

Please sign in to comment.