diff --git a/docs/03-concepts/04-actor-events.mdx b/docs/03-concepts/04-actor-events.mdx index 575c37e3..91ad0695 100644 --- a/docs/03-concepts/04-actor-events.mdx +++ b/docs/03-concepts/04-actor-events.mdx @@ -40,6 +40,8 @@ During its runtime, the Actor receives Actor events sent by the Apify platform o {' '}to another worker server soon.

You can use it to persist the state of the Actor so that once it is executed again on the new server, it doesn't have to start over from the beginning. + Once you have persisted the state of your Actor, you can call Actor.reboot() + to reboot the Actor and trigger the migration immediately, to speed up the process. diff --git a/poetry.lock b/poetry.lock index 11af6b24..443ee6e7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "annotated-types" @@ -3537,4 +3537,4 @@ scrapy = ["scrapy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "008371392c5d2baf886b2529e3227434280d1d37122f0fee6a19e53451682fbb" +content-hash = "5f0773c951bd13de37603ebfe9d55788c09c9b50d5c1ed2370abbafe19242e76" diff --git a/pyproject.toml b/pyproject.toml index e5638dcd..1d562d32 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ cryptography = ">=42.0.0" # https://github.com/apify/apify-sdk-python/issues/348 httpx = "~0.27.0" lazy-object-proxy = ">=1.10.0" +more_itertools = ">=10.2.0" scrapy = { version = ">=2.11.0", optional = true } typing-extensions = ">=4.1.0" websockets = ">=10.0 <14.0.0" diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 4f076a7a..7c84e510 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -7,13 +7,14 @@ from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast from lazy_object_proxy import Proxy +from more_itertools import flatten from pydantic import AliasChoices from apify_client import ApifyClientAsync from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value from crawlee import service_container -from crawlee.events._types import Event, EventPersistStateData +from crawlee.events._types import Event, EventMigratingData, EventPersistStateData from apify._configuration import Configuration from apify._consts import EVENT_LISTENERS_TIMEOUT @@ -48,6 +49,7 @@ class _ActorType: _apify_client: ApifyClientAsync _configuration: Configuration _is_exiting = False + _is_rebooting = False def __init__( self, @@ -839,12 +841,32 @@ async def reboot( self.log.error('Actor.reboot() is only supported when running on the Apify platform.') return + if self._is_rebooting: + self.log.debug('Actor is already rebooting, skipping the additional reboot call.') + return + + self._is_rebooting = True + if not custom_after_sleep: custom_after_sleep = self._configuration.metamorph_after_sleep - self._event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=True)) + # Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish. + # PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot. + # MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot. + # Typically, crawlers are listening for the MIIGRATING event to stop processing new requests. + # We can't just emit the events and wait for all listeners to finish, + # because this method might be called from an event listener itself, and we would deadlock. + persist_state_listeners = flatten( + (self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001 + ) + migrating_listeners = flatten( + (self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 + ) - await self._event_manager.__aexit__(None, None, None) + await asyncio.gather( + *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], + *[listener(EventMigratingData()) for listener in migrating_listeners], + ) if not self._configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')