Skip to content

Commit

Permalink
feat(python-sdk): add async scraping and crawling methods using aioht…
Browse files Browse the repository at this point in the history
…tp and asyncio
  • Loading branch information
RutamBhagat committed Dec 21, 2024
1 parent f877fbf commit 49f7fec
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 9 deletions.
149 changes: 142 additions & 7 deletions apps/python-sdk/firecrawl/firecrawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import requests
import pydantic
import websockets
import aiohttp
import asyncio

logger : logging.Logger = logging.getLogger("firecrawl")

Expand All @@ -26,8 +28,8 @@ class ExtractParams(pydantic.BaseModel):
"""
Parameters for the extract operation.
"""
prompt: str
schema: Optional[Any] = None
prompt: Optional[str] = None
schema_: Optional[Any] = pydantic.Field(None, alias='schema')
system_prompt: Optional[str] = None
allow_external_links: Optional[bool] = False

Expand Down Expand Up @@ -106,6 +108,37 @@ def scrape_url(self, url: str, params: Optional[Dict[str, Any]] = None) -> Any:
else:
self._handle_error(response, 'scrape URL')

async def scrape_url_async(self, url: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""Async version of scrape_url"""
headers = self._prepare_headers()
scrape_params = {'url': url}
if params:
extract = params.get('extract', {})
if extract:
if 'schema' in extract and hasattr(extract['schema'], 'schema'):
extract['schema'] = extract['schema'].schema()
scrape_params['extract'] = extract
for key, value in params.items():
if key not in ['extract']:
scrape_params[key] = value

endpoint = f'/v1/scrape'
response = await self._async_post_request(
f'{self.api_url}{endpoint}',
scrape_params,
headers
)
if response.status == 200:
response_json = await response.json()
if response_json['success'] and 'data' in response_json:
return response_json['data']
elif "error" in response_json:
raise Exception(f'Failed to scrape URL. Error: {response_json["error"]}')
else:
raise Exception(f'Failed to scrape URL. Error: {response_json}')
else:
await self._handle_error_async(response, 'scrape URL')

def search(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""
Perform a search using the Firecrawl API.
Expand Down Expand Up @@ -162,6 +195,23 @@ def crawl_url(self, url: str,
else:
self._handle_error(response, 'start crawl job')

async def crawl_url_async(self, url: str,
params: Optional[Dict[str, Any]] = None,
poll_interval: Optional[int] = 2,
idempotency_key: Optional[str] = None) -> Any:
"""Async version of crawl_url"""
endpoint = f'/v1/crawl'
headers = self._prepare_headers(idempotency_key)
json_data = {'url': url}
if params:
json_data.update(params)
response = await self._async_post_request(f'{self.api_url}{endpoint}', json_data, headers)
if response.status == 200:
response_json = await response.json()
id = response_json.get('id')
return await self._async_monitor_job_status(id, headers, poll_interval)
else:
await self._handle_error_async(response, 'start crawl job')

def async_crawl_url(self, url: str, params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -353,8 +403,33 @@ def batch_scrape_urls(self, urls: list[str],
else:
self._handle_error(response, 'start batch scrape job')

async def _async_monitor_job_status(self, id: str, headers: Dict[str, str], poll_interval: int) -> Any:
"""Async version of _monitor_job_status"""
while True:
api_url = f'{self.api_url}/v1/crawl/{id}'
status_response = await self._async_get_request(api_url, headers)
if status_response.status == 200:
status_data = await status_response.json()
if status_data['status'] == 'completed':
if 'data' in status_data:
data = status_data['data']
while 'next' in status_data:
status_response = await self._async_get_request(status_data['next'], headers)
status_data = await status_response.json()
data.extend(status_data.get('data', []))
status_data['data'] = data
return status_data
else:
raise Exception('Crawl job completed but no data was returned')
elif status_data['status'] in ['active', 'paused', 'pending', 'queued', 'waiting', 'scraping']:
poll_interval = max(poll_interval, 2)
await asyncio.sleep(poll_interval)
else:
raise Exception(f'Crawl job failed or was stopped. Status: {status_data["status"]}')
else:
await self._handle_error_async(status_response, 'check crawl status')

def async_batch_scrape_urls(self, urls: list[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]:
async def async_batch_scrape_urls(self, urls: list[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]:
"""
Initiate a crawl job asynchronously.
Expand All @@ -374,11 +449,11 @@ def async_batch_scrape_urls(self, urls: list[str], params: Optional[Dict[str, An
json_data = {'urls': urls}
if params:
json_data.update(params)
response = self._post_request(f'{self.api_url}{endpoint}', json_data, headers)
if response.status_code == 200:
return response.json()
response = await self._async_post_request(f'{self.api_url}{endpoint}', json_data, headers)
if response.status == 200:
return await response.json()
else:
self._handle_error(response, 'start batch scrape job')
await self._handle_error_async(response, 'start batch scrape job')

def batch_scrape_urls_and_watch(self, urls: list[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> 'CrawlWatcher':
"""
Expand Down Expand Up @@ -549,6 +624,21 @@ def _post_request(self, url: str,
return response
return response

async def _async_post_request(self, url: str,
data: Dict[str, Any],
headers: Dict[str, str],
retries: int = 3,
backoff_factor: float = 0.5) -> aiohttp.ClientResponse:
"""Async version of _post_request"""
async with aiohttp.ClientSession() as session:
for attempt in range(retries):
async with session.post(url, headers=headers, json=data) as response:
if response.status == 502:
await asyncio.sleep(backoff_factor * (2 ** attempt))
continue
return response
return response

def _get_request(self, url: str,
headers: Dict[str, str],
retries: int = 3,
Expand All @@ -575,6 +665,20 @@ def _get_request(self, url: str,
else:
return response
return response

async def _async_get_request(self, url: str,
headers: Dict[str, str],
retries: int = 3,
backoff_factor: float = 0.5) -> aiohttp.ClientResponse:
"""Async version of _get_request"""
async with aiohttp.ClientSession() as session:
for attempt in range(retries):
async with session.get(url, headers=headers) as response:
if response.status == 502:
await asyncio.sleep(backoff_factor * (2 ** attempt))
continue
return response
return response

def _delete_request(self, url: str,
headers: Dict[str, str],
Expand Down Expand Up @@ -670,6 +774,37 @@ def _handle_error(self, response: requests.Response, action: str) -> None:
# Raise an HTTPError with the custom message and attach the response
raise requests.exceptions.HTTPError(message, response=response)

async def _handle_error_async(self, response: aiohttp.ClientResponse, action: str) -> None:
"""
Handle errors from async API responses.
Args:
response (aiohttp.ClientResponse): The response object from the async API request.
action (str): Description of the action that was being performed.
Raises:
Exception: An exception with a message containing the status code and error details from the response.
"""
try:
error_json = await response.json()
error_message = error_json.get('error', 'No error message provided.')
error_details = error_json.get('details', 'No additional error details provided.')

if response.status == 402:
message = f"Payment Required: Failed to {action}. {error_message} - {error_details}"
elif response.status == 408:
message = f"Request Timeout: Failed to {action} as the request timed out. {error_message} - {error_details}"
elif response.status == 409:
message = f"Conflict: Failed to {action} due to a conflict. {error_message} - {error_details}"
elif response.status == 500:
message = f"Internal Server Error: Failed to {action}. {error_message} - {error_details}"
else:
message = f"Unexpected error during {action}: Status code {response.status}. {error_message} - {error_details}"

raise aiohttp.ClientError(message)
except Exception as e:
raise aiohttp.ClientError(f"Error processing response: {str(e)}")

class CrawlWatcher:
def __init__(self, id: str, app: FirecrawlApp):
self.id = id
Expand Down
4 changes: 3 additions & 1 deletion apps/python-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ dependencies = [
"requests",
"python-dotenv",
"websockets",
"nest-asyncio"
"nest-asyncio",
"aiohttp>=3.8.0",
"asyncio>=3.4.3",
]
authors = [{name = "Mendable.ai",email = "[email protected]"}]
maintainers = [{name = "Mendable.ai",email = "[email protected]"}]
Expand Down
4 changes: 3 additions & 1 deletion apps/python-sdk/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ requests
pytest
python-dotenv
websockets
nest-asyncio
nest-asyncio
aiohttp
asyncio

0 comments on commit 49f7fec

Please sign in to comment.