-
Notifications
You must be signed in to change notification settings - Fork 1
/
avatar-fetcher.py
101 lines (83 loc) · 3.26 KB
/
avatar-fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#! /usr/bin/env python3
from collections import Counter
from urllib.request import urlopen, Request
import argparse
import base64
import json
import os
import re
import shutil
import urllib.parse
import urllib.error
reScreenName = re.compile('^[a-z0-9_]{1,15}$', re.I)
def validateScreenName (screenName):
if reScreenName.match(screenName):
return True
print('Invalid screen name:', screenName.encode())
return False
class TwitterAvatarFetcher:
def __init__ (self, targetFolder):
if not os.path.exists(targetFolder):
os.makedirs(targetFolder)
self.targetFolder = targetFolder
with open('config.json') as f:
config = json.load(f)
self.key = base64.b64encode('{}:{}'.format(config['consumer_key'], config['consumer_secret']).encode()).decode()
def retrieveBearerToken (self):
req = Request('https://api.twitter.com/oauth2/token')
req.method = 'POST'
req.add_header('Authorization', 'Basic {}'.format(self.key))
req.add_header('Content-Type', 'application/x-www-form-urlencoded;charset=utf-8')
req.data = b'grant_type=client_credentials'
with urlopen(req) as resp:
data = json.loads(resp.read().decode())
self.bearerToken = data['access_token']
def getUsers (self, users):
req = Request('https://api.twitter.com/1.1/users/lookup.json')
req.method = 'POST'
req.data = urllib.parse.urlencode({ 'screen_name': ','.join(users) }).encode()
req.add_header('Authorization', 'Bearer {}'.format(self.bearerToken))
req.add_header('Content-type', 'application/x-www-form-urlencoded')
with urlopen(req) as resp:
return json.loads(resp.read().decode())
def downloadAvatars (self, users):
users = Counter(filter(validateScreenName, users))
for user, count in users.items():
if count > 1:
print('Duplicated screen name: "{}" ({} times)'.format(user, count))
users = list(users.keys())
print('Downloading avatars for {} users'.format(len(users)))
for i in range(0, len(users), 50):
batch = set((u.lower() for u in users[i:i+50]))
for user in self.getUsers(batch):
screenName = user['screen_name']
batch.remove(screenName.lower())
imageUrl = user['profile_image_url']
path = os.path.join(self.targetFolder, screenName + os.path.splitext(imageUrl)[1])
try:
for url in imageUrl.replace('_normal', ''), imageUrl.replace('_normal', '_400x400'), imageUrl:
try:
with urlopen(url) as res, open(path, 'wb+') as f:
shutil.copyfileobj(res, f)
break
except urllib.error.HTTPError as e:
if e.code != 404 or url == imageUrl:
raise
except Exception as e:
print('Download failed for "{}"\n {}'.format(screenName, e))
if batch:
print('Skipped by the Twitter API:', batch)
def main ():
parser = argparse.ArgumentParser(description='Fetch user avatars from Twitter.')
parser.add_argument('file', help='File with usernames')
parser.add_argument('-t', '--target', default='output', help='target path')
args = parser.parse_args()
if not os.path.exists('config.json'):
parser.error('You need to create a config.json file first. Check the README file for details.')
dl = TwitterAvatarFetcher(args.target)
dl.retrieveBearerToken()
with open(args.file, encoding='utf-8') as f:
users = list(line.strip() for line in f)
dl.downloadAvatars(users)
if __name__ == '__main__':
main()