Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow calling Actor.reboot() from migrating handler, align reboot behavior with JS SDK #361

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/03-concepts/04-actor-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ During its runtime, the Actor receives Actor events sent by the Apify platform o
{' '}to another worker server soon.</p>
You can use it to persist the state of the Actor so that once it is executed again on the new server,
it doesn't have to start over from the beginning.
Once you have persisted the state of your Actor, you can call <a href="../../reference/class/Actor#reboot"><code>Actor.reboot()</code></a>
to reboot the Actor and trigger the migration immediately, to speed up the process.
</td>
</tr>
<tr>
Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ cryptography = ">=42.0.0"
# https://github.com/apify/apify-sdk-python/issues/348
httpx = "~0.27.0"
lazy-object-proxy = ">=1.10.0"
more_itertools = ">=10.2.0"
scrapy = { version = ">=2.11.0", optional = true }
typing-extensions = ">=4.1.0"
websockets = ">=10.0 <14.0.0"
Expand Down
28 changes: 25 additions & 3 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast

from lazy_object_proxy import Proxy
from more_itertools import flatten
from pydantic import AliasChoices

from apify_client import ApifyClientAsync
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
from crawlee import service_container
from crawlee.events._types import Event, EventPersistStateData
from crawlee.events._types import Event, EventMigratingData, EventPersistStateData

from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
Expand Down Expand Up @@ -48,6 +49,7 @@ class _ActorType:
_apify_client: ApifyClientAsync
_configuration: Configuration
_is_exiting = False
_is_rebooting = False

def __init__(
self,
Expand Down Expand Up @@ -839,12 +841,32 @@ async def reboot(
self.log.error('Actor.reboot() is only supported when running on the Apify platform.')
return

if self._is_rebooting:
self.log.debug('Actor is already rebooting, skipping the additional reboot call.')
return

self._is_rebooting = True

if not custom_after_sleep:
custom_after_sleep = self._configuration.metamorph_after_sleep

self._event_manager.emit(event=Event.PERSIST_STATE, event_data=EventPersistStateData(is_migrating=True))
# Call all the listeners for the PERSIST_STATE and MIGRATING events, and wait for them to finish.
# PERSIST_STATE listeners are called to allow the Actor to persist its state before the reboot.
# MIGRATING listeners are called to allow the Actor to gracefully stop in-progress tasks before the reboot.
# Typically, crawlers are listening for the MIIGRATING event to stop processing new requests.
# We can't just emit the events and wait for all listeners to finish,
# because this method might be called from an event listener itself, and we would deadlock.
persist_state_listeners = flatten(
(self._event_manager._listeners_to_wrappers[Event.PERSIST_STATE] or {}).values() # noqa: SLF001
)
migrating_listeners = flatten(
(self._event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
)

await self._event_manager.__aexit__(None, None, None)
await asyncio.gather(
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
*[listener(EventMigratingData()) for listener in migrating_listeners],
fnesveda marked this conversation as resolved.
Show resolved Hide resolved
)

if not self._configuration.actor_run_id:
raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.')
Expand Down
Loading