-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_collector.py
112 lines (90 loc) · 3.67 KB
/
data_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import settings
import os
import psycopg2
import re
import tweepy
import pandas as pd
from textblob import TextBlob
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
# Collecting the tweets and store into the SQL database.
if status.retweeted:
return True
id_str = status.id_str
created_at = status.created_at
text = deEmojify(status.text)
sentiment = TextBlob(text).sentiment
polarity = sentiment.polarity
subjectivity = sentiment.subjectivity
user_created_at = status.user.created_at
user_location = deEmojify(status.user.location)
user_description = deEmojify(status.user.description)
user_followers_count = status.user.followers_count
longitude = None
latitude = None
if status.coordinates:
longitude = status.coordinates['coordinates'][0]
latitude = status.coordinates['coordinates'][1]
retweet_count = status.retweet_count
favorite_count = status.favorite_count
sql_cursor = connection.cursor()
sql = "INSERT INTO {} (id_str, created_at, text, polarity, subjectivity, user_created_at, user_location, user_description, user_followers_count, longitude, latitude, retweet_count, favorite_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)".format(
settings.table_name)
val = (id_str, created_at, text, polarity, subjectivity, user_created_at, user_location, \
user_description, user_followers_count, longitude, latitude, retweet_count, favorite_count)
sql_cursor.execute(sql, val)
connection.commit()
# If the limit of the table exceeds delete old ones.
delete_query = '''
DELETE FROM {0}
WHERE id_str IN (
SELECT id_str
FROM {0}
ORDER BY created_at asc
LIMIT 200) AND (SELECT COUNT(*) FROM covid19) > 9500;
'''.format(settings.table_name)
sql_cursor.execute(delete_query)
connection.commit()
sql_cursor.close()
def on_error(self, status_code):
# Checking for error status codefrom twitter
if status_code == 420:
# return False to disconnect the stream
return False
def clean_tweet(self, tweet):
# Cleaning the Tweets
return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t]) \
|(\w+:\/\/\S+)", " ", tweet).split())
def deEmojify(text):
# Cleaning all the emojies.
if text:
return text.encode('ascii', 'ignore').decode('ascii')
else:
return None
# connect to the database
DATABASE_URL = os.environ['DATABASE_URL']
connection = psycopg2.connect(DATABASE_URL, sslmode='require')
sql_cursor = connection.cursor()
sql_cursor.execute("""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = '{0}'
""".format(settings.table_name))
# Checking if the table exists.
table_check = bool(sql_cursor.rowcount)
print(table_check)
if table_check == False:
sql_cursor.execute("CREATE TABLE {} ({});".format(settings.table_name, settings.table_attributes))
connection.commit()
sql_cursor.close()
CONSUMER_KEY = os.environ['CONSUMER_KEY']
CONSUMER_SECRET = os.environ['CONSUMER_SECRET']
ACCESS_KEY = os.environ['ACCESS_KEY']
ACCESS_SECRET = os.environ['ACCESS_SECRET']
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_KEY, ACCESS_SECRET)
api = tweepy.API(auth)
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
myStream.filter(languages=["en"], track=settings.data_on_word)
connection.close()