forked from nikivanov/watney
-
Notifications
You must be signed in to change notification settings - Fork 0
/
offcharger.py
46 lines (36 loc) · 1.62 KB
/
offcharger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import concurrent
from events import Events
import time
class OffCharger:
def __init__(self, config, tts, motorController):
enabled = config["OFFCHARGER"]["Enabled"]
self.gracePeriod = int(config["OFFCHARGER"]["GracePeriod"])
self.initialDelay = int(config["OFFCHARGER"]["InitialDelay"])
self.tts = tts
self.motorController = motorController
if enabled.lower() == "true":
print("Starting Off Charger Monitoring...")
Events.getInstance().motionOn.append(lambda: self.onMotion())
Events.getInstance().motionOff.append(lambda: self.onMotion())
Events.getInstance().offCharger.append(lambda: self.onOffCharger())
Events.getInstance().onCharger.append(lambda: self.onOnCharger())
self.lastMotion = time.time()
self.currentChargingState = False
self.delayedExecutor = concurrent.futures.ThreadPoolExecutor()
def onMotion(self):
self.lastMotion = time.time()
def onOffCharger(self):
self.currentChargingState = False
if (time.time() - self.lastMotion > self.gracePeriod):
self.delayedExecutor.submit(self.__doOffCharger)
def onOnCharger(self):
self.currentChargingState = True
def __doOffCharger(self):
time.sleep(self.initialDelay)
if not self.currentChargingState:
print("Off Charger detected")
self.tts.sayText("Off Charger")
time.sleep(2)
self.motorController.setBearing("n", False)
time.sleep(1)
self.motorController.setBearing("0", False)