From 840ebbadbff3c268a387c6aef4626851ffc9318e Mon Sep 17 00:00:00 2001 From: Matthias Tafelmeier Date: Fri, 17 Nov 2023 15:40:48 +0100 Subject: [PATCH] breeder - decentralize dependency imports Dependencies are not to be installed on the airflow container anymore, hence we have to resolve dependencies in the tasks that are run on the dask workers. --- breeder/linux_network_stack/effectuation.py | 21 +++++++++++++++++-- .../linux_network_stack/nats_coroutines.py | 3 +++ breeder/linux_network_stack/optimization.py | 13 +++++++++++- breeder/linux_network_stack/root_dag.py | 20 ------------------ 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/breeder/linux_network_stack/effectuation.py b/breeder/linux_network_stack/effectuation.py index e69e6eb6..1852e408 100644 --- a/breeder/linux_network_stack/effectuation.py +++ b/breeder/linux_network_stack/effectuation.py @@ -20,6 +20,8 @@ def create_target_interaction_dag(dag_id, config, target, identifier): @dag.task(task_id="pull_optimization_step") def run_pull_optimization(): + import asyncio + task_logger.debug("Entering") msg = asyncio.run(receive_msg_via_nats(subject=f'effectuation_{identifier}')) @@ -36,7 +38,11 @@ def run_pull_optimization(): def run_aquire_lock(): task_logger.debug("Entering") - dlm_lock = LOCKER.lock(target) + import pals + + locker = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION) + + dlm_lock = locker.lock(target) if not dlm_lock.acquire(acquire_timeout=600): task_logger.debug("Could not aquire lock for {target}") @@ -61,6 +67,12 @@ def run_release_lock(): @dag.task(task_id="push_optimization_step") def run_push_optimization(ti=None): + + import asyncio + from sqlalchemy import create_engine + from sqlalchemy import text + + archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') task_logger.debug("Entering") metric_value = ti.xcom_pull(task_ids="recon_step") @@ -82,7 +94,7 @@ def run_push_optimization(ti=None): bindparam("setting_full", settings_full, type_=String), bindparam("setting_result", metric_data, type_=String)) - ARCHIVE_DB_ENGINE.execute(query) + archive_db_engine.execute(query) task_logger.debug("Done") @@ -92,6 +104,11 @@ def run_push_optimization(ti=None): @dag.task(task_id="recon_step") def run_reconnaissance(): + + from prometheus_api_client import PrometheusConnect, MetricsList, Metric + from prometheus_api_client.utils import parse_datetime + import urllib3 + task_logger.debug("Entering") prom_conn = PrometheusConnect(url=PROMETHEUS_URL, retry=urllib3.util.retry.Retry(total=3, raise_on_status=True, backoff_factor=0.5), diff --git a/breeder/linux_network_stack/nats_coroutines.py b/breeder/linux_network_stack/nats_coroutines.py index dc96a943..6070a8db 100644 --- a/breeder/linux_network_stack/nats_coroutines.py +++ b/breeder/linux_network_stack/nats_coroutines.py @@ -24,6 +24,9 @@ async def send_msg_via_nats(subject=None, data_dict=None): {% endraw %} async def receive_msg_via_nats(subject=None, timeout=300): + import nats + import time + import sys # Connect to NATS Server. nc = await nats.connect(NATS_SERVER_URL) sub = await nc.subscribe(f'{subject}') diff --git a/breeder/linux_network_stack/optimization.py b/breeder/linux_network_stack/optimization.py index cc8d0821..d9b35607 100644 --- a/breeder/linux_network_stack/optimization.py +++ b/breeder/linux_network_stack/optimization.py @@ -9,9 +9,15 @@ def objective(trial, identifier): ###--- end coroutines ---### import logging + from sqlalchemy import create_engine + from sqlalchemy import text + logger = logging.getLogger('objective') logger.setLevel(logging.DEBUG) + + archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') + logger.warning('entering') # Compiling settings for effectuation @@ -35,7 +41,7 @@ def objective(trial, identifier): query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String), bindparam("setting_id", setting_id, type_=String)) - archive_db_data = ARCHIVE_DB_ENGINE.execute(query).fetchall() + archive_db_data = archive_db_engine.execute(query).fetchall() if archive_db_data: is_setting_explored = True @@ -79,6 +85,11 @@ def create_optimization_dag(dag_id, config, identifier): ## perform optimiziation run @dag.task(task_id="optimization_step") def run_optimization(): + import optuna + from optuna.storages import InMemoryStorage + from optuna.integration import DaskStorage + from distributed import Client, wait + __directions = list() for objective in config.get('objectvices'): diff --git a/breeder/linux_network_stack/root_dag.py b/breeder/linux_network_stack/root_dag.py index 3c2709e8..2d492056 100644 --- a/breeder/linux_network_stack/root_dag.py +++ b/breeder/linux_network_stack/root_dag.py @@ -27,25 +27,7 @@ from airflow.utils.dates import days_ago from airflow.decorators import task -import optuna -from optuna.storages import InMemoryStorage -from optuna.integration import DaskStorage -from distributed import Client, wait - -from sqlalchemy import create_engine -from sqlalchemy import text - -from prometheus_api_client import PrometheusConnect, MetricsList, Metric -from prometheus_api_client.utils import parse_datetime from datetime import timedelta -import asyncio - -import pals -import urllib3 - -import nats -import time -import sys import random import logging @@ -86,10 +68,8 @@ DASK_SERVER_ENDPOINT = "{DASK_ENDPOINT}" -ARCHIVE_DB_ENGINE = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}') DLM_DB_CONNECTION = 'postgresql://{DLM_DB_USER}:{DLM_DB_PASSWORD}@{DLM_DB_HOST}/{DLM_DB_DATABASE}' -LOCKER = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION) ###