Skip to content

Commit

Permalink
[SPARK-50112][SQL] Allowing the TransformWithState operator to use Av…
Browse files Browse the repository at this point in the history
…ro encoding

### What changes were proposed in this pull request?

With the introduction of the TransformWithState operator, we are going to persist all state information in the StateStore with Avro encoding, instead of UnsafeRow. This is because UnsafeRow is not backwards compatible and can change between Spark releases, potentially causing StateStore corruptions. Avro is backwards-compatible and can be reliably used. This change will move the necessary files to the sql/core directory so Avro encoding can be used by stateful streaming operators.

### Why are the changes needed?

To allow classes within sql/core, like the StateTypesEncoder class, to be able to use Avro encoding.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests are sufficient to ensure compilation

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48650 from ericm-db/moving-avro.

Lead-authored-by: Eric Marnadi <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
2 people authored and HyukjinKwon committed Nov 5, 2024
1 parent b81da76 commit 2c4f748
Show file tree
Hide file tree
Showing 13 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ object CheckConnectJvmClientCompatibility {
// Filter unsupported rules:
// Note when muting errors for a method, checks on all overloading methods are also muted.

// Skip any avro files
ProblemFilters.exclude[Problem]("org.apache.spark.sql.avro.*"),

// Skip unsupported packages
ProblemFilters.exclude[Problem]("org.apache.spark.sql.api.*"), // Java, Python, R
ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalyst.*"),
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ object MimaExcludes {

// SPARK-49748: Add getCondition and deprecate getErrorClass in SparkThrowable
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkThrowable.getCondition"),

// SPARK-50112: Moving avro files from connector to sql/core
ProblemFilters.exclude[Problem]("org.apache.spark.sql.avro.*"),
) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.io._

import scala.util.control.NonFatal

import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.LogicalType
import org.apache.avro.{LogicalType, LogicalTypes, Schema}
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ import java.nio.ByteBuffer

import scala.jdk.CollectionConverters._

import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes
import org.apache.avro.LogicalTypes.{LocalTimestampMicros, LocalTimestampMillis, TimestampMicros, TimestampMillis}
import org.apache.avro.Schema
import org.apache.avro.Schema.Type
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed, Record}
import org.apache.avro.util.Utf8

import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.sql.avro

import org.apache.avro.LogicalType
import org.apache.avro.Schema
import org.apache.avro.{LogicalType, Schema}

import org.apache.spark.sql.types.DecimalType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.LogicalTypes.{Date, Decimal, LocalTimestampMicros, LocalTimestampMillis, TimestampMicros, TimestampMillis}
import org.apache.avro.LogicalTypes.{Decimal, _}
import org.apache.avro.Schema.Type._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{FIELD_NAME, FIELD_TYPE, RECURSIVE_DEPTH}
import org.apache.spark.internal.MDC
import org.apache.spark.sql.avro.AvroOptions.RECURSIVE_FIELD_MAX_DEPTH_LIMIT
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types._
Expand Down

0 comments on commit 2c4f748

Please sign in to comment.