Skip to content

Commit

Permalink
Add module for network devices
Browse files Browse the repository at this point in the history
Add a module to detect network devices with version when possible
  • Loading branch information
OussamaBeng committed Feb 20, 2024
1 parent 206d6ec commit 26744ea
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 0 deletions.
132 changes: 132 additions & 0 deletions tests/attack/test_mod_network_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from asyncio import Event
from unittest.mock import AsyncMock

import httpx
import respx
import pytest

from wapitiCore.net.classes import CrawlerConfiguration
from wapitiCore.net import Request
from wapitiCore.net.crawler import AsyncCrawler
from wapitiCore.attack.mod_network_device import ModuleNetworkDevice


# Test no Drupal detected
@pytest.mark.asyncio
@respx.mock
async def test_no_ubika():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Etes Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert not persister.add_payload.call_count

@pytest.mark.asyncio
@respx.mock
async def test_ubika_without_version():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get("http://perdu.com/app/monitor/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>UBIKA WAAP GATEWAY</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "UBIKA WAAP", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)

@pytest.mark.asyncio
@respx.mock
async def test_ubika_with_version():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get("http://perdu.com/app/monitor/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>UBIKA WAAP GATEWAY</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)

respx.get("http://perdu.com/app/monitor/api/info/product").mock(
return_value=httpx.Response(
200,
content='{"result":{"api":{"version":"1.0","logLevel":"info"},\
"appliance":{"name":"Management","role":"manager","ip":"192.168.0.169","port":3002},\
"product":{"version":"6.5.6"}},"_info":{"apiVersion":"1.0","serverTimestamp":1708417838114,\
"responseTime":"0s","responseStatus":200}}'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["category"] == "Fingerprint web technology"
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "UBIKA WAAP", "version": "6.5.6", "categories": ["Network Equipment"], "groups": ["Content"]}'
)
1 change: 1 addition & 0 deletions wapitiCore/attack/attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"https_redirect",
"log4shell",
"methods",
"network_device",
"nikto",
"permanentxss",
"redirect",
Expand Down
92 changes: 92 additions & 0 deletions wapitiCore/attack/mod_network_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import json

from typing import Optional

from bs4 import BeautifulSoup
from httpx import RequestError

from wapitiCore.attack.attack import Attack
from wapitiCore.net import Request
from wapitiCore.net.response import Response
from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE

from wapitiCore.main.log import log_blue

MSG_TECHNO_VERSIONED = "{0} {1} detected"
MSG_NO_UBIKA = "No UBIKA Detected"


class ModuleNetworkDevice(Attack):
"""Base class for detecting version."""
name = "network_device"
version = ""

async def check_ubika(self, url):
check_list = ['app/monitor/']
for item in check_list:
request = Request(f'{url}{item}', 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=True)
except RequestError:
self.network_errors += 1
else:
soup = BeautifulSoup(response.content, 'html.parser')
title_tag = soup.title
if (
response.is_success
and title_tag
and "UBIKA" in title_tag.text.strip()
):
return True
return False

async def get_ubika_version(self, url):
version_uri = "app/monitor/api/info/product"
request = Request(f'{url}{version_uri}', 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=True)
except RequestError:
self.network_errors += 1
else:
if response.is_success:
json_data = response.json
ubika_version = json_data.get("result", {}).get("product", {}).get("version")
if ubika_version:
self.version = ubika_version

async def must_attack(self, request: Request, response: Optional[Response] = None):
if self.finished:
return False

if request.method == "POST":
return False

return request.url == await self.persister.get_root_url()

async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
request_to_root = Request(request.url)

if await self.check_ubika(request_to_root.url):
await self.get_ubika_version(request_to_root.url)

ubika_detected = {
"name": "UBIKA WAAP",
"version": self.version,
"categories": ["Network Equipment"],
"groups": ["Content"]
}
log_blue(
MSG_TECHNO_VERSIONED,
"UBIKA WAAP",
self.version
)

await self.add_addition(
category=TECHNO_DETECTED,
request=request_to_root,
info=json.dumps(ubika_detected),
wstg=WSTG_CODE
)
else:
log_blue(MSG_NO_UBIKA)

0 comments on commit 26744ea

Please sign in to comment.