Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Correct polling infrequent/periodic_sequence usecase #125

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions polling/infrequent/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ class ComposeGreetingInput:
name: str


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await test_service.get_service_result(input)
class ComposeGreeting:
def __init__(self):
self.test_service = TestService()

@activity.defn
async def compose_greeting(self, input: ComposeGreetingInput) -> str:
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await self.test_service.get_service_result(input)
6 changes: 3 additions & 3 deletions polling/infrequent/run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
from temporalio.client import Client
from temporalio.worker import Worker

from polling.infrequent.activities import compose_greeting
from polling.infrequent.activities import ComposeGreeting
from polling.infrequent.workflows import GreetingWorkflow


async def main():
client = await Client.connect("localhost:7233")

activities = ComposeGreeting()
worker = Worker(
client,
task_queue="infrequent-activity-retry-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
activities=[activities.compose_greeting],
)
await worker.run()

Expand Down
6 changes: 3 additions & 3 deletions polling/infrequent/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from temporalio.common import RetryPolicy

with workflow.unsafe.imports_passed_through():
from polling.infrequent.activities import ComposeGreetingInput, compose_greeting
from polling.infrequent.activities import ComposeGreeting, ComposeGreetingInput


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
return await workflow.execute_activity_method(
ComposeGreeting.compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=2),
retry_policy=RetryPolicy(
Expand Down
14 changes: 11 additions & 3 deletions polling/periodic_sequence/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

from temporalio import activity

from polling.test_service import TestService


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
raise RuntimeError("Service is down")
class ComposeGreeting:
def __init__(self):
self.test_service = TestService(error_attempts=23)

@activity.defn
async def compose_greeting(self, input: ComposeGreetingInput) -> str:
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await self.test_service.get_service_result(input)
6 changes: 3 additions & 3 deletions polling/periodic_sequence/run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
from temporalio.client import Client
from temporalio.worker import Worker

from polling.periodic_sequence.activities import compose_greeting
from polling.periodic_sequence.activities import ComposeGreeting
from polling.periodic_sequence.workflows import ChildWorkflow, GreetingWorkflow


async def main():
client = await Client.connect("localhost:7233")

activities = ComposeGreeting()
worker = Worker(
client,
task_queue="periodic-retry-task-queue",
workflows=[GreetingWorkflow, ChildWorkflow],
activities=[compose_greeting],
activities=[activities.compose_greeting],
)
await worker.run()

Expand Down
16 changes: 10 additions & 6 deletions polling/periodic_sequence/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

with workflow.unsafe.imports_passed_through():
from polling.periodic_sequence.activities import (
ComposeGreeting,
ComposeGreetingInput,
compose_greeting,
)

MAX_RETRY_PER_CHILD_FLOW = 10


@workflow.defn
class GreetingWorkflow:
Expand All @@ -26,10 +28,10 @@ async def run(self, name: str) -> str:
class ChildWorkflow:
@workflow.run
async def run(self, name: str) -> str:
for i in range(10):
for i in range(MAX_RETRY_PER_CHILD_FLOW):
try:
return await workflow.execute_activity(
compose_greeting,
return await workflow.execute_activity_method(
ComposeGreeting.compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=4),
retry_policy=RetryPolicy(
Expand All @@ -38,8 +40,10 @@ async def run(self, name: str) -> str:
)

except ActivityError:
workflow.logger.error("Activity failed, retrying in 1 seconds")
workflow.logger.error(
f"Activity failed ({i}/{MAX_RETRY_PER_CHILD_FLOW}), retrying in 1 seconds"
)
await asyncio.sleep(1)
workflow.continue_as_new(name)
workflow.continue_as_new(name)

raise Exception("Polling failed after all attempts")
4 changes: 2 additions & 2 deletions polling/test_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class TestService:
def __init__(self):
def __init__(self, error_attempts: int = 5):
self.try_attempts = 0
self.error_attempts = 5
self.error_attempts = error_attempts

async def get_service_result(self, input):
print(
Expand Down