Skip to content

Commit

Permalink
fix: allow calling Actor.reboot() from migrating handler, align reb…
Browse files Browse the repository at this point in the history
…oot behavior with JS SDK (#361)

This fixes several issues with the `Actor.reboot()` behavior:
- `Actor.reboot()` waits for all event handlers to finish, but if itself
it was called in an event handler, it would be waiting for itself,
getting into a deadlock
- `Actor.reboot()` in the JS SDK triggers event handlers for the
`migrating` and `persistState` events, but in the Python SDK it was
triggering only the `persistState` handlers

This aligns the behavior to work like the JS SDK, and prevents reboot
getting into an infinite loop by allowing it to be called only once.

Related PR in JS SDK: apify/apify-sdk-js#345
  • Loading branch information
fnesveda authored Dec 20, 2024
1 parent 2d4b8d0 commit 7ba0221
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
2 changes: 2 additions & 0 deletions docs/03-concepts/04-actor-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ During its runtime, the Actor receives Actor events sent by the Apify platform o
{' '}to another worker server soon.</p>
You can use it to persist the state of the Actor so that once it is executed again on the new server,
it doesn't have to start over from the beginning.
Once you have persisted the state of your Actor, you can call <a href="../../reference/class/Actor#reboot"><code>Actor.reboot()</code></a>
to reboot the Actor and trigger the migration immediately, to speed up the process.
</td>
</tr>
<tr>
Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ cryptography = ">=42.0.0"
# https://github.com/apify/apify-sdk-python/issues/348
httpx = "~0.27.0"
lazy-object-proxy = ">=1.10.0"
more_itertools = ">=10.2.0"
scrapy = { version = ">=2.11.0", optional = true }
typing-extensions = ">=4.1.0"
websockets = ">=10.0 <14.0.0"
Expand Down
28 changes: 25 additions & 3 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast

from lazy_object_proxy import Proxy
from more_itertools import flatten
from pydantic import AliasChoices

from apify_client import ApifyClientAsync
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
from crawlee import service_container
from crawlee.events._types import Event, EventPersistStateData
from crawlee.events._types import Event, EventMigratingData, EventPersistStateData

from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
Expand Down Expand Up @@ -48,6 +49,7 @@ class _ActorType:
_apify_client: ApifyClientAsync
_configuration: Configuration
_is_exiting = False
_is_rebooting = False

def __init__(
self,
Expand Down Expand Up @@ -839,12 +841,32 @@ async def reboot(
self.log.error('Actor.reboot() is only supported when running on the Apify platform.')
return

if self._is_rebooting:
self.log.debug('Actor is already rebooting, skipping the additional reboot call.')
return

self._is_rebooting = True

if not custom_after_sleep:
custom_after_sleep = self._configuration.metamorph_after_sleep

self._event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=True))
# Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
# PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot.
# MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot.
# Typically, crawlers are listening for the MIIGRATING event to stop processing new requests.
# We can't just emit the events and wait for all listeners to finish,
# because this method might be called from an event listener itself, and we would deadlock.
persist_state_listeners = flatten(
(self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001
)
migrating_listeners = flatten(
(self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)

await self._event_manager.__aexit__(None, None, None)
await asyncio.gather(
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
*[listener(EventMigratingData()) for listener in migrating_listeners],
)

if not self._configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')
Expand Down

0 comments on commit 7ba0221

Please sign in to comment.