-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
59 lines (49 loc) · 1.75 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from tornado.ioloop import IOLoop
from tornado.options import define, options
from tornado import gen
from tornado.iostream import StreamClosedError
from tornado.tcpserver import TCPServer
from struct import *
define('port', default=9999, help="TCP port to use")
ON = b"\x12\x00\x00"
OFF = b"\x13\x00\x00"
class FlashlightServer(TCPServer):
"""Tornado asynchronous flashlight TCP server."""
clients = set()
def decode_data(self, data):
if "on" in data:
text = ON
elif "off" in data:
text = OFF
elif 'color' in data:
color = [int(c) for c in data.split(' ')[1:]]
text = pack(">ch", b"\x20", 3)
for component in color:
text += pack(">B", component)
else:
text = data
if text[-1] != '\n':
text += b'\n'
return text
@gen.coroutine
def handle_stream(self, stream, address):
ip, fileno = address
print("Incoming connection from " + ip)
FlashlightServer.clients.add(address)
print("Command examples:\n" +
"'on'\n'off'\n'color 200 150 3'\n or byte sequence...")
while True:
try:
data = self.decode_data(input('(command) '))
yield stream.write(data)
print("Sending to client: ", data[:-1])
except StreamClosedError:
print("Client " + str(address) + " left, cannot send message.")
FlashlightServer.clients.remove(address)
break
if __name__ == "__main__":
options.parse_command_line()
server = FlashlightServer()
server.listen(options.port)
print("Starting server on tcp://localhost:" + str(options.port))
IOLoop.current().start()