forked from jvq2/blue_status
-
Notifications
You must be signed in to change notification settings - Fork 0
/
notifier.py
105 lines (77 loc) · 2.75 KB
/
notifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import logging
import time
import bluetooth
from command import Command
logger = logging.getLogger(__name__)
class Notifier():
def __init__(self, device_prefix='ESP32test', port=1):
self.device_prefix = device_prefix
self._sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self._port = port
def discover(self, duration=4):
nearby = bluetooth.discover_devices(
duration=duration, lookup_names=True, flush_cache=True)
print('Found devices', nearby)
# logger.warn('Found devices', nearby)
for addr, name in nearby:
if name.startswith(self.device_prefix):
return addr, name
raise Exception('Could not find notifier')
def connect(self, addr, port=1):
# TODO: make sure any old connection is cleared first
self._sock.connect((addr, port))
# def command(self, *commands):
# self._sock.sendall(Command.Attention)
# for command in commands:
# self._sock.sendall(command)
# self._sock.sendall(Command.End)
def command(self, *commands):
# the format is as follows:
# Attention: AT
# message length including attention and length byte
# command and data
# END: \n
command = ''.join(commands)
message = Command.Attention
message += chr(len(command) + 3) # +3 for AT and length bit
message += command
message += Command.End
# print('command: %s' % message)
self._sock.send(message)
# for command in commands:
# self._sock.sendall(command)
# self._sock.sendall(Command.End)
return self.readUntil('\n')
# def read(self):
# message = ''
# while True:
# data = self._sock.recv(2)
# print('data: "%s"' % data)
# if :
# break
# message += data
# # message = self._sock.recv(2)
# print('message: "%s"' % message)
# return message
def readUntil(self, end):
message = ''
while True:
char = self._sock.recv(1).decode('utf-8')
if char == end:
if message != 'OK':
print('message: "%s"' % message)
return message
message += char
# def read(self):
# time.sleep(0.01)
# message = self._sock.recv(2048)
# print('message: "%s"' % message)
# return message
def hello(self):
self.command(Command.Info)
def set_rgb(self, red, green, blue):
return self.command(Command.SetRGB, red, green, blue)
def set_mode(self, mode):
return self.command(Command.SetMode, mode)
def set_speed(self, speed):
return self.command(Command.SetSpeed, str(speed))