Skip to content

Commit

Permalink
Merge pull request #1481 from PrefectHQ/context-config
Browse files Browse the repository at this point in the history
Embedding config into context
  • Loading branch information
cicdw authored Sep 11, 2019
2 parents 3da4562 + 1fc541d commit 2ebf197
Show file tree
Hide file tree
Showing 16 changed files with 263 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Use `-U` option when installing `prefect` in Docker containers to override base image version - [#1461](https://github.com/PrefectHQ/prefect/pull/1461)
- Remove restriction that prevented `DotDict` classes from having keys that shadowed dict methods - [#1462](https://github.com/PrefectHQ/prefect/pull/1462)
- Added livenessProbe to Kubernetes Agent - [#1474](https://github.com/PrefectHQ/prefect/pull/1474)
- Ensure external Dask Clusters do not require Prefect Cloud environment variables to run Cloud flows - [#1481](https://github.com/PrefectHQ/prefect/pull/1481)

### Task Library

Expand Down
10 changes: 0 additions & 10 deletions docs/cloud/first-steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ f = Flow("example-env")
f.environment = RemoteEnvironment(executor="prefect.engine.executors.LocalExecutor")
```

::: warning Remote Environment Configuration
Some remote environments will require configuration to be able to communicate successfully to Prefect's Cloud API. For example, when using a Remote Environment with the `DaskExecutor` (executor="`prefect.engine.executors.DaskExecutor`") you will need to manually configure your Dask workers with relevant environment variables:
- `PREFECT__CLOUD__AUTH_TOKEN` is required for Prefect to be able to update task state by calling the Prefect Cloud API. You can specify the same token value that you used with the Prefect Kubernetes Agent or another valid auth token.
- `PREFECT__CLOUD__API` should be set to https://api.prefect.io for use with Prefect Cloud
- `PREFECT__LOGGING__LOG_TO_CLOUD` can optionally be set to "true" to enable sending task logs to Prefect Cloud
- `PREFECT__LOGGING__LEVEL` can optionally be set to "DEBUG", "INFO", etc. to control the logging level

See [here](https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/job_spec.yaml) for example configuration of environment variable used by the Prefect Kubernetes Agent to launch a job to run flows. (You won't need all of these environment variables, but you can see how they are specified in a Kubernetes YAML file.)
:::

### Storage

The [Prefect Storage interface](https://docs.prefect.io/api/unreleased/environments/storage.html#docker) provides a way to specify (via metadata) _where_ your Flow code is actually stored. Currently the only supported Storage class in Prefect Cloud is [Docker storage](https://docs.prefect.io/api/unreleased/environments/storage.html#docker). The only _required_ piece of information to include when creating your Docker storage class is the `registry_url` of the Docker registry where your image will live. All other keyword arguments are optional and "smart" defaults will be inferred.
Expand Down
9 changes: 0 additions & 9 deletions docs/core/tutorials/dask-cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,5 @@ Notice that we didn't specify an executor in our call to `flow.run()`. This is b

This flow will now run every minute on your local Dask cluster until you kill this process.

::: warning Prefect Cloud Configuration
Your Dask cluster will require configuration to be able to communicate successfully to Prefect's Cloud API. For example, when using a Remote Environment with the `DaskExecutor`, you will need to manually configure your Dask workers with relevant environment variables:
- `PREFECT__CLOUD__AUTH_TOKEN` is required for Prefect to be able to update task state by calling the Prefect Cloud API. You can specify the same token value that you used with the Prefect Kubernetes Agent or another valid auth token.
- `PREFECT__LOGGING__LOG_TO_CLOUD` can optionally be set to "true" to enable sending task logs to Prefect Cloud
- `PREFECT__LOGGING__LEVEL` can optionally be set to "DEBUG", "INFO", etc. to control the logging level

See [here](https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/job_spec.yaml) for example configuration of environment variable used by the Prefect Kubernetes Agent to launch a job to run flows. (You won't need all of these environment variables, but you can see how they are specified in a Kubernetes YAML file.)
:::

## Further steps
Take this example to the next level by storing your flow in a Docker container and deploying it with Dask on Kubernetes using the excellent [dask-kubernetes](http://kubernetes.dask.org/en/latest/) project! Details are left as an exercise to the reader. ;)
8 changes: 5 additions & 3 deletions src/prefect/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ def __init__(self, api_server: str = None, api_token: str = None):
self._active_tenant_id = None

# store api server
self.api_server = api_server or prefect.config.cloud.get("graphql")
self.api_server = api_server or prefect.context.config.cloud.get("graphql")

# store api token
self._api_token = api_token or prefect.config.cloud.get("auth_token", None)
self._api_token = api_token or prefect.context.config.cloud.get(
"auth_token", None
)

# if no api token was passed, attempt to load state from local storage
if not self._api_token:
Expand Down Expand Up @@ -289,7 +291,7 @@ def _local_settings_path(self) -> Path:
Returns the local settings directory corresponding to the current API servers
"""
path = "{home}/client/{server}".format(
home=prefect.config.home_dir,
home=prefect.context.config.home_dir,
server=slugify(self.api_server, regex_pattern=r"[^-\.a-z0-9]+"),
)
return Path(os.path.expanduser(path)) / "settings.toml"
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/client/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get(self) -> Optional[Any]:
"Secrets should only be retrieved during a Flow run, not while building a Flow."
)

if prefect.config.cloud.use_local_secrets is True:
if prefect.context.config.cloud.use_local_secrets is True:
secrets = prefect.context.get("secrets", {})
try:
value = secrets[self.name]
Expand Down
8 changes: 4 additions & 4 deletions src/prefect/engine/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from distributed import Client, Future, fire_and_forget, worker_client

from prefect import config, context
from prefect import context
from prefect.engine.executors.base import Executor


Expand Down Expand Up @@ -45,13 +45,13 @@ def __init__(
**kwargs: Any
):
if address is None:
address = config.engine.executor.dask.address
address = context.config.engine.executor.dask.address
if address == "local":
address = None
if local_processes is None:
local_processes = config.engine.executor.dask.local_processes
local_processes = context.config.engine.executor.dask.local_processes
if debug is None:
debug = config.debug
debug = context.config.debug
self.address = address
self.local_processes = local_processes
self.debug = debug
Expand Down
1 change: 0 additions & 1 deletion src/prefect/engine/flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import pendulum

import prefect
from prefect import config
from prefect.core import Edge, Flow, Task
from prefect.engine import signals
from prefect.engine.runner import ENDRUN, Runner, call_state_handlers
Expand Down
4 changes: 3 additions & 1 deletion src/prefect/utilities/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def set_temporary_config(temp_config: dict) -> Iterator:
for key, value in temp_config.items():
prefect.config.set_nested(key, value)

yield prefect.config
# ensure the new config is available in context
with prefect.context(config=prefect.config):
yield prefect.config
finally:
prefect.config.__dict__.clear()
prefect.config.__dict__.update(old_config)
15 changes: 12 additions & 3 deletions src/prefect/utilities/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import threading
from typing import Any, Iterator, MutableMapping

from prefect.configuration import config
from prefect.utilities.collections import DotDict
from prefect.configuration import config, Config
from prefect.utilities.collections import DotDict, as_nested_dict, merge_dicts


class Context(DotDict, threading.local):
Expand All @@ -77,6 +77,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
if "context" in config:
self.update(config.context)
if "config" in self:
new_config = merge_dicts(config, self["config"]) # order matters
self["config"] = as_nested_dict(new_config, dct_class=Config)
else:
self["config"] = config

def __repr__(self) -> str:
return "<Context>"
Expand All @@ -93,7 +98,11 @@ def __call__(self, *args: MutableMapping, **kwargs: Any) -> Iterator["Context"]:
"""
previous_context = self.copy()
try:
self.update(*args, **kwargs)
new_context = dict(*args, **kwargs)
if "config" in new_context:
new_config = merge_dicts(self.get("config", {}), new_context["config"])
new_context["config"] = as_nested_dict(new_config, dct_class=Config)
self.update(new_context) # type: ignore
yield self
finally:
self.clear()
Expand Down
18 changes: 10 additions & 8 deletions src/prefect/utilities/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pendulum

import prefect
from prefect.configuration import config
from prefect.utilities.context import context


class CloudHandler(logging.StreamHandler):
Expand All @@ -24,13 +24,17 @@ def __init__(self) -> None:
self.client = None
self.logger = logging.getLogger("CloudHandler")
handler = logging.StreamHandler()
formatter = logging.Formatter(config.logging.format)
formatter = logging.Formatter(context.config.logging.format)
formatter.converter = time.gmtime # type: ignore
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(config.logging.level)
self.logger.setLevel(context.config.logging.level)

def emit(self, record) -> None: # type: ignore
# if we shouldn't log to cloud, don't emit
if not prefect.context.config.logging.log_to_cloud:
return

try:
from prefect.client import Client

Expand Down Expand Up @@ -80,15 +84,13 @@ def configure_logging(testing: bool = False) -> logging.Logger:
name = "prefect-test-logger" if testing else "prefect"
logger = logging.getLogger(name)
handler = logging.StreamHandler()
formatter = logging.Formatter(config.logging.format)
formatter = logging.Formatter(context.config.logging.format)
formatter.converter = time.gmtime # type: ignore
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(config.logging.level)
logger.setLevel(context.config.logging.level)

# send logs to server
if config.logging.log_to_cloud:
logger.addHandler(CloudHandler())
logger.addHandler(CloudHandler())
return logger


Expand Down
8 changes: 8 additions & 0 deletions tests/client/test_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ def test_local_secrets_auto_load_json_strings():
secret.get()


def test_local_secrets_remain_plain_dictionaries():
secret = Secret(name="test")
with set_temporary_config({"cloud.use_local_secrets": True}):
with prefect.context(secrets=dict(test={"x": 42})):
assert isinstance(prefect.context.secrets["test"], dict)
assert secret.get() == {"x": 42}


def test_secrets_raise_if_in_flow_context():
secret = Secret(name="test")
with set_temporary_config({"cloud.use_local_secrets": True}):
Expand Down
83 changes: 83 additions & 0 deletions tests/engine/cloud/test_cloud_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,3 +427,86 @@ def downstream(l):
assert flow_state.is_successful()
assert flow_state.result[inter].result == 10
assert flow_state.result[final].result == 100


def test_cloud_flow_runner_can_successfully_initialize_cloud_task_runners():
"""
After the context refactor wherein config settings were pulled from context.config,
there were various errors related to `prefect.context(self.context.to_dict())`
caused by the Context object not creating a nested DotDict structure. The main
symptom of this was when a CloudFlowRunner submitted a job to a dask worker and an error
was raised: `dict has no attribute cloud`
"""
fr = CloudFlowRunner(flow=prefect.Flow(name="test"))
fr.run_task(
task=MagicMock(),
state=None,
upstream_states=dict(),
context=dict(),
task_runner_state_handlers=[],
executor=None,
)


def test_cloud_task_runners_submitted_to_remote_machines_respect_original_config(
monkeypatch
):
"""
This test is meant to simulate the behavior of running a Cloud Flow against an external
cluster which has _not_ been configured for Prefect. The idea is that the configuration
settings which were present on the original machine are respected in the remote job, reflected
here by having the CloudHandler called during logging and the special values present in context.
"""

class CustomFlowRunner(CloudFlowRunner):
def run_task(self, *args, **kwargs):
with prefect.utilities.configuration.set_temporary_config(
{
"logging.log_to_cloud": False,
"special_key": 99,
"cloud.auth_token": "",
}
):
return super().run_task(*args, **kwargs)

calls = []

class Client(MagicMock):
def write_run_log(self, *args, **kwargs):
calls.append(kwargs)

monkeypatch.setattr("prefect.client.Client", Client)
monkeypatch.setattr("prefect.engine.cloud.task_runner.Client", Client)
monkeypatch.setattr("prefect.engine.cloud.flow_runner.Client", Client)
prefect.utilities.logging.prefect_logger.handlers[-1].client = Client()

@prefect.task
def log_stuff():
logger = prefect.context.get("logger")
logger.critical("important log right here")
return (
prefect.context.config.special_key,
prefect.context.config.cloud.auth_token,
)

with prefect.utilities.configuration.set_temporary_config(
{
"logging.log_to_cloud": True,
"special_key": 42,
"cloud.auth_token": "original",
}
):
# captures config at init
runner = CustomFlowRunner(flow=prefect.Flow("test", tasks=[log_stuff]))
flow_state = runner.run(return_tasks=[log_stuff])

assert flow_state.is_successful()
assert flow_state.result[log_stuff].result == (42, "original")
assert len(calls) == 6

loggers = [c["name"] for c in calls]
assert set(loggers) == {
"prefect.CloudTaskRunner",
"prefect.CustomFlowRunner",
"prefect.Task: log_stuff",
}
60 changes: 60 additions & 0 deletions tests/engine/test_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from prefect.engine.cache_validators import duration_only
from prefect.engine.executors import Executor, LocalExecutor
from prefect.engine.flow_runner import ENDRUN, FlowRunner, FlowRunnerInitializeResult
from prefect.engine.task_runner import TaskRunner
from prefect.engine.result import NoResult, Result
from prefect.engine.state import (
Cached,
Expand Down Expand Up @@ -1480,3 +1481,62 @@ def log_stuff():

assert res.is_successful()
assert len([r for r in caplog.records if r.levelname == "CRITICAL"]) == 1


def test_task_runners_submitted_to_remote_machines_respect_original_config(monkeypatch):
"""
This test is meant to simulate the behavior of running a Cloud Flow against an external
cluster which has _not_ been configured for Prefect. The idea is that the configuration
settings which were present on the original machine are respected in the remote job, reflected
here by having the CloudHandler called during logging and the special values present in context.
"""

class CustomFlowRunner(FlowRunner):
def run_task(self, *args, **kwargs):
with prefect.utilities.configuration.set_temporary_config(
{
"logging.log_to_cloud": False,
"special_key": 99,
"cloud.auth_token": "",
}
):
return super().run_task(*args, **kwargs)

calls = []

class Client:
def write_run_log(self, *args, **kwargs):
calls.append(kwargs)

monkeypatch.setattr("prefect.client.Client", Client)

@prefect.task
def log_stuff():
logger = prefect.context.get("logger")
logger.critical("important log right here")
return (
prefect.context.config.special_key,
prefect.context.config.cloud.auth_token,
)

with prefect.utilities.configuration.set_temporary_config(
{
"logging.log_to_cloud": True,
"special_key": 42,
"cloud.auth_token": "original",
}
):
# captures config at init
runner = CustomFlowRunner(flow=Flow("test", tasks=[log_stuff]))
flow_state = runner.run(return_tasks=[log_stuff])

assert flow_state.is_successful()
assert flow_state.result[log_stuff].result == (42, "original")
assert len(calls) == 6

loggers = [c["name"] for c in calls]
assert set(loggers) == {
"prefect.TaskRunner",
"prefect.CustomFlowRunner",
"prefect.Task: log_stuff",
}
2 changes: 1 addition & 1 deletion tests/engine/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def test_task_runner_puts_checkpointing_in_context(self):
assert "task_tags" not in ctx
with set_temporary_config({"flows.checkpointing": "FOO"}):
result = TaskRunner(Task()).initialize_run(state=None, context=ctx)
assert result.context.checkpointing == "FOO"
assert result.context.checkpointing == "FOO"

def test_task_runner_puts_task_slug_in_context(self):
with prefect.context() as ctx:
Expand Down
Loading

0 comments on commit 2ebf197

Please sign in to comment.