diff --git a/tests/attack/test_mod_network_device.py b/tests/attack/test_mod_network_device.py index 3c24cb0b4..6630a2c06 100644 --- a/tests/attack/test_mod_network_device.py +++ b/tests/attack/test_mod_network_device.py @@ -696,7 +696,7 @@ async def test_detect_citrix_from_title(): assert persister.add_payload.call_count == 1 assert persister.add_payload.call_args_list[0][1]["info"] == ( - '{"name": "Citrix Gateway", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}' + '{"name": "Citrix Gateway", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}' ) @@ -736,5 +736,38 @@ async def test_detect_citrix_from_class(): assert persister.add_payload.call_count == 1 assert persister.add_payload.call_args_list[0][1]["info"] == ( - '{"name": "NetscalerGateway", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}' + '{"name": "NetscalerGateway", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}' + ) + + +@pytest.mark.asyncio +@respx.mock +async def test_detect_citrix_in_root_url(): + respx.get("http://perdu.com/").mock( + return_value=httpx.Response( + 200, + content='NetScaler ADC

Perdu sur Internet ?

\ +

Pas de panique, on va vous aider NetScaler ADC

\ + ' + ) + ) + + respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404)) + + persister = AsyncMock() + + request = Request("http://perdu.com/") + request.path_id = 1 + + crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/")) + async with AsyncCrawler.with_configuration(crawler_configuration) as crawler: + options = {"timeout": 10, "level": 2, "tasks": 20} + + module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration) + + await module.attack(request) + + assert persister.add_payload.call_count == 1 + assert persister.add_payload.call_args_list[0][1]["info"] == ( + '{"name": "NetScaler ADC", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}' ) diff --git a/wapitiCore/attack/network_devices/mod_citrix.py b/wapitiCore/attack/network_devices/mod_citrix.py index 1b1f2073f..9e5140599 100644 --- a/wapitiCore/attack/network_devices/mod_citrix.py +++ b/wapitiCore/attack/network_devices/mod_citrix.py @@ -39,10 +39,10 @@ class ModuleCitrix(Attack): """Detect Citrix Devices.""" device_name = "Citrix" - version = "" + version = [] async def check_citrix(self, url): - check_list = ['logon/LogonPoint/'] + check_list = ['logon/LogonPoint/', ''] for item in check_list: full_url = urljoin(url, item) @@ -80,6 +80,17 @@ async def detect_citrix_product(self, response_content): # Extract the product name from the title self.device_name = title return True + if "NetScaler" in title: + # Search the product name in the content + product_names = ["NetScaler ADC", "Citrix NetScaler", "NetScaler", "NetScaler AWS"] + matches = soup.find_all('span', text=lambda text: text in product_names) + if matches: + # Set product_name to the first matched product name + self.device_name = matches[0].text + else: + self.device_name = "Citrix" + return True + return False async def attack(self, request: Request, response: Optional[Response] = None): @@ -90,7 +101,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): if await self.check_citrix(request_to_root.url): citrix_detected = { "name": self.device_name, - "version": "", + "versions": self.version, "categories": ["Network Equipment"], "groups": ["Content"] } diff --git a/wapitiCore/attack/network_devices/mod_forti.py b/wapitiCore/attack/network_devices/mod_forti.py index 62475f7ad..97f5b2c4b 100644 --- a/wapitiCore/attack/network_devices/mod_forti.py +++ b/wapitiCore/attack/network_devices/mod_forti.py @@ -37,7 +37,7 @@ class ModuleForti(NetworkDeviceCommon): """Detect Forti.""" device_name = "Fortinet" - version = "" + version = [] fortinet_pattern = re.compile(r'Forti\w+') async def check_forti(self, url): @@ -169,7 +169,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): forti_detected = { "name": self.device_name, - "versions": [], + "versions": self.version, "categories": ["Network Equipment"], "groups": ["Content"] } diff --git a/wapitiCore/attack/network_devices/mod_harbor.py b/wapitiCore/attack/network_devices/mod_harbor.py index ddf7c90e8..97bae4e77 100644 --- a/wapitiCore/attack/network_devices/mod_harbor.py +++ b/wapitiCore/attack/network_devices/mod_harbor.py @@ -37,7 +37,7 @@ class ModuleHarbor(NetworkDeviceCommon): """Detect Harbor.""" device_name = "Harbor" - version = "" + version = [] async def check_harbor(self, url): check_list = ['api/v2.0/systeminfo'] @@ -67,7 +67,7 @@ async def detect_harbor_version(self, response_content): data = json.loads(response_content) # Extract the harbor_version value if data.get("harbor_version"): - self.version = data.get("harbor_version") + self.version.append(data.get("harbor_version")) except (json.JSONDecodeError, KeyError) as json_error: raise ValueError("The URL doesn't contain a valid JSON.") from json_error @@ -79,7 +79,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): if await self.check_harbor(request_to_root.url): harbor_detected = { "name": self.device_name, - "versions": [self.version] if self.version else [], + "versions": self.version, "categories": ["Network Equipment"], "groups": ["Content"] } @@ -95,6 +95,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): info=json.dumps(harbor_detected), wstg=WSTG_CODE ) + self.version.clear() else: log_blue(MSG_NO_HARBOR) except RequestError as req_error: diff --git a/wapitiCore/attack/network_devices/mod_ubika.py b/wapitiCore/attack/network_devices/mod_ubika.py index 3ca7536e4..e605c9938 100644 --- a/wapitiCore/attack/network_devices/mod_ubika.py +++ b/wapitiCore/attack/network_devices/mod_ubika.py @@ -36,7 +36,7 @@ class ModuleUbika(NetworkDeviceCommon): """Detect Ubika.""" - version = "" + version = [] async def check_ubika(self, url): check_list = ['app/monitor/'] @@ -53,7 +53,7 @@ async def check_ubika(self, url): return response.is_success and title_tag and "UBIKA" in title_tag.text.strip() async def get_ubika_version(self, url): - version = "" + versions = [] version_uri = "app/monitor/api/info/product" full_url = urljoin(url, version_uri) request = Request(full_url, 'GET') @@ -65,7 +65,9 @@ async def get_ubika_version(self, url): if response.is_success: version = response.json.get("result", {}).get("product", {}).get("version", '') - return version + if version: + versions.append(version) + return versions async def attack(self, request: Request, response: Optional[Response] = None): self.finished = True @@ -81,7 +83,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): ubika_detected = { "name": "UBIKA WAAP", - "versions": [self.version] if self.version else [], + "versions": self.version, "categories": ["Network Equipment"], "groups": ["Content"] } @@ -97,6 +99,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): info=json.dumps(ubika_detected), wstg=WSTG_CODE ) + self.version.clear() else: log_blue(MSG_NO_UBIKA) except RequestError as req_error: