Skip to content

Commit

Permalink
breeder - decentralize dependency imports
Browse files Browse the repository at this point in the history
Dependencies are not to be installed on the airflow container anymore,
hence we have to resolve dependencies in the tasks that are run on the
dask workers.
  • Loading branch information
cherusk committed Nov 17, 2023
1 parent b2d5555 commit 840ebba
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 23 deletions.
21 changes: 19 additions & 2 deletions breeder/linux_network_stack/effectuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def create_target_interaction_dag(dag_id, config, target, identifier):

@dag.task(task_id="pull_optimization_step")
def run_pull_optimization():
import asyncio

task_logger.debug("Entering")

msg = asyncio.run(receive_msg_via_nats(subject=f'effectuation_{identifier}'))
Expand All @@ -36,7 +38,11 @@ def run_pull_optimization():
def run_aquire_lock():
task_logger.debug("Entering")

dlm_lock = LOCKER.lock(target)
import pals

locker = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION)

dlm_lock = locker.lock(target)

if not dlm_lock.acquire(acquire_timeout=600):
task_logger.debug("Could not aquire lock for {target}")
Expand All @@ -61,6 +67,12 @@ def run_release_lock():

@dag.task(task_id="push_optimization_step")
def run_push_optimization(ti=None):

import asyncio
from sqlalchemy import create_engine
from sqlalchemy import text

archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}')
task_logger.debug("Entering")

metric_value = ti.xcom_pull(task_ids="recon_step")
Expand All @@ -82,7 +94,7 @@ def run_push_optimization(ti=None):
bindparam("setting_full", settings_full, type_=String),
bindparam("setting_result", metric_data, type_=String))

ARCHIVE_DB_ENGINE.execute(query)
archive_db_engine.execute(query)

task_logger.debug("Done")

Expand All @@ -92,6 +104,11 @@ def run_push_optimization(ti=None):

@dag.task(task_id="recon_step")
def run_reconnaissance():

from prometheus_api_client import PrometheusConnect, MetricsList, Metric
from prometheus_api_client.utils import parse_datetime
import urllib3

task_logger.debug("Entering")
prom_conn = PrometheusConnect(url=PROMETHEUS_URL,
retry=urllib3.util.retry.Retry(total=3, raise_on_status=True, backoff_factor=0.5),
Expand Down
3 changes: 3 additions & 0 deletions breeder/linux_network_stack/nats_coroutines.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ async def send_msg_via_nats(subject=None, data_dict=None):
{% endraw %}

async def receive_msg_via_nats(subject=None, timeout=300):
import nats
import time
import sys
# Connect to NATS Server.
nc = await nats.connect(NATS_SERVER_URL)
sub = await nc.subscribe(f'{subject}')
Expand Down
13 changes: 12 additions & 1 deletion breeder/linux_network_stack/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ def objective(trial, identifier):
###--- end coroutines ---###

import logging
from sqlalchemy import create_engine
from sqlalchemy import text

logger = logging.getLogger('objective')
logger.setLevel(logging.DEBUG)


archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}')

logger.warning('entering')

# Compiling settings for effectuation
Expand All @@ -35,7 +41,7 @@ def objective(trial, identifier):
query = query.bindparams(bindparam("table_name", breeder_table_name, type_=String),
bindparam("setting_id", setting_id, type_=String))

archive_db_data = ARCHIVE_DB_ENGINE.execute(query).fetchall()
archive_db_data = archive_db_engine.execute(query).fetchall()

if archive_db_data:
is_setting_explored = True
Expand Down Expand Up @@ -79,6 +85,11 @@ def create_optimization_dag(dag_id, config, identifier):
## perform optimiziation run
@dag.task(task_id="optimization_step")
def run_optimization():
import optuna
from optuna.storages import InMemoryStorage
from optuna.integration import DaskStorage
from distributed import Client, wait

__directions = list()

for objective in config.get('objectvices'):
Expand Down
20 changes: 0 additions & 20 deletions breeder/linux_network_stack/root_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,7 @@
from airflow.utils.dates import days_ago
from airflow.decorators import task

import optuna
from optuna.storages import InMemoryStorage
from optuna.integration import DaskStorage
from distributed import Client, wait

from sqlalchemy import create_engine
from sqlalchemy import text

from prometheus_api_client import PrometheusConnect, MetricsList, Metric
from prometheus_api_client.utils import parse_datetime
from datetime import timedelta
import asyncio

import pals
import urllib3

import nats
import time
import sys

import random
import logging
Expand Down Expand Up @@ -86,10 +68,8 @@

DASK_SERVER_ENDPOINT = "{DASK_ENDPOINT}"

ARCHIVE_DB_ENGINE = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}')

DLM_DB_CONNECTION = 'postgresql://{DLM_DB_USER}:{DLM_DB_PASSWORD}@{DLM_DB_HOST}/{DLM_DB_DATABASE}'
LOCKER = pals.Locker('network_breeder_effectuation', DLM_DB_CONNECTION)

###

Expand Down

0 comments on commit 840ebba

Please sign in to comment.