Skip to content

Commit

Permalink
Add Fortinet detection
Browse files Browse the repository at this point in the history
Add Fortinet products detection for module network_device
  • Loading branch information
OussamaBeng committed Mar 18, 2024
1 parent 56e46e3 commit e5ffb3d
Show file tree
Hide file tree
Showing 4 changed files with 482 additions and 72 deletions.
237 changes: 235 additions & 2 deletions tests/attack/test_mod_network_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
from unittest.mock import AsyncMock

import httpx
from httpx import RequestError
import respx
import pytest

from wapitiCore.net.classes import CrawlerConfiguration
from wapitiCore.net import Request
from wapitiCore.net.crawler import AsyncCrawler
from wapitiCore.attack.mod_network_device import ModuleNetworkDevice
from wapitiCore.attack.network_devices.mod_forti import ModuleForti


@pytest.mark.asyncio
@respx.mock
async def test_no_ubika():
# Test no UBIKA detected
async def test_no_net_device():
# Test no network device detected
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
Expand Down Expand Up @@ -130,3 +132,234 @@ async def test_ubika_with_version():
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "UBIKA WAAP", "version": "6.5.6", "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_detect_fortimanager():
respx.get("http://perdu.com/p/login/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<div class="sign-in-header" style="visibility: hidden"><span class="platform">FortiManager-3000G</span>'
'</body></html>'
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "FortiManager", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_detect_ssl_vpn():
respx.get("http://perdu.com/remote/login?lang=en").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Login</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> '
)
)

respx.get("http://perdu.com/remote/fgt_lang?lang=fr").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "Fortinet SSL-VPN", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_detect_fortinet():
respx.get("http://perdu.com/login/?next=/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Login</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> '
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "Fortinet", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_detect_fortiportal_from_title():
respx.get("http://perdu.com/fpc/app/login").mock(
return_value=httpx.Response(
200,
content='<html><head><title>FortiPortal</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> '
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "FortiPortal", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_detect_fortimail():
respx.get("http://perdu.com/admin/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>FortiMail</title><meta name="FortiMail" content="width=device-width, initial-scale=1">\
</head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> '
)
)
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "FortiMail", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_raise_on_request_error():
"""Tests that a RequestError is raised when calling the module with wrong URL."""

respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content='<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> </body></html>'
)
)

respx.get(url__regex=r"http://perdu.com/.*").mock(side_effect=RequestError("RequestError occurred: [Errno -2] Name or service not known"))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleForti(crawler, persister, options, Event(), crawler_configuration)

with pytest.raises(RequestError) as exc_info:
await module.check_forti("http://perdu.com/")

assert exc_info.value.args[0] == "RequestError occurred: [Errno -2] Name or service not known"
79 changes: 9 additions & 70 deletions wapitiCore/attack/mod_network_device.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import json

from asyncio import Event
from typing import Optional
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from httpx import RequestError

from wapitiCore.main.log import logging
from wapitiCore.attack.network_devices.mod_forti import ModuleForti
from wapitiCore.attack.network_devices.mod_ubika import ModuleUbika
from wapitiCore.attack.attack import Attack
from wapitiCore.net import Request
from wapitiCore.net.response import Response
from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE

from wapitiCore.main.log import log_blue

MSG_TECHNO_VERSIONED = "{0} {1} detected"
MSG_NO_UBIKA = "No UBIKA Detected"
Expand All @@ -20,36 +15,6 @@
class ModuleNetworkDevice(Attack):
"""Base class for detecting version."""
name = "network_device"
version = ""

async def check_ubika(self, url):
check_list = ['app/monitor/']
for item in check_list:
full_url = urljoin(url, item)
request = Request(full_url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=True)
except RequestError:
self.network_errors += 1
raise
soup = BeautifulSoup(response.content, 'html.parser')
title_tag = soup.title
return response.is_success and title_tag and "UBIKA" in title_tag.text.strip()

async def get_ubika_version(self, url):
version = ""
version_uri = "app/monitor/api/info/product"
full_url = urljoin(url, version_uri)
request = Request(full_url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=True)
except RequestError:
self.network_errors += 1
raise

if response.is_success:
version = response.json.get("result", {}).get("product", {}).get("version", '')
return version

async def must_attack(self, request: Request, response: Optional[Response] = None):
if self.finished:
Expand All @@ -63,35 +28,9 @@ async def must_attack(self, request: Request, response: Optional[Response] = Non
async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
request_to_root = Request(request.url)

try:
if await self.check_ubika(request_to_root.url):
try:
self.version = await self.get_ubika_version(request_to_root.url)
except RequestError as req_error:
self.network_errors += 1
logging.error(f"Request Error occurred: {req_error}")

ubika_detected = {
"name": "UBIKA WAAP",
"version": self.version,
"categories": ["Network Equipment"],
"groups": ["Content"]
}
log_blue(
MSG_TECHNO_VERSIONED,
"UBIKA WAAP",
self.version
)

await self.add_addition(
category=TECHNO_DETECTED,
request=request_to_root,
info=json.dumps(ubika_detected),
wstg=WSTG_CODE
)
else:
log_blue(MSG_NO_UBIKA)
except RequestError as req_error:
self.network_errors += 1
logging.error(f"Request Error occurred: {req_error}")
modules_list = [ModuleUbika, ModuleForti]
for module in modules_list:
mod = module(
self.crawler, self.persister, self.options, Event(), self.crawler_configuration
)
await mod.attack(request_to_root)
Loading

0 comments on commit e5ffb3d

Please sign in to comment.