Skip to content

Commit

Permalink
Merge pull request #1537 from PrefectHQ/serialize-all-envs
Browse files Browse the repository at this point in the history
Serialize all environments
  • Loading branch information
cicdw authored Sep 19, 2019
2 parents a6ce029 + 873dcd3 commit b7107a2
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 13 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
### Features

- Added Fargate agent - [#1521](https://github.com/PrefectHQ/prefect/pull/1521)
- Custom user-written environments can be deployed to Prefect Cloud - [#1534](https://github.com/PrefectHQ/prefect/pull/1534), [#1537](https://github.com/PrefectHQ/prefect/pull/1537)

### Enhancements

Expand All @@ -17,7 +18,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Agents now log platform errors to flow runs which cannot deploy - [#1528](https://github.com/PrefectHQ/prefect/pull/1528)
- Updating `ShellTask` to work more like Airflow Bash Operator for streaming logs and returning values - [#1451](https://github.com/PrefectHQ/prefect/pull/1451)
- Agents now have a verbose/debug logging option for granular output - [#1532](https://github.com/PrefectHQ/prefect/pull/1532)
- `DaskKubernetesEnvironment` now allows for custom scheduler and worker specs - [#1543](https://github.com/PrefectHQ/prefect/pull/1534)
- `DaskKubernetesEnvironment` now allows for custom scheduler and worker specs - [#1543](https://github.com/PrefectHQ/prefect/pull/1534), [#1537](https://github.com/PrefectHQ/prefect/pull/1537)

### Task Library

Expand Down
37 changes: 31 additions & 6 deletions src/prefect/serialization/environment.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from marshmallow import fields
from marshmallow import fields, post_load
from typing import Any

from prefect.environments import (
DaskKubernetesEnvironment,
Environment,
LocalEnvironment,
RemoteEnvironment,
)
from prefect.utilities.serialization import ObjectSchema, OneOfSchema
from prefect.utilities.serialization import ObjectSchema, OneOfSchema, to_qualified_name


class BaseEnvironmentSchema(ObjectSchema):
Expand All @@ -33,10 +34,6 @@ class Meta:
min_workers = fields.Int()
max_workers = fields.Int()

# Serialize original spec file locations, not the spec itself
scheduler_spec_file = fields.String(allow_none=True)
worker_spec_file = fields.String(allow_none=True)


class RemoteEnvironmentSchema(ObjectSchema):
class Meta:
Expand All @@ -47,6 +44,26 @@ class Meta:
labels = fields.List(fields.String())


class CustomEnvironmentSchema(ObjectSchema):
class Meta:
object_class = lambda: Environment
exclude_fields = ["type"]

labels = fields.List(fields.String())

type = fields.Function(
lambda environment: to_qualified_name(type(environment)), lambda x: x
)

@post_load
def create_object(self, data: dict, **kwargs: Any) -> Environment:
"""
Because we cannot deserialize a custom class, we return an empty
Base Environment with the appropriate labels.
"""
return Environment(labels=data.get("labels"))


class EnvironmentSchema(OneOfSchema):
"""
Field that chooses between several nested schemas
Expand All @@ -58,4 +75,12 @@ class EnvironmentSchema(OneOfSchema):
"Environment": BaseEnvironmentSchema,
"LocalEnvironment": LocalEnvironmentSchema,
"RemoteEnvironment": RemoteEnvironmentSchema,
"CustomEnvironment": CustomEnvironmentSchema,
}

def get_obj_type(self, obj: Any) -> str:
name = obj.__class__.__name__
if name in self.type_schemas:
return name
else:
return "CustomEnvironment"
4 changes: 3 additions & 1 deletion src/prefect/utilities/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def configure_logging(testing: bool = False) -> logging.Logger:
logger.addHandler(handler)
logger.setLevel(context.config.logging.level)

logger.addHandler(CloudHandler())
cloud_handler = CloudHandler()
cloud_handler.setLevel("DEBUG")
logger.addHandler(cloud_handler)
return logger


Expand Down
44 changes: 42 additions & 2 deletions tests/serialization/test_environments.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import tempfile

import prefect
from prefect import environments
from prefect.serialization.environment import (
Expand Down Expand Up @@ -38,8 +41,6 @@ def test_serialize_dask_environment():
assert serialized["min_workers"] == 1
assert serialized["max_workers"] == 2
assert serialized["labels"] == []
assert serialized["scheduler_spec_file"] is None
assert serialized["worker_spec_file"] is None

new = schema.load(serialized)
assert new.private_registry is False
Expand All @@ -51,6 +52,25 @@ def test_serialize_dask_environment():
assert new.worker_spec_file is None


def test_serialize_dask_env_with_custom_specs():
with tempfile.TemporaryDirectory() as directory:
with open(os.path.join(directory, "scheduler.yaml"), "w+") as f:
f.write("scheduler")
with open(os.path.join(directory, "worker.yaml"), "w+") as f:
f.write("worker")

env = environments.DaskKubernetesEnvironment(
scheduler_spec_file=os.path.join(directory, "scheduler.yaml"),
worker_spec_file=os.path.join(directory, "worker.yaml"),
)

schema = DaskKubernetesEnvironmentSchema()
serialized = schema.dump(env)

deserialized = schema.load(serialized)
assert isinstance(deserialized, environments.DaskKubernetesEnvironment)


def test_serialize_dask_environment_with_labels():
env = environments.DaskKubernetesEnvironment(labels=["a", "b", "c"])

Expand Down Expand Up @@ -162,3 +182,23 @@ def test_deserialize_old_env_payload():
obj = schema.load(old)
assert isinstance(obj, environments.RemoteEnvironment)
assert obj.labels == set()


def test_serialize_custom_environment():
class MyEnv(environments.Environment):
def __init__(self, x=5):
self.x = 5
super().__init__(labels=["foo", "bar"])

def custom_method(self):
pass

env = MyEnv()
schema = EnvironmentSchema()
serialized = schema.dump(env)
assert serialized["type"] == "CustomEnvironment"
assert set(serialized["labels"]) == set(["foo", "bar"])

obj = schema.load(serialized)
assert isinstance(obj, environments.Environment)
assert obj.labels == set(["foo", "bar"])
9 changes: 6 additions & 3 deletions tests/tasks/test_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ def test_shell_logs_error_on_non_zero_exit(caplog):
task = ShellTask()(command="ls surely_a_dir_that_doesnt_exist")
out = f.run()
assert out.is_failed()
assert "ERROR prefect.Task: ShellTask:shell.py" in caplog.text
assert "surely_a_dir_that_doesnt_exist" in caplog.text
assert "No such file or directory" in caplog.text

error_log = [c for c in caplog.records if c.levelname == "ERROR"]
assert len(error_log) == 1
assert error_log[0].name == "prefect.Task: ShellTask"
assert "surely_a_dir_that_doesnt_exist" in error_log[0].message
assert "No such file or directory" in error_log[0].message


def test_shell_initializes_and_runs_multiline_cmd():
Expand Down

0 comments on commit b7107a2

Please sign in to comment.