-
Notifications
You must be signed in to change notification settings - Fork 0
/
Node.py
178 lines (149 loc) · 6.07 KB
/
Node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import json
import zmq
import traceback
import numpy
import functools
import uuid
class Node():
"""This is the class that will be the base of every node object
Each node will read a config file in JSON format to initialize zmq sockets.
The sockets will be stored in a dict with the key being the topic and the
value being the socket.
To create a node, make a config file and extend the run() method.
Further functionality is on the way.
"""
def __init__(self, configPath):
f = open(configPath)
self.configPath = configPath
self.configData = json.load(f)
f.close()
self.id = self.configData['id']
self.context = zmq.Context()
self.topics = {}
self.initzmq()
def stopzmq(self):
""" Shuts down all zmq stuff
"""
self.context.destroy()
def loop(self):
""" The main node code that gets executed every loop
This is the method that should be overridden for the node to do stuff
So help me God if anyone overrides this and puts a while true in there
"""
print(self.id + " needs an overridden loop method")
# TODO: use new helper methods
def initzmq(self):
"""This method initializes zmq sockets and places them in the topics dict
It will throw exceptions if the JSON it was fed is not correct
"""
if "topics" not in self.configData:
raise Exception("Topics not found in %s" % self.configPath)
for topic in self.configData['topics']:
addr = self.gen_address(topic['protocol'], topic['address'],
topic['port'])
socket = self.build_socket(topic['paradigm'], topic['name'], addr)
self.topics[topic['name']] = socket
def gen_address(self, protocol, address, port):
""" This method builds a url from info in a json file
It can create a url for ipc, udp and tcp.
It will throw exceptions if the protocol is invalid
"""
url = ""
if protocol == "tcp":
url = "tcp://"
elif protocol == "udp":
url = "udp://"
elif protocol == "ipc":
url = "ipc://"
else:
raise Exception("Protocol not ipc or udp or tcp")
url += address
if protocol == "tcp" or protocol == "udp":
url += ":" + port
return url
def build_socket(self, paradigm, topic, url):
""" This method creates a socket from a paradigm and a url
"""
socket = None
if paradigm == "sub":
socket = self.context.socket(zmq.SUB)
socket.connect(url)
socket.setsockopt_string(zmq.SUBSCRIBE, topic)
elif paradigm == "pub":
socket = self.context.socket(zmq.PUB)
socket.bind(url)
elif paradigm == "req":
socket = self.context.socket(zmq.REQ)
socket.connect(url)
elif paradigm == "rep":
socket = self.context.socket(zmq.REP)
socket.bind(url)
else:
raise Exception("Please provide a valid paradigm")
return socket
def send(self, topic, msg):
"""This method can be used to send messages for the pub pattern
The first argument is the topic to send the message on and the second
is the message body
"""
self.topics[topic].send_string("%s %s" % (topic, msg))
def recv_simple(self, topic):
"""This methof is used to receive messages without a callback
It returns a string read from the topic
"""
re = self.topics[topic].recv_string()
return re
# TODO: implement a timeout
def recv(self, topic, callback):
"""This method is used to receive messages for the sub pattern
The first argument is the topic to look for messages on.
The second argument is a function to be executed with the message
received being passed to it as an argument
NOT VERIFIED: This method is blocking, and will interrupt execution
until a message is received
"""
re = self.topics[topic].recv_string()
callback(re)
def request(self, topic, req, callback):
"""This method is used to send a request to a node
The first argument is the topic(in this case, the node) to send a
request to.
The second argument is the request to send
The third argument is a callback function to process the reply from
the server. The reply will be a string.
IMPORTANT: this method calls recv() and send(), so the parameters must
be bytes!
"""
self.topics[topic].send(req)
msg = self.topics[topic].recv()
callback(msg)
def reply(self, topic, callback):
"""This method is used to send a reply to a node
The first argument is the topic(in this case, the node) to reply to
The second argument is a callback that will handle the request sent to
this node. It must return a string.
The reply generated by the callback is sent as a reply to the node
that sent a request
IMPORTANT: this method calls recv() and send(), so the parameters must
be bytes!
"""
msg = self.topics[topic].recv()
rep = callback(msg)
self.topics[topic].send(rep)
def send_nparray(self, topic, nparray, flags=0, copy=True, track=False):
"""send a numpy array with metadata"""
socket = self.topics[topic]
md = dict(
dtype = str(nparray.dtype),
shape = nparray.shape,
)
socket.send_json(md, flags|zmq.SNDMORE)
return socket.send(nparray, flags, copy=copy, track=track)
def recv_nparray(self, topic, flags=0, copy=True, track=False):
"""recv a numpy array"""
socket = self.topics[topic]
md = socket.recv_json(flags=flags)
msg = socket.recv(flags=flags, copy=copy, track=track)
buf = memoryview(msg)
nparray = numpy.frombuffer(buf, dtype=md['dtype'])
return nparray.reshape(md['shape'])