Skip to content

Commit

Permalink
Persist failed data at random locations (#15991)
Browse files Browse the repository at this point in the history
  • Loading branch information
cicdw authored Nov 12, 2024
1 parent a876770 commit 02b99f0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
7 changes: 5 additions & 2 deletions src/prefect/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import sys
import traceback
import uuid
import warnings
from collections import Counter
from types import GeneratorType, TracebackType
Expand Down Expand Up @@ -220,7 +221,8 @@ async def exception_to_crashed_state(
)

if result_store:
data = result_store.create_result_record(exc)
key = uuid.uuid4().hex
data = result_store.create_result_record(exc, key=key)
else:
# Attach the exception for local usage, will not be available when retrieved
# from the API
Expand Down Expand Up @@ -253,7 +255,8 @@ async def exception_to_failed_state(
pass

if result_store:
data = result_store.create_result_record(exc)
key = uuid.uuid4().hex
data = result_store.create_result_record(exc, key=key)
if write_result:
try:
await result_store.apersist_result_record(data)
Expand Down
37 changes: 37 additions & 0 deletions tests/results/test_task_results.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from uuid import UUID

import pytest

Expand Down Expand Up @@ -262,6 +263,42 @@ def bar():
assert task_state.data.metadata.storage_key == "test"


async def test_task_failure_is_persisted_randomly(
prefect_client, tmp_path, events_pipeline
):
"""
If we use the result storage key it can interfere with proper caching.
"""
storage = LocalFileSystem(basepath=tmp_path / "test-storage")
await storage.save("tmp-test-storage")

@flow
def foo():
return bar(return_state=True)

@task(result_storage=storage, persist_result=True, result_storage_key="not-a-uuid")
def bar():
raise ValueError("oops I messed up")

flow_state = foo(return_state=True)
task_state = await flow_state.result(raise_on_failure=False)
assert task_state.is_failed()
with pytest.raises(ValueError, match="oops I messed up"):
await task_state.result()

assert UUID(task_state.data.metadata.storage_key)

await events_pipeline.process_events()

api_state = (
await prefect_client.read_task_run(task_state.state_details.task_run_id)
).state
with pytest.raises(ValueError, match="oops I messed up"):
await api_state.result()

assert UUID(task_state.data.metadata.storage_key)


async def test_task_result_parameter_formatted_storage_key(
prefect_client, tmp_path, events_pipeline
):
Expand Down

0 comments on commit 02b99f0

Please sign in to comment.