Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

breeder shift locking to objective #148

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 1 addition & 60 deletions breeder/linux_network_stack/effectuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,6 @@ def run_pull_optimization():

pull_step = run_pull_optimization()


@dag.task(task_id="aquire_lock_step")
def run_aquire_lock():
task_logger.debug("Entering")


locker = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION)

dlm_lock = locker.lock(target)

if not dlm_lock.acquire(acquire_timeout=600):
task_logger.debug("Could not aquire lock for {target}")

return dlm_lock

aquire_lock_step = run_aquire_lock()


@dag.task(task_id="release_lock_step")
def run_release_lock():
task_logger.debug("Entering")

dlm_lock = ti.xcom_pull(task_ids="aquire_lock_step")

dlm_lock.release()

return dlm_lock

release_lock_step = run_release_lock()


@dag.task(task_id="push_optimization_step")
def run_push_optimization(ti=None):

Expand Down Expand Up @@ -137,41 +106,13 @@ def run_reconnaissance():
)
{% endraw %}

@dag.task(task_id="run_iter_count_step")
def run_iter_count(ti=None):
last_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
current_iteration = last_iteration + 1 if last_iteration else 0
return current_iteration

run_iter_count_step = run_iter_count()

@task.branch(task_id="stopping_decision_step")
def stopping_decision(max_iterations, ti=None):
task_logger.debug("Entering")
current_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
def is_stop_criteria_reached(iteration):
if iteration >= max_iterations:
return True
else:
return False

task_logger.debug("Done")
if is_stop_criteria_reached(current_iteration):
return "stop_step"
else:
return "continue_step"

stopping_conditional_step = stopping_decision(config.get('run').get('iterations').get('max'))

continue_step = TriggerDagRunOperator(
task_id='continue_step',
trigger_dag_id=interaction_dag.dag_id,
dag=interaction_dag
)

stop_step = EmptyOperator(task_id="stop_task", dag=interaction_dag)

pull_step >> aquire_lock_step >> effectuation_step >> recon_step >> release_lock_step >> push_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step]
pull_step >> effectuation_step >> recon_step >> push_step >> continue_step

return dag

17 changes: 16 additions & 1 deletion breeder/linux_network_stack/objective.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


def objective(trial, identifier, archive_db_url, breeder_name):
def objective(trial, identifier, archive_db_url, locking_db_url, breeder_name):

###--- definition coroutines ---###
### We have to keep to coroutines in the objective function,
Expand Down Expand Up @@ -46,10 +46,25 @@ def objective(trial, identifier, archive_db_url, breeder_name):
if not is_setting_explored:
logger.warning('doing effectuation')
settings_data = dict(settings=settings)

# get lock to gate other objective runs
locker = pals.Locker('network_breeder_effectuation', locking_db_url)

dlm_lock = locker.lock(target)

if not dlm_lock.acquire(acquire_timeout=600):
task_logger.debug("Could not aquire lock for {target}")


asyncio.run(send_msg_via_nats(subject=f'effectuation_{identifier}', data_dict=settings_data))

logger.warning('gathering recon')
metric = json.loads(asyncio.run(receive_msg_via_nats(subject=f'recon_{identifier}')))


# release lock to let other objective runs effectuation
dlm_lock.release()

metric_value = metric.get('metric')
rtt = float(metric_value['tcp_rtt'])
delivery_rate = float(metric_value['tcp_delivery_rate_bytes'])
Expand Down
39 changes: 37 additions & 2 deletions breeder/linux_network_stack/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def run_optimization():
###--- end coroutines ---###

archive_db_url = f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}'

__directions = list()

for __objective in config.get('objectives'):
Expand All @@ -37,7 +38,7 @@ def run_optimization():
# Create a study using Dask-compatible storage
storage = DaskStorage(InMemoryStorage())
study = optuna.create_study(directions=__directions, storage=storage)
objective_wrapped = lambda trial: objective(trial,identifier, archive_db_url, config.get('name'))
objective_wrapped = lambda trial: objective(trial,identifier, archive_db_url, DLM_DB_CONNECTION,config.get('name'))
# Optimize in parallel on your Dask cluster
futures = [
client.submit(study.optimize, objective_wrapped, n_trials=10, pure=False)
Expand All @@ -46,6 +47,40 @@ def run_optimization():

optimization_step = run_optimization()

noop >> optimization_step
@dag.task(task_id="run_iter_count_step")
def run_iter_count(ti=None):
last_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
current_iteration = last_iteration + 1 if last_iteration else 0
return current_iteration

run_iter_count_step = run_iter_count()

@task.branch(task_id="stopping_decision_step")
def stopping_decision(max_iterations, ti=None):
task_logger.debug("Entering")
current_iteration = ti.xcom_pull(task_ids="run_iter_count_step")
def is_stop_criteria_reached(iteration):
if iteration >= max_iterations:
return True
else:
return False

task_logger.debug("Done")
if is_stop_criteria_reached(current_iteration):
return "stop_step"
else:
return "continue_step"

continue_step = TriggerDagRunOperator(
task_id='continue_step',
trigger_dag_id=optimization_dag.dag_id,
dag=optimization_dag
)

stopping_conditional_step = stopping_decision(config.get('run').get('iterations').get('max'))

stop_step = EmptyOperator(task_id="stop_task", dag=optimization_dag)

optimization_step >> run_iter_count_step >> stopping_conditional_step >> [continue_step, stop_step]

return dag