Skip to content

Commit

Permalink
api - repair archive db state cleanup
Browse files Browse the repository at this point in the history
Use cascading object dropping to get rid of triggers for now.
  • Loading branch information
cherusk committed Nov 16, 2023
1 parent 5bd9469 commit 65bc1fb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 14 deletions.
4 changes: 2 additions & 2 deletions api/archive_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def create_breeder_table(table_name=None):
@staticmethod
def delete_breeder_table(table_name=None):
query = f"""
DROP TABLE IF EXISTS {table_name};
DROP TABLE IF EXISTS {table_name} CASCADE;
"""

return query
Expand Down Expand Up @@ -115,7 +115,7 @@ def create_procedure(procedure_name=None, probability=1.0, source_table_name=Non
@staticmethod
def delete_procedure(procedure_name=None):
query = f"""
DROP FUNCTION IF EXISTS {procedure_name}();
DROP FUNCTION IF EXISTS {procedure_name}() CASCADE;
"""

return query
Expand Down
20 changes: 8 additions & 12 deletions api/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import openapi_server.controllers.archive_db as archive
import openapi_server.controllers.meta_data_db as meta_data

import logging

AIRFLOW_API_BASE_URL = os.environ.get('AIRFLOW__URL')
AIRFLOW_API_VERSION = "v1"
AIRFLOW_API_AUTH_USER = "airflow"
Expand Down Expand Up @@ -94,27 +96,21 @@ def breeders_name_delete(breeder_name): # noqa: E501
__query = archive.queries.delete_breeder_table(table_name=breeder_name)
archive.archive_db.execute(db_info=db_config, query=__query)


__query = archive.queries.fetch_procedures(breeder_name=breeder_name)
procedures = archive.archive_db.execute(db_info=db_config, query=__query, with_result=True)

for procedure_name in procedures:
__query = archive.queries.delete_procedure(procedure_name=f'{dag_id}_procedure')
archive.archive_db.execute(db_info=db_config, query=__query)

__query = archive.queries.fetch_triggers(breeder_name=breeder_name, )
triggers = archive.archive_db.execute(db_info=db_config, query=__query, with_result=True)

for trigger_name in triggers:
__query = archive.queries.delete_trigger(trigger_name=trigger_name,
table_name=dag_id)
logging.error(type(procedure_name))
logging.error(procedure_name)
__query = archive.queries.delete_procedure(procedure_name=procedure_name[0])
archive.archive_db.execute(db_info=db_config, query=__query)

__query = archive.queries.fetch_tables(breeder_name=breeder_name)
archive_tables = archive.archive_db.execute(db_info=db_config, query=__query, with_result=True)

for table_name in archive_tables:
__query = archive.queries.delete_breeder_table(table_name=table_name )
logging.error(table_name)
__query = archive.queries.delete_breeder_table(table_name=table_name[0])
archive.archive_db.execute(db_info=db_config, query=__query)


Expand Down Expand Up @@ -251,7 +247,7 @@ def create_breeder(api_client, content):
target_table_name=dag_name)
archive.archive_db.execute(db_info=db_config, query=__query)

__query = archive.queries.create_trigger(trigger_name=f'f{dag_id}_trigger',
__query = archive.queries.create_trigger(trigger_name=f'{dag_id}_trigger',
table_name=dag_id,
procedure_name=f'{dag_id}_procedure')
archive.archive_db.execute(db_info=db_config, query=__query)
Expand Down

0 comments on commit 65bc1fb

Please sign in to comment.