-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_resolver.py
117 lines (85 loc) · 3.71 KB
/
test_resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import re
import unittest
from dnslib import CLASS
from dnslib.client import DNSRecord, DNSQuestion, QTYPE
from dnslib.server import DNSServer
from ipaddress import IPv4Address
from src.resolver import WeaponizedResolver
from src.storage import SubdomainTable
from src.helpers import get_random_ip_v4
DNS_PORT = 55053
DNS_HOST = "127.0.0.1"
DNS_TTL = 10
class TestStorage(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._storage = SubdomainTable(max_size=2)
def test_add(self):
ip = IPv4Address("1.2.3.4")
self._storage.add(subdomain="subdomain", ip=ip)
assert len(self._storage._records) == 1
# test duplicates
self._storage.add(subdomain="subdomain", ip=ip)
assert len(self._storage._records) == 1
def test_max_size(self):
self._storage.add(subdomain="subdomain-max-1", ip=get_random_ip_v4())
self._storage.add(subdomain="subdomain-max-2", ip=get_random_ip_v4())
self._storage.add(subdomain="subdomain-max-3", ip=get_random_ip_v4())
assert len(self._storage._records) == 2
assert self._storage.get("subdomain-max-1") is None
assert self._storage.get("subdomain-max-2") is not None
assert self._storage.get("subdomain-max-3") is not None
def test_get(self):
ip = get_random_ip_v4()
self._storage.add(subdomain="subdomain-get-test", ip=ip)
assert self._storage.get("subdomain-get-test") == ip
class TestWeaponizedResolver(unittest.TestCase):
@classmethod
def setUpClass(cls):
resolver = WeaponizedResolver(ttl=DNS_TTL)
udp_server = DNSServer(resolver, port=DNS_PORT, address=DNS_HOST)
udp_server.start_thread()
cls._resolver = resolver
def test_bind(self):
bind_ip_query = "11-22-33-44"
bind_ip_expected = bind_ip_query.replace("-", ".")
question = DNSQuestion(
f"{bind_ip_query}.bind.localhost", qtype=QTYPE.A, qclass=CLASS.IN
)
record = DNSRecord(q=question)
response = record.send(dest=DNS_HOST, port=DNS_PORT, tcp=False)
response_parsed = DNSRecord.parse(response)
assert bind_ip_expected in str(response_parsed.a)
def test_random(self):
question = DNSQuestion(f"random.localhost", qtype=QTYPE.A, qclass=CLASS.IN)
record = DNSRecord(q=question)
response = record.send(dest=DNS_HOST, port=DNS_PORT, tcp=False)
response_parsed = DNSRecord.parse(response)
ip_response = re.search(r"\d+\.\d+\.\d+\.\d+", str(response_parsed.a))
assert ip_response
def test_as(self):
as_ip_query = "55-66-77-88"
as_ip_expected = as_ip_query.replace("-", ".")
question = DNSQuestion(
f"unittest.as.{as_ip_query}.localhost", qtype=QTYPE.A, qclass=CLASS.IN
)
record = DNSRecord(q=question)
response = record.send(dest=DNS_HOST, port=DNS_PORT, tcp=False)
response_parsed = DNSRecord.parse(response)
assert len(self._resolver.storage._records) == 1
storage_record = self._resolver.storage._records[0]
record_subdomain, record_ip = storage_record
assert record_subdomain == "unittest"
assert str(record_ip) == as_ip_expected
assert as_ip_expected in str(response_parsed.a)
def test_lookup(self):
lookup_ip_expected = "55.66.77.88"
question = DNSQuestion(
f"unittest.lookup.localhost", qtype=QTYPE.A, qclass=CLASS.IN
)
record = DNSRecord(q=question)
response = record.send(dest=DNS_HOST, port=DNS_PORT, tcp=False)
response_parsed = DNSRecord.parse(response)
assert lookup_ip_expected in str(response_parsed.a)
if __name__ == "__main__":
unittest.main()