Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake Cortex destination : Bug fixes #38206

Merged
merged 5 commits into from
May 15, 2024

Conversation

bindipankhudi
Copy link
Contributor

@bindipankhudi bindipankhudi commented May 15, 2024

This PR addresses the following:

  1. UI related fixes
  • Order params to match snowflake destination
  • Remove secret field from params (which was hiding what was being typed)
  • Updated doc link (was linked to Pinecone earlier)
  1. Update to write logic
  • For destinationMode=Overwrite, we first delete all records for the specified stream and then call cortexProcessor with WriteStrategy.append otherwise the batch size enforced by vector_db_based results in records getting overwritten every time batch size is met. We usually do this for all vector dbs, I missed it earlier.

@bindipankhudi bindipankhudi requested a review from a team as a code owner May 15, 2024 03:18
Copy link

vercel bot commented May 15, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 15, 2024 1:18pm

@bindipankhudi bindipankhudi changed the title Snowflake Cortex destination : Minor fixes post release Snowflake Cortex destination : Bug fixes May 15, 2024
@@ -16,7 +16,7 @@
from destination_snowflake_cortex.config import ConfigModel
from destination_snowflake_cortex.indexer import SnowflakeCortexIndexer

BATCH_SIZE = 32
BATCH_SIZE = 150
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

32 seemed to low in general. each batch calls pyairbyte for write once.

@@ -85,7 +86,7 @@ def _get_updated_catalog(self) -> ConfiguredAirbyteCatalog:
metadata -> metadata of the record
embedding -> embedding of the document content
"""
updated_catalog = self.catalog
updated_catalog = copy.deepcopy(self.catalog)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed to not change the original catalog since this method is called twice

pass

def pre_sync(self, catalog: ConfiguredAirbyteCatalog) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new method - meant to be implemented for vector dbs - deletes records beforehand for overwrite.

@@ -144,7 +145,8 @@ def get_write_strategy(self, stream_name: str) -> WriteStrategy:
for stream in self.catalog.streams:
if stream.stream.name == stream_name:
if stream.destination_sync_mode == DestinationSyncMode.overwrite:
return WriteStrategy.REPLACE
# we will use append here since we will remove the existing records and add new ones.
return WriteStrategy.APPEND
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for overwrite mode we delete records first and just use append in pyairbyte, since data is sent in batches.

Copy link
Collaborator

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving with one caveat. When in 'replace' mode, we would ideally load to a stage table and then swap the table name with the existing table after the load is complete. The SQLProcessor class should do this automatically when in 'replace' mode, but it may require a refactor to actually implement this with confidence.

So, non-blocking, but something to think about for next iterations.

Otherwise, this all looks great! :shipit:

@bindipankhudi
Copy link
Contributor Author

Approving with one caveat. When in 'replace' mode, we would ideally load to a stage table and then swap the table name with the existing table after the load is complete. The SQLProcessor class should do this automatically when in 'replace' mode, but it may require a refactor to actually implement this with confidence.

So, non-blocking, but something to think about for next iterations.

Yea, that makes sense. Created this issue: https://github.com/airbytehq/airbyte-internal-issues/issues/7928

@bindipankhudi bindipankhudi merged commit e19e634 into master May 15, 2024
39 checks passed
@bindipankhudi bindipankhudi deleted the bindi/snowflake-cortex-minor-fixups branch May 15, 2024 16:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/destination/snowflake-cortex
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants