Skip to content

Commit

Permalink
137 add godaddy dns support (#160)
Browse files Browse the repository at this point in the history
* feat: added GoDaddy Provider

* fix: add dns timeout

* feat: add early exit

* chore: update readme

---------

Co-authored-by: Simon Gurney <[email protected]>
  • Loading branch information
VKotwicki and SimonGurney authored Jul 20, 2023
1 parent 581a2ff commit 74928ad
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 130 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ providers:
> cloudflare - Scan multiple domains by fetching them from Cloudflare
> digitalocean - Scan multiple domains by fetching them from Digital Ocean
> file - Read domains from a file (or folder of files), one per line
> godaddy - Scan multiple domains by fetching them from GoDaddy
> projectdiscovery - Scan multiple domains by fetching them from Project Discovery
> single - Scan a single domain by providing a domain on the commandline
> zonetransfer - Scan multiple domains by fetching records via DNS zone transfer
Expand Down Expand Up @@ -164,6 +166,15 @@ projectdiscovery:
--pd-domains PD_DOMAINS
Required
godaddy:
Scan multiple domains by fetching them from GoDaddy
--gd-api-key GD_API_KEY
Required
--gd-api-secret GD_API_SECRET
Required
--gd-domains GD_DOMAINS
Optional
file:
Read domains from a file (or folder of files), one per line
Expand Down
1 change: 1 addition & 0 deletions domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(self, domain, fetch_standard_records=True, ns=None):
self.resolver = dns.resolver
else:
self.set_custom_NS(ns)
self.resolver.timeout = 1
self.should_fetch_std_records = fetch_standard_records

@lru_cache
Expand Down
251 changes: 129 additions & 122 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,129 @@
from scan import scan_domain
import signatures
import output
import detection_enums
import providers
from os import linesep

from multiprocessing.pool import ThreadPool
import threading
from functools import partial

import logging
from sys import stderr, exit, argv

import argparsing

import colorama

import time

start_time = time.time()

if "--nocolour" in argv:
print(argparsing.banner, file=stderr)
else:
colorama.init()
print(argparsing.banner_with_colour, file=stderr)

args = argparsing.parse_args()

###### verbosity

if args.verbose == 0:
verbosity_level = logging.WARN
if args.verbose == 1:
verbosity_level = logging.INFO
if args.verbose > 1:
verbosity_level = logging.DEBUG

logging.basicConfig(format="%(message)s", level=verbosity_level)
logging.StreamHandler(stderr)

if not args.verbose > 2:
for module in ["boto", "requests"]:
logger = logging.getLogger(module)
logger.setLevel(logging.CRITICAL)
###### domain ingestion

provider = getattr(providers, args.provider)
domains = list(provider.fetch_domains(**args.__dict__))

if len(domains) == 0:
logging.error("ERROR: No domains to scan")
exit(-1)

###### signatures

signatures = [getattr(signatures, signature) for signature in signatures.__all__]

# replace name for each signature
for signature in signatures:
signature.__name__ = signature.__name__.replace("signatures.", "")

if args.signature:
signatures = [s for s in signatures if s.__name__ in args.signature]

if args.exclude_signature:
signatures = [s for s in signatures if s.__name__ not in args.exclude_signature]

if not args.enable_unlikely:
signatures = [
s
for s in signatures
if s.test.CONFIDENCE != detection_enums.CONFIDENCE.UNLIKELY
]

if args.disable_probable:
signatures = [
s
for s in signatures
if s.test.CONFIDENCE != detection_enums.CONFIDENCE.POTENTIAL
]

logging.warning(f"Testing with {len(signatures)} signatures")


###### scanning

findings = []
lock = threading.Lock()

if "--out" not in argv:
# using default out location, need to append our format
args.out = f"{args.out}.{args.out_format}"

with output.Output(args.out_format, args.out) as o:
scan = partial(
scan_domain,
signatures=signatures,
output_handler=o,
lock=lock,
findings=findings,
name_servers=args.resolver.replace(" ", "").split(","),
)
pool = ThreadPool(processes=args.parallelism)
pool.map(scan, domains)
pool.close()
pool.join()

###### exit
logging.warning(f"\n\nWe found {len(findings)} takeovers ☠️")
for finding in findings:
msg = f"-- DOMAIN '{finding.domain}' :: SIGNATURE '{finding.signature}' :: CONFIDENCE '{finding.confidence}'"
msg += f"{linesep}{finding.populated_records()}"
if args.nocolour == False:
msg = colorama.Fore.RED + msg + colorama.Fore.RESET
logging.warning(msg)
logging.warning(f"\n⏱️ We completed in {round(time.time() - start_time, 2)} seconds")
logging.warning(f"...Thats all folks!")
if args.pipeline:
logging.debug(f"Pipeline flag set - Exit code: {len(findings)}")
exit(len(findings))
from scan import scan_domain
import signatures
import output
import detection_enums
import providers
from os import linesep

from multiprocessing.pool import ThreadPool
import threading
from functools import partial

import logging
from sys import stderr, exit, argv

import argparsing

import colorama

import time

start_time = time.time()

if "--nocolour" in argv:
print(argparsing.banner, file=stderr)
else:
colorama.init()
print(argparsing.banner_with_colour, file=stderr)

args = argparsing.parse_args()

###### verbosity

if args.verbose == 0:
verbosity_level = logging.WARN
if args.verbose == 1:
verbosity_level = logging.INFO
if args.verbose > 1:
verbosity_level = logging.DEBUG

logging.basicConfig(format="%(message)s", level=verbosity_level)
logging.StreamHandler(stderr)

if not args.verbose > 2:
for module in ["boto", "requests"]:
logger = logging.getLogger(module)
logger.setLevel(logging.CRITICAL)
###### domain ingestion

provider = getattr(providers, args.provider)
domains = list(provider.fetch_domains(**args.__dict__))

if len(domains) == 0:
logging.error("ERROR: No domains to scan")
exit(-1)

###### signatures

signatures = [getattr(signatures, signature) for signature in signatures.__all__]

# replace name for each signature
for signature in signatures:
signature.__name__ = signature.__name__.replace("signatures.", "")

if args.signature:
signatures = [s for s in signatures if s.__name__ in args.signature]

if args.exclude_signature:
signatures = [s for s in signatures if s.__name__ not in args.exclude_signature]

if not args.enable_unlikely:
signatures = [
s
for s in signatures
if s.test.CONFIDENCE != detection_enums.CONFIDENCE.UNLIKELY
]

if args.disable_probable:
signatures = [
s
for s in signatures
if s.test.CONFIDENCE != detection_enums.CONFIDENCE.POTENTIAL
]

logging.warning(f"Testing with {len(signatures)} signatures")


###### scanning

findings = []
lock = threading.Lock()

if "--out" not in argv:
# using default out location, need to append our format
args.out = f"{args.out}.{args.out_format}"

with output.Output(args.out_format, args.out) as o:
scan = partial(
scan_domain,
signatures=signatures,
output_handler=o,
lock=lock,
findings=findings,
name_servers=args.resolver.replace(" ", "").split(","),
)
pool = ThreadPool(processes=args.parallelism)
res = pool.map_async(scan, domains)
try:
while not res.ready():
time.sleep(2)
except KeyboardInterrupt:
logging.warning("Caught KeyboardInterrupt, terminating early...")
lock.acquire()
else:
pool.close()
pool.join()

###### exit
logging.warning(f"\n\nWe found {len(findings)} takeovers ☠️")
for finding in findings:
msg = f"-- DOMAIN '{finding.domain}' :: SIGNATURE '{finding.signature}' :: CONFIDENCE '{finding.confidence}'"
msg += f"{linesep}{finding.populated_records()}"
if args.nocolour == False:
msg = colorama.Fore.RED + msg + colorama.Fore.RESET
logging.warning(msg)
logging.warning(f"\n⏱️ We completed in {round(time.time() - start_time, 2)} seconds")
logging.warning(f"...Thats all folks!")
if args.pipeline:
logging.debug(f"Pipeline flag set - Exit code: {len(findings)}")
exit(len(findings))
107 changes: 107 additions & 0 deletions providers/godaddy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import requests

from domain import Domain

description = "Scan multiple domains by fetching them from GoDaddy"


class DomainNotFoundError(Exception):
def __init__(self, domain):
self.message = "Domain not found: " + domain
super().__init__(self.message)


class GDApi:
def __init__(self, api_key, api_secret):
self.session = requests.session()
self.session.headers.update(
{
"Content-Type": "application/json",
"Authorization": "sso-key " + api_key + ":" + api_secret,
}
)

@staticmethod
def check_response(response: requests.Response):
if response.status_code == 401:
raise ValueError("Invalid API key specified.")

if response.status_code < 200 or response.status_code >= 300:
raise ValueError("Invalid response received from API: " + response.json())

return response

def make_request(self, endpoint):
return self.session.prepare_request(
requests.Request("GET", "https://api.godaddy.com/v1/" + endpoint)
)

def list_domains(self):
req = self.make_request("domains")

return self.check_response(self.session.send(req))

def get_records(self, domain):
req = self.make_request(f"domains/{domain}/records")
res = self.session.send(req)

if 404 == res.status_code:
raise DomainNotFoundError(domain)

return self.check_response(res)


def convert_records_to_domains(records, root_domain):
buf = {}
for record in records:
if "@" == record["name"]:
continue

record_name = f"{record['name']}.{root_domain}"

if record_name not in buf.keys():
buf[record_name] = {}

if record["type"] not in buf[record_name].keys():
buf[record_name][record["type"]] = []

if "data" in record.keys():
buf[record_name][record["type"]].append(record["data"])

def extract_records(desired_type):
return [r.rstrip(".") for r in buf[subdomain][desired_type]]

for subdomain in buf.keys():
domain = Domain(subdomain.rstrip("."), fetch_standard_records=False)

if "A" in buf[subdomain].keys():
domain.A = extract_records("A")
if "AAAA" in buf[subdomain].keys():
domain.AAAA = extract_records("AAAA")
if "CNAME" in buf[subdomain].keys():
domain.CNAME = extract_records("CNAME")
if "NS" in buf[subdomain].keys():
domain.NS = extract_records("NS")

yield domain


def fetch_domains(gd_api_key: str, gd_api_secret: str, gd_domains: str = None, **args):
root_domains = []
domains = []
api = GDApi(gd_api_key, gd_api_secret)

if gd_domains is not None and len(gd_domains):
root_domains = [domain.strip(" ") for domain in gd_domains.split(",")]
else:
resp_data = api.list_domains().json()
root_domains = [domain["domain"] for domain in resp_data]

for domain in root_domains:
if "" == domain or domain is None:
continue

records = api.get_records(domain).json()
domains.extend(convert_records_to_domains(records, domain))

return domains
Loading

0 comments on commit 74928ad

Please sign in to comment.