Skip to content

Commit

Permalink
Add Citrix to mod_network_device
Browse files Browse the repository at this point in the history
Add Citrix products detection for module network_device
  • Loading branch information
OussamaBeng committed Apr 24, 2024
1 parent 6bee660 commit 77ea165
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 1 deletion.
80 changes: 80 additions & 0 deletions tests/attack/test_mod_network_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,83 @@ async def test_detect_harbor_raise_on_request_error():
await module.check_harbor("http://perdu.com/")

assert exc_info.value.args[0] == "RequestError occurred: [Errno -2] Name or service not known"


@pytest.mark.asyncio
@respx.mock
async def test_detect_citrix_from_title():
respx.get("http://perdu.com/logon/LogonPoint/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Citrix Gateway</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
</body></html>'
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "Citrix Gateway", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_detect_citrix_from_class():
respx.get("http://perdu.com/logon/LogonPoint/").mock(
return_value=httpx.Response(
200,
content='<html><head><title class="_ctxstxt_NetscalerGateway">Hello</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
</body></html>'
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "NetscalerGateway", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)
3 changes: 2 additions & 1 deletion wapitiCore/attack/mod_network_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional

from wapitiCore.attack.network_devices.mod_harbor import ModuleHarbor
from wapitiCore.attack.network_devices.mod_citrix import ModuleCitrix
from wapitiCore.attack.network_devices.mod_forti import ModuleForti
from wapitiCore.attack.network_devices.mod_ubika import ModuleUbika
from wapitiCore.attack.attack import Attack
Expand All @@ -25,7 +26,7 @@ async def must_attack(self, request: Request, response: Optional[Response] = Non
async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
request_to_root = Request(request.url)
modules_list = [ModuleHarbor, ModuleUbika, ModuleForti]
modules_list = [ModuleUbika, ModuleForti, ModuleHarbor, ModuleCitrix]
for module in modules_list:
mod = module(
self.crawler, self.persister, self.options, Event(), self.crawler_configuration
Expand Down
94 changes: 94 additions & 0 deletions wapitiCore/attack/network_devices/mod_citrix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import re
from typing import Optional
from urllib.parse import urljoin

from bs4 import BeautifulSoup
from httpx import RequestError

from wapitiCore.attack.attack import Attack
from wapitiCore.net import Request
from wapitiCore.net.response import Response
from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE
from wapitiCore.main.log import log_blue, logging

MSG_NO_CITRIX = "No Citrix Product Detected"
MSG_CITRIX_DETECTED = "{0}{1} Detected !"


class ModuleCitrix(Attack):
"""Detect Citrix Devices."""

device_name = "Citrix"
version = ""

async def check_citrix(self, url):
check_list = ['logon/LogonPoint/']

for item in check_list:
full_url = urljoin(url, item)
request = Request(full_url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=False)
except RequestError:
self.network_errors += 1
continue

if response.is_success:
return await self.detect_citrix_product(response.content)

return False

async def detect_citrix_product(self, response_content):
soup = BeautifulSoup(response_content, 'html.parser')
title_tag = soup.title
# If title tag exists and has a class attribute
if title_tag:
if 'class' in title_tag.attrs:
title_class = title_tag['class']
# Assuming class is a list, extracting the first class
if title_class:
extract_pattern = r"^_ctxstxt_(.*)$"
match = re.search(extract_pattern, title_class[0])
if match:
self.device_name = match.group(1)
return True

return False
else:
title = title_tag.text
if "Citrix" in title:
# Extract the product name from the title
self.device_name = title
return True
return False

async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
request_to_root = Request(request.url)

try:
if await self.check_citrix(request_to_root.url):
citrix_detected = {
"name": self.device_name,
"version": "",
"categories": ["Network Equipment"],
"groups": ["Content"]
}
log_blue(
MSG_CITRIX_DETECTED,
self.device_name,
self.version
)

await self.add_addition(
category=TECHNO_DETECTED,
request=request_to_root,
info=json.dumps(citrix_detected),
wstg=WSTG_CODE
)
else:
log_blue(MSG_NO_CITRIX)
except RequestError as req_error:
self.network_errors += 1
logging.error(f"Request Error occurred: {req_error}")
1 change: 1 addition & 0 deletions wapitiCore/attack/network_devices/network_device_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

MSG_TECHNO_VERSIONED = "{0} {1} detected"


class NetworkDeviceCommon(Attack):
"""Base class for detecting version."""
name = "network_device"

0 comments on commit 77ea165

Please sign in to comment.