-
Notifications
You must be signed in to change notification settings - Fork 7
/
crawler.py
382 lines (324 loc) · 13 KB
/
crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
import fetcher, time, urlparse, robotexclusionrulesparser, pickle, os
from datetime import datetime, timedelta
class Crawler:
""" The main crawler class. Instantiate this with a seed and tell it
to start crawling with .crawl()
"""
def __init__(self,
seed, # The initial list of urls to visit
robots_txt_name = "crawler", # which useragent to obey robots.txt rules for
useragent_string = "crawler", # useragent string to send when crawling
default_crawl_delay = 20, # the minimum time between fetches from a domain
obey_robots_txt = True, # Be nice?
schemes = ["http"], # link types to follow
crawl_domains = [], # optionally, restrict the crawler to these domains
pass_time = 0.1, # how long to wait after each crawl management pass
document_fetchers = 15,
robots_txt_fetchers = 5,
outputdir = "output"):
# The number of documents that may be scheduled for fetching concurrently
# + the number of robots txt fetchers should not exceed the number of celery
# workers.
#config import
self.seed = seed
self.robots_txt_name = robots_txt_name
self.useragent_string = useragent_string
self.obey_robots_txt = obey_robots_txt
self.default_crawl_delay = default_crawl_delay
self.schemes = schemes
self.crawl_domains = crawl_domains
self.document_fetchers = document_fetchers
self.robots_txt_fetchers = robots_txt_fetchers
self.outputdir = outputdir
#setup
self.pass_time = pass_time
# stuff we discover while crawling:
self.urls = {}
self.links = {}
self.domains = {}
self.results = {None:None}
# queues for crawl management
self.result_queue = []
self.candidate_queue = []
self.in_progress_queue = []
self.robots_txt_wait_queue = []
# for statistics
self.start_stop_tuples = [(time.time(), -1)]
# to avoid having a website starve the crawling process with excessive crawl delays:
self.too_far_ahead_to_schedule = timedelta(seconds=self.pass_time)
# Let's try to avoid sorting the retrieval candidate queue if possible
self.new_links = True
def add_url(self, url_tuple):
""" add a url to the retrieval queue, without starting to download it
This creates a Domain instance if the domain hasn't been seen before,
to keep track of crawl intervals, robot exclusion etc. It also
updates some internal data so that statistics may be calculated,
and so that a more accurate rank score can be given to other urls.
"""
#avoid parsing more than once
parts = urlparse.urlparse(url_tuple[0])
#if it's a url we want to visit
if parts[0] in self.schemes and (len(self.crawl_domains) == 0 or (parts[1] in self.crawl_domains)):
# extract domain
dname = parts[1]
#if we've never seen this domain
if self.domains.get(dname) == None:
# create a Domain instance
self.domains[dname] = Domain(name = dname, crawler = self)
# and an array to keep track of urls for this domain
self.urls[dname] = []
# if this url is new to us
if self.results.get(url_tuple[0]) == None:
#add to urls for domain
self.urls[dname].append(url_tuple[0])
# create a Document instance for url
document = Document(url_tuple[0],
self.results[url_tuple[1]],
self.domains[dname])
# add url to list of candidates for retrieval
self.candidate_queue.append(document)
# keep track of Document
self.results[url_tuple[0]] = document
else:
#if we have seen this url before, we just track the extra incoming link
self.results.get(url_tuple[0]).add_referrer(self.results[url_tuple[1]])
def crawl(self, save_frequency = timedelta(seconds = 60), termination_checker = lambda c: False):
""" Start crawl. Default termination checker says to never stop.
Plug your own in that says otherwise if this is wanted.
"""
# Add seed urls to queue
while len(self.seed) > 0:
self.add_url((self.seed.pop(0), None))
# Setup tracking of last periodic save time
last_save = datetime.now() - save_frequency
# start of crawling:
# yes we are managing the crawling process with polling.
# celery would do so internally anyway, and we want more control
while not termination_checker(self) and not self.out_of_work():
pass_start = time.time()
# make an attempt to save the state periodically
if save_frequency + last_save < datetime.now():
print "Periodic save made to " + self.suspend()
last_save = datetime.now()
self.check_progress()
self.process_results()
self.start_new_retrievals()
# avoid checking progress too often
time.sleep(max(0.0, min(self.pass_time, time.time() - pass_start)))
print "\nStopping.."
print "Out of work: " + str(self.out_of_work())
print "Termination criteria reached: " + str(termination_checker(self))
def process_results(self):
""" Consider adding urls from result to crawling queue """
for doc in self.result_queue:
self.result_queue.remove(doc)
if len(doc.get_contents()[2].result) > 0:
self.new_links = True
for url_tuple in doc.get_contents()[2].result:
if self.links.get(doc.url)==None:
self.links[doc.url] = [url_tuple[0]]
else:
self.links[doc.url] = self.links[doc.url] + [url_tuple[0]]
self.add_url(url_tuple)
def start_new_retrievals(self):
""" Consider crawling some new urls from queue: """
if self.document_fetchers - len(self.in_progress_queue) > 0:
if self.new_links:
self.candidate_queue.sort(key=self.rank)
self.new_links = False
for domain in self.robots_txt_wait_queue:
if domain.robots_txt_task.ready():
self.robots_txt_wait_queue.remove(domain)
domain.parse_robots_txt()
for doc in self.candidate_queue:
# if we're not allowed to crawl the site before the next crawl management pass,
# skip it to avoid starving the crawling process with waiting workers
if doc.domain.robots_txt_in_place():
if self.within_scheduling_scope(doc):
self.candidate_queue.remove(doc)
doc.retrieve()
self.in_progress_queue.append(doc)
else:
if not doc.domain in self.robots_txt_wait_queue and self.robots_txt_fetchers - len(self.robots_txt_wait_queue) > 0:
doc.domain.setup_robots_txt()
self.robots_txt_wait_queue.append(doc.domain)
if self.document_fetchers - len(self.in_progress_queue) == 0:
break
def check_progress(self):
""" Consider moving crawled documents from in progress queue to
url extraction queue, or remove them from crawling altogether
if we were blocked by robots.txt
"""
for doc in self.in_progress_queue:
if doc.blocked:
# robots.txt block => remove so we don't starve the crawl process
self.in_progress_queue.remove(doc)
if doc.task.ready() and doc.task.result[2].ready():
self.in_progress_queue.remove(doc)
self.result_queue.append(doc)
def within_scheduling_scope(self,document):
return not document.domain.too_long_until_crawl(too_long = self.too_far_ahead_to_schedule)
def suspend(self):
""" Suspends crawl to file and returns filename """
if not os.path.exists(self.outputdir):
os.makedirs(self.outputdir)
thisrun = self.outputdir + os.sep + str(int(self.start_stop_tuples[0][0]))
if not os.path.exists(thisrun):
os.makedirs(thisrun)
# store stop time
self.start_stop_tuples.append( (self.start_stop_tuples.pop()[0], time.time()) )
filename = thisrun + os.sep + str(int(time.time()))+".suspended_crawl"
f = open(filename,"w")
pickle.dump(self, f)
f.close()
return filename
def out_of_work(self):
return (len(self.candidate_queue) == 0 and len(self.in_progress_queue) == 0 and len(self.result_queue) == 0)
def rank(self, document):
""" Assigns score to a document, used for sorting retrieval queue to find next
urls to crawl. This is probably not the best ranking method but hey.. WIP^TM
"""
# if nobody thinks the site is worth linking to, then who are we to argue?
if len(document.referrers) == 0:
return 0.0
# assign a score based on who links to the document
ancestor_linkjuice = 0.0
for referrer in document.referrers:
if referrer != None and self.links[referrer.url] != None and not document.domain == referrer.domain:
ancestor_linkjuice += 1.0/(len(self.links[referrer.url]))
return ancestor_linkjuice/len(self.results)
class Document:
""" Class representing a document (url + incoming link info + crawl
state + contents)
"""
def __init__(self, url, referrer, domain):
self.url = url
self.referrers = [referrer]
self.domain = domain
self.crawl_time = None
self.blocked = False
self.task = fetcher.FakeAsyncResult(ready=False)
def retrieve(self):
""" Start retrieval of document.
Treats crawl-delay as a *guideline*, as tasks scheduled for a specific
time may be executed later. But on average, it will be obeyed.
This is a limitation of the crawler, and should be fixed.
To really obey crawl-delay, a safety margin should be added, and made into
a task expiration time.
"""
# check if we're allowed to crawl
if self.domain.allows_crawling(self.url):
# farm out crawl job
self.crawl_time = self.domain.claim_next_crawl_time()
self.task = fetcher.fetch_document.apply_async(args = [ self.url, self.domain.crawler.useragent_string],
eta = self.crawl_time)
else:
# Blocked by robots.txt
self.blocked = True
def __eq__(self, other):
""" Comparison override to be able to use class sensibly in queues """
if other == None:
return False
return self.url == other.url
def __hash__(self):
""" Hash override to be able to use class in dicts """
return hash(self.url)
def get_contents(self):
""" Blocking method to wait for and get the retrieval result """
try:
self.contents
except:
try:
print "<<< " + self.url
except UnicodeEncodeError:
print "<<< (unable to decode url to ascii)"
self.contents = self.task.wait()
self.domain.downloaded += self.contents[3]
self.domain.downloaded_count += 1
return self.contents
def add_referrer(self, referrer):
""" gain an incoming link """
self.referrers.append(referrer)
class Domain:
""" A class representing a single domain, so we can keep track of
allowable crawltimes, robot exclusion etc
"""
def __init__(self, name, crawler):
# config
self.name = name
self.crawler = crawler
# counters for statistics
self.downloaded = 0
self.downloaded_count = 0
# robots.txt handling
self.crawl_delay = timedelta(seconds=crawler.default_crawl_delay)
self.last_crawl_time = datetime.now() - self.crawl_delay
self.rp = robotexclusionrulesparser.RobotExclusionRulesParser()
self.robots_txt_task = fetcher.FakeAsyncResult(ready=False)
self.parsed_robots_txt = False
self.setup_robots_txt()
def setup_robots_txt(self):
""" Download and parse robots.txt if we care about it """
if self.crawler.obey_robots_txt:
self.parsed_robots_txt = False
try:
# This should be made async later
self.robots_txt_task = fetcher.fetch_robots_txt.apply_async(
args = ['http://'+self.name+'/robots.txt', self.crawler.useragent_string])
#self.rp.fetch()
except:
# if we couldn't get robots.txt, that's just too bad.. :p
pass
def parse_robots_txt(self):
""" take our async result and parse it (blocking) """
self.rp.parse(self.robots_txt_task.wait()[1])
if self.rp.get_crawl_delay(self.crawler.robots_txt_name) != None:
self.crawl_delay = max(timedelta(seconds = self.rp.get_crawl_delay(self.crawler.robots_txt_name)), self.crawl_delay)
self.parsed_robots_txt = True
def __eq__(self,other):
""" Override eq so we can do better than object id comparisons """
if other == None:
return False
return self.name == other.name
def __hash__(self):
""" Override has so we can use Domain as key in dicts etc """
return hash(self.name)
def claim_next_crawl_time(self):
""" Claim a crawl time """
if self.rp.is_expired():
self.setup_robots_txt()
if not self.parsed_robots_txt:
self.parse_robots_txt()
self.last_crawl_time = self.last_crawl_time + self.crawl_delay
return self.last_crawl_time
def robots_txt_in_place(self):
return not self.crawler.obey_robots_txt or (not self.rp.is_expired() and self.parsed_robots_txt)
def defer_crawl(self):
""" Undo claim of crawl time """
self.last_crawl_time = self.last_crawl_time - self.crawl_delay
def allows_crawling(self, url):
""" Can we crawl this url? """
if not self.crawler.obey_robots_txt:
return True
# if robots.txt is expired, refresh it
if self.rp.is_expired():
self.setup_robots_txt()
# some more thought needs to go into how to avoid blocking on robots.txt
if not self.parsed_robots_txt:
self.parse_robots_txt()
try:
return self.rp.is_allowed(self.crawler.robots_txt_name, urlparse.urlparse(url)[2])
except UnicodeDecodeError:
return False
def too_long_until_crawl(self, too_long):
""" Avoid starvation of crawling process """
return self.last_crawl_time + self.crawl_delay > datetime.now() + too_long
def resume(suspended_crawl):
""" Reads suspended crawl from file and returns crawler object. Restart
crawling with .crawl()
Remember to handle file exceptions..
"""
crawler = pickle.load(open(suspended_crawl))
# store start time
crawler.start_stop_tuples.append( (time.time(), -1) )
return crawler