Skip to content

Commit

Permalink
Ajout de la détection de Check Point
Browse files Browse the repository at this point in the history
Adapter le module network device afin d'identifier les produits Check Point
  • Loading branch information
OussamaBeng authored and fwininger committed Jul 11, 2024
1 parent 01a3884 commit 7e84c4c
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 2 deletions.
126 changes: 126 additions & 0 deletions tests/attack/test_mod_network_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async def test_ubika_without_version():
)
assert persister.add_payload.call_args_list[0][1]["module"] == "network_device"


@pytest.mark.asyncio
@respx.mock
async def test_ubika_with_version():
Expand Down Expand Up @@ -771,3 +772,128 @@ async def test_detect_citrix_in_root_url():
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "NetScaler ADC", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}'
)


@pytest.mark.asyncio
@respx.mock
async def test_checkpoint_without_version():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get("http://perdu.com/Login/Login").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Hello</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Check Point Software Technologies Ltd</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "Check Point", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}'
)
assert persister.add_payload.call_args_list[0][1]["module"] == "network_device"

@pytest.mark.asyncio
@respx.mock
async def test_checkpoint_based_on_realmsArrJSON():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get("http://perdu.com/Login/Login").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Hello</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Hello</h2><script type='text/javascript'>var realmsArrJSON = '[]'</script> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "Check Point", "versions": [], "categories": ["Network Equipment"], "groups": ["Content"]}'
)
assert persister.add_payload.call_args_list[0][1]["module"] == "network_device"


@pytest.mark.asyncio
@respx.mock
async def test_checkpoint_with_version():
respx.get("http://perdu.com/").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Vous Perdu ?</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)
respx.get("http://perdu.com/cgi-bin/home.tcl").mock(
return_value=httpx.Response(
200,
content="<html><head><title>Login</title></head><body><h1>Perdu sur l'Internet ?</h1> \
<h2>Pas de panique, on va vous aider</h2> \
<div><script src='/login/login.js'></script></div> <script>var hostname='';var version='R80.20';</script>\
<strong><pre> * <----- vous &ecirc;tes ici</pre></strong></body></html>"
)
)

respx.get(url__regex=r"http://perdu.com/.*?").mock(return_value=httpx.Response(404))

persister = AsyncMock()

request = Request("http://perdu.com/")
request.path_id = 1

crawler_configuration = CrawlerConfiguration(Request("http://perdu.com/"))
async with AsyncCrawler.with_configuration(crawler_configuration) as crawler:
options = {"timeout": 10, "level": 2, "tasks": 20}

module = ModuleNetworkDevice(crawler, persister, options, Event(), crawler_configuration)

await module.attack(request)

assert persister.add_payload.call_count == 1
assert persister.add_payload.call_args_list[0][1]["category"] == "Fingerprint web technology"
assert persister.add_payload.call_args_list[0][1]["info"] == (
'{"name": "Check Point", "versions": ["R80.20"], "categories": ["Network Equipment"], "groups": ["Content"]}'
)
assert persister.add_payload.call_args_list[0][1]["module"] == "network_device"
5 changes: 3 additions & 2 deletions wapitiCore/attack/mod_network_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
from asyncio import Event
from typing import Optional

from wapitiCore.attack.network_devices.mod_harbor import ModuleHarbor
from wapitiCore.attack.network_devices.mod_checkpoint import ModuleCheckPoint
from wapitiCore.attack.network_devices.mod_citrix import ModuleCitrix
from wapitiCore.attack.network_devices.mod_forti import ModuleForti
from wapitiCore.attack.network_devices.mod_harbor import ModuleHarbor
from wapitiCore.attack.network_devices.mod_ubika import ModuleUbika
from wapitiCore.attack.attack import Attack
from wapitiCore.net import Request
Expand All @@ -48,7 +49,7 @@ async def must_attack(self, request: Request, response: Optional[Response] = Non
async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
request_to_root = Request(request.url)
modules_list = [ModuleUbika, ModuleForti, ModuleHarbor, ModuleCitrix]
modules_list = [ModuleCheckPoint, ModuleCitrix, ModuleForti, ModuleHarbor, ModuleUbika]
for module in modules_list:
mod = module(
self.crawler, self.persister, self.options, Event(), self.crawler_configuration
Expand Down
113 changes: 113 additions & 0 deletions wapitiCore/attack/network_devices/mod_checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# This file is part of the Wapiti project (https://wapiti-scanner.github.io)
# Copyright (C) 2024 Cyberwatch
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

import json
import re
from typing import Optional
from urllib.parse import urljoin

from bs4 import BeautifulSoup
from httpx import RequestError

from wapitiCore.attack.network_devices.network_device_common import NetworkDeviceCommon, MSG_TECHNO_VERSIONED
from wapitiCore.net import Request
from wapitiCore.net.response import Response
from wapitiCore.definitions.fingerprint import NAME as TECHNO_DETECTED, WSTG_CODE
from wapitiCore.main.log import log_blue, logging

MSG_NO_CHECKPOINT = "No Check Point Product Detected"
MSG_CHECKPOINT_DETECTED = "{0} {1} Detected !"


class ModuleCheckPoint(NetworkDeviceCommon):
"""Detect Check Point Devices."""

device_name = "Check Point"
version = []

async def check_checkpoint(self, url):
check_list = ['cgi-bin/home.tcl', 'sslvpn/Login/Login',
'sslvpn/ICS/scanPage', 'portail/Login/Login', 'Login/Login']

for item in check_list:
full_url = urljoin(url, item)
request = Request(full_url, 'GET')
try:
response: Response = await self.crawler.async_send(request, follow_redirects=False)
except RequestError:
self.network_errors += 1
continue

if response.is_success:
soup = BeautifulSoup(response.content, 'html.parser')
scripts = soup.find_all('script')
for script in scripts:
if script.has_attr('src') and script['src'].endswith('/login/login.js'):
self.version = await self.detect_checkpoint_version(response.content)
return True
if script.string and "realmsArrJSON =" in script.string:
return True
if 'Check Point Software Technologies Ltd' in response.content:
return True

return False

async def detect_checkpoint_version(self, response_content):
version = []
soup = BeautifulSoup(response_content, 'html.parser')
scripts = [script for script in soup.find_all('script') if not script.has_attr('src')]
for script in scripts:
if script.string:
# Define the pattern to match the version variable
pattern = r"var version='([^']*)'"
match = re.search(pattern, script.string)
if match:
version.append(match.group(1))
return version

async def attack(self, request: Request, response: Optional[Response] = None):
self.finished = True
request_to_root = Request(request.url)

try:
if await self.check_checkpoint(request_to_root.url):
checkpoint_detected = {
"name": self.device_name,
"versions": self.version,
"categories": ["Network Equipment"],
"groups": ["Content"]
}
log_blue(
MSG_TECHNO_VERSIONED,
self.device_name,
self.version
)

await self.add_addition(
category=TECHNO_DETECTED,
request=request_to_root,
info=json.dumps(checkpoint_detected),
wstg=WSTG_CODE
)
else:
log_blue(MSG_NO_CHECKPOINT)
except RequestError as req_error:
self.network_errors += 1
logging.error(f"Request Error occurred: {req_error}")

0 comments on commit 7e84c4c

Please sign in to comment.