Skip to content

Commit

Permalink
Add module for network devices
Browse files Browse the repository at this point in the history
Add a module to detect network devices with version when possible
  • Loading branch information
OussamaBeng committed Mar 1, 2024
1 parent 206d6ec commit b504243
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 0 deletions.
132 changes: 132 additions & 0 deletions tests/attack/test_mod_network_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from asyncio import Event
from unittest.mock import AsyncMock

import httpx
import respx
import pytest

from wapitiCore.net.classes import CrawlerConfiguration
from wapitiCore.net import Request
from wapitiCore.net.crawler import AsyncCrawler
from wapitiCore.attack.mod_network_device import ModuleNetworkDevice


@pytest.mark.asyncio
@respx.mock
async def test_no_ubika():
# Test no UBIKA detected
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Etes Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert not persister.add_payload.call_count

@pytest.mark.asyncio
@respx.mock
async def test_ubika_without_version():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get("http://perdu.com/app/monitor/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>UBIKA WAAP GATEWAY</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "UBIKA WAAP", "version": "", "categories": ["Network Equipment"], "groups": ["Content"]}'
)

@pytest.mark.asyncio
@respx.mock
async def test_ubika_with_version():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get("http://perdu.com/app/monitor/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>UBIKA WAAP GATEWAY</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)

respx.get("http://perdu.com/app/monitor/api/info/product").mock(
return_value=httpx.Response(
200,
content='{"result":{"api":{"version":"1.0","logLevel":"info"},\
"appliance":{"name":"Management","role":"manager","ip":"192.168.0.169","port":3002},\
"product":{"version":"6.5.6"}},"_info":{"apiVersion":"1.0","serverTimestamp":1708417838114,\
"responseTime":"0s","responseStatus":200}}'
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["category"] == "Fingerprint web technology"
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "UBIKA WAAP", "version": "6.5.6", "categories": ["Network Equipment"], "groups": ["Content"]}'
)
1 change: 1 addition & 0 deletions wapitiCore/attack/attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"https_redirect",
"log4shell",
"methods",
"network_device",
"nikto",
"permanentxss",
"redirect",
Expand Down
97 changes: 97 additions & 0 deletions wapitiCore/attack/mod_network_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import json

from typing import Optional
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from httpx import RequestError

from wapitiCore.main.log import logging
from wapitiCore.attack.attack import Attack
from wapitiCore.net import Request
from wapitiCore.net.response import Response
from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE

from wapitiCore.main.log import log_blue

MSG_TECHNO_VERSIONED = "{0} {1} detected"
MSG_NO_UBIKA = "No UBIKA Detected"


class ModuleNetworkDevice(Attack):
"""Base class for detecting version."""
name = "network_device"
version = ""

async def check_ubika(self, url):
check_list = ['app/monitor/']
for item in check_list:
full_url = urljoin(url, item)
request = Request(full_url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=True)
except RequestError:
self.network_errors += 1
raise
soup = BeautifulSoup(response.content, 'html.parser')
title_tag = soup.title
return response.is_success and title_tag and "UBIKA" in title_tag.text.strip()

async def get_ubika_version(self, url):
version = ""
version_uri = "app/monitor/api/info/product"
full_url = urljoin(url, version_uri)
request = Request(full_url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=True)
except RequestError:
self.network_errors += 1
raise

if response.is_success:
version = response.json.get("result", {}).get("product", {}).get("version", '')
return version

async def must_attack(self, request: Request, response: Optional[Response] = None):
if self.finished:
return False

if request.method == "POST":
return False

return request.url == await self.persister.get_root_url()

async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
request_to_root = Request(request.url)

try:
if await self.check_ubika(request_to_root.url):
try:
self.version = await self.get_ubika_version(request_to_root.url)
except RequestError as req_error:
self.network_errors += 1
logging.error(f"Request Error occurred: {req_error}")

ubika_detected = {
"name": "UBIKA WAAP",
"version": self.version,
"categories": ["Network Equipment"],
"groups": ["Content"]
}
log_blue(
MSG_TECHNO_VERSIONED,
"UBIKA WAAP",
self.version
)

await self.add_addition(
category=TECHNO_DETECTED,
request=request_to_root,
info=json.dumps(ubika_detected),
wstg=WSTG_CODE
)
else:
log_blue(MSG_NO_UBIKA)
except RequestError as req_error:
self.network_errors += 1
logging.error(f"Request Error occurred: {req_error}")

0 comments on commit b504243

Please sign in to comment.