-
Notifications
You must be signed in to change notification settings - Fork 0
/
DataKeeper.py
173 lines (129 loc) · 5.19 KB
/
DataKeeper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/home/sofyan/anaconda3/bin/python
'''
How to run this process
- ./DataKeeper.py ip PortOfDownload PortOfUpload
- in config file i need the (ip of master + the port of successful message)
- in download method
- the client will send an dictionary with (VIDEO_NAME) key
- in upload method
- the client will send an dictionary with (VIDEO_NAME , VIDEO)
'''
#############################################################################################################
# libraries
#############################################################################################################
import sys
import os
import time
import zmq
import json
import signal
import threading
##############################################################################################################
# Variables
##############################################################################################################
context = zmq.Context()
Download = context.socket(zmq.REP) # bind
Upload = context.socket(zmq.PULL) # bind
Successful = context.socket(zmq.PUSH) # connect
ReplicateOrder= context.socket(zmq.PULL)
MasterIP = None
PathOfVideos = None
MasterPortSuccessful = None
MasterPortReplicate = None
MyInfo = {}
##############################################################################################################
# Functions
##############################################################################################################
# Download
def DownloadMethod():
while True:
Message = Download.recv_pyobj()
File = Message["VIDEO_NAME"]
Path = PathOfVideos+"/"+File
with open(Path,'rb') as vfile:
Vid=vfile.read()
Download.send_pyobj(Vid)
SuccessfulMethod("Download",Message["VIDEO_NAME"])
print("The client has downloaded a video and the master has been told about that")
# Upload
def UploadMethod():
while True:
DataOfVideo = Upload.recv_pyobj()
Path=PathOfVideos+"/"+DataOfVideo["VIDEO_NAME"]
saveVideo(DataOfVideo["VIDEO"],Path)
SuccessfulMethod("Upload",DataOfVideo["VIDEO_NAME"])
print("The client has uploaded a video and the master has been told about that")
def SuccessfulMethod(Type,File='MOHAMED.MP4'):
Object = {}
Object["IPv4"] = MyInfo["IP"]
if Type == "Download":
Object["Port"] = MyInfo["PortDownload"]
Object["Type"] = "D"
#Object["Filename"] = File
elif Type == "Upload":
Object["Port"] = MyInfo["PortUpload"]
Object["Type"] = "U"
Object["Filename"] = File
print('i have sent ')
Successful.send_pyobj(Object)
def ReplicateMethod():
while True:
Data = ReplicateOrder.recv_pyobj()
Video_Name = Data["VIDEO_NAME"]
IPMachine = Data["IP"]
PortMachine = Data["PORT"]
contextMachine = zmq.Context()
ReplicateVideo = contextMachine.socket(zmq.PUSH)
ReplicateVideo.connect("tcp://"+IPMachine+":"+PortMachine)
Path = PathOfVideos+"/"+Video_Name
with open(Path,'rb') as vfile:
Vid=vfile.read()
MessageSent = {"VIDEO_NAME" : Video_Name , "VIDEO" : Vid}
ReplicateVideo.send_pyobj(MessageSent)
SuccessfulMethod("Replicate")
# Save Video
def saveVideo(video,Path:str):
try:
with open(Path,'wb') as myfile:
myfile.write(video)
print("I have saved a video")
return True
except:
print("I can't save the video")
return False
# Estaplish connection
def Connections():
dstring="tcp://"+MyInfo["IP"]+":"+MyInfo["PortDownload"]
print(dstring)
Download.bind("tcp://"+MyInfo["IP"]+":"+MyInfo["PortDownload"])
Upload.bind("tcp://"+MyInfo["IP"]+":"+MyInfo["PortUpload"])
Successful.connect("tcp://"+MasterIP+":"+MasterPortSuccessful)
ReplicateOrder.connect("tcp://"+MasterIP+":"+MasterPortReplicate)
##############################################################################################################
# Main
##############################################################################################################
if __name__ == "__main__":
# Initial values
with open('DKConfig.json') as config_file:
data = json.load(config_file)
PathOfVideos=data["PathOfVideos"]
MasterIP = data["MasterIP"]
MasterPortSuccessful = data["MasterPortSuccessful"]
MasterPortReplicate = data["MasterPortReplicate"]
MyInfo["IP"] = '127.0.0.1'
MyInfo["PortDownload"] = str(sys.argv[1])
MyInfo["PortUpload"] = str(sys.argv[2])
# Estaplish connections
Connections()
# Threading
DownloadThread = threading.Thread(target=DownloadMethod)
UploadThread = threading.Thread(target=UploadMethod)
ReplicateThread = threading.Thread(target=ReplicateMethod)
# Starting threads
DownloadThread.start()
UploadThread.start()
ReplicateThread.start()
DownloadThread.join()
UploadThread.join()
ReplicateThread.join()
print("i have finished")