This repository has been archived by the owner on Jan 25, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_bad_test.py
117 lines (94 loc) · 3.68 KB
/
server_bad_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
Script for testing how servers respond the various 'bad' cookies.
"""
import argparse
import json
import multiprocessing as mp
import signal
import sys
import time
from functools import partial
import dns.resolver
from dns.edns import GenericOption
from dns.message import make_query
from tqdm import tqdm
COOKIE_OPT = 10
CLIENT_COOKIE = "1e4ddeb526a1da40"
FAKE_SERVER_COOKIE = "00112233445566778899aabbccddeeff"
query_keys = ["sent", "edns", "scook", "response", "rcode", "err"]
methods = ['normal', 'none', 'fake']
json_keys = ["ip", "domain", "num_sent"]
json_keys.extend(methods)
def makedict(default=None, keys=json_keys):
return {key: default for key in keys}
def extract_scook(r: dns.message.Message) -> bytes:
for o in r.options:
if o.otype == COOKIE_OPT:
return o.data[8:]
return bytes()
def ind_query(domain: str, ip: str, scookie: str, attempts: int = 3) -> dict:
d = makedict(keys=query_keys)
for i in range(attempts):
try:
cookie_opt = GenericOption(COOKIE_OPT, bytes.fromhex(CLIENT_COOKIE + scookie))
q = make_query(domain, dns.rdatatype.A, use_edns=True, want_dnssec=False, options=[cookie_opt])
d["sent"] = scookie
r: dns.message.Message = dns.query.udp(q, ip, timeout=5)
except Exception as e:
d['err'] = str(e)
continue
else:
if len(r.answer) > 0:
d['response'] = str(r.answer[0]).split()[-1]
d["scook"] = extract_scook(r).hex()
d["rcode"] = r.rcode()
d["edns"] = r.edns >= 0
break
return d
def query(params):
res = makedict()
res["ip"] = params["ip"]
res["domain"] = params["domain"]
res["num_sent"] = params["number"]
for m in methods:
qry = partial(ind_query, params["domain"], params["ip"])
prev_query = qry("")
res[m] = []
for _ in range(params["number"]):
time.sleep(1)
if m == "none":
res[m].append(qry(""))
elif m == "fake":
res[m].append(qry(FAKE_SERVER_COOKIE))
else:
q = qry(prev_query["scook"])
res[m].append(q)
if q["scook"] is not None:
prev_query = q
return res
def main(args):
parser = argparse.ArgumentParser(description="Running a series of dns queries on a list of IPs")
parser.add_argument('input', help="Input file containing a json lines with ip and domain keys")
parser.add_argument('output', help="Output file to write results to")
parser.add_argument('-t', '--num-threads', help="Number of threads to execute queries", default=64, type=int)
parser.add_argument('-n', '--num-queries', help="Number of queries to run on a single IP", default=5, type=int)
args = parser.parse_args(args)
with open(args.input, 'r') as in_file:
targets = [json.loads(t) for t in in_file.readlines()]
for t in targets:
t["number"] = args.num_queries
if "domain" not in t.keys():
t["domain"] = "cookie-test.example.com"
threads = min(args.num_threads, len(targets))
print("Beginning threads...")
with open(args.output, 'w') as output:
with mp.Pool(processes=threads, initializer=lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)) as p:
try:
for result in tqdm(p.imap_unordered(query, targets), total=len(targets), unit="query"):
output.write(json.dumps(result) + "\n")
except KeyboardInterrupt:
p.terminate()
p.join()
print("Exiting early from queries.")
if __name__ == "__main__":
main(sys.argv[1:])