forked from nikivanov/watney
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservocontroller.py
97 lines (78 loc) · 3.08 KB
/
servocontroller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import asyncio
import time
class ServoController:
def __init__(self, gpio, config, audioManager):
servoConfig = config["SERVO"]
self.pwmPin = int(servoConfig["PWMPin"])
self.neutral = int(servoConfig["Neutral"])
self.min = int(servoConfig["Min"])
self.max = int(servoConfig["Max"])
self.audioManager = audioManager
self.audioToken = "927dff95-b82b-433c-885c-6ac9ac13d8b0"
self.gpio = gpio
self.changeVelocityPerSec = 1000
# 1 is forward, -1 is backward, 0 is stop
self.direction = 0
self.timingLock = asyncio.Condition()
self.task = None
self.startLoop()
async def forward(self):
async with self.timingLock:
self.direction = 1
self.timingLock.notify()
async def backward(self):
async with self.timingLock:
self.direction = -1
self.timingLock.notify()
async def lookStop(self):
async with self.timingLock:
if self.direction != 0:
self.direction = 0
self.timingLock.notify()
def startLoop(self):
loop = asyncio.get_event_loop()
self.task = loop.create_task(self.timingLoop())
async def timingLoop(self):
print("Servo starting...")
try:
currentPosition = self.neutral
lastChangeTime = None
while True:
async with self.timingLock:
if not self.__shouldBeMoving(currentPosition):
self.stopServo()
self.audioManager.restoreVolume(self.audioToken)
lastChangeTime = None
await self.timingLock.wait()
self.audioManager.lowerVolume(self.audioToken)
if not lastChangeTime:
changeDelta = 0
else:
timeDelta = time.time() - lastChangeTime
changeDelta = self.changeVelocityPerSec * timeDelta
lastChangeTime = time.time()
if self.direction == 1:
currentPosition = max(currentPosition - changeDelta, self.min)
elif self.direction == -1:
currentPosition = min(currentPosition + changeDelta, self.max)
self.changeServo(currentPosition)
await asyncio.sleep(0.05)
except asyncio.CancelledError:
self.stopServo()
print("Servo stopped")
except Exception as e:
print("Unexpected exception in servo: " + str(e))
def __shouldBeMoving(self, currentPosition):
if self.direction == 0:
return False
elif self.direction == 1 and currentPosition <= self.min:
return False
elif self.direction == -1 and currentPosition >= self.max:
return False
return True
def stopServo(self):
self.gpio.set_servo_pulsewidth(self.pwmPin, 0)
def changeServo(self, val):
self.gpio.set_servo_pulsewidth(self.pwmPin, val)
def stop(self):
self.stopServo()