Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Quasarr CAPTCHA solution method #813

Merged
merged 4 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ FeedCrawler automatisiert bequem das Hinzufügen von Links für den JDownloader.
### Voraussetzungen

* [Python 3.8](https://www.python.org/downloads/) oder neuer (nur
4 [externe Abhängigkeiten](https://github.com/rix1337/FeedCrawler/blob/main/requirements.txt)!)
5 [externe Abhängigkeiten](https://github.com/rix1337/FeedCrawler/blob/main/requirements.txt)!)
* [JDownloader 2](http://www.jdownloader.org/jdownloader2) mit
aktivem [My JDownloader-Konto](https://my.jdownloader.org)
* _optional: [FlareSolverr 3](https://github.com/FlareSolverr/FlareSolverr) um Cloudflare-Blockaden zu umgehen_
Expand Down
Empty file.
285 changes: 285 additions & 0 deletions feedcrawler/external_sites/captcha/filecrypt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
# -*- coding: utf-8 -*-
# FeedCrawler
# Project by https://github.com/rix1337

import base64
import json
import random
import re
import xml.dom.minidom
from urllib.parse import urlparse

import dukpy
from Cryptodome.Cipher import AES
from bs4 import BeautifulSoup

from feedcrawler.providers.http_requests.request_handler import request, Session


class CNL:
def __init__(self, crypted_data):
self.crypted_data = crypted_data

def jk_eval(self, f_def):
js_code = f"""
{f_def}
f();
"""

result = dukpy.evaljs(js_code).strip()

return result

def aes_decrypt(self, data, key):
try:
encrypted_data = base64.b64decode(data)
except Exception as e:
raise ValueError("Failed to decode base64 data") from e

try:
key_bytes = bytes.fromhex(key)
except Exception as e:
raise ValueError("Failed to convert key to bytes") from e

iv = key_bytes
cipher = AES.new(key_bytes, AES.MODE_CBC, iv)

try:
decrypted_data = cipher.decrypt(encrypted_data)
except ValueError as e:
raise ValueError("Decryption failed") from e

try:
return decrypted_data.decode('utf-8').replace('\x00', '').replace('\x08', '')
except UnicodeDecodeError as e:
raise ValueError("Failed to decode decrypted data") from e

def decrypt(self):
crypted = self.crypted_data[2]
jk = "function f(){ return \'" + self.crypted_data[1] + "';}"
key = self.jk_eval(jk)
uncrypted = self.aes_decrypt(crypted, key)
urls = [result for result in uncrypted.split("\r\n") if len(result) > 0]

return urls


class DLC:
def __init__(self, dlc_file):
global user_agent
self.data = dlc_file
self.KEY = b"cb99b5cbc24db398"
self.IV = b"9bc24cb995cb8db3"
self.API_URL = "http://service.jdownloader.org/dlcrypt/service.php?srcType=dlc&destType=pylo&data="

def parse_packages(self, start_node):
return [
(
base64.b64decode(node.getAttribute("name")).decode("utf-8"),
self.parse_links(node)
)
for node in start_node.getElementsByTagName("package")
]

def parse_links(self, start_node):
return [
base64.b64decode(node.getElementsByTagName("url")[0].firstChild.data).decode("utf-8")
for node in start_node.getElementsByTagName("file")
]

def decrypt(self):
if not isinstance(self.data, bytes):
raise TypeError("data must be bytes.")

all_urls = []

try:
data = self.data.strip()

data += b"=" * (-len(data) % 4)

dlc_key = data[-88:].decode("utf-8")
dlc_data = base64.b64decode(data[:-88])
dlc_content = request(self.API_URL + dlc_key).text

rc = base64.b64decode(re.search(r"<rc>(.+)</rc>", dlc_content, re.S).group(1))[:16]

cipher = AES.new(self.KEY, AES.MODE_CBC, self.IV)
key = iv = cipher.decrypt(rc)

cipher = AES.new(key, AES.MODE_CBC, iv)
xml_data = base64.b64decode(cipher.decrypt(dlc_data)).decode("utf-8")

root = xml.dom.minidom.parseString(xml_data).documentElement
content_node = root.getElementsByTagName("content")[0]

packages = self.parse_packages(content_node)

for package in packages:
urls = package[1]
all_urls.extend(urls)

except Exception as e:
print("DLC Error: " + str(e))
return None

return all_urls


def get_filecrypt_links(shared_state, token, title, url, password=None):
shared_state.logger.debug("Attempting to decrypt Filecrypt link: " + url)
session = Session()

password_field = None
if password:
try:
output = session.get(url, headers={'User-Agent': shared_state.values["user_agent"]})
soup = BeautifulSoup(output.text, 'html.parser')
input_element = soup.find('input', placeholder=lambda value: value and 'password' in value.lower())
password_field = input_element['name']
shared_state.logger.debug("Password field name identified: " + password_field)
url = output.url
except:
shared_state.logger.debug("No password field found. Skipping password entry!")

if password and password_field:
shared_state.logger.debug("Using Password: " + password)
output = session.post(url, data=password_field + "=" + password,
headers={'User-Agent': shared_state.values["user_agent"],
'Content-Type': 'application/x-www-form-urlencoded'})
else:
output = session.get(url, headers={'User-Agent': shared_state.values["user_agent"]})

url = output.url
soup = BeautifulSoup(output.text, 'html.parser')
if bool(soup.findAll("input", {"id": "p4assw0rt"})):
shared_state.logger.debug(f"Password was wrong or missing. Could not get links for {title}")
return False

no_captcha_present = bool(soup.find("form", {"class": "cnlform"}))
if no_captcha_present:
shared_state.logger.debug("No CAPTCHA present. Skipping token!")
else:
circle_captcha = bool(soup.findAll("div", {"class": "circle_captcha"}))
i = 0
while circle_captcha and i < 3:
shared_state.logger.debug("Sending Fake solution to skip Circle-CAPTCHA...")
random_x = str(random.randint(100, 200))
random_y = str(random.randint(100, 200))
output = session.post(url, data="buttonx.x=" + random_x + "&buttonx.y=" + random_y,
headers={'User-Agent': shared_state.values["user_agent"],
'Content-Type': 'application/x-www-form-urlencoded'})
url = output.url
soup = BeautifulSoup(output.text, 'html.parser')
circle_captcha = bool(soup.findAll("div", {"class": "circle_captcha"}))

output = session.post(url, data="cap_token=" + token, headers={'User-Agent': shared_state.values["user_agent"],
'Content-Type': 'application/x-www-form-urlencoded'})
url = output.url
soup = BeautifulSoup(output.text, 'html.parser')

solved = bool(soup.findAll("div", {"class": "container"}))
if not solved:
shared_state.logger.debug(f"Filecrypt did did not accept the token! Could not get links for {title}")
return False
else:
season_number = ""
episode_number = ""
episode_in_title = re.findall(r'.*\.s(\d{1,3})e(\d{1,3})\..*', title, re.IGNORECASE)
season_in_title = re.findall(r'.*\.s(\d{1,3})\..*', title, re.IGNORECASE)
if episode_in_title:
try:
season_number = str(int(episode_in_title[0][0]))
episode_number = str(int(episode_in_title[0][1]))
except:
pass
elif season_in_title:
try:
season_number = str(int(season_in_title[0]))
except:
pass

season = ""
episode = ""
tv_show_selector = soup.find("div", {"class": "dlpart"})
if tv_show_selector:

season = "season="
episode = "episode="

season_selection = soup.find("div", {"id": "selbox_season"})
try:
if season_selection:
season += str(season_number)
except:
pass

episode_selection = soup.find("div", {"id": "selbox_episode"})
try:
if episode_selection:
episode += str(episode_number)
except:
pass

links = []

mirrors = []
mirrors_available = soup.select("a[href*=mirror]")
if mirrors_available:
for mirror in mirrors_available:
try:
mirror_query = mirror.get("href").split("?")[1]
base_url = url.split("?")[0] if "mirror" in url else url
mirrors.append(f"{base_url}?{mirror_query}")
except IndexError:
continue
else:
mirrors = [url]

for mirror in mirrors:
if not len(mirrors) == 1:
output = session.get(mirror, headers={'User-Agent': shared_state.values["user_agent"]})
url = output.url
soup = BeautifulSoup(output.text, 'html.parser')

try:
crypted_payload = soup.find("form", {"class": "cnlform"}).get('onsubmit')
crypted_data = re.findall(r"'(.*?)'", crypted_payload)
if not title:
title = crypted_data[3]
crypted_data = [
crypted_data[0],
crypted_data[1],
crypted_data[2],
title
]
if episode and season:
domain = urlparse(url).netloc
filtered_cnl_secret = soup.find("input", {"name": "hidden_cnl_id"}).attrs["value"]
filtered_cnl_link = f"https://{domain}/_CNL/{filtered_cnl_secret}.html?{season}&{episode}"
filtered_cnl_result = session.post(filtered_cnl_link,
headers={'User-Agent': shared_state.values["user_agent"]})
if filtered_cnl_result.status_code == 200:
filtered_cnl_data = json.loads(filtered_cnl_result.text)
if filtered_cnl_data["success"]:
crypted_data = [
crypted_data[0],
filtered_cnl_data["data"][0],
filtered_cnl_data["data"][1],
title
]
links.extend(CNL(crypted_data).decrypt())
except:
shared_state.logger.debug("Click'n'Load not found! Falling back to DLC...")
crypted_payload = soup.find("button", {"class": "dlcdownload"}).get("onclick")
crypted_data = re.findall(r"'(.*?)'", crypted_payload)
dlc_secret = crypted_data[0]
domain = urlparse(url).netloc
if episode and season:
dlc_link = f"https://{domain}/DLC/{dlc_secret}.dlc?{episode}&{season}"
else:
dlc_link = f"https://{domain}/DLC/{dlc_secret}.dlc"
dlc_file = session.get(dlc_link, headers={'User-Agent': shared_state.values["user_agent"]}).content
links.extend(DLC(dlc_file).decrypt())

return links
53 changes: 53 additions & 0 deletions feedcrawler/providers/http_requests/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,56 @@ def create_ipv4_socket(address, timeout=None, source_address=None):
resp_url = url

return Response(req, content, text, json, status_code, resp_url, headers, cookiejar)


class Session:
def __init__(self):
# Create a single CookieJar to manage session cookies
self.cookiejar = CookieJar()
# Shared headers for all requests
self.headers = {}

def request(self, url, method="GET", **kwargs):
"""
Make a request while persisting cookies and printing cookie operations.
"""
# Merge session headers with request-specific headers
kwargs["headers"] = {**self.headers, **kwargs.get("headers", {})}

# Provide the persistent CookieJar to the request
kwargs["cookiejar"] = self.cookiejar

# Perform the HTTP request
response = request(url, method=method, **kwargs)

# Persist cookies from the response into the shared CookieJar
if response.cookiejar:
for cookie in response.cookiejar:
self.cookiejar.set_cookie(cookie)

return response

def get(self, url, **kwargs):
"""
Convenience method for GET requests.
"""
return self.request(url, method="GET", **kwargs)

def post(self, url, **kwargs):
"""
Convenience method for POST requests.
"""
return self.request(url, method="POST", **kwargs)

def update_headers(self, headers):
"""
Update shared headers for all requests in this session.
"""
self.headers.update(headers)

def clear_cookies(self):
"""
Clear all cookies stored in the session.
"""
self.cookiejar.clear()
print("All session cookies cleared.")
21 changes: 0 additions & 21 deletions feedcrawler/providers/myjd_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,27 +931,6 @@ def do_package_merge(title, uuids, linkids):
return False


def download_decrypted_links_from_cnl(title, password, cnl_packages):
linkids = []
uuids = []
urls = ""
for cnl_package in cnl_packages:
for linkid in cnl_package['linkids']:
linkids.append(linkid)
uuids.append(cnl_package['uuid'])
urls = urls + ensure_string(cnl_package['urls']).replace("\n\n", "\n")

links = ensure_string(urls).replace("\n\n", "\n")
if remove_from_linkgrabber(linkids, uuids):
if download(title, "FeedCrawler", links, password):
episode = re.findall(r'.*\.S\d{1,3}E(\d{1,3})\..*', title)
if episode:
FeedDb('episode_remover').store(title, str(int(episode[0])))
print("[Click'n'Load-Automatik erfolgreich] - " + title)
return [True, title]
return False


def add_for_manual_decryption(title, link, password, replace=False):
try:
if check_is_site(link):
Expand Down
Loading
Loading