Skip to content

Commit

Permalink
fix: [AAP-23378] add PENDING_WORKERS_OFFLINE activation status
Browse files Browse the repository at this point in the history
This provides the agreed upon distinction between
- PENDING
    a normal occurrence of a request not yet running
- PENDING_WORKERS_OFFLINE
    an abnormal occurrence where the request has not yet run and cannot run due
    to lack of workers; note that this includes auto-start events where we
    definitively know the request's state
- WORKERS_OFFLINE
    an abnormal occurrence where we do not know the request's state

PENDING_WORKERS_OFFLINE maintains the fail-over functionality previously dependent
solely on WORKERS_OFFLINE.

For consistency with other log messages from the processing code the content of the message indicating issues with the workers has been modified from that documented in AAP-23378.  It is worded as follows:

    The workers available to process request type {request_type} for
    {process_parent_type} {process_parent_id} are failing liveness checks.
    There may be an issue with the node; please contact the administrator.

Orchestrator log messages were updated due to previous change that specifically set
the request type if none was specified.
  • Loading branch information
jshimkus-rh committed May 6, 2024
1 parent c45ea01 commit 934ee20
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 31 deletions.
4 changes: 4 additions & 0 deletions src/aap_eda/core/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ActivationStatus(DjangoStrEnum):
STARTING = "starting"
RUNNING = "running"
PENDING = "pending"
PENDING_WORKERS_OFFLINE = "pending (workers offline)"
FAILED = "failed"
STOPPING = "stopping"
STOPPED = "stopped"
Expand Down Expand Up @@ -102,6 +103,9 @@ class DefaultCredentialType(DjangoStrEnum):
# TODO: rename to "RulebookProcessStatus" or "ParentProcessStatus"
ACTIVATION_STATUS_MESSAGE_MAP = {
ActivationStatus.PENDING: "Wait for a worker to be available to start activation", # noqa: E501
ActivationStatus.PENDING_WORKERS_OFFLINE: "Wait for a worker to be available to" # noqa: E501
" start activation (all workers in the" # noqa: E501
" node are offline)", # noqa: E501
ActivationStatus.STARTING: "Worker is starting activation",
ActivationStatus.RUNNING: "Container running activation",
ActivationStatus.STOPPING: "Activation is being disabled",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Generated by Django 4.2.7 on 2024-05-06 18:40

from django.db import migrations, models

import aap_eda.core.enums


class Migration(migrations.Migration):
dependencies = [
("core", "0037_alter_activation_extra_var_and_more"),
]

operations = [
migrations.AlterField(
model_name="activation",
name="status",
field=models.TextField(
choices=[
("starting", "starting"),
("running", "running"),
("pending", "pending"),
("pending (workers offline)", "pending (workers offline)"),
("failed", "failed"),
("stopping", "stopping"),
("stopped", "stopped"),
("deleting", "deleting"),
("completed", "completed"),
("unresponsive", "unresponsive"),
("error", "error"),
("workers offline", "workers offline"),
],
default=aap_eda.core.enums.ActivationStatus["PENDING"],
),
),
migrations.AlterField(
model_name="eventstream",
name="status",
field=models.TextField(
choices=[
("starting", "starting"),
("running", "running"),
("pending", "pending"),
("pending (workers offline)", "pending (workers offline)"),
("failed", "failed"),
("stopping", "stopping"),
("stopped", "stopped"),
("deleting", "deleting"),
("completed", "completed"),
("unresponsive", "unresponsive"),
("error", "error"),
("workers offline", "workers offline"),
],
default=aap_eda.core.enums.ActivationStatus["PENDING"],
),
),
migrations.AlterField(
model_name="rulebookprocess",
name="status",
field=models.TextField(
choices=[
("starting", "starting"),
("running", "running"),
("pending", "pending"),
("pending (workers offline)", "pending (workers offline)"),
("failed", "failed"),
("stopping", "stopping"),
("stopped", "stopped"),
("deleting", "deleting"),
("completed", "completed"),
("unresponsive", "unresponsive"),
("error", "error"),
("workers offline", "workers offline"),
],
default=aap_eda.core.enums.ActivationStatus["PENDING"],
),
),
]
76 changes: 45 additions & 31 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,15 @@ def dispatch(
process_parent_id: int,
request_type: Optional[ActivationRequest],
):
request_type_text = f" type {request_type}" if request_type else ""
# TODO: add "monitor" type to ActivationRequestQueue
if request_type is None:
request_type = "Monitor"

job_id = _manage_process_job_id(process_parent_type, process_parent_id)
LOGGER.info(
"Dispatching request"
f"{request_type_text} for {process_parent_type} {process_parent_id}",
f"Dispatching request type {request_type} for"
f" {process_parent_type} {process_parent_id}",
)
# TODO: add "monitor" type to ActivationRequestQueue
if request_type is None:
request_type = "Monitor"

queue_name = None
# new processes
Expand All @@ -174,8 +173,8 @@ def dispatch(
ActivationRequest.AUTO_START,
]:
LOGGER.info(
f"Dispatching {process_parent_type}"
f" {process_parent_id} as new process.",
f"Dispatching {process_parent_type} {process_parent_id}"
" as new process.",
)
else:
queue_name = get_queue_name_by_parent_id(
Expand All @@ -192,14 +191,14 @@ def dispatch(
if not queue_name:
LOGGER.info(
"Scheduling request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} to the least busy queue"
"; it is not currently associated with a queue.",
)
else:
LOGGER.info(
"Scheduling request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} to the least busy queue"
f"; its associated queue '{queue_name}' is from"
" previous configuation settings.",
Expand All @@ -218,7 +217,7 @@ def dispatch(
)
LOGGER.warning(
"Request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} will be run on the least busy queue."
)
queue_name = None
Expand All @@ -230,31 +229,45 @@ def dispatch(
# The are no healthy queues on which to run the request. For any
# request that isn't a restart set its status to "workers offline."
msg = (
"There are no healthy queues on which to run the request"
f"{request_type_text} for {process_parent_type}"
f" {process_parent_id}. There may be an issue with the node"
"; please contact the administrator."
f"The workers available to process request type {request_type}"
f" for {process_parent_type} {process_parent_id}"
" are failing liveness checks. There may be an issue with the"
" node; please contact the administrator."
)
LOGGER.warning(
msg,
)

if request_type not in [
ActivationRequest.RESTART,
]:
status_manager = StatusManager(
get_process_parent(process_parent_type, process_parent_id),
)
status_manager.set_status(
ActivationStatus.WORKERS_OFFLINE,
msg,
)
# There may not be a latest instance.
if status_manager.latest_instance:
status_manager.set_latest_instance_status(
ActivationStatus.WORKERS_OFFLINE,
msg,
)
status_manager = StatusManager(
get_process_parent(process_parent_type, process_parent_id),
)

status = ActivationStatus.WORKERS_OFFLINE
# If the request is start/auto-start we know the request isn't
# running.
# If the request is in one of the pending states that also
# indicates it's not running; this check is for previous
# failures to start that got dispatched from the monitor
# task.
if (
request_type
in [
ActivationRequest.START,
ActivationRequest.AUTO_START,
]
) or (
status_manager.db_instance.status
in [
ActivationStatus.PENDING,
ActivationStatus.PENDING_WORKERS_OFFLINE,
]
):
status = ActivationStatus.PENDING_WORKERS_OFFLINE

status_manager.set_status(status, msg)
# There may not be a latest instance.
if status_manager.latest_instance:
status_manager.set_latest_instance_status(status, msg)

if queue_name:
unique_enqueue(
Expand Down Expand Up @@ -436,6 +449,7 @@ def monitor_rulebook_processes() -> None:
for process in models.RulebookProcess.objects.filter(
status__in=[
ActivationStatus.RUNNING,
ActivationStatus.PENDING_WORKERS_OFFLINE,
ActivationStatus.WORKERS_OFFLINE,
]
):
Expand Down
1 change: 1 addition & 0 deletions tests/integration/api/test_activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ def test_enable_activation(
for state in [
enums.ActivationStatus.COMPLETED,
enums.ActivationStatus.PENDING,
enums.ActivationStatus.PENDING_WORKERS_OFFLINE,
enums.ActivationStatus.STOPPED,
enums.ActivationStatus.FAILED,
]:
Expand Down
1 change: 1 addition & 0 deletions tests/integration/core/test_rulebook_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def test_rulebook_process_save(init_data):
enums.ActivationStatus.UNRESPONSIVE,
enums.ActivationStatus.ERROR,
enums.ActivationStatus.PENDING,
enums.ActivationStatus.PENDING_WORKERS_OFFLINE,
]:
instance.status = status
instance.save(update_fields=["status"])
Expand Down

0 comments on commit 934ee20

Please sign in to comment.