Skip to content

Commit

Permalink
✨ First version of video scrapper
Browse files Browse the repository at this point in the history
- update dependencies
- bypass rgpd form post
- new proxy api for thumbnails
  • Loading branch information
essembeh committed Nov 12, 2024
1 parent 9859317 commit d1aa12d
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 299 deletions.
430 changes: 197 additions & 233 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ pydantic = "^2.7.1"
pydantic-yaml = "^1.3.0"
pydantic-settings = "^2.2.1"
pydantic-xml = "^2.11.0"
rapid-api-client = "^0.5.0"
rapid-api-client = "^0.6.0"
jsonpath-ng = "^1.7.0"


[tool.poetry.group.dev.dependencies]
Expand Down
15 changes: 15 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,18 @@ async def test_proxy_home(client):
== user.headers["Location"]
== "https://www.youtube.com/channel/UCVooVnzQxPSTXTMzSi1s6uw"
)


@pytest.mark.anyio
async def test_proxy_thumbnail(client):
thumbnail = await client.get("/proxy/thumbnail/XivF3Nx3exA")
thumbnail2 = await client.get("/proxy/thumbnail/XivF3Nx3exA?instance=2")
assert thumbnail.status_code == thumbnail2.status_code == 307
assert (
thumbnail.headers["Location"]
== "https://i1.ytimg.com/vi/XivF3Nx3exA/hqdefault.jpg"
)
assert (
thumbnail2.headers["Location"]
== "https://i2.ytimg.com/vi/XivF3Nx3exA/hqdefault.jpg"
)
58 changes: 37 additions & 21 deletions tests/test_youtube.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,46 @@
from http.cookiejar import CookieJar

import pytest
from httpx import AsyncClient, get
from httpx import AsyncClient

from yourss.youtube import (
YoutubeMetadata,
YoutubeRssApi,
YoutubeWebApi,
)
from yourss.youtube.utils import html_get_rgpd_forms
from yourss.youtube.scrapper import VideoScrapper
from yourss.youtube.utils import bs_parse


def is_rgpd_applicable():
resp = get("https://ifconfig.io/country_code")
return resp.status_code == 200 and resp.text.strip() == "FR"


@pytest.mark.skipif(not is_rgpd_applicable(), reason="Not applicable outside Europe")
@pytest.mark.asyncio(loop_scope="module")
async def test_rgpd_with_cookies():
api = YoutubeWebApi(AsyncClient(cookies=CookieJar()))
async def test_rgpd():
api = YoutubeWebApi()

url = "https://www.youtube.com/@jonnygiger"
url = "/@jonnygiger"

# first call should fail
resp = await api.get_html(url)
assert resp.status_code == 200
assert len(html_get_rgpd_forms(resp.text)) > 0
assert (
len(
bs_parse(resp.text).find_all(
"form",
attrs={"method": "POST", "action": "https://consent.youtube.com/save"},
)
)
== 0
)

# this call automatically accept the rgpd form
resp = await api.get_rgpd_html(url)
resp = await api.get_html(url, ucbcb=0)
assert resp.status_code == 200
assert len(html_get_rgpd_forms(resp.text)) == 0

# now we can get the page without the rgpd form
resp = await api.get_html(url)
assert resp.status_code == 200
assert len(html_get_rgpd_forms(resp.text)) == 0
assert (
len(
bs_parse(resp.text).find_all(
"form",
attrs={"method": "POST", "action": "https://consent.youtube.com/save"},
)
)
> 0
)


@pytest.mark.asyncio(loop_scope="module")
Expand Down Expand Up @@ -81,3 +85,15 @@ async def test_metadata_user():
meta.url.geturl() == "https://www.youtube.com/channel/UCVooVnzQxPSTXTMzSi1s6uw"
)
assert meta.avatar_url is not None


@pytest.mark.asyncio(loop_scope="module")
async def test_scrap_videos():
scrapper = VideoScrapper(YoutubeWebApi())

page_iterator = scrapper.iter_videos("UCVooVnzQxPSTXTMzSi1s6uw")
page1 = await anext(page_iterator)
assert len(page1) == 30
page2 = await anext(page_iterator)
assert len(page2) > 10
assert page1 != page2
16 changes: 16 additions & 0 deletions yourss/jsonutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Dict, Iterator, Type, TypeVar

from jsonpath_ng import parse

T = TypeVar("T")


def json_iter(path: str, payload: Dict, cls: Type[T] | None = None) -> Iterator[T]:
for match in parse(path).find(payload):
out = match.value
if out is not None and (cls is None or isinstance(out, cls)):
yield out


def json_first(path: str, payload: Dict, cls: Type[T] | None = None) -> T:
return next(json_iter(path, payload, cls=cls))
8 changes: 8 additions & 0 deletions yourss/routers/proxy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from httpx import AsyncClient
from pydantic import PositiveInt
from starlette.status import HTTP_404_NOT_FOUND

from ..youtube import (
Expand Down Expand Up @@ -70,3 +71,10 @@ async def home(
meta = YoutubeMetadata.from_response(homepage)

return RedirectResponse(meta.url.geturl())


@router.get("/thumbnail/{video_id}", response_class=RedirectResponse)
async def thumbnail(video_id: str, instance: PositiveInt = 1):
return RedirectResponse(
f"https://i{instance}.ytimg.com/vi/{video_id}/hqdefault.jpg"
)
51 changes: 18 additions & 33 deletions yourss/youtube/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from dataclasses import dataclass
from typing import Annotated, Dict
from urllib.parse import urlparse
from typing import Annotated, Any, Dict

from httpx import Response
from loguru import logger
from rapid_api_client import FormBody, Path
from pydantic import TypeAdapter
from rapid_api_client import FormBody, Path, Query
from rapid_api_client.annotations import JsonBody
from rapid_api_client.async_ import AsyncRapidApi, get, post

from .schema import Feed
from .utils import (
ALLOWED_HOSTS,
MOZILLA_USER_AGENT,
html_get_rgpd_forms,
is_channel_id,
is_user,
)
Expand All @@ -24,43 +22,30 @@
@dataclass
class YoutubeWebApi(AsyncRapidApi):
def __post_init__(self):
self.client.base_url = BASE_URL
self.client.follow_redirects = True
self.client.headers.setdefault("user-agent", MOZILLA_USER_AGENT)
self.client.headers["user-agent"] = MOZILLA_USER_AGENT
self.client.headers["accept-language"] = "en"
self.client.cookies.set("CONSENT", "YES+cb", domain=".youtube.com")

@get("{url}")
async def get_html(self, url: Annotated[str, Path()]): ...
@get("{path}")
async def get_html(
self, path: Annotated[str, Path()], ucbcb: Annotated[int, Query(default=1)]
): ...

@post("{url}")
@post("{path}")
async def post_html(
self, url: Annotated[str, Path()], form: Annotated[Dict, FormBody()]
self, path: Annotated[str, Path()], form: Annotated[Dict, FormBody()]
): ...

async def get_rgpd_html(self, url: str) -> Response:
logger.debug("Get youtube page: {}", url)
parsed_url = urlparse(url)
assert (
parsed_url.hostname in ALLOWED_HOSTS
), f"Invalid host: {parsed_url.hostname}"
response = await self.get_html(url)
response.raise_for_status()
if len(forms := html_get_rgpd_forms(response.text)) > 0:
logger.debug("Page {} has RGPD forms", url)
response = await self.post_html(
(forms[0].attrs["action"]),
form={
element.attrs["name"]: element.attrs["value"]
for element in forms[0].find_all("input")
if "name" in element.attrs and "value" in element.attrs
},
)
response.raise_for_status()
return response
@post("/youtubei/v1/browse", response_class=TypeAdapter(Dict[str, Any]))
async def api_browse(self, data: Annotated[dict, JsonBody()]): ...

async def get_homepage(self, name: str) -> Response:
if is_channel_id(name):
return await self.get_rgpd_html(f"https://www.youtube.com/channel/{name}")
return await self.get_html(f"/channel/{name}")
if is_user(name):
return await self.get_rgpd_html(f"https://www.youtube.com/{name}")
return await self.get_html(f"/{name}")
raise ValueError(f"Cannot find homepage for: {name}")


Expand Down
97 changes: 96 additions & 1 deletion yourss/youtube/metadata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import json
import re
from collections import UserDict
from typing import Self
from dataclasses import dataclass
from functools import cached_property
from typing import Any, Dict, Iterator, Literal, Self
from urllib.parse import ParseResult, urlparse

from bs4 import BeautifulSoup
from httpx import Response

from yourss.jsonutils import json_first, json_iter

from .utils import ALLOWED_HOSTS, html_get_metadata, is_channel_id

YTCFG_PATTERN = r"ytcfg\.set\((?P<json>(?:\"[^\"]*\"|'[^']*'|[^()])*)\)"
YTINITIALDATA_PATTERN = r"ytInitialData = (?P<json>{.*?});"


class YoutubeMetadata(UserDict[str, str]):
@classmethod
Expand Down Expand Up @@ -34,3 +44,88 @@ def channel_id(self) -> str:
last_segment = self.url.path.split("/")[-1]
assert is_channel_id(last_segment), f"Invalid channel_id: {last_segment}"
return last_segment


@dataclass
class VideoDescription:
video_id: str
title: str


class VideoData(UserDict):
@classmethod
def from_json(cls, text: str) -> Self:
return cls(json.loads(text))

def iter_videos(
self, selector: Literal["videoRenderer", "reelItemRenderer"] = "videoRenderer"
) -> Iterator[VideoDescription]:
for item in json_iter(f"$..{selector}", self.data):
yield VideoDescription(
video_id=json_first("$.videoId", item, str),
title=json_first("$.title.runs[0].text", item, str),
)

@property
def continuation_token(self) -> str | None:
return next(json_iter("$..continuationCommand.token", self.data, str), None)

@property
def click_tracking_params(self) -> str:
return json_first("$..clickTrackingParams", self.data, str)


@dataclass
class YoutubeWebPage:
response: Response

def __post_init__(self):
self.response.raise_for_status()

@cached_property
def soup(self) -> BeautifulSoup:
return BeautifulSoup(self.response.text, features="html.parser")

@cached_property
def metadata(self) -> Dict[str, str]:
return html_get_metadata(self.response.text)

@property
def metadata_title(self) -> str:
return self.metadata["og:title"]

@property
def metadata_avatar_url(self) -> str | None:
return self.metadata.get("og:image")

@property
def metadata_url(self) -> ParseResult:
home_url = self.metadata["og:url"]
out = urlparse(home_url)
assert out.hostname in ALLOWED_HOSTS, f"Not a valid youtube url: {home_url}"
return out

@property
def metadata_channel_id(self) -> str:
last_segment = self.metadata_url.path.split("/")[-1]
assert is_channel_id(last_segment), f"Invalid channel_id: {last_segment}"
return last_segment

def iter_scripts(self) -> Iterator[str]:
for script in self.soup.find_all("script"):
if script.string is not None:
yield script.string

def find_initial_video_data(self) -> VideoData | None:
for script in self.iter_scripts():
if (m := re.search(YTINITIALDATA_PATTERN, script, re.DOTALL)) is not None:
return VideoData.from_json(m.group("json"))

def find_client_data(self) -> Dict[str, Any] | None:
for script in self.iter_scripts():
if (
"INNERTUBE_CONTEXT" in script
and (m := re.search(YTCFG_PATTERN, script, re.DOTALL)) is not None
):
payload = json.loads(m.group("json"))
return json_first("$.INNERTUBE_CONTEXT.client", payload)
52 changes: 52 additions & 0 deletions yourss/youtube/scrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from asyncio import sleep
from dataclasses import dataclass
from typing import AsyncIterator, List

from .client import YoutubeWebApi
from .metadata import VideoData, VideoDescription, YoutubeWebPage


@dataclass
class VideoScrapper:
youtube_api: YoutubeWebApi

async def iter_videos(
self, channel_id: str, *, shorts: bool = False, delay: float = 0
) -> AsyncIterator[List[VideoDescription]]:
resp = await self.youtube_api.get_html(
f"https://www.youtube.com/channel/{channel_id}/{'videos' if not shorts else 'shorts'}"
)
first_page = YoutubeWebPage(resp)
assert (client_data := first_page.find_client_data()) is not None
assert (video_data := first_page.find_initial_video_data()) is not None
while True:
videos = list(
video_data.iter_videos(
selector="videoRenderer" if not shorts else "reelItemRenderer"
)
)
if len(videos) > 0:
# yield all videos from the page
yield videos
else:
# could not find any video
break
if video_data.continuation_token is None:
# no continuation token, stop
break
# get next page using json api
if delay > 0:
await sleep(delay)
video_data = VideoData(
await self.youtube_api.api_browse(
{
"context": {
"clickTracking": {
"clickTrackingParams": video_data.click_tracking_params
},
"client": client_data,
},
"continuation": video_data.continuation_token,
}
)
)
Loading

0 comments on commit d1aa12d

Please sign in to comment.