-
Notifications
You must be signed in to change notification settings - Fork 0
/
cacheunittest.py
280 lines (224 loc) · 7.54 KB
/
cacheunittest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import http.client
import http.cookiejar
import json
import random
import re
import ssl
import time
import unittest
from xml.dom import minidom
import logging
logger = logging.getLogger(__name__)
__unittest = True
PROXYHOST = ""
PROXYPORT = ""
PROXYHTTPSPORT = ""
'''
Based on https://stackoverflow.com/questions/61280350/how-to-set-the-sni-via-http-client-httpsconnection
'''
class WrapSSSLContext(ssl.SSLContext):
"""
HTTPSConnection provides no way to specify the
server_hostname in the underlying socket. We
accomplish this by wrapping the context to
overrride the wrap_socket behavior (called later
by HTTPSConnection) to specify the
server_hostname that we want.
"""
def __new__(cls, server_hostname, *args, **kwargs):
return super().__new__(cls, *args, *kwargs)
def __init__(self, server_hostname, *args, **kwargs):
super().__init__(*args, **kwargs)
self._server_hostname = server_hostname
def wrap_socket(self, sock, *args, **kwargs):
kwargs['server_hostname'] = self._server_hostname
return super().wrap_socket(sock, *args, **kwargs)
class CacheUnitTest(unittest.TestCase):
"""
Basis class common to all CMS and tests
"""
def setUp(self, proxy, port, website, https=False):
self.headers = None
self.proxies = None
if proxy:
self.proxies = {"http": proxy, "https": proxy}
self.proxy = proxy
if port:
self.port = port
else:
if not https:
self.port = 80
else:
self.port = 443
if website:
self.website = website
self.headers = {'Host': self.website}
self.https = https
self.response_content = ""
self.response_content_bytes = 2048
# Add specific header
def set_header(self, header_value):
self.headers.update(header_value)
def purge_request(self, url):
return self.request("PURGE", url)
def get_request(self, url):
return self.request("GET", url)
def request(self, method, url):
# Recoding get request to allow proxy
if not self.https:
logging.debug("Establishing HTTP Connection with {} on port {}".format(self.proxy, self.port))
conn = http.client.HTTPConnection(self.proxy, self.port)
else:
proxy_to_server_context = WrapSSSLContext(self.proxy)
logging.debug("Establishing HTTPS Connection with {} on port {}".format(self.proxy, self.port))
conn = http.client.HTTPSConnection(self.proxy, self.port, context=proxy_to_server_context)
logging.debug("Pushing request on url {} with method {}".format(url, method))
conn.putrequest(method, url, skip_host=True)
for header in self.headers.keys():
logging.debug("Specifying header {} with value {}".format(str(header), self.headers.get(header)))
conn.putheader(str(header), str(self.headers.get(header)))
conn.endheaders()
response = conn.getresponse()
self.response_header = response.headers
self.response_content = str(response.read(self.response_content_bytes))
conn.close()
return response
def build_url(self, path):
"""
Construct an absolute url by appending a path to a domain.
"""
return 'http://%s%s' % (self.website, path)
def get_once(self, url, needpurge=False, **kwargs):
if needpurge:
self.purge_request(url)
response = self.get_request(url)
return response
def get_twice(self, url, **kwargs):
"""
Fetch a url twice and return the second response (for testing cache hits).
"""
self.get_request(url)
# time.sleep(2)
response = self.get_request(url)
return response
def get_twice_tokenized(self, url, tokenname=None, **kwargs):
"""
Fetch a url twice with two different tokens and return the 2nd response
"""
if tokenname is not None:
token = tokenname + "=" + str(random.randint(10000, 999999))
else:
token = str(random.randint(10000, 999999))
# print("url1: " + url + "?" + token)
self.get_request(url + "?" + token)
if tokenname is not None:
token = tokenname + "=" + str(random.randint(10000, 999999))
else:
token = str(random.randint(10000, 999999))
# print("url2: " + url + "?" + token)
response = self.get_request(url + "?" + token)
return response
def purgeandget_twice(self, url, **kwargs):
"""
Fetch a url twice and return the second response (for testing cache hits).
"""
self.purge_request(url)
time.sleep(1)
self.get_request(url)
time.sleep(2)
response = self.get_request(url)
return response
"""
Assertions
"""
def assertHit(self, response):
"""
Assert that a given response contains the header indicating a cache hit.
"""
self.assertEqual(str(response.headers['X-Cache']).lower(), 'HIT'.lower(),
msg='Uncached while cache was expected')
def assertMiss(self, response):
"""
Assert that a given response contains the header indicating a cache miss.
"""
self.assertEqual(str(response.headers['X-Cache']).lower(), 'miss'.lower())
def assertPass(self, response):
"""
Assert that a given response contains the header indicating a pass.
"""
self.assertEqual(str(response.headers['X-Cache']).lower(), 'pass'.lower())
def assertSynth(self, response):
"""
Assert that a given response contains the header indicating a pass.
"""
self.assertEqual(str(response.headers['X-Cache']).lower(), 'synth'.lower())
def assertMaxAge(self, response, value):
"""
Assert that a given response contains the header indicating specific "max-age" value.
"""
max_age_regex = re.compile('max-age\s*=\s*(\d+)')
try:
cache_control = response.headers['cache-control']
except KeyError:
try:
cache_control = response.headers['Cache-Control']
except:
raise AssertionError('No cache-control header.')
max_age = max_age_regex.match(cache_control)
if not max_age:
raise AssertionError('No max-age specified in cache-control header.')
self.assertEqual(int(max_age.group(1)), value)
def assert200(self, response):
# Ok
self.assertEqual(response.status, 200)
def assert30X(self, response):
self.assertRegex(str(response.status), '30?')
def assert301(self, response):
# Permanent redirect
self.assertEqual(response.status, 301)
def assert302(self, response):
# Temporary redirect
self.assertEqual(response.status, 302)
def assert304(self, response):
# Not modified
self.assertEqual(response.status, 304)
def assert40X(self, response):
self.assertRegex(str(response.status), '40?')
def assert400(self, response):
# Bad Request
self.assertEqual(response.status, 400)
def assert401(self, response):
# Unauthorized
self.assertEqual(response.status, 401)
def assert403(self, response):
# Forbidden
self.assertEqual(response.status, 403)
def assert404(self, response):
# Not found
self.assertEqual(response.status, 404)
def assert405(self, response):
# Method Not allowed
self.assertEqual(response.status, 405)
def assert50X(self, response):
# Method Not allowed
self.assertRegex(str(response.status), '50?')
def assertBackend(self, response, backend):
self.assertEqual(str(response.headers['X-Back']).lower(), backend.lower())
def assertRedirectURL(self, response, url):
self.assertEqual(str(response.headers['location']).lower(), url.lower())
def assertValidJSON(self, response):
try:
logging.debug("Parsing response {} first {} bytes, expecting valid JSON".format(self.response_content,
self.response_content_bytes))
json_object = json.loads(self.response_content)
return True
except ValueError as error:
return False
def assertValidXML(self, response):
try:
logging.debug("Parsing response {} first {} bytes, expecting valid JSON".format(self.response_content,
self.response_content_bytes))
minidom.parseString(self.response_content)
return True
except ValueError as error:
return False