Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

further results from integrating cooperative optimization #152

Merged
merged 18 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_stack.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
run: >
source /etc/bashrc;
mask --maskfile "${MASK_FILE}" infra create machines;
sleep 30;
sleep 50;
mask --maskfile "${MASK_FILE}" config generate prometheus;
mask --maskfile "${MASK_FILE}" config generate breeder "./examples/network_gen.yml";
mask --maskfile "${MASK_FILE}" infra create network;
Expand Down
25 changes: 0 additions & 25 deletions Dockerfile-flowgrind

This file was deleted.

13 changes: 8 additions & 5 deletions api/archive_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def create_breeder_table(table_name=None):
(
setting_id bpchar NOT NULL,
setting_full jsonb NOT NULL,
setting_result FLOAT NOT NULL,
setting_result jsonb NOT NULL,
PRIMARY KEY (setting_id HASH)
);
"""
Expand Down Expand Up @@ -100,12 +100,15 @@ def create_procedure(procedure_name=None, probability=1.0, source_table_name=Non
random_value = random();

IF random_value < {probability} THEN
INSERT INTO {target_table_name} (target_table_setting_id, target_table_setting_full, target_table_setting_result)
SELECT source_table_setting_id, source_table_setting_full, source_table_setting_result FROM {source_table_name}
ON CONFLICT
DO UPDATE SET target_table_setting_result = source_table_setting_result WHERE target_table_setting_result < source_table_setting_result;
INSERT INTO {target_table_name} AS target_table (setting_id, setting_full, setting_result)
SELECT setting_id, setting_full, setting_result FROM {source_table_name}
ON CONFLICT (setting_id)
DO UPDATE SET setting_result = excluded.setting_result WHERE (excluded.setting_result->>0 < target_table.setting_result->>0 AND
excluded.setting_result->>1 > target_table.setting_result->>1);
END IF;

RETURN NULL;

END;
$$ LANGUAGE plpgsql;
"""
Expand Down
15 changes: 2 additions & 13 deletions breeder/linux_network_stack/effectuation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,15 @@ def run_pull_optimization():
@dag.task(task_id="push_optimization_step")
def run_push_optimization(ti=None):

archive_db_engine = create_engine(f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}')
task_logger.debug("Entering")

metric_value = ti.xcom_pull(task_ids="recon_step")
settings_full = ti.xcom_pull(task_ids="pull_optimization_step")

setting_id = hashlib.sha256(str.encode(settings_full)).hexdigest()[0:6]

task_logger.debug(f"Metric : f{metric_value}")

metric_data = dict(metric=metric_value)
msg = asyncio.run(send_msg_via_nats(subject=f'recon_{identifier}', data_dict=metric_data))

breeder_table_name = config.get("name")

query = f"INSERT INTO {breeder_table_name} VALUES ({setting_id}, {setting_full}, {setting_result});"

archive_db_engine.execute(query)

task_logger.debug("Done")

return msg
Expand All @@ -71,10 +61,9 @@ def run_reconnaissance():

if recon_service_type == 'prometheus':
recon_query = objective.get('reconaissance').get('query')
query_name = objective.get('reconaissance').get('name')
query_string = query.get('query')
query_name = objective.get('name')

query_result = prom_conn.custom_query(query_string)
query_result = prom_conn.custom_query(recon_query)
metric_value = query_result[0]
metric_data[query_name] = metric_value.get('value')[1]
else:
Expand Down
31 changes: 24 additions & 7 deletions breeder/linux_network_stack/objective.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@


def objective(trial,
run=None,
identifier=None,
archive_db_url=None,
locking_db_url=None,
Expand All @@ -17,38 +18,47 @@ def objective(trial,
logger = logging.getLogger('objective')
logger.setLevel(logging.DEBUG)

logger.debug('entering')

archive_db_engine = create_engine(archive_db_url)

logger.warning('entering')

# Compiling settings for effectuation
settings = []
setting_full = []
for setting_name, setting_config in config.get('settings').get('sysctl').items():
constraints = setting_config.get('constraints')
step_width = setting_config.get('step')
suggested_value = trial.suggest_int(setting_name, constraints.get('lower') , constraints.get('upper'), step_width)

setting_full.append({ setting_name : suggested_value })

if setting_name in ['net.ipv4.tcp_rmem', 'net.ipv4.tcp_wmem']:
settings.append(f"sudo sysctl -w {setting_name}='4096 131072 {suggested_value}';")
else:
settings.append(f"sudo sysctl -w {setting_name}='{suggested_value}';")
settings = '\n'.join(settings)
setting_full = json.dumps(setting_full)

is_setting_explored = False
setting_id = hashlib.sha256(str.encode(settings)).hexdigest()[0:6]

breeder_table_name = f"{breeder_name}"
logger.debug('fetching setting data')

breeder_table_name = f"{breeder_name}_{run}_{identifier}"
query = f"SELECT * FROM {breeder_table_name} WHERE {breeder_table_name}.setting_id = '{setting_id}';"

archive_db_data = archive_db_engine.execute(query).fetchall()

if archive_db_data:
logger.debug('setting already explored')
is_setting_explored = True
rtt = archive_db_data[0].get('setting_result').get('rtt')
delivery_rate = archive_db_data[0].get('setting_result').get('delivery_rate')
result_tuple = json.loads(archive_db_data[0])
rtt = result_tuple[0]
delivery_rate = result_tuple[1]

if not is_setting_explored:
logger.warning('doing effectuation')
is_setting_explored = True
settings_data = dict(settings=settings)

# get lock to gate other objective runs
Expand All @@ -62,7 +72,7 @@ def objective(trial,

asyncio.run(send_msg_via_nats(subject=f'effectuation_{identifier}', data_dict=settings_data))

logger.warning('gathering recon')
logger.info('gathering recon')
metric = json.loads(asyncio.run(receive_msg_via_nats(subject=f'recon_{identifier}')))


Expand All @@ -72,7 +82,14 @@ def objective(trial,
metric_value = metric.get('metric')
rtt = float(metric_value['tcp_rtt'])
delivery_rate = float(metric_value['tcp_delivery_rate_bytes'])
logger.warning(f'metric received {metric_value}')
logger.info(f'metric received {metric_value}')

setting_result = json.dumps([rtt, delivery_rate])

query = f"INSERT INTO {breeder_table_name} VALUES ('{setting_id}', '{setting_full}', '{setting_result}');"
archive_db_engine.execute(query)

logger.warning('Result stored in Knowledge Archive')

logger.warning('Done')

Expand Down
5 changes: 3 additions & 2 deletions breeder/linux_network_stack/optimization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


def create_optimization_dag(dag_id, config, identifier):
def create_optimization_dag(dag_id, config, run, identifier):

dag = DAG(dag_id,
default_args=DEFAULTS,
Expand All @@ -20,8 +20,9 @@ def run_optimization():
{{ local_objective_includ()|indent(12) }} # default is indent of 4 spaces!
###--- end coroutines ---###

objective_kwargs = dict(archive_db_url=f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOST}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}',
objective_kwargs = dict(archive_db_url=f'postgresql://{ARCHIVE_DB_USER}:{ARCHIVE_DB_PASSWORD}@{ARCHIVE_DB_HOSTNAME}:{ARCHIVE_DB_PORT}/{ARCHIVE_DB_DATABASE}',
locking_db_url=DLM_DB_CONNECTION,
run=run,
identifier=identifier,
breeder_name=config.get('name'),
)
Expand Down
4 changes: 2 additions & 2 deletions breeder/linux_network_stack/root_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@

ARCHIVE_DB_USER = os.environ.get("ARCHIVE_DB_USER")
ARCHIVE_DB_PASSWORD = os.environ.get("ARCHIVE_DB_PASSWORD")
ARCHIVE_DB_HOST = os.environ.get("ARCHIVE_DB_HOST")
ARCHIVE_DB_HOSTNAME = os.environ.get("ARCHIVE_DB_HOSTNAME")
ARCHIVE_DB_PORT = os.environ.get("ARCHIVE_DB_PORT")
ARCHIVE_DB_DATABASE = os.environ.get("ARCHIVE_DB_DATABASE")

Expand Down Expand Up @@ -143,7 +143,7 @@ def determine_config_shard(run_id=None,
dag_id = f'{dag_name}_{run_id}'
if not is_cooperative:
config = determine_config_shard()
globals()[f'{dag_id}_optimization_{identifier}'] = create_optimization_dag(f'{dag_id}_optimization_{identifier}', config, identifier)
globals()[f'{dag_id}_optimization_{identifier}'] = create_optimization_dag(f'{dag_id}_optimization_{identifier}', config, run_id, identifier)
globals()[f'{dag_id}_target_{identifier}'] = create_target_interaction_dag(f'{dag_id}_target_interaction_{identifier}', config, target, identifier)

target_id += 1
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ services:
deploy:
replicas: 2
volumes:
- ./testing/infra/credentials/ssh/:/opt/airflow/credentials/
- ./breeder/dags:/opt/airflow/dags/
- airflow-logs-volume:/opt/airflow/logs/
# for optuna parallel metaheuristics execution on dask
Expand All @@ -170,7 +171,7 @@ services:
build:
context: ./
dockerfile: ./Dockerfile-dask
entrypoint: bash -c "dask-worker tcp://dask_scheduler:8786"
entrypoint: bash -c "dask-worker tcp://dask-scheduler-optuna:8786"
deploy:
replicas: 2
prometheus:
Expand Down
2 changes: 1 addition & 1 deletion env_vars/archive_db.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ARCHIVE_DB_USER=yugabyte
ARCHIVE_DB_PASSWORD=yugabyte
ARCHIVE_DB_HOST=archive-db
ARCHIVE_DB_HOSTNAME=archive-db
ARCHIVE_DB_PORT=5433
ARCHIVE_DB_DATABASE=archive_db
8 changes: 5 additions & 3 deletions testing/maskfile.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ cat "${__template_file}" | yq '.breeder.effectuation.targets |= []' - > ${output

for __target_ip_address in ${__target_ip_addresses_array[@]}
do
export targets_object="{ "user": "godon_robot", "key_file": "/opt/airflow/credentials/id_rsa", "address": "${__target_ip_address}" }"
export target_object="{ "user": "godon_robot", "key_file": "/opt/airflow/credentials/id_rsa", "address": "${__target_ip_address}" }"
yq -i '.breeder.effectuation.targets += env(target_object)' "${output_file}"
done

Expand All @@ -107,6 +107,8 @@ __kcli_cmd="mask --maskfile ${MASKFILE_DIR}/maskfile.md util kcli run"
echo "instanciating machines"
${__kcli_cmd} "create plan -f ./infra/machines/plan.yml ${__plan_name}"

sleep 10

${__kcli_cmd} "list plan"
${__kcli_cmd} "list vm"

Expand Down Expand Up @@ -222,7 +224,7 @@ echo "provisioning infra instances"
~~~bash
set -eEux

docker-compose -f "${MASKFILE_DIR}/../docker-compose.yml" up --build -d
docker-compose -f "${MASKFILE_DIR}/../docker-compose.yml" up --build -d --force-recreate

~~~

Expand All @@ -233,7 +235,7 @@ docker-compose -f "${MASKFILE_DIR}/../docker-compose.yml" up --build -d
~~~bash
set -eEux

docker-compose -f "${MASKFILE_DIR}/../docker-compose.yml" down
docker-compose -f "${MASKFILE_DIR}/../docker-compose.yml" down --remove-orphans --volumes

~~~

Expand Down
Loading