diff --git a/tests/attack/test_mod_network_device.py b/tests/attack/test_mod_network_device.py index 14189ec68..5e3cd8bd2 100644 --- a/tests/attack/test_mod_network_device.py +++ b/tests/attack/test_mod_network_device.py @@ -1,3 +1,4 @@ +import json from asyncio import Event from unittest.mock import AsyncMock @@ -363,3 +364,51 @@ async def test_raise_on_request_error(): await module.check_forti("http://perdu.com/") assert exc_info.value.args[0] == "RequestError occurred: [Errno -2] Name or service not known" + + +@pytest.mark.asyncio +@respx.mock +async def test_detect_harbor_with_version(): + json_data = { + "auth_mode": "db_auth", + "banner_message": "", + "harbor_version": "v2.10", + "oidc_provider_name": "", + "primary_auth_mode": False, + "self_registration": True + } + respx.get("http://perdu.com/").mock( + return_value=httpx.Response( + 200, + content='Hello

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider

\ + ' + ) + ) + respx.get("http://perdu.com/api/v2.0/systeminfo").mock( + return_value=httpx.Response( + 200, + headers={"Content-Type": "application/json"}, + content=json.dumps(json_data) + ) + ) + + respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404)) + + persister = AsyncMock() + + request = Request("http://perdu.com/") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 20} + + module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration) + + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["info"] == ( + '{"name": "Harbor", "version": "v2.10", "categories": ["Network Equipment"], "groups": ["Content"]}' + ) diff --git a/wapitiCore/attack/mod_network_device.py b/wapitiCore/attack/mod_network_device.py index 340847ebd..9de710063 100644 --- a/wapitiCore/attack/mod_network_device.py +++ b/wapitiCore/attack/mod_network_device.py @@ -1,6 +1,7 @@ from asyncio import Event from typing import Optional +from wapitiCore.attack.network_devices.mod_harbor import ModuleHarbor from wapitiCore.attack.network_devices.mod_forti import ModuleForti from wapitiCore.attack.network_devices.mod_ubika import ModuleUbika from wapitiCore.attack.attack import Attack @@ -28,7 +29,7 @@ async def must_attack(self, request: Request, response: Optional[Response] = Non async def attack(self, request: Request, response: Optional[Response] = None): self.finished = True request_to_root = Request(request.url) - modules_list = [ModuleUbika, ModuleForti] + modules_list = [ModuleHarbor, ModuleUbika, ModuleForti] for module in modules_list: mod = module( self.crawler, self.persister, self.options, Event(), self.crawler_configuration diff --git a/wapitiCore/attack/network_devices/mod_harbor.py b/wapitiCore/attack/network_devices/mod_harbor.py new file mode 100644 index 000000000..84e6ff087 --- /dev/null +++ b/wapitiCore/attack/network_devices/mod_harbor.py @@ -0,0 +1,80 @@ +import json +import re +from typing import Optional +from urllib.parse import urljoin + +from httpx import RequestError + +from wapitiCore.attack.attack import Attack +from wapitiCore.net import Request +from wapitiCore.net.response import Response +from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE +from wapitiCore.main.log import log_blue, logging + +MSG_NO_HARBOR = "No Harbor Product Detected" +MSG_HARBOR_DETECTED = "{0} {1} Detected !" + + +class ModuleHarbor(Attack): + """Detect Harbor.""" + + device_name = "Harbor" + version = "" + + async def check_harbor(self, url): + check_list = ['api/v2.0/systeminfo'] + + for item in check_list: + full_url = urljoin(url, item) + request = Request(full_url, 'GET') + try: + response: Response = await self.crawler.async_send(request, follow_redirects=False) + except RequestError: + self.network_errors += 1 + continue + + if (response.is_success and "content-type" in response.headers + and "json" in response.headers["content-type"]): + await self.detect_harbor_version(response.content) + return True + + return False + + async def detect_harbor_version(self, response_content): + try: + # Parse the JSON content + data = json.loads(response_content) + # Extract the harbor_version value + self.version = data.get("harbor_version") + except (json.JSONDecodeError, KeyError): + return None + + async def attack(self, request: Request, response: Optional[Response] = None): + self.finished = True + request_to_root = Request(request.url) + + try: + if await self.check_harbor(request_to_root.url): + harbor_detected = { + "name": self.device_name, + "version": self.version, + "categories": ["Network Equipment"], + "groups": ["Content"] + } + log_blue( + MSG_HARBOR_DETECTED, + self.device_name, + self.version + ) + + await self.add_addition( + category=TECHNO_DETECTED, + request=request_to_root, + info=json.dumps(harbor_detected), + wstg=WSTG_CODE + ) + else: + log_blue(MSG_NO_HARBOR) + except RequestError as req_error: + self.network_errors += 1 + logging.error(f"Request Error occurred: {req_error}")