Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint write to blob storage using hadoop configuration #674

Open
ddevidence opened this issue Jun 1, 2023 · 3 comments
Open

Checkpoint write to blob storage using hadoop configuration #674

ddevidence opened this issue Jun 1, 2023 · 3 comments
Labels

Comments

@ddevidence
Copy link

ddevidence commented Jun 1, 2023

Bug Report:

Description: Spark Scala streaming application reads dataset from EventHub and writes processed dataset to ADLS Gen2, that part of the application (without hadoop configuration) works fine using the Client Credentials using the following lib

  • azure-eventhubs-spark_2.12 (ver 2.3.22)
  • DataLakeServiceClientBuilder/azure-storage-file-datalake (ver 12.15.0)

The issue is the usage of hadoop configuration using the client-credentials, doesn't work for writing checkpoints to the Blob storage for the streaming application described above

  • Actual behavior: As the application with the hadoop configuration is launched/initializes, the checkpoint IO to the blob storage fails with the following stacktrace

sorry about the long stacktrace..

23/05/30 20:46:21 ERROR AzureNativeFileSystemStore: Service returned StorageException when checking existence of container XXXXXXXXXXX in account XXXXXXXX.blob.core.windows.net com.microsoft.azure.storage.StorageException: An unknown failure occurred : Connection reset at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:67) at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:209) at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:769) at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:756) at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobContainerWrapperImpl.exists(StorageInterfaceImpl.java:233) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:892) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1118) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:566) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1423) at org.apache.hadoop.fs.DelegateToFileSystem.<init>(DelegateToFileSystem.java:54) at org.apache.hadoop.fs.azure.Wasbs.<init>(Wasbs.java:40) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:143) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:181) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:266) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.base/java.security.AccessController.doPrivileged(Unknown Source) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465) at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:316) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:357) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.resolveCheckpointLocation(ResolveWriteToStream.scala:89) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:42) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:40) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:31) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:40) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:270) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:346) at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:430) at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:365) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor$.executeStreamProcessor(StreamEvidenceProcessor.scala:290) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor$.main(StreamEvidenceProcessor.scala:304) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor.main(StreamEvidenceProcessor.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The hadoop configuration code

def createSparkSessionWithHadoopConfigBlobWrite(appName: String): SparkSession = { val spark = SparkSession.builder() .master("local[*]") .appName(appName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.streaming.log.enableTimeStamps", "true") .config("spark.sql.streaming.log.malformedEventLogEnabled", "true") .config(s"spark.hadoop.fs.azure.account.auth.type.$blobStorageAccount.blob.core.windows.net", "OAuth") .config(s"spark.hadoop.fs.azure.account.oauth.provider.type.$blobStorageAccount.blob.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") .config(s"spark.hadoop.fs.azure.account.oauth2.client.id.$blobStorageAccount.blob.core.windows.net", clientId) .config(s"spark.hadoop.fs.azure.account.oauth2.client.secret.$blobStorageAccount.blob.core.windows.net", clientSecret) .config(s"spark.hadoop.fs.azure.account.oauth2.client.endpoint.$blobStorageAccount.blob.core.windows.net", "https://login.microsoftonline.com/XXXXXX-XXXX-XXXX-XXXX-XXXXXXXX/oauth2/token") .config("spark.sql.streaming.checkpointLocation", s"wasbs://$containerName@$blobStorageAccount.blob.core.windows.net/$checkPointWritePath") .getOrCreate() println(s"CHKPOINT Location: wasbs://$containerName@$blobStorageAccount.blob.core.windows.net/$checkPointWritePath") spark }

  • Expected behavior: The Spark Scala streaming application should be able to perform Checkpoint writes to the BLOB storage based on the hadoop configuration provided as part of the SparkSession.builder().config

  • Spark version: 3.4.0

  • spark-eventhubs artifactId and version: azure-eventhubs-spark_2.12 (ver 2.3.22)

@yamin-msft
Copy link
Contributor

Currently, we only support checkpoint location which is a path in an HDFS compatible file system. We don't have a firm deadline for this feature yet.

@ddevidence
Copy link
Author

Ref: https://learn.microsoft.com/en-us/azure/hdinsight/overview-data-lake-storage-gen2#core-functionality-of-azure-data-lake-storage-gen2

• Access that is compatible with Hadoop: In Azure Data Lake Storage Gen2, you can manage and access data just as you would with a Hadoop Distributed File System (HDFS). The Azure Blob File System (ABFS) driver is available within all Apache Hadoop environments, including Azure HDInsight and Azure Databricks. Use ABFS to access data stored in Data Lake Storage Gen2.

– this ref seems to indicate ADLS Gen2 supports Hadoop operations which lead to the assumption the library would support checkpoint write as part of the spark hadoopConfiguration.

Can you provide insight ? if this would be a significant feature update or perhaps something on the lower-end.. just want to have this available on a priority for the use-case at hand.

@yamin-msft
Copy link
Contributor

This will require some works and I am not able to provide a deadline for it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants