Skip to content

Commit

Permalink
[Spark] Refactor out Delta read path (#4041)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

Refactored out the following read path functionality:
- V2 -> V1 relation conversions.
- Table with partition filters resolution.

This is a refactor-only change to support the single-pass Analyzer
project in Spark: https://issues.apache.org/jira/browse/SPARK-49834.

It will be used in single-pass resolver extensions:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverExtension.scala.

## How was this patch tested?

Existing tests.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
vladimirg-db authored Jan 14, 2025
1 parent b1139d8 commit c979a89
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,7 @@ class DeltaAnalysis(session: SparkSession)
}
DeltaDynamicPartitionOverwriteCommand(r, d, adjustedQuery, o.writeOptions, o.isByName)

// Pull out the partition filter that may be part of the FileIndex. This can happen when someone
// queries a Delta table such as spark.read.format("delta").load("/some/table/partition=2")
case l @ DeltaTable(index: TahoeLogFileIndex) if index.partitionFilters.nonEmpty =>
Filter(
index.partitionFilters.reduce(And),
DeltaTableUtils.replaceFileIndex(l, index.copy(partitionFilters = Nil)))
case ResolveDeltaTableWithPartitionFilters(plan) => plan

// SQL CDC table value functions "table_changes" and "table_changes_by_path"
case stmt: CDCStatementBase if stmt.functionArgs.forall(_.resolved) =>
Expand Down Expand Up @@ -442,10 +437,7 @@ class DeltaAnalysis(session: SparkSession)

case d: DescribeDeltaHistory if d.childrenResolved => d.toCommand

// This rule falls back to V1 nodes, since we don't have a V2 reader for Delta right now
case dsv2 @ DataSourceV2Relation(d: DeltaTableV2, _, _, _, options)
if dsv2.getTagValue(DeltaRelation.KEEP_AS_V2_RELATION_TAG).isEmpty =>
DeltaRelation.fromV2Relation(d, dsv2, options)
case FallbackToV1DeltaRelation(v1Relation) => v1Relation

case ResolvedTable(_, _, d: DeltaTableV2, _) if d.catalogTable.isEmpty && !d.tableExists =>
// This is DDL on a path based table that doesn't exist. CREATE will not hit this path, most
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
* Fall back to V1 nodes, since we don't have a V2 reader for Delta right now
*/
object FallbackToV1DeltaRelation {
def unapply(dsv2: DataSourceV2Relation): Option[LogicalRelation] = dsv2.table match {
case d: DeltaTableV2 if dsv2.getTagValue(DeltaRelation.KEEP_AS_V2_RELATION_TAG).isEmpty =>
Some(DeltaRelation.fromV2Relation(d, dsv2, dsv2.options))
case _ => None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.catalyst.expressions.And
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Filter}
import org.apache.spark.sql.delta.files.TahoeLogFileIndex

/**
* Pull out the partition filter that may be part of the FileIndex. This can happen when someone
* queries a Delta table such as spark.read.format("delta").load("/some/table/partition=2")
*/
object ResolveDeltaTableWithPartitionFilters {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
case relation @ DeltaTable(index: TahoeLogFileIndex) if index.partitionFilters.nonEmpty =>
val result = Filter(
index.partitionFilters.reduce(And),
DeltaTableUtils.replaceFileIndex(relation, index.copy(partitionFilters = Nil))
)
Some(result)
case _ => None
}
}

0 comments on commit c979a89

Please sign in to comment.