-
Notifications
You must be signed in to change notification settings - Fork 4
/
sub2Dump1090.py
executable file
·145 lines (112 loc) · 4.26 KB
/
sub2Dump1090.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/python
"""
sub2Dump1090 by ThreeSixes (https://github.com/ThreeSixes)
This project is licensed under GPLv3. See COPYING for dtails.
This file is part of the airSuck project (https://github.com/ThreeSixes/airSUck).
"""
############
# Imports. #
############
try:
import config
except:
raise IOError("No configuration present. Please copy config/config.py to the airSuck folder and edit it.")
import sys
import redis
import time
import json
import threading
import time
import traceback
from libAirSuck import asLog
from socket import socket
from pprint import pprint
#################
# Configuration #
#################
# Submit data to a dump1090 instance via TCP 30001
dump1090Dst = {
"host": "127.0.0.1",
"port": 30001,
"reconnectDelay": 5
}
##############################
# Classes for handling data. #
##############################
class SubListener(threading.Thread):
"""
Listen to the SSR channel for new data.
"""
def __init__(self, r, channels):
threading.Thread.__init__(self)
# Set up REDIS feed
self.redis = r
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channels)
def worker(self, work, dSock):
"""
worker(self,work) dumps the incoming data fromt he queue to the open TCP socket for dump1090.
"""
# Break our SSR wrapped data out from the Redis queue
ssrJson = str(work['data'])
ssrWrapped = json.loads(ssrJson)
# Make sure we pared the JSON correctly.
if (type(ssrWrapped) == dict):
# If we have SSR data from dump1090...
if ssrWrapped['type'] == "airSSR":
# And if the data actually came from dump1090...
if ssrWrapped['dataOrigin'] == "dump1090":
# Define start of data char
dataHdr = "*"
# Set the data body.
dataBody = ssrWrapped['data']
# Do we have MLAT data coming in?
if 'mlatData' in ssrWrapped:
# Set the header to an MLAT header
dataHdr = "@"
# Prepend the MLAT data on the data body.
dataBody = ssrWrapped['mlatData'] + dataBody
# Send the data over to the target dump1090
dSock.sendall((dataHdr + dataBody + ";\n"))
def run(self):
"""
run(self) is how we start the process. This creates a new thread to write data into.
"""
# Set up socket.
dump1090Sock = socket()
# Infinite loop
while True:
try:
# Attempt a connection.
logger.log("Connecting to %s:%s" %(dump1090Dst["host"], dump1090Dst["port"]))
# Connect up.
dump1090Sock.connect((dump1090Dst["host"], dump1090Dst["port"]))
logger.log("Connected.")
# Handle incoming data.
for work in self.pubsub.listen():
self.worker(work, dump1090Sock)
dump1090Sock.close()
except KeyboardInterrupt:
quit()
except:
tb = traceback.format_exc()
logger.log("Failed to connect to %s:%s\n%s" %(dump1090Dst["host"], dump1090Dst["port"], tb))
logger.log("Sleeping %s sec" %dump1090Dst["reconnectDelay"])
time.sleep(dump1090Dst["reconnectDelay"])
# Start up.
if __name__ == "__main__":
# Set up the logger.
logger = asLog('stdout')
# Start up redis, create our threaded client, and start it.
r = redis.Redis(host=config.connPub['host'], port=config.connPub['port'])
client = SubListener(r, [config.connPub['qName']])
client.daemon = True
client.start()
try:
while True: time.sleep(10)
except KeyboardInterrupt:
# Die nicely.
quit()
except Exception:
tb = traceback.format_exc()
logger.log("Unexpected exception\n%s" %tb)