From e3545acdbd338827481bd9bfeae32d81fb47ae42 Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 19 Sep 2019 15:36:27 -0700 Subject: [PATCH 1/4] Update Daskk8s to be deserializable + allow for custom env serialization --- src/prefect/serialization/environment.py | 34 ++++++++++++++---- src/prefect/utilities/logging.py | 4 ++- tests/serialization/test_environments.py | 44 ++++++++++++++++++++++-- tests/utilities/test_logging.py | 26 ++++++++++++++ 4 files changed, 99 insertions(+), 9 deletions(-) diff --git a/src/prefect/serialization/environment.py b/src/prefect/serialization/environment.py index 85432bf60b66..d016e1eb35af 100644 --- a/src/prefect/serialization/environment.py +++ b/src/prefect/serialization/environment.py @@ -1,4 +1,5 @@ -from marshmallow import fields +from marshmallow import fields, post_load +from typing import Any from prefect.environments import ( DaskKubernetesEnvironment, @@ -6,7 +7,7 @@ LocalEnvironment, RemoteEnvironment, ) -from prefect.utilities.serialization import ObjectSchema, OneOfSchema +from prefect.utilities.serialization import ObjectSchema, OneOfSchema, to_qualified_name class BaseEnvironmentSchema(ObjectSchema): @@ -33,10 +34,6 @@ class Meta: min_workers = fields.Int() max_workers = fields.Int() - # Serialize original spec file locations, not the spec itself - scheduler_spec_file = fields.String(allow_none=True) - worker_spec_file = fields.String(allow_none=True) - class RemoteEnvironmentSchema(ObjectSchema): class Meta: @@ -47,6 +44,23 @@ class Meta: labels = fields.List(fields.String()) +class CustomEnvironmentSchema(ObjectSchema): + class Meta: + object_class = lambda: Environment + exclude_fields = ["type"] + + labels = fields.List(fields.String()) + + type = fields.Function( + lambda environment: to_qualified_name(type(environment)), lambda x: x + ) + + @post_load + def create_object(self, data: dict, **kwargs: Any) -> Environment: + """Because we cannot deserialize a custom class, just return `None`""" + return Environment(labels=data.get("labels")) + + class EnvironmentSchema(OneOfSchema): """ Field that chooses between several nested schemas @@ -58,4 +72,12 @@ class EnvironmentSchema(OneOfSchema): "Environment": BaseEnvironmentSchema, "LocalEnvironment": LocalEnvironmentSchema, "RemoteEnvironment": RemoteEnvironmentSchema, + "CustomEnvironment": CustomEnvironmentSchema, } + + def get_obj_type(self, obj: Any) -> str: + name = obj.__class__.__name__ + if name in self.type_schemas: + return name + else: + return "CustomEnvironment" diff --git a/src/prefect/utilities/logging.py b/src/prefect/utilities/logging.py index 5b87d7d8ebdb..c3070d4ff2a8 100644 --- a/src/prefect/utilities/logging.py +++ b/src/prefect/utilities/logging.py @@ -90,7 +90,9 @@ def configure_logging(testing: bool = False) -> logging.Logger: logger.addHandler(handler) logger.setLevel(context.config.logging.level) - logger.addHandler(CloudHandler()) + cloud_handler = CloudHandler() + cloud_handler.setLevel("DEBUG") + logger.addHandler(cloud_handler) return logger diff --git a/tests/serialization/test_environments.py b/tests/serialization/test_environments.py index 6a6f14318f5d..398e2d5ca58f 100644 --- a/tests/serialization/test_environments.py +++ b/tests/serialization/test_environments.py @@ -1,3 +1,6 @@ +import os +import tempfile + import prefect from prefect import environments from prefect.serialization.environment import ( @@ -38,8 +41,6 @@ def test_serialize_dask_environment(): assert serialized["min_workers"] == 1 assert serialized["max_workers"] == 2 assert serialized["labels"] == [] - assert serialized["scheduler_spec_file"] is None - assert serialized["worker_spec_file"] is None new = schema.load(serialized) assert new.private_registry is False @@ -51,6 +52,25 @@ def test_serialize_dask_environment(): assert new.worker_spec_file is None +def test_serialize_dask_env_with_custom_specs(): + with tempfile.TemporaryDirectory() as directory: + with open(os.path.join(directory, "scheduler.yaml"), "w+") as f: + f.write("scheduler") + with open(os.path.join(directory, "worker.yaml"), "w+") as f: + f.write("worker") + + env = environments.DaskKubernetesEnvironment( + scheduler_spec_file=os.path.join(directory, "scheduler.yaml"), + worker_spec_file=os.path.join(directory, "worker.yaml"), + ) + + schema = DaskKubernetesEnvironmentSchema() + serialized = schema.dump(env) + + deserialized = schema.load(serialized) + assert isinstance(deserialized, environments.DaskKubernetesEnvironment) + + def test_serialize_dask_environment_with_labels(): env = environments.DaskKubernetesEnvironment(labels=["a", "b", "c"]) @@ -162,3 +182,23 @@ def test_deserialize_old_env_payload(): obj = schema.load(old) assert isinstance(obj, environments.RemoteEnvironment) assert obj.labels == set() + + +def test_serialize_custom_environment(): + class MyEnv(environments.Environment): + def __init__(self, x=5): + self.x = 5 + super().__init__(labels=["foo", "bar"]) + + def custom_method(self): + pass + + env = MyEnv() + schema = EnvironmentSchema() + serialized = schema.dump(env) + assert serialized["type"] == "CustomEnvironment" + assert set(serialized["labels"]) == set(["foo", "bar"]) + + obj = schema.load(serialized) + assert isinstance(obj, environments.Environment) + assert obj.labels == set(["foo", "bar"]) diff --git a/tests/utilities/test_logging.py b/tests/utilities/test_logging.py index 759ddde3e785..c852ed2ec3d8 100644 --- a/tests/utilities/test_logging.py +++ b/tests/utilities/test_logging.py @@ -61,6 +61,32 @@ def test_remote_handler_captures_errors_and_logs_them(caplog): logger.handlers = [] +def test_remote_handler_is_always_debug_level(caplog, monkeypatch): + monkeypatch.setattr("prefect.client.Client", MagicMock) + client = MagicMock() + try: + with utilities.configuration.set_temporary_config( + {"logging.log_to_cloud": True, "logging.level": "INFO"} + ): + + logger = utilities.logging.configure_logging(testing=True) + child_logger = logger.getChild("sub-test") + child_logger.debug("debug me") + + debug_logs = [r for r in caplog.records if r.levelname == "DEBUG"] + assert len(debug_logs) == 1 + + logged_msg = client.write_run_log.call_args[1]["message"] + logged_level = client.write_run_log.call_args[1]["level"] + assert logged_msg == "debug me" + assert logged_level == "DEBUG" + + finally: + # reset root_logger + logger = utilities.logging.configure_logging(testing=True) + logger.handlers = [] + + def test_remote_handler_captures_tracebacks(caplog, monkeypatch): monkeypatch.setattr("prefect.client.Client", MagicMock) client = MagicMock() From e2830b1f719ea8ad08d9a317ae8b7eba01f5bd60 Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 19 Sep 2019 15:49:44 -0700 Subject: [PATCH 2/4] Remove accidentally commited test --- tests/tasks/test_shell.py | 9 ++++++--- tests/utilities/test_logging.py | 26 -------------------------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/tests/tasks/test_shell.py b/tests/tasks/test_shell.py index cb06bdd3555b..a5d3f356f977 100644 --- a/tests/tasks/test_shell.py +++ b/tests/tasks/test_shell.py @@ -86,9 +86,12 @@ def test_shell_logs_error_on_non_zero_exit(caplog): task = ShellTask()(command="ls surely_a_dir_that_doesnt_exist") out = f.run() assert out.is_failed() - assert "ERROR prefect.Task: ShellTask:shell.py" in caplog.text - assert "surely_a_dir_that_doesnt_exist" in caplog.text - assert "No such file or directory" in caplog.text + + error_log = [c for c in caplog.records if c.levelname == "ERROR"] + assert len(error_log) == 1 + assert error_log[0].name == "prefect.Task: ShellTask" + assert "surely_a_dir_that_doesnt_exist" in error_log[0].message + assert "No such file or directory" in error_log[0].message def test_shell_initializes_and_runs_multiline_cmd(): diff --git a/tests/utilities/test_logging.py b/tests/utilities/test_logging.py index c852ed2ec3d8..759ddde3e785 100644 --- a/tests/utilities/test_logging.py +++ b/tests/utilities/test_logging.py @@ -61,32 +61,6 @@ def test_remote_handler_captures_errors_and_logs_them(caplog): logger.handlers = [] -def test_remote_handler_is_always_debug_level(caplog, monkeypatch): - monkeypatch.setattr("prefect.client.Client", MagicMock) - client = MagicMock() - try: - with utilities.configuration.set_temporary_config( - {"logging.log_to_cloud": True, "logging.level": "INFO"} - ): - - logger = utilities.logging.configure_logging(testing=True) - child_logger = logger.getChild("sub-test") - child_logger.debug("debug me") - - debug_logs = [r for r in caplog.records if r.levelname == "DEBUG"] - assert len(debug_logs) == 1 - - logged_msg = client.write_run_log.call_args[1]["message"] - logged_level = client.write_run_log.call_args[1]["level"] - assert logged_msg == "debug me" - assert logged_level == "DEBUG" - - finally: - # reset root_logger - logger = utilities.logging.configure_logging(testing=True) - logger.handlers = [] - - def test_remote_handler_captures_tracebacks(caplog, monkeypatch): monkeypatch.setattr("prefect.client.Client", MagicMock) client = MagicMock() From 64c0b2ede76e6d3cc9dc4760bb70fc8d1c1d9a8a Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 19 Sep 2019 15:51:58 -0700 Subject: [PATCH 3/4] Update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a947e91372e1..3bbb66408adb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ ### Features - Added Fargate agent - [#1521](https://github.com/PrefectHQ/prefect/pull/1521) +- Custom user-written environments can be deployed to Prefect Cloud - [#1534](https://github.com/PrefectHQ/prefect/pull/1534), [#1537](https://github.com/PrefectHQ/prefect/pull/1537) ### Enhancements @@ -17,7 +18,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - Agents now log platform errors to flow runs which cannot deploy - [#1528](https://github.com/PrefectHQ/prefect/pull/1528) - Updating `ShellTask` to work more like Airflow Bash Operator for streaming logs and returning values - [#1451](https://github.com/PrefectHQ/prefect/pull/1451) - Agents now have a verbose/debug logging option for granular output - [#1532](https://github.com/PrefectHQ/prefect/pull/1532) -- `DaskKubernetesEnvironment` now allows for custom scheduler and worker specs - [#1543](https://github.com/PrefectHQ/prefect/pull/1534) +- `DaskKubernetesEnvironment` now allows for custom scheduler and worker specs - [#1543](https://github.com/PrefectHQ/prefect/pull/1534), [#1537](https://github.com/PrefectHQ/prefect/pull/1537) ### Task Library From 873dcd371e25037b2850f8d5970324c3112616e5 Mon Sep 17 00:00:00 2001 From: Chris White Date: Thu, 19 Sep 2019 16:36:13 -0700 Subject: [PATCH 4/4] Update misleading docstring --- src/prefect/serialization/environment.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/prefect/serialization/environment.py b/src/prefect/serialization/environment.py index d016e1eb35af..c26de759f06f 100644 --- a/src/prefect/serialization/environment.py +++ b/src/prefect/serialization/environment.py @@ -57,7 +57,10 @@ class Meta: @post_load def create_object(self, data: dict, **kwargs: Any) -> Environment: - """Because we cannot deserialize a custom class, just return `None`""" + """ + Because we cannot deserialize a custom class, we return an empty + Base Environment with the appropriate labels. + """ return Environment(labels=data.get("labels"))