-
Notifications
You must be signed in to change notification settings - Fork 238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce hybrid (CPU) scan for Parquet read #11720
base: branch-25.02
Are you sure you want to change the base?
Conversation
It's draft, may missed some code change, will double check later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elaborate in the headline and description what this PR is doing. C2C is not a well-known acronym in the project and is not very descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a quick look at the code. Nothing too in depth.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
Passed IT. Tested conventional Spark-Rapids jar and regular Spark-Rapids jar. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to do some manual testing on my own to try and understand what is happening here and how this is all working. It may take a while.
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridBackend.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala
Outdated
Show resolved
Hide resolved
case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false | ||
// For the time being, BinaryType is not supported yet | ||
case _: BinaryType => false | ||
case _ => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
facebookincubator/velox#9560 I am not an expert, and I don't even know what version of velox we will end up using. It sounds like it is plugable. But according to this, even the latest version of velox cannot handle bytes/TINYINT. We are not looking for spaces in the names of columns, among other issues. I know that other implementations fall back for even more things. Should we be concerned about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gluten uses another velox repo, code link
VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=gluten-1.2.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be something we should remember once we switch to use facebookincubator/velox directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that if the gluten/velox version we use is pluggable, then we need to have some clear documentation on exactly which version you need to be based off of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Chong has added hybrid-execution.md to clarify the 1.2.0 version of Gluten.
sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ScanExecShims.scala
Outdated
Show resolved
Hide resolved
…park 322,331,343,351
sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/spark331/ScanExecShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark322/scala/com/nvidia/spark/rapids/shims/spark322/ScanExecShims.scala
Outdated
Show resolved
Hide resolved
…ly supports 3.2.2, 3.3.1, 3.4.2, and 3.5.1.
build |
Depending on deoloying Hybrid 25.02 jar into Maven repo. @NvTimLiu |
build |
"Hybrid jar is not in the classpath, Please add Hybrid jar into the class path, or " + | ||
"Please disable Hybrid feature by setting " + | ||
"spark.rapids.sql.parquet.useHybridReader=false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong exception message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not get the point, could you provide the message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in checkJavaVersion. Shouldn't the message be related to Java version? I think you copied the code from other place but forgot to modify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove this check if other Java versions are compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, passed for Java 11, removed this check.
try { | ||
Class.forName(HYBRID_JAR_PLUGIN_CLASS_NAME) | ||
} catch { | ||
case e: ClassNotFoundException => throw new RuntimeException( | ||
"Hybrid jar is not in the classpath, Please add Hybrid jar into the class path, or " + | ||
"Please disable Hybrid feature by setting " + | ||
"spark.rapids.sql.parquet.useHybridReader=false", e) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this way to check the class only works on driver side.
Do we need to check on executor side as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this way to check the class only works on driver side.
Yes.
Do we need to check on executor side as well?
Yes. Will check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly just more questions for me to understand what is happening. This looks a lot better. I assume a lot of the code that is very picky about getting the exact setup right is here just because that is what this code has been tested with.
# MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen) | ||
], | ||
] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some tests to validate that predicate push down and filtering is working correctly? It would be nice to have
- simple filters
- complex filters that are not supported by normal parquet predicate push down. (like the ors at the top level instead of ands)
- filters that have operators in them that velox does not support, but spark rapids does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed internally before, the decision is putting into a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow-up issue filed: #11892
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala
Outdated
Show resolved
Hide resolved
case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false | ||
// For the time being, BinaryType is not supported yet | ||
case _: BinaryType => false | ||
case _ => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that if the gluten/velox version we use is pluggable, then we need to have some clear documentation on exactly which version you need to be based off of.
lazy val allSupportedTypes = fsse.requiredSchema.exists { field => | ||
TrampolineUtil.dataTypeExistsRecursively(field.dataType, { | ||
// For the time being, the native backend may return incorrect results over nestedMap | ||
case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about if it is a MapType, but kt or vt is not directly a map, but might be a LIST of MAP, so a struct with a MAP in it? Do we know the cause of this error so that we can limit things properly? If not then I would rather just stick with a MAP at the top level and any nested maps are not allowed.
Also what happens if the data is a LIST Internally in Parquet a Map is just a LIST<STRUCT<KEY, VALUE>> would we have similar issues if we had one of them be nested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @revans2 , I am sorry that I did not audit carefully on which types is unsupported by native backend. Just before, I ran a rather comprehensive test:
hybrid_gens_test = [
# failed
[decimal_gen_32bit_neg_scale],
[decimal_gen_128bit],
decimal_64_map_gens,
[MapGen(TimestampGen(nullable=False), ArrayGen(string_gen))],
[MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), TimestampGen())],
[MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), decimal_gen_32bit)],
[MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), decimal_gen_64bit)],
# failed
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), decimal_gen_128bit)],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen))],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(ArrayGen(long_gen)))],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(ArrayGen(string_gen)))],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False),
StructGen([['child0', string_gen],
['child1', double_gen],
['child2', int_gen],
['child3', StructGen([['child0', ArrayGen(byte_gen)],
['child1', byte_gen],
['child2', float_gen],
['child3', decimal_gen_64bit]])]]))
],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False),
StructGen([['child0', ArrayGen(ArrayGen(long_gen))],
['child1', ArrayGen(string_gen)],
['child2', ArrayGen(ArrayGen(string_gen))]]))
],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False),
ArrayGen(MapGen(LongGen(nullable=False), long_gen)))],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False),
ArrayGen(MapGen(IntegerGen(nullable=False), string_gen)))],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False),
ArrayGen(ArrayGen(MapGen(IntegerGen(nullable=False), string_gen))))],
[ArrayGen(ArrayGen(string_gen))],
[ArrayGen(ArrayGen(long_gen))],
# failed
[ArrayGen(MapGen(LongGen(nullable=False), long_gen))],
# failed
[ArrayGen(MapGen(StringGen(pattern='key_[0-9]', nullable=False), long_gen))],
# failed
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), MapGen(LongGen(nullable=False), long_gen))],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), MapGen(LongGen(nullable=False), string_gen))],
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)],
# failed
[StructGen([['child0', MapGen(LongGen(nullable=False), long_gen)],
['child1', MapGen(StringGen(pattern='key_[0-9]', nullable=False), long_gen)],
['child2', MapGen(IntegerGen(nullable=False), decimal_gen_64bit)],
['child3', StructGen([["cc", MapGen(IntegerGen(nullable=False), decimal_gen_32bit)]])]
]),
],
[StructGen([['cc', MapGen(IntegerGen(nullable=False), decimal_gen_64bit)]])],
# failed
[StructGen([['cc', ArrayGen(MapGen(IntegerGen(nullable=False), string_gen))]])],
[StructGen([['cc', ArrayGen(ArrayGen(MapGen(IntegerGen(nullable=False), string_gen)))]])],
]
The test result suggested the unsupported types are:
- Decimal with negative scale is NOT supported
- Decimal128 inside nested types is NOT supported
- BinaryType is NOT supported
- MapType inside nested types (Struct of Map/Array of Map/Map of Map) is NOT fully supported
I reworked the typeCheck function are integration tests according to the new finding.
...gin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/HybridFileSourceScanExecMeta.scala
Outdated
Show resolved
Hide resolved
if (javaVersion == null) { | ||
throw new RuntimeException("Hybrid feature: Can not read java.version, get null") | ||
} | ||
if (!javaVersion.startsWith("1.8")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it only work with java 1.8? Newer versions are supposed to be backwards compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will test other Java version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, passed for Java 11, removed this check.
*/ | ||
private def checkScalaVersion(): Unit = { | ||
val scalaVersion = scala.util.Properties.versionString | ||
if (!scalaVersion.startsWith("version 2.12")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have shims and a separate jar for scala 2.13. @gerashegalov is there a way for us to have scala 2.13 specific code that would just fail instead of doing a check like this?
(fsse, conf, p, r) => { | ||
// TODO: HybridScan supports DataSourceV2 | ||
if (HybridFileSourceScanExecMeta.useHybridScan(conf, fsse)) { | ||
// Check if runtimes are satisfied: Spark is not Databricks or CDH; Java version is 1.8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not databricks or CDH? Is it just that we have not tested with these yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because have not tested with CDH and Databricks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently do not have customer to use CDH and Databricks; Did not test perf on CDH and Databricks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have confidence that the Hybrid thing support Databricks spark totally. So, for first version, we consider not to support Databricks.
@@ -2895,6 +2912,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { | |||
|
|||
lazy val avroDebugDumpAlways: Boolean = get(AVRO_DEBUG_DUMP_ALWAYS) | |||
|
|||
lazy val useHybridParquetReader: Boolean = get(HYBRID_PARQUET_READER) | |||
|
|||
lazy val loadHybridBackend: Boolean = get(LOAD_HYBRID_BACKEND) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, it's only used like it must be true if useHybridParquetReader is true.
Where is the code to check this config then load the backend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOAD_HYBRID_BACKEND
is a startup config, while HYBRID_PARQUET_READER
is not. User can config LOAD_HYBRID_BACKEND
as true on the startup time, and enable/disable HYBRID_PARQUET_READER
at runtime on the fly. This is more flexible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have some code to check LOAD_HYBRID_BACKEND then try to load the jar when initializing the driver and executor plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in Hybrid execution repo. It's like:
HybridPluginWrapper.java:
HybridPluginWrapper {
DriverPluginWrapper {
if (LOAD_HYBRID_BACKEND) {
load_impl()
}
}
ExecutorPluginWrapper {
if (LOAD_HYBRID_BACKEND) {
load_impl()
}
}
}
Also Hybrid execution repo provides a config file spark-rapids-extra-plugins
:
com.nvidia.spark.rapids.hybrid.HybridPluginWrapper
If the Hybrid jar in the classpach, Rapids Plugin uses a reflection approach to load the HybridPluginWrapper
and init it. Of course, if LOAD_HYBRID_BACKEND is disable, then the hybrid plugin will be not loaded.
Signed-off-by: sperlingxx <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My issues have pretty much all been addressed and my questions answered. I do want to see a follow on issue filed for #11720 (comment)
I also want to understand the plan for documentation. I get that this is still very early and the configs are all marked as internal so I am okay with where it is at right now. I am not going to approve it yet because I want to hear from others on this too.
As discussed with Chong, we also need a doc to describe how to build Gluten/Velox jar for the case that the external users want to have a try. |
Introduce hybrid (CPU) scan for Parquet read
This PR leverages Gluten/Velox to do scan on CPU.
hybrid feature contains
gluten-public
rapids-hybrid-execution
, branch 1.2This PR
Add Shims
build for all shims: 320-324, 330-334, 340-344, 350-353, CDHs, Databricks, throw runtime error if it's CDH or Databricks runtime.
Checks
Call to Hybrid JNI to do Parquet scan
Limitations
supports more Spark versions than Gluten official supports
The Gluten official doc says only support Spark 322, 331, 342, 351.
Hybrid supports totally 19 Spark versions(320-324, 330-334, 340-344, 350-353 ), and add doc to config
HYBRID_PARQUET_READER
that other versions except Gluten official supports are not fully tested.tests
Signed-off-by: sperlingxx [email protected]
Signed-off-by: Chong Gao [email protected]