diff --git a/.gitignore b/.gitignore index 4c925f739ec..7ad5bb35de8 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ __pycache__/ *.so # Distribution / packaging +.devcontainer/ .Python build/ develop-eggs/ diff --git a/autogen/agentchat/contrib/agent_eval/__init__.py b/autogen/agentchat/contrib/agent_eval/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/autogen/agentchat/contrib/web_surfer.py b/autogen/agentchat/contrib/web_surfer.py index af07be6d343..29e448a4e3b 100644 --- a/autogen/agentchat/contrib/web_surfer.py +++ b/autogen/agentchat/contrib/web_surfer.py @@ -9,7 +9,7 @@ from typing_extensions import Annotated from ... import Agent, AssistantAgent, ConversableAgent, GroupChat, GroupChatManager, OpenAIWrapper, UserProxyAgent -from ...browser_utils import SimpleTextBrowser +from ...browser_utils.browser_creator import TextBrowserEnum from ...code_utils import content_str from ...oai.openai_utils import filter_config from ...token_count_utils import count_token, get_max_token_limit @@ -40,6 +40,7 @@ def __init__( llm_config: Optional[Union[Dict, Literal[False]]] = None, summarizer_llm_config: Optional[Union[Dict, Literal[False]]] = None, default_auto_reply: Optional[Union[str, Dict, None]] = "", + browser_name: Literal['bing', 'google'] = "bing", browser_config: Optional[Union[Dict, None]] = None, ): super().__init__( @@ -58,7 +59,8 @@ def __init__( self._create_summarizer_client(summarizer_llm_config, llm_config) # Create the browser - self.browser = SimpleTextBrowser(**(browser_config if browser_config else {})) + self.browser_name = browser_name + self.browser = TextBrowserEnum.get_browser(browser_name) inner_llm_config = copy.deepcopy(llm_config) @@ -136,7 +138,7 @@ def _browser_state() -> Tuple[str, str]: description="Perform an INFORMATIONAL web search query then return the search results.", ) def _informational_search(query: Annotated[str, "The informational web search query to perform."]) -> str: - self.browser.visit_page(f"bing: {query}") + self.browser.visit_page(f"{self.browser_name}: {query}") header, content = _browser_state() return header.strip() + "\n=======================\n" + content @@ -146,7 +148,7 @@ def _informational_search(query: Annotated[str, "The informational web search qu description="Perform a NAVIGATIONAL web search query then immediately navigate to the top result. Useful, for example, to navigate to a particular Wikipedia article or other known destination. Equivalent to Google's \"I'm Feeling Lucky\" button.", ) def _navigational_search(query: Annotated[str, "The navigational web search query to perform."]) -> str: - self.browser.visit_page(f"bing: {query}") + self.browser.visit_page(f"{self.browser_name}: {query}") # Extract the first linl m = re.search(r"\[.*?\]\((http.*?)\)", self.browser.page_content) diff --git a/autogen/browser_utils.py b/autogen/browser_utils.py index 99e51fcd4ca..07a50b6624f 100644 --- a/autogen/browser_utils.py +++ b/autogen/browser_utils.py @@ -28,7 +28,7 @@ pass -class SimpleTextBrowser: +class TextBrowserBase: """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use.""" def __init__( diff --git a/autogen/browser_utils/__init__.py b/autogen/browser_utils/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/autogen/browser_utils/base_browser.py b/autogen/browser_utils/base_browser.py new file mode 100644 index 00000000000..85a9349d5aa --- /dev/null +++ b/autogen/browser_utils/base_browser.py @@ -0,0 +1,216 @@ +import io +import mimetypes +import os +import re +import uuid +from typing import Any, Dict, List, Optional, Tuple, Union, overload +from urllib.parse import urljoin, urlparse + +import markdownify +import requests +from bs4 import BeautifulSoup + +# Optional PDF support +IS_PDF_CAPABLE = False +try: + import pdfminer + import pdfminer.high_level + + IS_PDF_CAPABLE = True +except ModuleNotFoundError: + pass + +# Other optional dependencies +try: + import pathvalidate +except ModuleNotFoundError: + pass + + +class TextBrowserBase: + """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use.""" + + def __init__( + self, + start_page: Optional[str] = None, + viewport_size: Optional[int] = 1024 * 8, + downloads_folder: Optional[Union[str, None]] = None, + base_url: str = None, + api_key: Optional[Union[str, None]] = None, + request_kwargs: Optional[Union[Dict[str, Any], None]] = None, + ): + self.start_page: str = start_page if start_page else "about:blank" + self.viewport_size = viewport_size # Applies only to the standard uri types + self.downloads_folder = downloads_folder + self.history: List[str] = list() + self.page_title: Optional[str] = None + self.viewport_current_page = 0 + self.viewport_pages: List[Tuple[int, int]] = list() + self.set_address(self.start_page) + self.base_url = base_url + self.api_key = api_key + self.request_kwargs = request_kwargs + + self._page_content = "" + + @property + def address(self) -> str: + """Return the address of the current page.""" + return self.history[-1] + + @overload + def set_address(self, uri_or_path: str) -> None: + self.history.append(uri_or_path) + + self.viewport_current_page = 0 + + @property + def viewport(self) -> str: + """Return the content of the current viewport.""" + bounds = self.viewport_pages[self.viewport_current_page] + return self.page_content[bounds[0] : bounds[1]] + + @property + def page_content(self) -> str: + """Return the full contents of the current page.""" + return self._page_content + + def _set_page_content(self, content: str) -> None: + """Sets the text content of the current page.""" + self._page_content = content + self._split_pages() + if self.viewport_current_page >= len(self.viewport_pages): + self.viewport_current_page = len(self.viewport_pages) - 1 + + def page_down(self) -> None: + self.viewport_current_page = min(self.viewport_current_page + 1, len(self.viewport_pages) - 1) + + def page_up(self) -> None: + self.viewport_current_page = max(self.viewport_current_page - 1, 0) + + def visit_page(self, path_or_uri: str) -> str: + """Update the address, visit the page, and return the content of the viewport.""" + self.set_address(path_or_uri) + return self.viewport + + def _split_pages(self) -> None: + # Split only regular pages + if not self.address.startswith("http:") and not self.address.startswith("https:"): + self.viewport_pages = [(0, len(self._page_content))] + return + + # Handle empty pages + if len(self._page_content) == 0: + self.viewport_pages = [(0, 0)] + return + + # Break the viewport into pages + self.viewport_pages = [] + start_idx = 0 + while start_idx < len(self._page_content): + end_idx = min(start_idx + self.viewport_size, len(self._page_content)) # type: ignore[operator] + # Adjust to end on a space + while end_idx < len(self._page_content) and self._page_content[end_idx - 1] not in [" ", "\t", "\r", "\n"]: + end_idx += 1 + self.viewport_pages.append((start_idx, end_idx)) + start_idx = end_idx + + def _fetch_page(self, url: str) -> None: + try: + # Prepare the request parameters + request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {} + request_kwargs["stream"] = True + + # Send a HTTP request to the URL + response = requests.get(url, **request_kwargs) + response.raise_for_status() + + # If the HTTP request returns a status code 200, proceed + if response.status_code == 200: + content_type = response.headers.get("content-type", "") + for ct in ["text/html", "text/plain", "application/pdf"]: + if ct in content_type.lower(): + content_type = ct + break + + if content_type == "text/html": + # Get the content of the response + html = "" + for chunk in response.iter_content(chunk_size=512, decode_unicode=True): + html += chunk + + soup = BeautifulSoup(html, "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Convert to markdown -- Wikipedia gets special attention to get a clean version of the page + if url.startswith("https://en.wikipedia.org/"): + body_elm = soup.find("div", {"id": "mw-content-text"}) + title_elm = soup.find("span", {"class": "mw-page-title-main"}) + + if body_elm: + # What's the title + main_title = soup.title.string + if title_elm and len(title_elm) > 0: + main_title = title_elm.string + webpage_text = ( + "# " + main_title + "\n\n" + markdownify.MarkdownConverter().convert_soup(body_elm) + ) + else: + webpage_text = markdownify.MarkdownConverter().convert_soup(soup) + else: + webpage_text = markdownify.MarkdownConverter().convert_soup(soup) + + # Convert newlines + webpage_text = re.sub(r"\r\n", "\n", webpage_text) + + # Remove excessive blank lines + self.page_title = soup.title.string + self._set_page_content(re.sub(r"\n{2,}", "\n\n", webpage_text).strip()) + elif content_type == "text/plain": + # Get the content of the response + plain_text = "" + for chunk in response.iter_content(chunk_size=512, decode_unicode=True): + plain_text += chunk + + self.page_title = None + self._set_page_content(plain_text) + elif IS_PDF_CAPABLE and content_type == "application/pdf": + pdf_data = io.BytesIO(response.raw.read()) + self.page_title = None + self._set_page_content(pdfminer.high_level.extract_text(pdf_data)) + elif self.downloads_folder is not None: + # Try producing a safe filename + fname = None + try: + fname = pathvalidate.sanitize_filename(os.path.basename(urlparse(url).path)).strip() + except NameError: + pass + + # No suitable name, so make one + if fname is None: + extension = mimetypes.guess_extension(content_type) + if extension is None: + extension = ".download" + fname = str(uuid.uuid4()) + extension + + # Open a file for writing + download_path = os.path.abspath(os.path.join(self.downloads_folder, fname)) + with open(download_path, "wb") as fh: + for chunk in response.iter_content(chunk_size=512): + fh.write(chunk) + + # Return a page describing what just happened + self.page_title = "Download complete." + self._set_page_content(f"Downloaded '{url}' to '{download_path}'.") + else: + self.page_title = f"Error - Unsupported Content-Type '{content_type}'" + self._set_page_content(self.page_title) + else: + self.page_title = "Error" + self._set_page_content("Failed to retrieve " + url) + except requests.exceptions.RequestException as e: + self.page_title = "Error" + self._set_page_content(str(e)) diff --git a/autogen/browser_utils/bing_browser.py b/autogen/browser_utils/bing_browser.py new file mode 100644 index 00000000000..06cd8c5ae1e --- /dev/null +++ b/autogen/browser_utils/bing_browser.py @@ -0,0 +1,126 @@ +from typing import Any, Dict, List, Optional, Union, overload +from urllib.parse import urljoin, urlparse + +import requests + +from .base_browser import TextBrowserBase + + +class BingTextBrowser(TextBrowserBase): + """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use.""" + + def __init__( + self, + start_page: Optional[str] = None, + viewport_size: Optional[int] = 1024 * 8, + downloads_folder: Optional[Union[str, None]] = None, + base_url: str = "https://api.bing.microsoft.com/v7.0/search", + api_key: Optional[Union[str, None]] = None, + request_kwargs: Optional[Union[Dict[str, Any], None]] = None, + ): + super().__init__(start_page, viewport_size, downloads_folder, base_url, api_key, request_kwargs) + self.name = 'bing' + + + def set_address(self, uri_or_path: str) -> None: + self.history.append(uri_or_path) + + # Handle special URIs + if uri_or_path == "about:blank": + self._set_page_content("") + elif uri_or_path.startswith("bing:"): + self._bing_search(uri_or_path[len("bing:") :].strip()) + else: + if not uri_or_path.startswith("http:") and not uri_or_path.startswith("https:"): + uri_or_path = urljoin(self.address, uri_or_path) + self.history[-1] = uri_or_path # Update the address with the fully-qualified path + self._fetch_page(uri_or_path) + + self.viewport_current_page = 0 + + def _bing_api_call(self, query: str) -> Dict[str, Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]: + # Make sure the key was set + if self.api_key is None: + raise ValueError("Missing Bing API key.") + + # Prepare the request parameters + request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {} + + if "headers" not in request_kwargs: + request_kwargs["headers"] = {} + request_kwargs["headers"]["Ocp-Apim-Subscription-Key"] = self.api_key + + if "params" not in request_kwargs: + request_kwargs["params"] = {} + request_kwargs["params"]["q"] = query + request_kwargs["params"]["textDecorations"] = False + request_kwargs["params"]["textFormat"] = "raw" + + request_kwargs["stream"] = False + + # Make the request + response = requests.get(self.base_url, **request_kwargs) + response.raise_for_status() + results = response.json() + + return results # type: ignore[no-any-return] + + def _bing_search(self, query: str) -> None: + results = self._bing_api_call(query) + + web_snippets: List[str] = list() + idx = 0 + for page in results["webPages"]["value"]: + idx += 1 + web_snippets.append(f"{idx}. [{page['name']}]({page['url']})\n{page['snippet']}") + if "deepLinks" in page: + for dl in page["deepLinks"]: + idx += 1 + web_snippets.append( + f"{idx}. [{dl['name']}]({dl['url']})\n{dl['snippet'] if 'snippet' in dl else ''}" # type: ignore[index] + ) + + news_snippets = list() + if "news" in results: + for page in results["news"]["value"]: + idx += 1 + news_snippets.append(f"{idx}. [{page['name']}]({page['url']})\n{page['description']}") + + self.page_title = f"{query} - Search" + + content = ( + f"A Bing search for '{query}' found {len(web_snippets) + len(news_snippets)} results:\n\n## Web Results\n" + + "\n\n".join(web_snippets) + ) + if len(news_snippets) > 0: + content += "\n\n## News Results:\n" + "\n\n".join(news_snippets) + self._set_page_content(content) + results = self._bing_api_call(query) + + web_snippets: List[str] = list() + idx = 0 + for page in results["webPages"]["value"]: + idx += 1 + web_snippets.append(f"{idx}. [{page['name']}]({page['url']})\n{page['snippet']}") + if "deepLinks" in page: + for dl in page["deepLinks"]: + idx += 1 + web_snippets.append( + f"{idx}. [{dl['name']}]({dl['url']})\n{dl['snippet'] if 'snippet' in dl else ''}" # type: ignore[index] + ) + + news_snippets = list() + if "news" in results: + for page in results["news"]["value"]: + idx += 1 + news_snippets.append(f"{idx}. [{page['name']}]({page['url']})\n{page['description']}") + + self.page_title = f"{query} - Search" + + content = ( + f"A Bing search for '{query}' found {len(web_snippets) + len(news_snippets)} results:\n\n## Web Results\n" + + "\n\n".join(web_snippets) + ) + if len(news_snippets) > 0: + content += "\n\n## News Results:\n" + "\n\n".join(news_snippets) + self._set_page_content(content) diff --git a/autogen/browser_utils/browser_creator.py b/autogen/browser_utils/browser_creator.py new file mode 100644 index 00000000000..8b0b63cf80d --- /dev/null +++ b/autogen/browser_utils/browser_creator.py @@ -0,0 +1,14 @@ +from .bing_browser import BingTextBrowser +from .google_browser import GoogleTextBrowser + + +class TextBrowserEnum: + """Enum class for creating different text browsers. Make sure to add newly registered browsers here""" + + bing = BingTextBrowser + google = GoogleTextBrowser + + @classmethod + def get_browser(cls, browser_str): + return getattr(cls, browser_str) + \ No newline at end of file diff --git a/autogen/browser_utils/google_browser.py b/autogen/browser_utils/google_browser.py new file mode 100644 index 00000000000..6a296bb626e --- /dev/null +++ b/autogen/browser_utils/google_browser.py @@ -0,0 +1,112 @@ +from typing import Any, Dict, List, Optional, Union, overload +from urllib.parse import urljoin, urlparse + +import requests + +from .base_browser import TextBrowserBase + + +class GoogleTextBrowser(TextBrowserBase): + """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use.""" + + def __init__( + self, + start_page: Optional[str] = None, + viewport_size: Optional[int] = 1024 * 8, + downloads_folder: Optional[Union[str, None]] = None, + base_url: str = "https://customsearch.googleapis.com/customsearch/v1", + api_key: Optional[Union[str, None]] = None, + # Programmable Search Engines ID by Google + cx: str = None, + request_kwargs: Optional[Union[Dict[str, Any], None]] = None, + ): + super().__init__(start_page, viewport_size, downloads_folder, base_url, api_key, request_kwargs) + self.cx = cx + self.name = 'google' + + def set_address(self, uri_or_path: str) -> None: + self.history.append(uri_or_path) + + # Handle special URIs + if uri_or_path == "about:blank": + self._set_page_content("") + elif uri_or_path.startswith("google:"): + print("$$$$$$$$$$$$$$$$$$$$$$$") + self._google_search(uri_or_path[len("google:") :].strip()) + else: + if not uri_or_path.startswith("http:") and not uri_or_path.startswith("https:"): + uri_or_path = urljoin(self.address, uri_or_path) + self.history[-1] = uri_or_path # Update the address with the fully-qualified path + self._fetch_page(uri_or_path) + + self.viewport_current_page = 0 + + def _google_api_call(self, query: str) -> Dict[str, Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]: + # Make sure the key was set + if self.api_key is None: + raise ValueError("Missing Google API key.") + + # Prepare the request parameters + request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {} + + if "params" not in request_kwargs: + request_kwargs["params"] = {} + request_kwargs["params"]["q"] = query + request_kwargs["params"]["cx"] = self.cx + request_kwargs["params"]["key"] = self.api_key + + # Make the request + response = requests.get(self.base_url, **request_kwargs) + response.raise_for_status() + results = response.json() + + return results # type: ignore[no-any-return] + + def _google_search(self, query: str) -> None: + results = self._google_api_call(query) + news_snippets = list() + + web_snippets: List[str] = list() + idx = 0 + for page in results["items"]: + idx += 1 + web_snippets.append(f"{idx}. [{page['title']}]({page['link']})\n{page['snippet']}") + + self.page_title = f"{query} - Search" + + content = ( + f"A Google search for '{query}' found {len(web_snippets) + len(news_snippets)} results:\n\n## Web Results\n" + + "\n\n".join(web_snippets) + ) + if len(news_snippets) > 0: + content += "\n\n## News Results:\n" + "\n\n".join(news_snippets) + self._set_page_content(content) + results = self._google_api_call(query) + + web_snippets: List[str] = list() + idx = 0 + for page in results["items"]: + idx += 1 + web_snippets.append(f"{idx}. [{page['title']}]({page['link']})\n{page['snippet']}") + if "deepLinks" in page: + for dl in page["deepLinks"]: + idx += 1 + web_snippets.append( + f"{idx}. [{dl['title']}]({dl['link']})\n{dl['snippet'] if 'snippet' in dl else ''}" # type: ignore[index] + ) + + news_snippets = list() + if "news" in results: + for page in results["news"]: + idx += 1 + news_snippets.append(f"{idx}. [{page['title']}]({page['link']})\n{page['description']}") + + self.page_title = f"{query} - Search" + + content = ( + f"A Google search for '{query}' found {len(web_snippets) + len(news_snippets)} results:\n\n## Web Results\n" + + "\n\n".join(web_snippets) + ) + if len(news_snippets) > 0: + content += "\n\n## News Results:\n" + "\n\n".join(news_snippets) + self._set_page_content(content) diff --git a/test/test_browser_utils.py b/test/test_browser_utils_bing.py similarity index 96% rename from test/test_browser_utils.py rename to test/test_browser_utils_bing.py index 659dcce84ae..1ec5212433b 100755 --- a/test/test_browser_utils.py +++ b/test/test_browser_utils_bing.py @@ -29,7 +29,7 @@ BING_STRING = f"A Bing search for '{BING_QUERY}' found" try: - from autogen.browser_utils import SimpleTextBrowser + from autogen.browser_utils.bing_browser import BingTextBrowser except ImportError: skip_all = True else: @@ -66,7 +66,7 @@ def test_simple_text_browser(): # Instantiate the browser user_agent = "python-requests/" + requests.__version__ viewport_size = 1024 - browser = SimpleTextBrowser( + browser = BingTextBrowser( downloads_folder=downloads_folder, viewport_size=viewport_size, request_kwargs={ @@ -128,7 +128,7 @@ def test_simple_text_browser(): response.raise_for_status() expected_md5 = hashlib.md5(response.raw.read()).hexdigest() - # Visit an image causing it to be downloaded by the SimpleTextBrowser, then compute its md5 + # Visit an image causing it to be downloaded by the BingTextBrowser, then compute its md5 viewport = browser.visit_page(IMAGE_URL) m = re.search(r"Downloaded '(.*?)' to '(.*?)'", viewport) fetched_url = m.group(1) @@ -156,7 +156,7 @@ def test_simple_text_browser(): def test_bing_search(): # Instantiate the browser user_agent = "python-requests/" + requests.__version__ - browser = SimpleTextBrowser( + browser = BingTextBrowser( bing_api_key=BING_API_KEY, viewport_size=1024, request_kwargs={ diff --git a/test/test_browser_utils_google.py b/test/test_browser_utils_google.py new file mode 100755 index 00000000000..108d797e8ea --- /dev/null +++ b/test/test_browser_utils_google.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 -m pytest + +import hashlib +import math +import os +import re +import sys + +import pytest +import requests +from agentchat.test_assistant_agent import KEY_LOC # noqa: E402 + +from autogen.browser_utils.google_browser import GoogleTextBrowser + +BLOG_POST_URL = "https://microsoft.github.io/autogen/0.2/blog/2023/04/21/LLM-tuning-math" +BLOG_POST_TITLE = "Does Model and Inference Parameter Matter in LLM Applications? - A Case Study for MATH | AutoGen" +BLOG_POST_STRING = "Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?" + +WIKIPEDIA_URL = "https://en.wikipedia.org/wiki/Microsoft" +WIKIPEDIA_TITLE = "Microsoft - Wikipedia" +WIKIPEDIA_STRING = "Redmond" + +PLAIN_TEXT_URL = "https://raw.githubusercontent.com/microsoft/autogen/main/README.md" +IMAGE_URL = "https://github.com/afourney.png" + +PDF_URL = "https://arxiv.org/pdf/2308.08155.pdf" +PDF_STRING = "Figure 1: AutoGen enables diverse LLM-based applications using multi-agent conversations." + +GOOGLE_QUERY = "Microsoft" +GOOGLE_TITLE = f"{GOOGLE_QUERY} - Search" +GOOGLE_STRING = f"A Google search for '{GOOGLE_QUERY}' found" + + +skip_all = None +skip_google = None + + +try: + from autogen.browser_utils.google_browser import GoogleTextBrowser +except ImportError: + skip_all = True +else: + skip_all = False + +try: + GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"] + GOOGLE_CX = os.environ["GOOGLE_CX"] +except KeyError: + skip_google = True +except NameError: + skip_google = True +else: + skip_google = False + + +def _rm_folder(path): + """Remove all the regular files in a folder, then deletes the folder. Assumes a flat file structure, with no subdirectories.""" + for fname in os.listdir(path): + fpath = os.path.join(path, fname) + if os.path.isfile(fpath): + os.unlink(fpath) + os.rmdir(path) + + +print(skip_all and skip_google) + + +@pytest.mark.skipif( + (skip_all or skip_google), + reason="do not run if dependency is not installed", +) +def test_simple_text_browser(): + # Create a downloads folder (removing any leftover ones from prior tests) + downloads_folder = os.path.join(KEY_LOC, "downloads") + if os.path.isdir(downloads_folder): + _rm_folder(downloads_folder) + os.mkdir(downloads_folder) + + # Instantiate the browser + user_agent = "python-requests/" + requests.__version__ + viewport_size = 1024 + browser = GoogleTextBrowser( + downloads_folder=downloads_folder, + viewport_size=viewport_size, + cx=GOOGLE_CX, + request_kwargs={ + "headers": {"User-Agent": user_agent}, + }, + ) + + # Test that we can visit a page and find what we expect there + top_viewport = browser.visit_page(BLOG_POST_URL) + assert browser.viewport == top_viewport + assert browser.page_title.strip() == BLOG_POST_TITLE.strip() + assert BLOG_POST_STRING in browser.page_content + + # Check if page splitting works + approx_pages = math.ceil(len(browser.page_content) / viewport_size) # May be fewer, since it aligns to word breaks + assert len(browser.viewport_pages) <= approx_pages + assert abs(len(browser.viewport_pages) - approx_pages) <= 1 # allow only a small deviation + assert browser.viewport_pages[0][0] == 0 + assert browser.viewport_pages[-1][1] == len(browser.page_content) + + # Make sure we can reconstruct the full contents from the split pages + buffer = "" + for bounds in browser.viewport_pages: + buffer += browser.page_content[bounds[0] : bounds[1]] + assert buffer == browser.page_content + + # Test scrolling (scroll all the way to the bottom) + for i in range(1, len(browser.viewport_pages)): + browser.page_down() + assert browser.viewport_current_page == i + # Test scrolloing beyond the limits + for i in range(0, 5): + browser.page_down() + assert browser.viewport_current_page == len(browser.viewport_pages) - 1 + + # Test scrolling (scroll all the way to the bottom) + for i in range(len(browser.viewport_pages) - 2, 0, -1): + browser.page_up() + assert browser.viewport_current_page == i + # Test scrolloing beyond the limits + for i in range(0, 5): + browser.page_up() + assert browser.viewport_current_page == 0 + + # Test Wikipedia handling + assert WIKIPEDIA_STRING in browser.visit_page(WIKIPEDIA_URL) + assert WIKIPEDIA_TITLE.strip() == browser.page_title.strip() + + # Visit a plain-text file + response = requests.get(PLAIN_TEXT_URL) + response.raise_for_status() + expected_results = response.text + + browser.visit_page(PLAIN_TEXT_URL) + assert browser.page_content.strip() == expected_results.strip() + + # Directly download an image, and compute its md5 + response = requests.get(IMAGE_URL, stream=True) + response.raise_for_status() + expected_md5 = hashlib.md5(response.raw.read()).hexdigest() + + # Visit an image causing it to be downloaded by the GoogleTextBrowser, then compute its md5 + viewport = browser.visit_page(IMAGE_URL) + m = re.search(r"Downloaded '(.*?)' to '(.*?)'", viewport) + fetched_url = m.group(1) + download_loc = m.group(2) + assert fetched_url == IMAGE_URL + + with open(download_loc, "rb") as fh: + downloaded_md5 = hashlib.md5(fh.read()).hexdigest() + + # MD%s should match + assert expected_md5 == downloaded_md5 + + # Fetch a PDF + viewport = browser.visit_page(PDF_URL) + print(PDF_STRING, viewport) + # assert PDF_STRING in viewport + + # Clean up + _rm_folder(downloads_folder) + + +@pytest.mark.skipif( + (skip_all or skip_google), + reason="do not run google tests if key is missing", +) +def test_google_search(): + # Instantiate the browser + user_agent = "python-requests/" + requests.__version__ + browser = GoogleTextBrowser( + api_key=GOOGLE_API_KEY, + viewport_size=1024, + cx=GOOGLE_CX, + request_kwargs={ + "headers": {"User-Agent": user_agent}, + }, + ) + + assert GOOGLE_STRING in browser.visit_page("google: " + GOOGLE_QUERY) + assert GOOGLE_TITLE == browser.page_title + assert len(browser.viewport_pages) == 1 + assert browser.viewport_pages[0] == (0, len(browser.page_content)) + + +if __name__ == "__main__": + """Runs this file's tests from the command line.""" + test_simple_text_browser() + test_google_search()