You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a spark application that I want to stream messages into BigQuery. Instead of manually having to a merge operation on the BQ side, I'd like to use the native CDC functionality that allows for UPSERTS and DELETES. I created a table in BQ w/ a PK as well as clustered so the pre-requisites are good there.
I added the field _CHANGE_TYPE to my data I wanted to load and omitted that field in the BQ table because its a pseudo column. When I try to use the storage write API I get errors related to it trying to write that pseudo column and their being a schema mismatch between the DF and the BQ schema.
It seems like this CDC feature isn't supported as part of the connector. Is this something that can be done? If not, is this in the planning stages?
I have a spark application that I want to stream messages into BigQuery. Instead of manually having to a merge operation on the BQ side, I'd like to use the native CDC functionality that allows for UPSERTS and DELETES. I created a table in BQ w/ a PK as well as clustered so the pre-requisites are good there.
I added the field _CHANGE_TYPE to my data I wanted to load and omitted that field in the BQ table because its a pseudo column. When I try to use the storage write API I get errors related to it trying to write that pseudo column and their being a schema mismatch between the DF and the BQ schema.
It seems like this CDC feature isn't supported as part of the connector. Is this something that can be done? If not, is this in the planning stages?
simple example:
spark.conf.set('spark.datasource.bigquery.enableModeCheckForSchemaFields',False)
spark.conf.set('spark.datasource.bigquery.writeAtLeastOnce',True)
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count, 'UPSERT' as _CHANGE_TYPE FROM words GROUP BY word')
Save the data to BigQuery
word_count.write.format('bigquery')
.option('writeMethod','direct')
.mode('append')
.save('demo_data.cdc_wordcount')
The text was updated successfully, but these errors were encountered: