Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repair api creation endpoint #139

Merged
merged 13 commits into from
Nov 15, 2023
28 changes: 14 additions & 14 deletions api/archive_db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@

import psycopg2
import logging

class archive_db():

@classmethod
def __execute(db_info=None, query=""):
@staticmethod
def execute(db_info=None, query=""):
""" Function wrapping the curoser execute with
a dedicated connection for the execution."""

Expand All @@ -15,8 +16,8 @@ def __execute(db_info=None, query=""):
with db_connection.cursor() as db_cursor:
db_cursor.execute(query)

except OperationalError as Error:
print(f"Error connecting to the database : {Error}")
except psycopg2.OperationalError as Error:
logging.error(f"Error connecting to the database : {Error}")

finally:
if db_connection:
Expand All @@ -25,7 +26,7 @@ def __execute(db_info=None, query=""):

class queries():

@classmethod
@staticmethod
def create_breeder_table(table_name=None):
query = f"""
CREATE TABLE IF NOT EXISTS {table_name}
Expand All @@ -39,27 +40,26 @@ def create_breeder_table(table_name=None):

return query

@classmethod
@staticmethod
def create_trigger(trigger_name=None, table_name=None, procedure_name=None):
query = f"""
CREATE TRIGGER {trigger_name}
AFTER INSERT ON {table_name}
FOR EACH ROW
EXECUTE
procedure{procedure_name}
EXECUTE PROCEDURE {procedure_name} ();
"""

return query

@classmethod
@staticmethod
def create_procedure(procedure_name=None, probability=1.0, source_table_name=None, target_table_name=None):
query = f"""
CREATE OR REPLACE PROCEDURE {procedure_name}
LANGUAGE plpgsql
AS $body$
CREATE OR REPLACE FUNCTION {procedure_name}() RETURNS TRIGGER AS $$
DECLARE
random_value real;
BEGIN

random_value real := random();
random_value = random();

IF random_value < {probability} THEN
INSERT INTO {target_table_name} (target_table_setting_id, target_table_setting_full, target_table_setting_result)
Expand All @@ -69,7 +69,7 @@ def create_procedure(procedure_name=None, probability=1.0, source_table_name=Non
END IF;

END;
$body$;
$$ LANGUAGE plpgsql;
"""

return query
28 changes: 14 additions & 14 deletions api/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ def breeders_post(content): # noqa: E501

def create_breeder(api_client, content):
api_instance = dag_run_api.DAGRunApi(api_client)
breeder_id = content.get('breeder').get('name')
breeder_config = dict(content)
breeder_config = dict(content.get('breeder'))
breeder_id = breeder_config.get('name')

# templating related
environment = Environment(loader=FileSystemLoader(DAG_TEMPLATES_DIR))
Expand All @@ -194,50 +194,50 @@ def create_breeder(api_client, content):

# set dbname to work with to breeder_id
db_config = ARCHIVE_DB_CONFIG.copy()
db_config.update(dict(dbname=breeder_id))
db_config.update(dict(dbname="archive_db"))

__query = archive.queries.create_breeder_table(table_name=dag_name)
archive.archive_db.__execute(db_info=db_config, query=__query)
archive.archive_db.execute(db_info=db_config, query=__query)

for target in targets:
identifier = str(abs(hash(target.get('address'))))[0:6]
for run_id in range(0, parallel_runs):
dag_id = f'{dag_name}_{run_id}_{identifier}'

__query = archive.queries.create_breeder_table(table_name=dag_id)
archive.archive_db.__execute(db_info=db_config, query=__query)
archive.archive_db.execute(db_info=db_config, query=__query)

__query = archive.queries.create_procedure(procedure_name=f"{dag_id}_procedure",
__query = archive.queries.create_procedure(procedure_name=f'{dag_id}_procedure',
probability=consolidation_probability,
source_table_name=dag_id,
target_table_name=dag_name)
archive.archive_db.__execute(db_info=db_config, query=__query)
archive.archive_db.execute(db_info=db_config, query=__query)

__query = archive.queries.create_trigger(trigger_name="{dag_id}_trigger",
__query = archive.queries.create_trigger(trigger_name=f'f{dag_id}_trigger',
table_name=dag_id,
procedure_name="{dag_id}_procedure")
archive.archive_db.__execute(db_info=db_config, query=__query)
procedure_name=f'{dag_id}_procedure')
archive.archive_db.execute(db_info=db_config, query=__query)

## create and fill breeder meta data db
db_config = META_DB_CONFIG.copy()
db_config.update(dict(dbname=breeder_id))
db_config.update(dict(dbname='meta_data'))
db_table_name = 'breeder_meta_data'

__query = meta_data.queries.create_meta_breeder_table(table_name=db_table_name)
archive.archive_db.__execute(db_info=db_config, query=__query)
archive.archive_db.execute(db_info=db_config, query=__query)

__query = meta_data.queries.insert_breeder_meta(table_name=db_table_name,
creation_ts=datetime.datetime.now(),
meta_state=breeder_config)
archive.archive_db.__execute(db_info=db_config, query=__query)
archive.archive_db.execute(db_info=db_config, query=__query)


with client.ApiClient(configuration) as api_client:
# Do not create connection dynamically for now
#api_response['breeder'] = create_breeder(api_client, content).to_dict()
create_breeder(api_client, content)

return Response(dict(), status=200, mimetype='application/json')
return Response(dict(), status=204, mimetype='application/json')


def breeders_put(content): # noqa: E501
Expand Down
4 changes: 2 additions & 2 deletions api/meta_data_db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

class queries():

@classmethod
@staticmethod
def create_meta_breeder_table(table_name=None):
query = f"""
CREATE TABLE IF NOT EXISTS {table_name}
Expand All @@ -13,7 +13,7 @@ def create_meta_breeder_table(table_name=None):

return query

@classmethod
@staticmethod
def insert_breeder_meta(table_name=None, creation_ts=None, meta_state=None):
query = f"""
INSERT INTO {table_name} (creation_tsz, definition)
Expand Down
13 changes: 7 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ services:
- 'AIRFLOW_CONN_LINUX_NETWORK_STACK_BREEDER_SSH={ "conn_type": "ssh", "login": "godon_robot", "host": "10.0.5.53", "port": 22, "extra": { "key_file": "/opt/airflow/credentials/id_rsa" } }'
- ARCHIVE_DB_USER=yugabyte
- ARCHIVE_DB_PASSWORD=yugabyte
- ARCHIVE_DB_HOST=archive_db
- ARCHIVE_DB_HOST=archive-db
- ARCHIVE_DB_PORT=5433
- ARCHIVE_DB_DATABASE=archive-db
- ARCHIVE_DB_DATABASE=archive_db
- META_DB_USER=meta_data
- META_DB_PASSWORD=meta_data
- META_DB_HOSTNAME=meta_data_db
- META_DB_HOSTNAME=meta-data-db
- META_DB_PORT=5433
- DLM_DB_USER=
- DLM_DB_PASSWORD=
Expand All @@ -57,6 +57,7 @@ services:
ports:
- 127.0.0.1:8080:8080
meta_data_db:
hostname: meta-data-db
image: postgres:13
environment:
POSTGRES_USER: meta_data
Expand Down Expand Up @@ -102,7 +103,7 @@ services:
restart: always
environment:
- AIRFLOW__URL=http://control_loop:8080
- ARCHIVE_DB_HOSTNAME=archive_db
- ARCHIVE_DB_HOSTNAME=archive-db
- ARCHIVE_DB_PORT=5433
volumes:
- ./breeder/linux_network_stack/:/usr/src/app/openapi_server/templates/
Expand Down Expand Up @@ -158,9 +159,9 @@ services:
- 9090:9090
archive_db:
image: yugabytedb/yugabyte:2.18.1.0-b84
container_name: "${DB_CONTAINER_NAME:-yugabytedb}"
hostname: archive-db
environment:
- "YSQL_DB=archive-db"
- "YSQL_DB=archive_db"
ports:
- ${DB_YSQL_PORT:-5433}:5433
- ${DB_YCQL_PORT:-9042}:9042
Expand Down
Loading