-
Notifications
You must be signed in to change notification settings - Fork 4
/
mongoDump.py
executable file
·134 lines (102 loc) · 4.25 KB
/
mongoDump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/python
"""
mongoDump by ThreeSixes (https://github.com/ThreeSixes)
This project is licensed under GPLv3. See COPYING for dtails.
This file is part of the airSuck project (https://github.com/ThreeSixes/airSUck).
"""
try:
import config
except:
raise IOError("No configuration present. Please copy config/config.py to the airSuck folder and edit it.")
import sys
import redis
import pymongo
import time
import json
import datetime
import traceback
from libAirSuck import asLog
from pprint import pprint
#Redis queue name
targetQ = "airReliable"
# Redis instance for queueing.
rQ = redis.StrictRedis(host=config.connRel['host'], port=config.connRel['port'])
#Delay this many seconds if the queue is empty to prevent
#stupid amounts of CPU utilization.
checkDelay = config.connMongo['checkDelay']
#MongoDB config
connMongo = pymongo.MongoClient(config.connMongo['host'], config.connMongo['port'])
mDB = connMongo[config.connMongo['dbName']]
mDBColl = mDB[config.connMongo['coll']]
# Set up record buffer.
insertBuff = []
# Keep running?
keepRunning = True
# Convert datetime objects expressed as a string back to datetime
def str2Datetime(strDateTime):
"""
Convert utcnow() datetime string back to a datetime object.
"""
retVal = None
try:
# Check the length of the string since frames received on the second sometimes lack the %f portion of the data.
if (len(strDateTime) == 19):
strDateTime = strDateTime + ".000000"
# Attempt to convert the date.
retVal = datetime.datetime.strptime(strDateTime, "%Y-%m-%d %H:%M:%S.%f")
except:
tb = traceback.format_exc()
logger.log("Failed to convert string to datetime:\n%s" %tb)
return retVal
# Decapsulate the JSON data.
def dejsonify(msg):
return json.loads(msg)
# Insert records into specified mongo instance
def serializeADSB(entry):
mDBColl.insert(entry)
# Set up the logger.
logger = asLog(config.connMongo['logMode'])
# If this mongo engine is enabled...
if config.connMongo['enabled'] == True:
# Limits?
timeLimit = datetime.timedelta(seconds=config.connMongo['insertDelay'])
# What time is it now?
timeThen = datetime.datetime.now()
timeNow = datetime.datetime.now()
# Infinite fucking loop.
logger.log("Dumping connector data from queue to MongoDB.")
while(keepRunning) :
try:
# Pull oldest entry from the queue.
dQd = rQ.rpop(config.connRel['qName'])
# If we have no data sleep for our configured delay to save CPU.
if(dQd == None):
time.sleep(checkDelay)
else:
# We have data so we should break it out of JSON formatting.
xDqd = dejsonify(dQd)
xDqd['dts'] = str2Datetime(xDqd['dts'])
# Add record to buffer.
insertBuff.append(xDqd)
# What time is it now?
timeNow = datetime.datetime.now()
# Time to store?
if (timeNow - timeThen) > timeLimit:
# Bulk insert
serializeADSB(insertBuff)
# Reset our then.
timeThen = datetime.datetime.now()
# Nuke buffered records because we've theoretically alread serialized them.
insertBuff[:] = []
except KeyboardInterrupt:
keepRunning = False
try:
# Store records.
serializeADSB(insertBuff)
except:
None
except:
tb = traceback.format_exc()
logger.log("Failed to pull from the Redis queue. Sleeping %s sec\n%s" %(checkDelay, tb))
else:
logger.log("The connector mongoDB engine is not enabled in the configuration.")