Skip to content

Commit

Permalink
Added method to dump postgres tables using Spark
Browse files Browse the repository at this point in the history
Version 0.1.4
  • Loading branch information
Pierre-Marie Leveque committed Nov 27, 2019
1 parent a1bb608 commit 0cb277c
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 4 deletions.
2 changes: 1 addition & 1 deletion novlake/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.3'
__version__ = '0.1.4'
53 changes: 52 additions & 1 deletion novlake/lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,58 @@ def dump_pg_table(self, query, database_name, table_name, bucket="noverde-data-r
mode="overwrite"
)

print("Dump done!")
print(f"Successfully exported data to S3 ({table_path}) and registered table to Athena")
print(f"Preview data with: lake.preview('{database_name}.{table_name}')")


def dump_pg_table_using_spark(self, query, database_name, table_name, bucket="noverde-data-repo", ds=None, db_code='REPLICA'):
import findspark
import os
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

if ds is None:
ds = str(dt.date.today() - dt.timedelta(days=1))

table_path = f"s3://{bucket}/{database_name}/{table_name}/ds={ds}"
table_path_s3a = f"s3a://{bucket}/{database_name}/{table_name}/ds={ds}"
print(table_path)

self.session.s3.delete_objects(path=table_path)

dataframe = spark.read \
.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql://%s:5432/noverde_loans" % os.getenv(f"PG_{db_code}_HOST")) \
.option("dbtable", "(%s) t" % query) \
.option("user", os.getenv(f"PG_{db_code}_USERNAME")) \
.option("password", os.getenv(f"PG_{db_code}_PASSWORD")) \
.load()

dataframe.printSchema()

(dataframe.write
.mode("overwrite")
.format("parquet")
.save(compression="gzip", path=table_path_s3a)
)

self.session.spark.create_glue_table(
dataframe=dataframe,
database=database_name,
table=table_name,
file_format="parquet",
path=table_path,
compression="gzip",
replace_if_exists=True,
load_partitions=False)

print(f"Successfully exported data to S3 ({table_path}) and registered table to Athena")
print(f"Preview data with: lake.preview('{database_name}.{table_name}')")



def list(self, table_filter=None):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "novlake"
version = "0.1.3"
version = "0.1.4"
description = "Tools to work with our data lake"
authors = ["Pierre-Marie Leveque <[email protected]>"]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


def test_version():
assert __version__ == '0.1.3'
assert __version__ == '0.1.4'


def test_lake_init():
Expand Down

0 comments on commit 0cb277c

Please sign in to comment.