-
Notifications
You must be signed in to change notification settings - Fork 10
/
control.py
74 lines (62 loc) · 2.35 KB
/
control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import pickle
import paho.mqtt.client as mqtt
from threading import Thread
import time
class BrokerCom:
def __init__(self, user, pw, ip, sub_topic):
self.user = user
self.pw = pw
self.ip = ip
self.port = 1883
self.topic = sub_topic
self.client = mqtt.Client()
self.stopped = []
self.run = 1
def on_connect(self, connect_client, userdata, flags, rc):
print("Connected with Code :" + str(rc))
# Subscribe Topic from here
connect_client.subscribe(self.topic)
def on_message(self, message_client, userdata, msg):
data = pickle.loads(msg.payload) # ['start', []], ['stop': id]
if (data[0] == 'stop') and (data[1] not in self.stopped):
self.stopped.append(data[1])
print(f'{data[1]} has stopped!')
def publish(self, topic, data):
self.client.publish(topic, data)
def broker_loop(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.username_pw_set(self.user, self.pw)
self.client.connect(self.ip, self.port, 60)
self.client.loop_start()
while True:
if self.run == 0:
self.client.loop_stop()
self.client.disconnect()
break
def __del__(self):
print('Broker Communication Object Deleted!')
def exp_control():
it = input('MEC Iterations(5 10 15): ')
# broker_dict = {'user': 'mec', 'pw': 'password', 'sub_topic': 'cache/control', 'ip': 'localhost'}
broker_dict = {'user': 'mec', 'pw': 'password', 'sub_topic': 'control', 'ip': 'localhost'}
if it == '':
exp_no = [5, 10, 15]
else:
exp_no = [int(i) for i in it.split()]
input('start: ')
messenger = BrokerCom(**broker_dict)
h1 = Thread(target=messenger.broker_loop)
h1.start()
for mec_no in exp_no:
messenger.publish(topic=broker_dict['sub_topic'], data=pickle.dumps(['start', [i for i in range(mec_no)], mec_no]))
print(f'Experiment {mec_no} has commenced!')
while len(messenger.stopped) != mec_no:
time.sleep(10)
messenger.stopped = []
print(f'Experiment {mec_no} is concluded!')
print('Waiting for 60 seconds Time Lapse!')
time.sleep(60)
messenger.run = 0
if __name__ == '__main__':
exp_control()