Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: refactor: reduce code duplication in request processing #892

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
145 changes: 92 additions & 53 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,31 +151,77 @@ def _run_request(
return True


def _set_request_cannot_be_run_status(
request_type: ActivationRequest,
process_parent: Union[Activation, EventStream],
status_text: str,
) -> None:
"""Set the request status to indicate it cannot be run on this node.
The request cannot be run on this node due to detected functional issues.
While it cannot be run on this node anothter functional node may be able
to.
"""
status = ActivationStatus.WORKERS_OFFLINE
# If the request is start/auto-start we know the request isn't
# running and can be put into the appropriate pending state.
# If a restart it can also be put in the appropriate pending
# state.
#
# If the request is in one of the pending states that also
# indicates it's not running; this check is for previous
# failures to start that got dispatched from the monitor
# task.
if request_type in [
ActivationRequest.START,
ActivationRequest.AUTO_START,
ActivationRequest.RESTART,
]:
status = ActivationStatus.PENDING

# If the target status is WORKERS_OFFLINE and the process already has that
# status there's nothing to do. For any other situation we need to be
# certain to set the status text.
if (status != ActivationStatus.WORKERS_OFFLINE) or (
process_parent.status != ActivationStatus.WORKERS_OFFLINE
):
status_manager = StatusManager(process_parent)
status_manager.set_status(status, status_text)

# Set the latest instance's status only if there is a latest instance
# and only if the status is WORKERS_OFFLINE.
if status_manager.latest_instance and (
status == ActivationStatus.WORKERS_OFFLINE
):
status_manager.set_latest_instance_status(status, status_text)


def dispatch(
process_parent_type: ProcessParentType,
process_parent_id: int,
request_type: Optional[ActivationRequest],
):
request_type_text = f" type {request_type}" if request_type else ""
# TODO: add "monitor" type to ActivationRequestQueue
if request_type is None:
request_type = "Monitor"

job_id = _manage_process_job_id(process_parent_type, process_parent_id)
LOGGER.info(
"Dispatching request"
f"{request_type_text} for {process_parent_type} {process_parent_id}",
f"Dispatching request type {request_type} for"
f" {process_parent_type} {process_parent_id}",
)
# TODO: add "monitor" type to ActivationRequestQueue
if request_type is None:
request_type = "Monitor"

queue_name = None
enqueable = True

# new processes
if request_type in [
ActivationRequest.START,
ActivationRequest.AUTO_START,
]:
LOGGER.info(
f"Dispatching {process_parent_type}"
f" {process_parent_id} as new process.",
f"Dispatching {process_parent_type} {process_parent_id}"
" as new process.",
)
else:
queue_name = get_queue_name_by_parent_id(
Expand All @@ -192,71 +238,64 @@ def dispatch(
if not queue_name:
LOGGER.info(
"Scheduling request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} to the least busy queue"
"; it is not currently associated with a queue.",
)
else:
LOGGER.info(
"Scheduling request"
f"{request_type_text} for {process_parent_type}"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} to the least busy queue"
f"; its associated queue '{queue_name}' is from"
" previous configuation settings.",
)
queue_name = None

status_msg = (
f"{process_parent_type} {process_parent_id} is in an "
"unknown state. The workers of its associated queue "
f"'{queue_name}' are failing liveness checks. "
"There may be an issue with the node; please contact "
"the administrator."
)
if queue_name and (not check_rulebook_queue_health(queue_name)):
# The queue is unhealthy; log this fact.
# We'll try to find another queue on which to run.
msg = (
f"{process_parent_type} {process_parent_id} is in an"
" unknown state. The workers of its associated queue"
f" '{queue_name}' are failing liveness checks."
" There may be an issue with the node; please contact"
" the administrator."
)
LOGGER.warning(
"Request"
f"{request_type_text} for {process_parent_type}"
f" {process_parent_id} will be run on the least busy queue."
)
queue_name = None
# If it's a restart we'll try a different queue below.
LOGGER.warning(status_msg)

if not queue_name:
if request_type == ActivationRequest.RESTART:
queue_name = None

LOGGER.warning(
"Request"
f" type {request_type} for {process_parent_type}"
f" {process_parent_id} will be run on the least busy queue."
)
else:
enqueable = False

if enqueable and (not queue_name):
try:
queue_name = get_least_busy_queue_name()
except HealthyQueueNotFoundError:
# The are no healthy queues on which to run the request. For any
# request that isn't a restart set its status to "workers offline."
msg = (
"There are no healthy queues on which to run the request"
f"{request_type_text} for {process_parent_type}"
f" {process_parent_id}. There may be an issue with the node"
"; please contact the administrator."
)
LOGGER.warning(
msg,
enqueable = False
status_msg = (
f"The workers for request type {request_type}"
f" for {process_parent_type} {process_parent_id}"
" are failing liveness checks. There may be an"
" issue with the node; please contact the administrator."
)

if request_type not in [
ActivationRequest.RESTART,
]:
status_manager = StatusManager(
get_process_parent(process_parent_type, process_parent_id),
)
status_manager.set_status(
ActivationStatus.WORKERS_OFFLINE,
msg,
)
# There may not be a latest instance.
if status_manager.latest_instance:
status_manager.set_latest_instance_status(
ActivationStatus.WORKERS_OFFLINE,
msg,
)

if queue_name:
if not enqueable:
# The request is not enqueable. Set the request status such that, if
# possible, a functional node can pick it up and run it.
_set_request_cannot_be_run_status(
request_type,
get_process_parent(process_parent_type, process_parent_id),
status_msg,
)
else:
unique_enqueue(
queue_name,
job_id,
Expand Down
4 changes: 2 additions & 2 deletions tools/docker/docker-compose-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ x-environment: &common-env
EDA_SECRET_KEY: ${EDA_SECRET_KEY:-'insecure'}
EDA_DEBUG: ${EDA_DEBUG:-True}
EDA_DB_PASSWORD: ${EDA_DB_PASSWORD:-'secret'}
EDA_RULEBOOK_WORKER_QUEUES: "activation-node1,activation-node2"
EDA_RULEBOOK_WORKER_QUEUES: "jbs-node1,jbs-node2"
EDA_ANSIBLE_BASE_JWT_VALIDATE_CERT: ${EDA_ANSIBLE_BASE_JWT_VALIDATE_CERT:-False}
EDA_ANSIBLE_BASE_JWT_KEY: ${EDA_ANSIBLE_BASE_JWT_KEY:-'https://localhost'}

Expand Down Expand Up @@ -197,7 +197,7 @@ services:
image: "${EDA_IMAGE:-localhost/aap-eda}"
environment:
<<: *common-env
EDA_RULEBOOK_QUEUE_NAME: 'activation-node2'
EDA_RULEBOOK_QUEUE_NAME: 'jbs-node2'
EDA_PODMAN_SOCKET_URL: tcp://podman-node2:8888
command:
- aap-eda-manage
Expand Down