Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error to write decimal(38, 10) to bigquery bigdecimal #1331

Open
natansilva opened this issue Dec 26, 2024 · 3 comments
Open

Error to write decimal(38, 10) to bigquery bigdecimal #1331

natansilva opened this issue Dec 26, 2024 · 3 comments
Assignees

Comments

@natansilva
Copy link

Hi,
We are trying to write a Spark Dataframe to BigQuery, however we have been facing an issue related to the chosen type. The library attempts to use the NUMERIC type, but the precision is insufficient. Instead, it should be written by using BIGNUMERIC type.
Our jobs are running in a Dataproc cluster with 2.2.40-debian12 and SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.1.jar

I have tried to create the cluster with SPARK_BQ_CONNECTOR_VERSION=0.35.0 and SPARK_BQ_CONNECTOR_VERSION=0.41.1 settings too.

BigQuery data type
image

Dataframe data type

StructField("field_10", DecimalType(38, 10), True),

We are writing using this options

      dataframe.write
                .format("com.google.cloud.spark.bigquery")
                .mode("append")
                .option("project", billing_project_id)
                .option("table", f"{project_id}.{dataset}.{destination_table}")
                .option("writeMethod", "indirect")
                .option("temporaryGcsBucket", f"{destination_bucket}")
                .option("intermediateFormat", "avro")
                .option("useAvroLogicalTypes", True)
                .option("bigNumericDefaultPrecision", 38)
                .option("bigNumericDefaultScale", 10)
                .save()

Error

: java.lang.RuntimeException: Failed to write to BigQuery
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:94)
        at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:43)
        at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:111)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:473)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:473)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:449)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy53.call(Unknown Source)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:53)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:53)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:34)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table sample_table. Field field_10 has changed type from BIGNUMERIC to NUMERIC
        at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.reload(Job.java:419)
        at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:252)
        at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:305)
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.finishedJob$lzycompute$1(BigQueryWriteHelper.scala:154)
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.finishedJob$1(BigQueryWriteHelper.scala:154)
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.$anonfun$loadDataToBigQuery$3(BigQueryWriteHelper.scala:156)
        at org.apache.spark.internal.Logging.logInfo(Logging.scala:60)
        at org.apache.spark.internal.Logging.logInfo$(Logging.scala:59)
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.logInfo(BigQueryWriteHelper.scala:34)
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:156)
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:91)
        ... 75 more
@isha97
Copy link
Member

isha97 commented Jan 1, 2025

Hi @natansilva ,

Can you please specify how you are creating the dataframe? Please open a ticket with Google support to provide more details about the BigQuery job so that we can investigate further.

@vpaithankar
Copy link

This seems related issue, I am also facing the same issue as scale beyond 9 is not supported by the spark bq connector. I am using version 0.26
This issue was closed stating that this is fixed in 0.31 version.
#525

Can someone confirm if this is still a limitation with spark bigquery connector ?

@isha97
Copy link
Member

isha97 commented Jan 3, 2025

@vpaithankar you can configure the precision and scale using bigNumericDefaultPrecision and bigNumericDefaultScale parameters. Please check the readme for more details on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants