diff --git a/llm-service/app/ai/indexing/index.py b/llm-service/app/ai/indexing/index.py index 85ac6993..fcbc822f 100644 --- a/llm-service/app/ai/indexing/index.py +++ b/llm-service/app/ai/indexing/index.py @@ -38,9 +38,10 @@ import logging import os +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path -from typing import Dict, List, Type +from typing import Dict, Generator, List, Type from llama_index.core.base.embeddings.base import BaseEmbedding from llama_index.core.node_parser import SentenceSplitter @@ -48,7 +49,10 @@ from llama_index.core.schema import BaseNode, Document, TextNode from llama_index.readers.file import DocxReader +from ...services.utils import batch_sequence, flatten_sequence from ...services.vector_store import VectorStore +from .readers.csv import CSVReader +from .readers.json import JSONReader from .readers.nop import NopReader from .readers.pdf import PDFReader @@ -59,6 +63,8 @@ ".txt": NopReader, ".md": NopReader, ".docx": DocxReader, + ".csv": CSVReader, + ".json": JSONReader, } CHUNKABLE_FILE_EXTENSIONS = {".pdf", ".txt", ".md", ".docx"} @@ -81,7 +87,7 @@ def __init__( self.embedding_model = embedding_model self.chunks_vector_store = chunks_vector_store - def index_file(self, file_path: Path, file_id: str) -> None: + def index_file(self, file_path: Path, document_id: str) -> None: logger.debug(f"Indexing file: {file_path}") file_extension = os.path.splitext(file_path)[1] @@ -93,7 +99,7 @@ def index_file(self, file_path: Path, file_id: str) -> None: logger.debug(f"Parsing file: {file_path}") - documents = self._documents_in_file(reader, file_path, file_id) + documents = self._documents_in_file(reader, file_path, document_id) if file_extension in CHUNKABLE_FILE_EXTENSIONS: logger.debug(f"Chunking file: {file_path}") chunks = [ @@ -104,34 +110,34 @@ def index_file(self, file_path: Path, file_id: str) -> None: else: chunks = documents - texts = [chunk.text for chunk in chunks] - logger.debug(f"Embedding {len(texts)} chunks") - embeddings = self.embedding_model.get_text_embedding_batch(texts) + logger.debug(f"Embedding {len(chunks)} chunks") - for chunk, embedding in zip(chunks, embeddings): - chunk.embedding = embedding + chunks_with_embeddings = flatten_sequence(self._compute_embeddings(chunks)) - logger.debug(f"Adding {len(chunks)} chunks to vector store") - chunks_vector_store = self.chunks_vector_store.access_vector_store() + acc = 0 + for chunk_batch in batch_sequence(chunks_with_embeddings, 1000): + acc += len(chunk_batch) + logger.debug(f"Adding {acc}/{len(chunks)} chunks to vector store") - # We have to explicitly convert here even though the types are compatible (TextNode inherits from BaseNode) - # because the "add" annotation uses List instead of Sequence. We need to use TextNode explicitly because - # we're capturing "text". - converted_chunks: List[BaseNode] = [chunk for chunk in chunks] - chunks_vector_store.add(converted_chunks) + # We have to explicitly convert here even though the types are compatible (TextNode inherits from BaseNode) + # because the "add" annotation uses List instead of Sequence. We need to use TextNode explicitly because + # we're capturing "text". + converted_chunks: List[BaseNode] = [chunk for chunk in chunk_batch] + + chunks_vector_store = self.chunks_vector_store.access_vector_store() + chunks_vector_store.add(converted_chunks) logger.debug(f"Indexing file: {file_path} completed") def _documents_in_file( - self, reader: BaseReader, file_path: Path, file_id: str + self, reader: BaseReader, file_path: Path, document_id: str ) -> List[Document]: documents = reader.load_data(file_path) for i, document in enumerate(documents): - # Update the document metadata - document.id_ = file_id + document.id_ = document_id document.metadata["file_name"] = os.path.basename(file_path) - document.metadata["document_id"] = file_id + document.metadata["document_id"] = document_id document.metadata["document_part_number"] = i document.metadata["data_source_id"] = self.data_source_id @@ -155,3 +161,24 @@ def _chunks_in_document(self, document: Document) -> List[TextNode]: converted_chunks.append(chunk) return converted_chunks + + def _compute_embeddings( + self, chunks: List[TextNode] + ) -> Generator[List[TextNode], None, None]: + batched_chunks = list(batch_sequence(chunks, 100)) + batched_texts = [[chunk.text for chunk in batch] for batch in batched_chunks] + + with ThreadPoolExecutor(max_workers=20) as executor: + futures = [ + executor.submit( + lambda b: (i, self.embedding_model.get_text_embedding_batch(b)), + batch, + ) + for i, batch in enumerate(batched_texts) + ] + logger.debug(f"Waiting for {len(futures)} futures") + for future in as_completed(futures): + i, batch_embeddings = future.result() + for chunk, embedding in zip(batched_chunks[i], batch_embeddings): + chunk.embedding = embedding + yield batched_chunks[i] diff --git a/llm-service/app/ai/indexing/readers/csv.py b/llm-service/app/ai/indexing/readers/csv.py new file mode 100644 index 00000000..5061627c --- /dev/null +++ b/llm-service/app/ai/indexing/readers/csv.py @@ -0,0 +1,56 @@ +# +# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP) +# (C) Cloudera, Inc. 2024 +# All rights reserved. +# +# Applicable Open Source License: Apache 2.0 +# +# NOTE: Cloudera open source products are modular software products +# made up of hundreds of individual components, each of which was +# individually copyrighted. Each Cloudera open source product is a +# collective work under U.S. Copyright Law. Your license to use the +# collective work is as provided in your written agreement with +# Cloudera. Used apart from the collective work, this file is +# licensed for your use pursuant to the open source license +# identified above. +# +# This code is provided to you pursuant a written agreement with +# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute +# this code. If you do not have a written agreement with Cloudera nor +# with an authorized and properly licensed third party, you do not +# have any rights to access nor to use this code. +# +# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the +# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY +# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED +# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO +# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU, +# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS +# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE +# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR +# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES +# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF +# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF +# DATA. +# + +import json +from pathlib import Path +from typing import List + +import pandas as pd +from llama_index.core.readers.base import BaseReader +from llama_index.core.schema import Document + + +class CSVReader(BaseReader): + def load_data(self, file_path: Path) -> List[Document]: + # Read the CSV file into a pandas DataFrame + df = pd.read_csv(file_path) + # Convert the dataframe into a list of dictionaries, one per row + rows = df.to_dict(orient="records") + # Convert each dictionary into a Document + documents = [Document(text=json.dumps(row, sort_keys=True)) for row in rows] + return documents diff --git a/llm-service/app/ai/indexing/readers/json.py b/llm-service/app/ai/indexing/readers/json.py new file mode 100644 index 00000000..e122a1e7 --- /dev/null +++ b/llm-service/app/ai/indexing/readers/json.py @@ -0,0 +1,51 @@ +# +# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP) +# (C) Cloudera, Inc. 2024 +# All rights reserved. +# +# Applicable Open Source License: Apache 2.0 +# +# NOTE: Cloudera open source products are modular software products +# made up of hundreds of individual components, each of which was +# individually copyrighted. Each Cloudera open source product is a +# collective work under U.S. Copyright Law. Your license to use the +# collective work is as provided in your written agreement with +# Cloudera. Used apart from the collective work, this file is +# licensed for your use pursuant to the open source license +# identified above. +# +# This code is provided to you pursuant a written agreement with +# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute +# this code. If you do not have a written agreement with Cloudera nor +# with an authorized and properly licensed third party, you do not +# have any rights to access nor to use this code. +# +# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the +# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY +# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED +# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO +# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU, +# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS +# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE +# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR +# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES +# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF +# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF +# DATA. +# + +import json +from pathlib import Path +from typing import List + +from llama_index.core.readers.base import BaseReader +from llama_index.core.schema import Document + + +class JSONReader(BaseReader): + def load_data(self, file_path: Path) -> List[Document]: + with open(file_path, "r") as f: + content = json.load(f) + return [Document(text=json.dumps(content, sort_keys=True))] diff --git a/llm-service/app/ai/indexing/readers/nop.py b/llm-service/app/ai/indexing/readers/nop.py index 3320e79f..aaa01e20 100644 --- a/llm-service/app/ai/indexing/readers/nop.py +++ b/llm-service/app/ai/indexing/readers/nop.py @@ -36,12 +36,14 @@ # DATA. # +from pathlib import Path from typing import List + from llama_index.core.readers.base import BaseReader from llama_index.core.schema import Document class NopReader(BaseReader): - def load_data(self, file_path: str) -> List[Document]: + def load_data(self, file_path: Path) -> List[Document]: with open(file_path, "r") as f: return [Document(text=f.read())] diff --git a/llm-service/app/services/utils.py b/llm-service/app/services/utils.py index 6ac9893c..6aeee03f 100644 --- a/llm-service/app/services/utils.py +++ b/llm-service/app/services/utils.py @@ -37,8 +37,7 @@ # ############################################################################## import re -from typing import List, Tuple - +from typing import Generator, List, Sequence, Tuple, TypeVar, Union # TODO delete this if it's not being used @@ -82,3 +81,27 @@ def parse_choice_select_answer_fn( def get_last_segment(path: str) -> str: return path.split("/")[-1] + + +T = TypeVar("T") + + +def batch_sequence( + sequence: Union[Sequence[T], Generator[T, None, None]], batch_size: int +) -> Generator[List[T], None, None]: + batch = [] + for val in sequence: + batch.append(val) + if len(batch) == batch_size: + yield batch + batch = [] + if batch: + yield batch + + +def flatten_sequence( + sequence: Union[Sequence[Sequence[T]], Generator[Sequence[T], None, None]], +) -> Generator[T, None, None]: + for sublist in sequence: + for item in sublist: + yield item diff --git a/llm-service/app/tests/routers/index/test_data_source.py b/llm-service/app/tests/routers/index/test_data_source.py index b6450f46..5498d623 100644 --- a/llm-service/app/tests/routers/index/test_data_source.py +++ b/llm-service/app/tests/routers/index/test_data_source.py @@ -74,7 +74,7 @@ def test_create_document( assert document_id is not None index = get_vector_store_index(data_source_id) vectors = index.vector_store.query( - VectorStoreQuery(query_embedding=[0.66] * 1024, doc_ids=[document_id]) + VectorStoreQuery(query_embedding=[0.66] * 1024) ) assert len(vectors.nodes or []) == 1 @@ -93,7 +93,7 @@ def test_delete_data_source( index = get_vector_store_index(data_source_id) vectors = index.vector_store.query( - VectorStoreQuery(query_embedding=[0.66] * 1024, doc_ids=[document_id]) + VectorStoreQuery(query_embedding=[0.66] * 1024) ) assert len(vectors.nodes or []) == 1 @@ -122,9 +122,11 @@ def test_delete_document( index = get_vector_store_index(data_source_id) vectors = index.vector_store.query( - VectorStoreQuery(query_embedding=[0.2] * 1024, doc_ids=[document_id]) + VectorStoreQuery(query_embedding=[0.2] * 1024) ) assert len(vectors.nodes or []) == 1 + print(document_id) + print("\n" * 10) response = client.delete( f"/data_sources/{data_source_id}/documents/{document_id}" @@ -133,7 +135,7 @@ def test_delete_document( index = get_vector_store_index(data_source_id) vectors = index.vector_store.query( - VectorStoreQuery(query_embedding=[0.2] * 1024, doc_ids=[document_id]) + VectorStoreQuery(query_embedding=[0.2] * 1024) ) assert len(vectors.nodes or []) == 0 diff --git a/llm-service/pdm.lock b/llm-service/pdm.lock index cddb261f..f7cc5668 100644 --- a/llm-service/pdm.lock +++ b/llm-service/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:2a9b3e86ee90d639241d72fdeee20e779c2e7c42e90ab2e43e335c18454e0858" +content_hash = "sha256:37f4e048b91c6850dc6133d0ab73273fa296454ed6776ad177856e619ff421d3" [[metadata.targets]] requires_python = "==3.10.*" diff --git a/llm-service/pyproject.toml b/llm-service/pyproject.toml index d2218846..eeff2848 100644 --- a/llm-service/pyproject.toml +++ b/llm-service/pyproject.toml @@ -5,7 +5,7 @@ description = "Default template for PDM package" authors = [ {name = "Conrado Silva Miranda", email = "csilvamiranda@cloudera.com"}, ] -dependencies = ["llama-index-core==0.10.68", "llama-index-readers-file==0.1.33", "fastapi==0.111.0", "pydantic==2.8.2", "pydantic-settings==2.3.4", "boto3>=1.35.66", "llama-index-embeddings-bedrock==0.2.1", "llama-index-llms-bedrock==0.1.13", "llama-index-llms-openai==0.1.31", "llama-index-llms-mistralai==0.1.20", "llama-index-embeddings-openai==0.1.11", "llama-index-vector-stores-qdrant==0.2.17", "docx2txt>=0.8"] +dependencies = ["llama-index-core==0.10.68", "llama-index-readers-file==0.1.33", "fastapi==0.111.0", "pydantic==2.8.2", "pydantic-settings==2.3.4", "boto3>=1.35.66", "llama-index-embeddings-bedrock==0.2.1", "llama-index-llms-bedrock==0.1.13", "llama-index-llms-openai==0.1.31", "llama-index-llms-mistralai==0.1.20", "llama-index-embeddings-openai==0.1.11", "llama-index-vector-stores-qdrant==0.2.17", "docx2txt>=0.8", "pandas>=2.2.3"] requires-python = "==3.10.*" readme = "README.md" license = {text = "APACHE"}