Skip to content

Commit

Permalink
have iter conditional to be in optimization dag
Browse files Browse the repository at this point in the history
Reducing effectuation dag to endless loop only to be stopped once dag is
to be ended and cleaned up.
  • Loading branch information
cherusk committed Nov 20, 2023
1 parent 30f33f1 commit 590f0da
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
30 changes: 1 addition & 29 deletions breeder/linux_network_stack/effectuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,41 +106,13 @@ def run_reconnaissance():
)
{% endraw %}

@dag.task(task_id="run_iter_count_step")
def run_iter_count(ti=None):
last_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
current_iteration = last_iteration + 1 if last_iteration else 0
return current_iteration

run_iter_count_step = run_iter_count()

@task.branch(task_id="stopping_decision_step")
def stopping_decision(max_iterations, ti=None):
task_logger.debug("Entering")
current_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
def is_stop_criteria_reached(iteration):
if iteration >= max_iterations:
return True
else:
return False

task_logger.debug("Done")
if is_stop_criteria_reached(current_iteration):
return "stop_step"
else:
return "continue_step"

stopping_conditional_step = stopping_decision(config.get('run').get('iterations').get('max'))

continue_step = TriggerDagRunOperator(
task_id='continue_step',
trigger_dag_id=interaction_dag.dag_id,
dag=interaction_dag
)

stop_step = EmptyOperator(task_id="stop_task", dag=interaction_dag)

pull_step >> effectuation_step >> recon_step >> push_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step]
pull_step >> effectuation_step >> recon_step >> push_step >> continue_step

return dag

36 changes: 35 additions & 1 deletion breeder/linux_network_stack/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,40 @@ def run_optimization():

optimization_step = run_optimization()

noop >> optimization_step
@dag.task(task_id="run_iter_count_step")
def run_iter_count(ti=None):
last_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
current_iteration = last_iteration + 1 if last_iteration else 0
return current_iteration

run_iter_count_step = run_iter_count()

@task.branch(task_id="stopping_decision_step")
def stopping_decision(max_iterations, ti=None):
task_logger.debug("Entering")
current_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
def is_stop_criteria_reached(iteration):
if iteration >= max_iterations:
return True
else:
return False

task_logger.debug("Done")
if is_stop_criteria_reached(current_iteration):
return "stop_step"
else:
return "continue_step"

continue_step = TriggerDagRunOperator(
task_id='continue_step',
trigger_dag_id=optimization_dag.dag_id,
dag=optimization_dag
)

stopping_conditional_step = stopping_decision(config.get('run').get('iterations').get('max'))

stop_step = EmptyOperator(task_id="stop_task", dag=optimization_dag)

optimization_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step]

return dag

0 comments on commit 590f0da

Please sign in to comment.