-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDHT22.py
121 lines (85 loc) · 1.97 KB
/
DHT22.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import RPi.GPIO as GPIO
import time
TIMEOUT = 2
ERROR_TIMEOUT = -1
ERROR_DATA_LOW = -2
ERROR_DATA_READ = -3
ERROR_DATA_EOF = -4
ERROR_DATA_LENGTH = -5
ERROR_DATA_PARITY = -6
ERROR_DATA_VALUE = -7
class DHT22:
def usleep(self, t):
tt = time.time() + (t / 1000000)
while time.time() < tt:
pass
def confirm(self, us, level):
cnt = int(us / 10)
if us % 10 > 0:
cnt = cnt + 1
for i in range(0, cnt):
self.usleep(10)
l = GPIO.input(self.pin)
if l != level:
return True
return False
def bits2byte(self, data, offset):
result = 0
b = 128
for i in range(0, 8):
result += data[offset + i] * b
b >>= 1
return result
def acquire(self):
t_start = time.time()
while True:
# init transmission
GPIO.setup(self.pin, GPIO.OUT)
GPIO.output(self.pin, GPIO.LOW)
self.usleep(1000)
GPIO.output(self.pin, GPIO.HIGH)
GPIO.setup(self.pin, GPIO.IN, pull_up_down = GPIO.PUD_DOWN)
if self.confirm(85, GPIO.LOW):
if self.confirm(85, GPIO.HIGH):
break
if time.time() - t_start > TIMEOUT:
return ERROR_TIMEOUT
# read transmission
data = []
for i in range(0, 40):
if not self.confirm(90, GPIO.LOW):
return ERROR_DATA_LOW
ok = False
for j in range(0, 8):
if GPIO.input(self.pin) != GPIO.HIGH:
ok = True
break
self.usleep(6)
if not ok:
return ERROR_DATA_READ
if j > 3:
data.append(1)
else:
data.append(0)
if not self.confirm(75, GPIO.LOW):
return ERROR_DATA_EOF
# decode
if len(data) != 40:
return ERROR_DATA_LENGTH
hh = self.bits2byte(data, 0)
hl = self.bits2byte(data, 8)
th = self.bits2byte(data, 16)
tl = self.bits2byte(data, 24)
p = self.bits2byte(data, 32)
s = (hh + hl + th + tl) & 255
if p != s:
return ERROR_DATA_PARITY
h = (hh << 8) | hl
t = (th << 8) | tl
if h == 0 and t == 0:
return ERROR_DATA_VALUE
return [h / 10, t / 10]
def begin(self, pin):
self.pin = pin
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)