-
Notifications
You must be signed in to change notification settings - Fork 47
/
contacts_model.py
145 lines (120 loc) · 4 KB
/
contacts_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import json
from operator import attrgetter
import time
from threading import Thread
from random import random
# ========================================================
# Contact Model
# ========================================================
PAGE_SIZE = 100
class Contact:
# mock contacts database
db = {}
def __init__(self, id_=None, first=None, last=None, phone=None, email=None):
self.id = id_
self.first = first
self.last = last
self.phone = phone
self.email = email
self.errors = {}
def __str__(self):
return json.dumps(self.__dict__, ensure_ascii=False)
def update(self, first, last, phone, email):
self.first = first
self.last = last
self.phone = phone
self.email = email
def validate(self):
if not self.email:
self.errors['email'] = "Email Required"
existing_contact = next(filter(lambda c: c.id != self.id and c.email == self.email, Contact.db.values()), None)
if existing_contact:
self.errors['email'] = "Email Must Be Unique"
return len(self.errors) == 0
def save(self):
if not self.validate():
return False
if self.id is None:
if len(Contact.db) == 0:
max_id = 1
else:
max_id = max(contact.id for contact in Contact.db.values())
self.id = max_id + 1
Contact.db[self.id] = self
Contact.save_db()
return True
def delete(self):
del Contact.db[self.id]
Contact.save_db()
@classmethod
def count(cls):
time.sleep(2)
return len(cls.db)
@classmethod
def all(cls, page=1):
page = int(page)
start = (page - 1) * PAGE_SIZE
end = start + PAGE_SIZE
return list(cls.db.values())[start:end]
@classmethod
def search(cls, text):
result = []
for c in cls.db.values():
match_first = c.first is not None and text in c.first
match_last = c.last is not None and text in c.last
match_email = c.email is not None and text in c.email
match_phone = c.phone is not None and text in c.phone
if match_first or match_last or match_email or match_phone:
result.append(c)
return result
@classmethod
def load_db(cls):
with open('contacts.json', 'r') as contacts_file:
contacts = json.load(contacts_file)
cls.db.clear()
for c in contacts:
cls.db[c['id']] = Contact(c['id'], c['first'], c['last'], c['phone'], c['email'])
@staticmethod
def save_db():
out_arr = [c.__dict__ for c in Contact.db.values()]
with open("contacts.json", "w") as f:
json.dump(out_arr, f, indent=2)
@classmethod
def find(cls, id_):
id_ = int(id_)
c = cls.db.get(id_)
if c is not None:
c.errors = {}
return c
class Archiver:
archive_status = "Waiting"
archive_progress = 0
thread = None
def status(self):
return Archiver.archive_status
def progress(self):
return Archiver.archive_progress
def run(self):
if Archiver.archive_status == "Waiting":
Archiver.archive_status = "Running"
Archiver.archive_progress = 0
Archiver.thread = Thread(target=self.run_impl)
Archiver.thread.start()
def run_impl(self):
for i in range(10):
time.sleep(1 * random())
if Archiver.archive_status != "Running":
return
Archiver.archive_progress = (i + 1) / 10
print("Here... " + str(Archiver.archive_progress))
time.sleep(1)
if Archiver.archive_status != "Running":
return
Archiver.archive_status = "Complete"
def archive_file(self):
return 'contacts.json'
def reset(self):
Archiver.archive_status = "Waiting"
@classmethod
def get(cls):
return Archiver()