Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: refactor: reduce code duplication in request processing #892

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions src/aap_eda/core/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ActivationStatus(DjangoStrEnum):
STARTING = "starting"
RUNNING = "running"
PENDING = "pending"
PENDING_WORKERS_OFFLINE = "pending (workers offline)"
jshimkus-rh marked this conversation as resolved.
Show resolved Hide resolved
FAILED = "failed"
STOPPING = "stopping"
STOPPED = "stopped"
Expand Down Expand Up @@ -102,6 +103,9 @@ class DefaultCredentialType(DjangoStrEnum):
# TODO: rename to "RulebookProcessStatus" or "ParentProcessStatus"
ACTIVATION_STATUS_MESSAGE_MAP = {
ActivationStatus.PENDING: "Wait for a worker to be available to start activation", # noqa: E501
ActivationStatus.PENDING_WORKERS_OFFLINE: "Wait for a worker to be available to" # noqa: E501
" start activation (all workers in the" # noqa: E501
" node are offline)", # noqa: E501
ActivationStatus.STARTING: "Worker is starting activation",
ActivationStatus.RUNNING: "Container running activation",
ActivationStatus.STOPPING: "Activation is being disabled",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Generated by Django 4.2.7 on 2024-05-06 18:40

from django.db import migrations, models

import aap_eda.core.enums


class Migration(migrations.Migration):
dependencies = [
("core", "0037_alter_activation_extra_var_and_more"),
]

operations = [
migrations.AlterField(
model_name="activation",
name="status",
field=models.TextField(
choices=[
("starting", "starting"),
("running", "running"),
("pending", "pending"),
("pending (workers offline)", "pending (workers offline)"),
("failed", "failed"),
("stopping", "stopping"),
("stopped", "stopped"),
("deleting", "deleting"),
("completed", "completed"),
("unresponsive", "unresponsive"),
("error", "error"),
("workers offline", "workers offline"),
],
default=aap_eda.core.enums.ActivationStatus["PENDING"],
),
),
migrations.AlterField(
model_name="eventstream",
name="status",
field=models.TextField(
choices=[
("starting", "starting"),
("running", "running"),
("pending", "pending"),
("pending (workers offline)", "pending (workers offline)"),
("failed", "failed"),
("stopping", "stopping"),
("stopped", "stopped"),
("deleting", "deleting"),
("completed", "completed"),
("unresponsive", "unresponsive"),
("error", "error"),
("workers offline", "workers offline"),
],
default=aap_eda.core.enums.ActivationStatus["PENDING"],
),
),
migrations.AlterField(
model_name="rulebookprocess",
name="status",
field=models.TextField(
choices=[
("starting", "starting"),
("running", "running"),
("pending", "pending"),
("pending (workers offline)", "pending (workers offline)"),
("failed", "failed"),
("stopping", "stopping"),
("stopped", "stopped"),
("deleting", "deleting"),
("completed", "completed"),
("unresponsive", "unresponsive"),
("error", "error"),
("workers offline", "workers offline"),
],
default=aap_eda.core.enums.ActivationStatus["PENDING"],
),
),
]
97 changes: 66 additions & 31 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,66 @@ def _run_request(
return True


def _set_request_cannot_be_run_status(
request_type: ActivationRequest,
process_parent_type: ProcessParentType,
process_parent_id: int,
status_text: str,
) -> None:
"""Set the request status to indicate it cannot be run on this node.

The request cannot be run on this node due to detected functional issues.
While it cannot be run on this node anothter functional node can run it.
This method sets the request's state such that other nodes can identify
it as something to be run, if possible. This identification is performed
as part of monitor_rulebook_processes.
"""
status_manager = StatusManager(
get_process_parent(process_parent_type, process_parent_id),
)

status = ActivationStatus.WORKERS_OFFLINE
# If the request is start/auto-start we know the request isn't
# running.
# If the request is in one of the pending states that also
# indicates it's not running; this check is for previous
# failures to start that got dispatched from the monitor
# task.
if (
request_type
in [
ActivationRequest.START,
ActivationRequest.AUTO_START,
]
) or (
status_manager.db_instance.status
in [
ActivationStatus.PENDING,
ActivationStatus.PENDING_WORKERS_OFFLINE,
]
):
status = ActivationStatus.PENDING_WORKERS_OFFLINE

status_manager.set_status(status, status_text)
# There may not be a latest instance.
if status_manager.latest_instance:
status_manager.set_latest_instance_status(status, status_text)


def dispatch(
process_parent_type: ProcessParentType,
process_parent_id: int,
request_type: Optional[ActivationRequest],
):
request_type_text = f" type {request_type}" if request_type else ""
# TODO: add "monitor" type to ActivationRequestQueue
if request_type is None:
request_type = "Monitor"

job_id = _manage_process_job_id(process_parent_type, process_parent_id)
LOGGER.info(
"Dispatching request"
f"{request_type_text} for {process_parent_type} {process_parent_id}",
f"Dispatching request type {request_type} for"
f" {process_parent_type} {process_parent_id}",
)
# TODO: add "monitor" type to ActivationRequestQueue
if request_type is None:
request_type = "Monitor"

queue_name = None
# new processes
Expand All @@ -174,8 +219,8 @@ def dispatch(
ActivationRequest.AUTO_START,
]:
LOGGER.info(
f"Dispatching {process_parent_type}"
f" {process_parent_id} as new process.",
f"Dispatching {process_parent_type} {process_parent_id}"
" as new process.",
)
else:
queue_name = get_queue_name_by_parent_id(
Expand All @@ -192,14 +237,14 @@ def dispatch(
if not queue_name:
LOGGER.info(
"Scheduling request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} to the least busy queue"
"; it is not currently associated with a queue.",
)
else:
LOGGER.info(
"Scheduling request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} to the least busy queue"
f"; its associated queue '{queue_name}' is from"
" previous configuation settings.",
Expand All @@ -218,7 +263,7 @@ def dispatch(
)
LOGGER.warning(
"Request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} will be run on the least busy queue."
)
queue_name = None
Expand All @@ -230,31 +275,20 @@ def dispatch(
# The are no healthy queues on which to run the request. For any
# request that isn't a restart set its status to "workers offline."
msg = (
"There are no healthy queues on which to run the request"
f"{request_type_text} for {process_parent_type}"
f" {process_parent_id}. There may be an issue with the node"
"; please contact the administrator."
f"The workers for request type {request_type}"
f" for {process_parent_type} {process_parent_id}"
" are failing liveness checks. There may be an"
" issue with the node; please contact the administrator."
)
LOGGER.warning(
msg,
)

if request_type not in [
ActivationRequest.RESTART,
]:
status_manager = StatusManager(
get_process_parent(process_parent_type, process_parent_id),
)
status_manager.set_status(
ActivationStatus.WORKERS_OFFLINE,
msg,
)
# There may not be a latest instance.
if status_manager.latest_instance:
status_manager.set_latest_instance_status(
ActivationStatus.WORKERS_OFFLINE,
msg,
)
# Set the request status such that, if possible, a functional
# node can pick it up and run it.
_set_request_cannot_be_run_status(
request_type, process_parent_type, process_parent_id, msg
)

if queue_name:
unique_enqueue(
Expand Down Expand Up @@ -436,6 +470,7 @@ def monitor_rulebook_processes() -> None:
for process in models.RulebookProcess.objects.filter(
status__in=[
ActivationStatus.RUNNING,
ActivationStatus.PENDING_WORKERS_OFFLINE,
ActivationStatus.WORKERS_OFFLINE,
]
):
Expand Down
1 change: 1 addition & 0 deletions tests/integration/api/test_activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ def test_enable_activation(
for state in [
enums.ActivationStatus.COMPLETED,
enums.ActivationStatus.PENDING,
enums.ActivationStatus.PENDING_WORKERS_OFFLINE,
enums.ActivationStatus.STOPPED,
enums.ActivationStatus.FAILED,
]:
Expand Down
1 change: 1 addition & 0 deletions tests/integration/core/test_rulebook_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def test_rulebook_process_save(init_data):
enums.ActivationStatus.UNRESPONSIVE,
enums.ActivationStatus.ERROR,
enums.ActivationStatus.PENDING,
enums.ActivationStatus.PENDING_WORKERS_OFFLINE,
]:
instance.status = status
instance.save(update_fields=["status"])
Expand Down