From 590f0da5d3b751f7d94b5b9f4af4f523faeedb79 Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Mon, 20 Nov 2023 13:46:33 +0100 Subject: [PATCH] have iter conditional to be in optimization dag Reducing effectuation dag to endless loop only to be stopped once dag is to be ended and cleaned up. --- breeder/linux_network_stack/effectuation.py | 30 +---------------- breeder/linux_network_stack/optimization.py | 36 ++++++++++++++++++++- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index be46b878..7cd2a5a3 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -106,41 +106,13 @@ def run_reconnaissance(): ) {% endraw %} - @dag.task(task_id="run_iter_count_step") - def run_iter_count(ti=None): - last_iteration = ti.xcom_pull(task_ids="run_iter_count_step") - current_iteration = last_iteration + 1 if last_iteration else 0 - return current_iteration - - run_iter_count_step = run_iter_count() - - @task.branch(task_id="stopping_decision_step") - def stopping_decision(max_iterations, ti=None): - task_logger.debug("Entering") - current_iteration = ti.xcom_pull(task_ids="run_iter_count_step") - def is_stop_criteria_reached(iteration): - if iteration >= max_iterations: - return True - else: - return False - - task_logger.debug("Done") - if is_stop_criteria_reached(current_iteration): - return "stop_step" - else: - return "continue_step" - - stopping_conditional_step = stopping_decision(config.get('run').get('iterations').get('max')) - continue_step = TriggerDagRunOperator( task_id='continue_step', trigger_dag_id=interaction_dag.dag_id, dag=interaction_dag ) - stop_step = EmptyOperator(task_id="stop_task", dag=interaction_dag) - - pull_step >> effectuation_step >> recon_step >> push_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step] + pull_step >> effectuation_step >> recon_step >> push_step >> continue_step return dag diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index 1b81bae4..2ae865d2 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -47,6 +47,40 @@ def run_optimization(): optimization_step = run_optimization() - noop >> optimization_step + @dag.task(task_id="run_iter_count_step") + def run_iter_count(ti=None): + last_iteration = ti.xcom_pull(task_ids="run_iter_count_step") + current_iteration = last_iteration + 1 if last_iteration else 0 + return current_iteration + + run_iter_count_step = run_iter_count() + + @task.branch(task_id="stopping_decision_step") + def stopping_decision(max_iterations, ti=None): + task_logger.debug("Entering") + current_iteration = ti.xcom_pull(task_ids="run_iter_count_step") + def is_stop_criteria_reached(iteration): + if iteration >= max_iterations: + return True + else: + return False + + task_logger.debug("Done") + if is_stop_criteria_reached(current_iteration): + return "stop_step" + else: + return "continue_step" + + continue_step = TriggerDagRunOperator( + task_id='continue_step', + trigger_dag_id=optimization_dag.dag_id, + dag=optimization_dag + ) + + stopping_conditional_step = stopping_decision(config.get('run').get('iterations').get('max')) + + stop_step = EmptyOperator(task_id="stop_task", dag=optimization_dag) + + optimization_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step] return dag