-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparsetest.py~
125 lines (112 loc) · 4.33 KB
/
parsetest.py~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 8 10:43:57 2012
@author: mike
"""
import shelve
from basicTwitter import auth
from tweepy import models
import sys
from pyspatialite import dbapi2 as spatialdb
statusCols =['created_at','id',
'place','source','text']
userCols = ['created_at', 'description', 'followers_count',
'friends_count', 'geo_enabled', 'id',
'lang', 'location', 'name', 'screen_name', 'status',
'statuses_count', 'time_zone', 'url', 'utc_offset',
'verified']
def parseStreamDict(inDict):
"""takes a json dictionary from a stream
and returns the status, user, entities, and place objects"""
api = auth()
status = models.ModelFactory.status.parse(api, inDict)
user = status.user
entities = status.entities
place = status.place
return status, user, entities, place
def dumpStatus(cur, status, user_id, place_name):
"""dumps params to status table"""
# parse it using their parser
try:
# pull out the user
# extract the important parts of the status class to feed into the sql machine
statusParams = []
statusParams.append(status.created_at.ctime())
statusParams.append(status.id)
statusParams.append(unicode(place_name))
statusParams.append(unicode(status.source))
statusParams.append(unicode(status.text))
coords = status.coordinates['coordinates']
statusParams.append(user_id)
cur.execute("""INSERT INTO STATUS
(created_at, tweet_id, place, source, tweet_text, user_id, coords)
values(?,?,?,?,?,?,MakePoint({0}, {1}, 4326))""".format(
*coords),statusParams)
print True
except Exception as e:
print e
def dumpUser(cur, user):
"""dumps params to the user table"""
# make a blank list
params = []
# loop through each of the rows
for col in userCols:
# through it on the list, or through on None
params.append(getattr(user, col, None))
# and now it is time to do some dumping
try:
cur.execute("""insert into twitteruser
(created_at, description, followers_count, friends_count,
geo_enabled, user_id, lang, location, name, screen_name, status,
statuses_count, time_zone, url, utc_offset, verified)
values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", params)
except spatialdb.IntegrityError:
pass
def dumpEntities(cur, entities, status):
# these are all dealing with the entities - need to remember plurality
if entities['hashtags']:
for hashtag in entities['hashtags']:
hashParams = [status.id, hashtag.get('text')]
hashParams += hashtag['indices']
cur.execute("""INSERT INTO HASH_TWEET values(?,?,?,?)""",hashParams)
if entities['urls']:
for url in entities['urls']:
urlParams = [status.id, url.get('url'), url.get('expanded_url')]
urlParams += url['indices']
cur.execute("""insert into tweeturl values(?,?,?,?,?)""", urlParams)
if entities['user_mentions']:
for mention in entities['user_mentions']:
mentionParams = [status.id, mention.get('id')]
mentionParams += mention['indices']
cur.execute("""insert into mention values (?,?,?,?)""", mentionParams)
def dumpWords(cur, status):
if status.text:
for count, word in enumerate(status.text.split(' ')):
wordParams = [word, status.id, count]
cur.execute("""insert into words values(?,?,?)""", wordParams)
def run(inDict, cur, conn):
status, user, entities, place = parseStreamDict(inDict)
if place:
place_name = place.get('name')
else:
place_name = None
if status.coordinates and user.lang == 'en':
dumpStatus(cur, status, user.id, place_name)
dumpUser(cur, user)
dumpEntities(cur, entities,status)
dumpWords(cur, status)
conn.commit()
def runShelve(inDb, outDb):
"""create a cur and run the functions"""
conn = spatialdb.Connection(outDb)
cur = conn.cursor()
db = shelve.open(inDb)
for val in db.itervalues():
run(val, cur)
conn.close()
def main():
inDb = sys.argv[1]
outDb = sys.argv[2]
runShelve(inDb, outDb)
if __name__ == '__main__':
main()