Skip to content

Commit

Permalink
Merge pull request #3 from noverde/gr/success-file
Browse files Browse the repository at this point in the history
creation of success file after export actions
  • Loading branch information
gabrielrubinobr authored Apr 8, 2020
2 parents 02ae37f + ad378ce commit a1acedc
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions novlake/lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def __init__(self, user_name):

# return dict()

def create_success_file(self, s3_repo):
settings = urlparse(s3_repo)
s3 = boto3.resource('s3')
success_file = s3.Object(settings.netloc, f"{settings.path.lstrip('/')}/_SUCCESS")
success_file.put(Body="")
print(f"Success file created at ({s3_repo}) ")
return True

def query(self, query, database=None, no_limit=False):
"""Queries data using Athena and returns pandas dataframe"""

Expand Down Expand Up @@ -144,9 +152,11 @@ def export(self, dataframe, table_name, database_name=None, bucket_name=None, fo
path=table_path,
# partition_cols=["col_name"],
)



print(f"Successfully exported data to S3 ({table_path}) and registered table to Athena")
print(f"Preview data with: lake.preview('{database_name}.{table_name}')")
self.create_success_file(s3_repo=s3_path)

return table_path

Expand Down Expand Up @@ -229,6 +239,7 @@ def query_and_export(self, query, table_name, database_name=None, bucket_name=No

print(f"Successfully exported data to S3 ({table_path}) and registered table to Athena")
print(f"Preview data with: lake.preview('{database_name}.{table_name}')")
self.create_success_file(s3_repo=table_path)

return table_path

Expand All @@ -243,7 +254,6 @@ def dump_pg_table(self, query, database_name, table_name, bucket="noverde-data-r
print(table_path)

self.session.s3.delete_objects(path=table_path)

connection = pg.connect("host='%s' dbname=%s user=%s password='%s'" % (
os.getenv(f'PG_{db_code}_HOST'),
os.getenv(f'PG_{db_code}_DATABASE'),
Expand All @@ -269,9 +279,10 @@ def dump_pg_table(self, query, database_name, table_name, bucket="noverde-data-r
mode="overwrite"
)


print(f"Successfully exported data to S3 ({table_path}) and registered table to Athena")
print(f"Preview data with: lake.preview('{database_name}.{table_name}')")

self.create_success_file(s3_repo=table_path)

def dump_pg_table_using_spark(self, query, database_name, table_name, bucket="noverde-data-repo", ds=None, db_code='REPLICA'):
import findspark
Expand Down Expand Up @@ -318,9 +329,10 @@ def dump_pg_table_using_spark(self, query, database_name, table_name, bucket="no
replace_if_exists=True,
load_partitions=False)


print(f"Successfully exported data to S3 ({table_path}) and registered table to Athena")
print(f"Preview data with: lake.preview('{database_name}.{table_name}')")

self.create_success_file(s3_repo=table_path)


def list(self, table_filter=None):
Expand Down

0 comments on commit a1acedc

Please sign in to comment.