Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add saveMode to CustomFS #1636

Merged
merged 1 commit into from
Dec 24, 2021
Merged

Conversation

chncaesar
Copy link
Contributor

@chncaesar chncaesar commented Dec 24, 2021

What changes were proposed in this pull request?

Problem description

Azure blob supports overwrite operation, however, save overwrite CustomFS.`` throws path wasb://xxx/tmp/json_names_1209 already exists exception. Full exception stack:

path wasb://<container_name>@<account_name>.blob.core.chinacloudapi.cn/tmp/json_names_1209 already exists.
org.apache.spark.sql.AnalysisException: path wasb://<container-name>@<account_name>.blob.core.chinacloudapi.cn/tmp/json_names_1209 already exists.
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:122)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
tech.mlsql.datasource.impl.CustomFS.save(CustomFS.scala:55)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
tech.mlsql.dsl.adaptor.SaveAdaptor.$anonfun$parse$2(SaveAdaptor.scala:123)
scala.Option.map(Option.scala:230)
tech.mlsql.dsl.adaptor.SaveAdaptor.parse(SaveAdaptor.scala:114)
streaming.dsl.ScriptSQLExecListener.execute$1(ScriptSQLExec.scala:408)
streaming.dsl.ScriptSQLExecListener.exitSql(ScriptSQLExec.scala:426)
streaming.dsl.parser.DSLSQLParser$SqlContext.exitRule(DSLSQLParser.java:296)
org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:47)
org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:30)
org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28)
streaming.dsl.ScriptSQLExec$._parse(ScriptSQLExec.scala:160)
streaming.dsl.ScriptSQLExec$.parse(ScriptSQLExec.scala:147)
streaming.rest.RestController.$anonfun$script$1(RestController.scala:153)
tech.mlsql.job.JobManager$.run(JobManager.scala:74)
tech.mlsql.job.JobManager$$anon$1.run(JobManager.scala:91)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

The code snippet that throws exception:

     val pathExists = fs.exists(qualifiedOutputPath)
      (mode, pathExists) match {
        case (SaveMode.ErrorIfExists, true) =>
          throw new AnalysisException(s"path $qualifiedOutputPath already exists.")

This shows that saveMode is ErrorIfExists . To fix the issue, saveMode should be passed to DataFrameWrite.

How was this patch tested?

Test code:

set rawData=''' 
{"jack":1,"jack2":2}
{"jack":2,"jack2":3}
''';
load jsonStr.`rawData` as table1;

SAVE overwrite table1 as FS.`wasb://<container_name>@<azure_account_name>.blob.core.chinacloudapi.cn/tmp/json_names_1` 
where `fs.azure.account.key.<azzure_account_name>.blob.core.chinacloudapi.cn`="<account_key>"
and `fs.AbstractFileSystem.wasb.impl`="org.apache.hadoop.fs.azure.Wasb"
and `fs.wasb.impl`="org.apache.hadoop.fs.azure.NativeAzureFileSystem"
and `fs.AbstractFileSystem.wasbs.impl`="org.apache.hadoop.fs.azure.Wasbs"
and `fs.wasbs.impl`="org.apache.hadoop.fs.azure.NativeAzureFileSystem"
and implClass="parquet"
and mode="overwrite";

Please note, the blob already exists.

Test result, blob is overwritten.

21/12/24 14:22:20  INFO DefaultMLSQLJobProgressListener: [owner] [admin] [groupId] [fd414d7c-070a-49fd-954d-fd7a377b4c09] __MMMMMM__ Total jobs: 1 current job:1 job script:SAVE overwrite table1 as FS.`wasb://<container_name>@<azure_account_name>.blob.core.chinacloudapi.cn/tmp/json_names_1` 
where `fs.azure.account.key.<azzure_account_name>.blob.core.chinacloudapi.cn`="<account_key>"
and `fs.AbstractFileSystem.wasb.impl`="org.apache.hadoop.fs.azure.Wasb"
and `fs.wasb.impl`="org.apache.hadoop.fs.azure.NativeAzureFileSystem"
and `fs.AbstractFileSystem.wasbs.impl`="org.apache.hadoop.fs.azure.Wasbs"
and `fs.wasbs.impl`="org.apache.hadoop.fs.azure.NativeAzureFileSystem"
and implClass="parquet"
and mode="overwrite"
21/12/24 14:22:25  INFO log: Logging initialized @586842ms
21/12/24 14:22:25  WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-azure-file-system.properties,hadoop-metrics2.properties
21/12/24 14:22:25  INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
21/12/24 14:22:25  INFO MetricsSystemImpl: azure-file-system metrics system started
21/12/24 14:22:26  WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1
21/12/24 14:22:26  INFO AzureFileSystemThreadPoolExecutor: Time taken for Delete operation is: 167 ms with threads: 0
21/12/24 14:22:29  INFO CodecPool: Got brand-new compressor [.snappy]
21/12/24 14:22:29  INFO CodecPool: Got brand-new compressor [.snappy]
21/12/24 14:22:29  INFO CodecPool: Got brand-new compressor [.snappy]
21/12/24 14:22:31  WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Rename operation as thread count 0 is <= 1
21/12/24 14:22:31  WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Rename operation as thread count 0 is <= 1
21/12/24 14:22:31  WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Rename operation as thread count 0 is <= 1
21/12/24 14:22:31  INFO AzureFileSystemThreadPoolExecutor: Time taken for Rename operation is: 149 ms with threads: 0
21/12/24 14:22:31  INFO AzureFileSystemThreadPoolExecutor: Time taken for Rename operation is: 158 ms with threads: 0
21/12/24 14:22:31  INFO AzureFileSystemThreadPoolExecutor: Time taken for Rename operation is: 156 ms with threads: 0
21/12/24 14:22:31  INFO FileOutputCommitter: Saved output of task 'attempt_202112241422273423253795147150269_0001_m_000000_3' to wasb://<container_name>@<account_name>.blob.core.chinacloudapi.cn/tmp/json_names_1/_temporary/0/task_202112241422273423253795147150269_0001_m_000000
21/12/24 14:22:31  INFO FileOutputCommitter: Saved output of task 'attempt_202112241422272323792383049341764_0001_m_000001_4' to wasb://<container_name>@<account_name>.blob.core.chinacloudapi.cn/tmp/json_names_1/_temporary/0/task_202112241422272323792383049341764_0001_m_000001
21/12/24 14:22:31  INFO FileOutputCommitter: Saved output of task 'attempt_202112241422273686934055686068635_0001_m_000002_5' to wasb://<container_name>@<account_name>.blob.core.chinacloudapi.cn/tmp/json_names_1/_temporary/0/task_202112241422273686934055686068635_0001_m_000002
21/12/24 14:22:35  WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1

Are there and DOC need to update?

No doc changes

Spark Core Compatibility

Add saveMode to CustomFS
@allwefantasy
Copy link
Contributor

Very clear. LGTM。

@lwz9103 lwz9103 merged commit ab07ba8 into byzer-org:master Dec 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants