Skip to content

Commit

Permalink
Supported custom ernie_api_base & Implemented asynchronous for ErnieE…
Browse files Browse the repository at this point in the history
…mbeddings (langchain-ai#10398)

Description: Supported custom ernie_api_base & Implemented asynchronous
for ErnieEmbeddings
 - ernie_api_base:Support Ernie Service custom endpoints
 - Support asynchronous 

Issue: None
Dependencies: None
Tag maintainer:
Twitter handle: @JohnMai95
  • Loading branch information
johnmai-dev authored Sep 9, 2023
1 parent e0d45e6 commit ee3f950
Showing 1 changed file with 53 additions and 1 deletion.
54 changes: 53 additions & 1 deletion libs/langchain/langchain/embeddings/ernie.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
import threading
from functools import partial
from typing import Dict, List, Optional

import requests
Expand All @@ -14,6 +16,7 @@
class ErnieEmbeddings(BaseModel, Embeddings):
"""`Ernie Embeddings V1` embedding models."""

ernie_api_base: Optional[str] = None
ernie_client_id: Optional[str] = None
ernie_client_secret: Optional[str] = None
access_token: Optional[str] = None
Expand All @@ -26,6 +29,9 @@ class ErnieEmbeddings(BaseModel, Embeddings):

@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
values["ernie_api_base"] = get_from_dict_or_env(
values, "ernie_api_base", "ERNIE_API_BASE", "https://aip.baidubce.com"
)
values["ernie_client_id"] = get_from_dict_or_env(
values,
"ernie_client_id",
Expand All @@ -40,7 +46,7 @@ def validate_environment(cls, values: Dict) -> Dict:

def _embedding(self, json: object) -> dict:
base_url = (
"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/embeddings"
f"{self.ernie_api_base}/rpc/2.0/ai_custom/v1/wenxinworkshop/embeddings"
)
resp = requests.post(
f"{base_url}/embedding-v1",
Expand Down Expand Up @@ -71,6 +77,15 @@ def _refresh_access_token_with_lock(self) -> None:
self.access_token = str(resp.json().get("access_token"))

def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs.
Args:
texts: The list of texts to embed
Returns:
List[List[float]]: List of embeddings, one for each text.
"""

if not self.access_token:
self._refresh_access_token_with_lock()
text_in_chunks = [
Expand All @@ -90,6 +105,15 @@ def embed_documents(self, texts: List[str]) -> List[List[float]]:
return lst

def embed_query(self, text: str) -> List[float]:
"""Embed query text.
Args:
text: The text to embed.
Returns:
List[float]: Embeddings for the text.
"""

if not self.access_token:
self._refresh_access_token_with_lock()
resp = self._embedding({"input": [text]})
Expand All @@ -100,3 +124,31 @@ def embed_query(self, text: str) -> List[float]:
else:
raise ValueError(f"Error from Ernie: {resp}")
return resp["data"][0]["embedding"]

async def aembed_query(self, text: str) -> List[float]:
"""Asynchronous Embed query text.
Args:
text: The text to embed.
Returns:
List[float]: Embeddings for the text.
"""

return await asyncio.get_running_loop().run_in_executor(
None, partial(self.embed_query, text)
)

async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
"""Asynchronous Embed search docs.
Args:
texts: The list of texts to embed
Returns:
List[List[float]]: List of embeddings, one for each text.
"""

result = await asyncio.gather(*[self.aembed_query(text) for text in texts])

return list(result)

0 comments on commit ee3f950

Please sign in to comment.