-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathelt-with-snowflake.py
301 lines (234 loc) · 10.1 KB
/
elt-with-snowflake.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""
Purpose: Retrieves data from API from last hour. Appends records to the
Snowflake table, "ABC_RAW"."RAW"."ABC_TABLE". On the first run, this
flow will retrieve all historical data from the ABC API. After the first
successful run, the same flow will retrieve only data since the last successful
load of data into Snowflake.
Requirements:
Assumes you have the following blocks in your workspace:
- a SnowflakeCredentials block named "abc-support-sysadmin"
- a SnowflakeConnector block named "abc-raw"
- a Secret block named "abc-credentials" containing the following:
{
"endpoint": "my-endpoint",
"api-user": "my-api-user",
"api-access-code": "my-api-access-code"
}
If you are unable to create these blocks in the UI because SnowflakeCredentials or
SnowflakeConnector block types do not exist,
run
`pip install prefect-snowflake`
`prefect block register -m prefect_snowflake.credentials`
`prefect block register -m prefect_snowflake.database`
"""
import asyncio
import json
from datetime import datetime as dt
from datetime import timedelta
from typing import List
import aiohttp
import pytz
from pandas import DataFrame, json_normalize
from prefect import flow, task
from prefect.blocks.system import Secret
from prefect.logging import get_run_logger
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.errors import ProgrammingError
from snowflake.connector.pandas_tools import write_pandas
# data to be loaded into Snowflake target table
abc_data = []
@task(name="Step 1 of 3: Retrieve last time that data was loaded into Snowflake table")
def get_start_time(target_table: str) -> dt:
"""
A pre-check on the target table such that extract_data_from_api() will
pull only data after the last load time. This ensures no duplicate data
while fulfilling an APPEND ONLY requirement. If there is no data
in the target table (or it doesn't exist), we want to load all existing
data we can from the API. In that case, the start time is 07/01/2022.
Args:
- target_table (str): The name of the Snowflake table in which to
check whether rows exist.
Returns:
- start_datetime (dt): The datetime from which to begin loading data.
"""
logger = get_run_logger()
# Load a Snowflake block containing account, user, password, warehouse,
# database, schema, and role
snowflake_auth = SnowflakeConnector.load("abc-raw")
with snowflake_auth.get_connection() as conn:
with conn.cursor() as cur:
# attempt to retrieve data from the target_table
try:
select_from_table_stmt = (
f"SELECT MAX(ELT_LOADED_ON) FROM {target_table} LIMIT 1"
)
max_elt_loaded_on_tms = cur.execute(select_from_table_stmt).fetchone()[
0
]
logger.info(f"Max ELT_LOADED_ON tms: {max_elt_loaded_on_tms}")
# if we cannot, then do a historical load (get all records) from ABC API
except ProgrammingError as e:
if f"Object '{target_table}' does not exist or not authorized." in str(
e
):
logger.info(e)
max_elt_loaded_on_tms = "2022-07-01T00:00:01.000000Z"
else:
logger.error(f"snowflake.connector error occurred: {str(e)}")
raise
else:
logger.info(f"Max ELT_LOADED_ON tms: {max_elt_loaded_on_tms}")
start_datetime = dt.strptime(max_elt_loaded_on_tms, "%Y-%m-%dT%H:%M:%S.%fZ")
return start_datetime
@flow(name="Retrieve data from all depts since last successful run")
async def _get_data_from_all_depts(
dept_code_list: List[int], endpoint_, headers_, new_startDt: str
) -> None:
"""
Retrieves data from all departments at all dates since last successful run.
As this runs hourly, it is likely to be data only from the last hour
from each dept.
Args:
- dept_code_list: A list of dept IDs each referenced as a
deptCode.
- endpoint_: ABC endpoint for requests.
- headers_: Pass in credentials.
- new_startDt: If Snowflake table exists, will add one microsecond
to max(ETL_LOADED_ON) value to retrieve data since that point in time.
If not exists, new_startDt will 2022-07-01T00:00:01.000000Z
"""
logger = get_run_logger()
async with aiohttp.ClientSession() as session:
tasks = []
# iterate through all necessary dates and depts
[
tasks.append(
asyncio.ensure_future(
session.get(
url=endpoint_,
headers=headers_,
params={
"startDate": new_startDt,
"deptCode": code,
},
)
)
)
for code in dept_code_list
]
# this will be used as the value in ELT_LOADED_ON column for this load
pst = pytz.timezone("America/Los_Angeles")
abc_request_time = dt.strftime(dt.now(tz=pst), "%Y-%m-%dT%H:%M:%S.%fZ")
responses = await asyncio.gather(*tasks, return_exceptions=True)
for response in responses:
if response.status == 200:
abc_data.extend(await response.json())
else:
logger.error(
f"Response Error: {response.status} for request {response}"
)
return abc_request_time
@flow(
name="Step 2 of 3: Extract load summary data from ABC API",
retries=2,
retry_delay_seconds=30,
)
def extract_data_from_api(start_time: dt, deptCode_list: List[int]) -> DataFrame:
"""
Given a list of depts and a start time, will retrieve needed data from API.
Examples:
- If there is no data in the table, the startDate will be
2022-07-01T00:00:01.000000Z.
- If there is data in the table, and the time is
2022-08-25T10:30:01.000000Z, we would look at the latest ETL_LOADED_ON
value in the Snowflake table to see the last time data was loaded. Let's
say MAX(ETL_LOADED_ON) = 2022-08-23T10:30:01.000000Z, because there was some outage
for a few days. We'll add one microsecond to the most recent ETL_LOADED_ON value,
and the startDate we will pull from will be 2022-08-23T10:30:01.000000Z
This enables us to load the remaining data from 2022-08-23 (but no
overlap), all data from 2022-08-24, and data up until now on 2022-08-25.
Args:
- start_time (dt): The time at which to start loading data
- deptCode_list (list): Dept IDs, each referenced as a
deptCode.
Returns:
- df (DataFrame): retrieved ABC API data.
"""
logger = get_run_logger()
ABC_CREDENTIALS = json.loads(Secret.load("abc-credentials").get())
endpoint = ABC_CREDENTIALS["endpoint"]
headers = {
"api-user": ABC_CREDENTIALS["api-user"],
"api-access-code": ABC_CREDENTIALS["api-access-code"],
}
# increment startDt by 1 microsecond
new_start_time = dt.strftime(
start_time + timedelta(seconds=1), "%Y-%m-%dT%H:%M:%S.%fZ"
)
time_of_abc_request = asyncio.run(
_get_data_from_all_depts(deptCode_list, endpoint, headers, new_start_time)
)
if abc_data:
df = json_normalize(abc_data)
# add tms when loaded into Snowflake
df["ELT_LOADED_ON"] = time_of_abc_request
logger.info(f"Nbr of records from all depts: {len(df)}")
assert len(abc_data) == len(df)
return df
else:
logger.info("No new records from any depts.")
return None
@task(name="Step 3 of 3: Load data into Snowflake")
def load_data_into_snowflake(
df: DataFrame, target_table: str, ddl: str
) -> tuple[bool, int]:
"""
Given ABC data, a Snowflake table name, and a DDL, will load the data
into the designated Snowflake table. If this run is a historical load,
likely we will need to create the table as it won't exist.
Args:
- df (DataFrame): ABC data to be loaded into the Snowflake table
- target_table (str): the table into which to load the data
- ddl (str): Snowflake DDL for designated table
Returns:
- tuple[bool, int]: True if successfully wrote df to Snowflake else
False; number of rows written as int.
"""
logger = get_run_logger()
# use Snowflake block
snowflake_auth = SnowflakeConnector.load("abc-raw")
database = snowflake_auth.database
schema = snowflake_auth.schema_
create_table_stmt = f"CREATE TABLE IF NOT EXISTS {target_table} ({ddl})"
with snowflake_auth.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(create_table_stmt)
logger.info("Executed SQL statement:" f"{create_table_stmt}")
# load the data into Snowflake
success, _, num_rows, _ = write_pandas(
conn=conn, df=df, table_name=target_table, database=database, schema=schema
)
logger.info(f"Success: {success}. Nbr of rows inserted: {num_rows}")
return success, num_rows
@flow(name="Update ABC Snowflake Table", retries=2, retry_delay_seconds=900)
def abc_elt_flow(deptCode_list: List[int]) -> None:
"""Flow which
1. Calculates the last successful load of data into Snowflake in order
to set the correct start_time.
2. Extracts the data since the start_time from ABC API into a df.
3. If new data from ABC API, loads df into Snowflake table.
Args:
- deptCode_list: A list of dept IDs each referenced as a
deptCode.
"""
target_table = "ABC_TABLE"
start_tms = get_start_time(target_table)
df = extract_data_from_api(start_tms, deptCode_list)
if isinstance(df, DataFrame):
# clean column names for Snowflake
df.columns = [col.replace(".", "_").upper() for col in df.columns]
# create ddl
ddl = "".join([col + " STRING, " for col in df.columns])[:-2]
load_data_into_snowflake(df, target_table, ddl)
if __name__ == "__main__":
abc_elt_flow(deptCode_list=[1, 3, 4, 8, 12, 13, 15])