Skip to content

Commit

Permalink
Merge pull request #412 from microsoft/geeal/6317-embeddings-retry
Browse files Browse the repository at this point in the history
Geeal/6317 embeddings retry
  • Loading branch information
georearl authored Dec 20, 2023
2 parents 610eeba + e9dc181 commit 5710995
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions app/enrichment/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ def poll_queue() -> None:
chunk_list = container_client.list_blobs(name_starts_with=chunk_folder_path)
chunks = list(chunk_list)
i = 0

for chunk in chunks:

statusLog.update_document_state( blob_path, f"Indexing {i+1}/{len(chunks)}", State.INDEXING)
# statusLog.update_document_state( blob_path, f"Indexing {i+1}/{len(chunks)}", State.PROCESSING
# statusLog.update_document_state( blob_path, f"Indexing {i+1}/{len(chunks)}", State.PROCESSING
# open the file and extract the content
blob_path_plus_sas = utilities_helper.get_blob_and_sas(
ENV["AZURE_BLOB_STORAGE_CONTAINER"] + '/' + chunk.name)
Expand All @@ -345,15 +345,18 @@ def poll_queue() -> None:
chunk_dict["content"]
)

# create embedding
embedding = embed_texts(target_embeddings_model, [text])
if 'data' in embedding:

try:
# try first to read the embedding from the chunk, in case it was already created
embedding_data = chunk_dict['contentVector']
except KeyError:
# create embedding
embedding = embed_texts(target_embeddings_model, [text])
embedding_data = embedding['data']
else:
raise ValueError(embedding['message'])

tag_list = get_tags_and_upload_to_cosmos(blob_service_client, chunk_dict["file_name"])

# PRepare the index schema based representation of the chunk with the embedding
index_chunk = {}
index_chunk['id'] = statusLog.encode_document_id(chunk.name)
index_chunk['processed_datetime'] = f"{chunk_dict['processed_datetime']}+00:00"
Expand All @@ -371,9 +374,15 @@ def poll_queue() -> None:
index_chunk['entities'] = chunk_dict["entities"]
index_chunk['key_phrases'] = chunk_dict["key_phrases"]
index_chunks.append(index_chunk)

# write the updated chunk, with embedding to storage in case of failure
chunk_dict['contentVector'] = embedding_data
json_str = json.dumps(chunk_dict, indent=2, ensure_ascii=False)
block_blob_client = blob_service_client.get_blob_client(container=ENV["AZURE_BLOB_STORAGE_CONTAINER"], blob=chunk.name)
block_blob_client.upload_blob(json_str, overwrite=True)
i += 1

# push batch of content to index
# push batch of content to index, rather than each individual chunk
if i % 200 == 0:
index_sections(index_chunks)
index_chunks = []
Expand Down

0 comments on commit 5710995

Please sign in to comment.