-
Notifications
You must be signed in to change notification settings - Fork 3
/
synctable.py
106 lines (88 loc) · 2.76 KB
/
synctable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
'''
That module handles the __sync table
'''
import logging
import pg
def get_status(tablename):
'''
Returns the status of a table ('ready', 'error', 'runnning', ...)
'''
logger = logging.getLogger(__name__)
cursor = pg.cursor()
cursor.execute(
'SELECT status FROM {} WHERE tablename=%s'.format(
pg.table_name('__sync')
), (
tablename,
))
line = cursor.fetchone()
if line is None:
logger.error('TABLE %s not found in __sync', tablename)
return None
return line[0]
def update(td, newstatus,
update_syncuntil=False, update_last_refresh=False,
required_status=None):
"""
Update table salesforce.__sync
"""
logger = logging.getLogger(__name__)
cursor = pg.cursor()
field_updates = {
'status': pg.escape_str(newstatus)
}
if update_syncuntil:
timefield = td.get_timestamp_name()
field_updates['syncuntil'] = '''
(
SELECT max({timefield})
FROM {quoted_table_dest}
)'''.format(
timefield=pg.escape_name(timefield),
quoted_table_dest=pg.table_name(td.name),
)
if update_last_refresh:
field_updates['last_refresh'] = "current_timestamp at time zone 'UTC'"
sync_name = pg.table_name('__sync')
updates = ','.join([f'{key}={value}'
for key, value
in field_updates.items()])
quoted_tablename = pg.escape_str(f'{td.name}')
if required_status is not None:
required_status_esc = pg.escape_str(required_status)
andcondition = f'AND status={required_status_esc}'
else:
andcondition = ''
sql = f'''UPDATE {sync_name}
SET {updates}
WHERE tablename={quoted_tablename}
{andcondition}
'''
# print(sql)
cursor.execute(sql)
if cursor.rowcount == 0:
logger.error('Cannot update __sync')
# TODO print the current status
pg.commit()
def insert(td, date_last_refresh):
'''
Insert a "table is reasy" entry in sync table
UTC date should be given a an argument
'''
print('refresh:', date_last_refresh)
cursor = pg.cursor()
cursor.execute("""
INSERT INTO {} (tablename, syncuntil, last_refresh, status)
VALUES(%s, %s, current_timestamp at time zone 'UTC', 'ready')
ON CONFLICT (tablename)
DO
UPDATE
SET syncuntil=EXCLUDED.syncuntil,
last_refresh=EXCLUDED.last_refresh,
status='ready'
""".format(
pg.table_name('__sync')
), (
td.name,
date_last_refresh))
pg.commit() # TODO remove that?