From d5dc3075ecea59dfc5949b2bd5b400c51b9c37c8 Mon Sep 17 00:00:00 2001 From: Kevin Pis Date: Sun, 14 Feb 2021 10:54:38 +0800 Subject: [PATCH 1/8] [SPARK-34432][SQL][TEST][FOLLOWUP] Add a java implementation of simple writable data source in `DataSourceV2Suite` ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/19269 In #19269 , there is only a scala implementation of simple writable data source in `DataSourceV2Suite`. This PR adds a java implementation of it. ### Why are the changes needed? To improve test coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuites --- .../JavaSimpleWritableDataSource.java | 374 ++++++++++++++++++ .../sql/connector/DataSourceV2Suite.scala | 3 +- 2 files changed, 375 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java new file mode 100644 index 000000000000..d342614dcf02 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.connector; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.spark.deploy.SparkHadoopUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.connector.TestingV2Source; +import org.apache.spark.sql.connector.catalog.SessionConfigSupport; +import org.apache.spark.sql.connector.catalog.SupportsWrite; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.write.*; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.apache.spark.util.SerializableConfiguration; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; + +public class JavaSimpleWritableDataSource implements TestingV2Source, SessionConfigSupport { + + private final StructType tableSchema = new StructType().add("i", "long").add("j", "long"); + + @Override + public String keyPrefix() { + return "javaSimpleWritableDataSource"; + } + + @Override + public Table getTable(CaseInsensitiveStringMap options) { + return new MyTable(options); + } + + static class JavaCSVInputPartitionReader implements InputPartition { + private String path; + + JavaCSVInputPartitionReader(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + } + + static class JavaCSVReaderFactory implements PartitionReaderFactory { + + private final SerializableConfiguration conf; + + JavaCSVReaderFactory(SerializableConfiguration conf) { + this.conf = conf; + } + + @Override + public PartitionReader createReader(InputPartition partition) { + String path = ((JavaCSVInputPartitionReader) partition).getPath(); + Path filePath = new Path(path); + try { + FileSystem fs = filePath.getFileSystem(conf.value()); + return new PartitionReader() { + private final FSDataInputStream inputStream = fs.open(filePath); + private final Iterator lines = + new BufferedReader(new InputStreamReader(inputStream)).lines().iterator(); + private String currentLine = ""; + + @Override + public boolean next() { + if (lines.hasNext()) { + currentLine = lines.next(); + return true; + } else { + return false; + } + } + + @Override + public InternalRow get() { + Object[] objects = + Arrays.stream(currentLine.split(",")) + .map(String::trim) + .map(Long::parseLong) + .toArray(); + return new GenericInternalRow(objects); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + }; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + static class JavaSimpleCounter { + private static Integer count = 0; + + public static void increaseCounter() { + count += 1; + } + + public static int getCounter() { + return count; + } + + public static void resetCounter() { + count = 0; + } + } + + static class JavaCSVDataWriterFactory implements DataWriterFactory { + private final String path; + private final String jobId; + private final SerializableConfiguration conf; + + JavaCSVDataWriterFactory(String path, String jobId, SerializableConfiguration conf) { + this.path = path; + this.jobId = jobId; + this.conf = conf; + } + + @Override + public DataWriter createWriter(int partitionId, long taskId) { + try { + Path jobPath = new Path(new Path(path, "_temporary"), jobId); + Path filePath = new Path(jobPath, String.format("%s-%d-%d", jobId, partitionId, taskId)); + FileSystem fs = filePath.getFileSystem(conf.value()); + return new JavaCSVDataWriter(fs, filePath); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + static class JavaCSVDataWriter implements DataWriter { + private final FileSystem fs; + private final Path file; + private final FSDataOutputStream out; + + JavaCSVDataWriter(FileSystem fs, Path file) throws IOException { + this.fs = fs; + this.file = file; + out = fs.create(file); + } + + @Override + public void write(InternalRow record) throws IOException { + out.writeBytes(String.format("%d,%d\n", record.getLong(0), record.getLong(1))); + } + + @Override + public WriterCommitMessage commit() throws IOException { + out.close(); + return null; + } + + @Override + public void abort() throws IOException { + try { + out.close(); + } finally { + fs.delete(file, false); + } + } + + @Override + public void close() {} + } + + class MyScanBuilder extends JavaSimpleScanBuilder { + private final String path; + private final Configuration conf; + + MyScanBuilder(String path, Configuration conf) { + this.path = path; + this.conf = conf; + } + + @Override + public InputPartition[] planInputPartitions() { + Path dataPath = new Path(this.path); + try { + FileSystem fs = dataPath.getFileSystem(conf); + if (fs.exists(dataPath)) { + return Arrays.stream(fs.listStatus(dataPath)) + .filter( + status -> { + String name = status.getPath().getName(); + return !name.startsWith("_") && !name.startsWith("."); + }) + .map(f -> new JavaCSVInputPartitionReader(f.getPath().toUri().toString())) + .toArray(InputPartition[]::new); + } else { + return new InputPartition[0]; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public StructType readSchema() { + return tableSchema; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + SerializableConfiguration serializableConf = new SerializableConfiguration(conf); + return new JavaCSVReaderFactory(serializableConf); + } + } + + static class MyWriteBuilder implements WriteBuilder, SupportsTruncate { + private final String path; + private final String queryId; + private boolean needTruncate = false; + + MyWriteBuilder(String path, LogicalWriteInfo info) { + this.path = path; + this.queryId = info.queryId(); + } + + @Override + public WriteBuilder truncate() { + this.needTruncate = true; + return this; + } + + @Override + public Write build() { + return new MyWrite(path, queryId, needTruncate); + } + } + + static class MyWrite implements Write { + private final String path; + private final String queryId; + private final boolean needTruncate; + + MyWrite(String path, String queryId, boolean needTruncate) { + this.path = path; + this.queryId = queryId; + this.needTruncate = needTruncate; + } + + @Override + public BatchWrite toBatch() { + Path hadoopPath = new Path(path); + Configuration hadoopConf = SparkHadoopUtil.get().conf(); + try { + FileSystem fs = hadoopPath.getFileSystem(hadoopConf); + if (needTruncate) { + fs.delete(hadoopPath, true); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + String pathStr = hadoopPath.toUri().toString(); + return new MyBatchWrite(queryId, pathStr, hadoopConf); + } + } + + static class MyBatchWrite implements BatchWrite { + + private final String queryId; + private final String path; + private final Configuration conf; + + MyBatchWrite(String queryId, String path, Configuration conf) { + this.queryId = queryId; + this.path = path; + this.conf = conf; + } + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + JavaSimpleCounter.resetCounter(); + return new JavaCSVDataWriterFactory(path, queryId, new SerializableConfiguration(conf)); + } + + @Override + public void onDataWriterCommit(WriterCommitMessage message) { + JavaSimpleCounter.increaseCounter(); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + Path finalPath = new Path(this.path); + Path jobPath = new Path(new Path(finalPath, "_temporary"), queryId); + try { + FileSystem fs = jobPath.getFileSystem(conf); + FileStatus[] fileStatuses = fs.listStatus(jobPath); + try { + for (FileStatus status : fileStatuses) { + Path file = status.getPath(); + Path dest = new Path(finalPath, file.getName()); + if (!fs.rename(file, dest)) { + throw new IOException(String.format("failed to rename(%s, %s)", file, dest)); + } + } + } finally { + fs.delete(jobPath, true); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void abort(WriterCommitMessage[] messages) { + try { + Path jobPath = new Path(new Path(this.path, "_temporary"), queryId); + FileSystem fs = jobPath.getFileSystem(conf); + fs.delete(jobPath, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + class MyTable extends JavaSimpleBatchTable implements SupportsWrite { + private final String path; + private final Configuration conf = SparkHadoopUtil.get().conf(); + + MyTable(CaseInsensitiveStringMap options) { + this.path = options.get("path"); + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return new MyScanBuilder(new Path(path).toUri().toString(), conf); + } + + @Override + public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + return new MyWriteBuilder(path, info); + } + + @Override + public StructType schema() { + return tableSchema; + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 28cb448c400c..49a107880055 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -228,8 +228,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } test("simple writable data source") { - // TODO: java implementation. - Seq(classOf[SimpleWritableDataSource]).foreach { cls => + Seq(classOf[SimpleWritableDataSource], classOf[JavaSimpleWritableDataSource]).foreach { cls => withTempPath { file => val path = file.getCanonicalPath assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty) From 88c6edf49ac19549aba9aa5ae04e6b7c5665191f Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Thu, 18 Feb 2021 21:33:47 +0800 Subject: [PATCH 2/8] ADD: 1. add a class description for JavaSimpleWritableDataSource OPTIMIZE: 1. re-order the import 2. match the class layout with the existing SimpleWritableDataSource UPDATE: 1. catch the specific exception(IOException) instead of Exception. 2. use SimpleCounter --- .../JavaSimpleWritableDataSource.java | 323 +++++++++--------- 1 file changed, 160 insertions(+), 163 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index d342614dcf02..91b2104326ab 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -17,11 +17,19 @@ package test.org.apache.spark.sql.connector; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; + import org.apache.spark.deploy.SparkHadoopUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.connector.SimpleCounter; import org.apache.spark.sql.connector.TestingV2Source; import org.apache.spark.sql.connector.catalog.SessionConfigSupport; import org.apache.spark.sql.connector.catalog.SupportsWrite; @@ -35,14 +43,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.SerializableConfiguration; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Arrays; -import java.util.Iterator; - +/** + * A HDFS based transactional writable data source which is implemented by java. + * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/uniqueId/` to `target`. + */ public class JavaSimpleWritableDataSource implements TestingV2Source, SessionConfigSupport { + private final StructType tableSchema = new StructType().add("i", "long").add("j", "long"); @Override @@ -50,153 +58,6 @@ public String keyPrefix() { return "javaSimpleWritableDataSource"; } - @Override - public Table getTable(CaseInsensitiveStringMap options) { - return new MyTable(options); - } - - static class JavaCSVInputPartitionReader implements InputPartition { - private String path; - - JavaCSVInputPartitionReader(String path) { - this.path = path; - } - - public String getPath() { - return path; - } - - public void setPath(String path) { - this.path = path; - } - } - - static class JavaCSVReaderFactory implements PartitionReaderFactory { - - private final SerializableConfiguration conf; - - JavaCSVReaderFactory(SerializableConfiguration conf) { - this.conf = conf; - } - - @Override - public PartitionReader createReader(InputPartition partition) { - String path = ((JavaCSVInputPartitionReader) partition).getPath(); - Path filePath = new Path(path); - try { - FileSystem fs = filePath.getFileSystem(conf.value()); - return new PartitionReader() { - private final FSDataInputStream inputStream = fs.open(filePath); - private final Iterator lines = - new BufferedReader(new InputStreamReader(inputStream)).lines().iterator(); - private String currentLine = ""; - - @Override - public boolean next() { - if (lines.hasNext()) { - currentLine = lines.next(); - return true; - } else { - return false; - } - } - - @Override - public InternalRow get() { - Object[] objects = - Arrays.stream(currentLine.split(",")) - .map(String::trim) - .map(Long::parseLong) - .toArray(); - return new GenericInternalRow(objects); - } - - @Override - public void close() throws IOException { - inputStream.close(); - } - }; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - static class JavaSimpleCounter { - private static Integer count = 0; - - public static void increaseCounter() { - count += 1; - } - - public static int getCounter() { - return count; - } - - public static void resetCounter() { - count = 0; - } - } - - static class JavaCSVDataWriterFactory implements DataWriterFactory { - private final String path; - private final String jobId; - private final SerializableConfiguration conf; - - JavaCSVDataWriterFactory(String path, String jobId, SerializableConfiguration conf) { - this.path = path; - this.jobId = jobId; - this.conf = conf; - } - - @Override - public DataWriter createWriter(int partitionId, long taskId) { - try { - Path jobPath = new Path(new Path(path, "_temporary"), jobId); - Path filePath = new Path(jobPath, String.format("%s-%d-%d", jobId, partitionId, taskId)); - FileSystem fs = filePath.getFileSystem(conf.value()); - return new JavaCSVDataWriter(fs, filePath); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - static class JavaCSVDataWriter implements DataWriter { - private final FileSystem fs; - private final Path file; - private final FSDataOutputStream out; - - JavaCSVDataWriter(FileSystem fs, Path file) throws IOException { - this.fs = fs; - this.file = file; - out = fs.create(file); - } - - @Override - public void write(InternalRow record) throws IOException { - out.writeBytes(String.format("%d,%d\n", record.getLong(0), record.getLong(1))); - } - - @Override - public WriterCommitMessage commit() throws IOException { - out.close(); - return null; - } - - @Override - public void abort() throws IOException { - try { - out.close(); - } finally { - fs.delete(file, false); - } - } - - @Override - public void close() {} - } - class MyScanBuilder extends JavaSimpleScanBuilder { private final String path; private final Configuration conf; @@ -213,17 +74,17 @@ public InputPartition[] planInputPartitions() { FileSystem fs = dataPath.getFileSystem(conf); if (fs.exists(dataPath)) { return Arrays.stream(fs.listStatus(dataPath)) - .filter( - status -> { - String name = status.getPath().getName(); - return !name.startsWith("_") && !name.startsWith("."); - }) - .map(f -> new JavaCSVInputPartitionReader(f.getPath().toUri().toString())) - .toArray(InputPartition[]::new); + .filter( + status -> { + String name = status.getPath().getName(); + return !name.startsWith("_") && !name.startsWith("."); + }) + .map(f -> new JavaCSVInputPartitionReader(f.getPath().toUri().toString())) + .toArray(InputPartition[]::new); } else { return new InputPartition[0]; } - } catch (Exception e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -304,13 +165,13 @@ static class MyBatchWrite implements BatchWrite { @Override public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { - JavaSimpleCounter.resetCounter(); + SimpleCounter.resetCounter(); return new JavaCSVDataWriterFactory(path, queryId, new SerializableConfiguration(conf)); } @Override public void onDataWriterCommit(WriterCommitMessage message) { - JavaSimpleCounter.increaseCounter(); + SimpleCounter.increaseCounter(); } @Override @@ -371,4 +232,140 @@ public StructType schema() { return tableSchema; } } + + @Override + public Table getTable(CaseInsensitiveStringMap options) { + return new MyTable(options); + } + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) { + return tableSchema; + } + + static class JavaCSVInputPartitionReader implements InputPartition { + private String path; + + JavaCSVInputPartitionReader(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + } + + static class JavaCSVReaderFactory implements PartitionReaderFactory { + + private final SerializableConfiguration conf; + + JavaCSVReaderFactory(SerializableConfiguration conf) { + this.conf = conf; + } + + @Override + public PartitionReader createReader(InputPartition partition) { + String path = ((JavaCSVInputPartitionReader) partition).getPath(); + Path filePath = new Path(path); + try { + FileSystem fs = filePath.getFileSystem(conf.value()); + return new PartitionReader() { + private final FSDataInputStream inputStream = fs.open(filePath); + private final Iterator lines = + new BufferedReader(new InputStreamReader(inputStream)).lines().iterator(); + private String currentLine = ""; + + @Override + public boolean next() { + if (lines.hasNext()) { + currentLine = lines.next(); + return true; + } else { + return false; + } + } + + @Override + public InternalRow get() { + Object[] objects = + Arrays.stream(currentLine.split(",")) + .map(String::trim) + .map(Long::parseLong) + .toArray(); + return new GenericInternalRow(objects); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + }; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + static class JavaCSVDataWriterFactory implements DataWriterFactory { + private final String path; + private final String jobId; + private final SerializableConfiguration conf; + + JavaCSVDataWriterFactory(String path, String jobId, SerializableConfiguration conf) { + this.path = path; + this.jobId = jobId; + this.conf = conf; + } + + @Override + public DataWriter createWriter(int partitionId, long taskId) { + try { + Path jobPath = new Path(new Path(path, "_temporary"), jobId); + Path filePath = new Path(jobPath, String.format("%s-%d-%d", jobId, partitionId, taskId)); + FileSystem fs = filePath.getFileSystem(conf.value()); + return new JavaCSVDataWriter(fs, filePath); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + static class JavaCSVDataWriter implements DataWriter { + private final FileSystem fs; + private final Path file; + private final FSDataOutputStream out; + + JavaCSVDataWriter(FileSystem fs, Path file) throws IOException { + this.fs = fs; + this.file = file; + out = fs.create(file); + } + + @Override + public void write(InternalRow record) throws IOException { + out.writeBytes(String.format("%d,%d\n", record.getLong(0), record.getLong(1))); + } + + @Override + public WriterCommitMessage commit() throws IOException { + out.close(); + return null; + } + + @Override + public void abort() throws IOException { + try { + out.close(); + } finally { + fs.delete(file, false); + } + } + + @Override + public void close() {} + } } From 8c080ba80ced4af173b9d2cebc9475e0a722c75e Mon Sep 17 00:00:00 2001 From: Kevin Pis <68981916+kevincmchen@users.noreply.github.com> Date: Thu, 18 Feb 2021 22:57:09 +0800 Subject: [PATCH 3/8] Update JavaSimpleWritableDataSource.java delete duplicated blank Lines in `JavaSimpleWritableDataSource` --- .../apache/spark/sql/connector/JavaSimpleWritableDataSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index 91b2104326ab..2cf64e530fbc 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -50,7 +50,6 @@ */ public class JavaSimpleWritableDataSource implements TestingV2Source, SessionConfigSupport { - private final StructType tableSchema = new StructType().add("i", "long").add("j", "long"); @Override From 7a1400cab77c8be233e1b1d3dcb8094a276f2ada Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Sat, 20 Feb 2021 16:50:54 +0800 Subject: [PATCH 4/8] change the schema of `JavaSimpleWritableDataSource` to match `TestingV2Source` --- .../JavaSimpleWritableDataSource.java | 33 ++++--------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index 91b2104326ab..51291301101f 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -39,7 +39,6 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.write.*; -import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.SerializableConfiguration; @@ -50,15 +49,12 @@ */ public class JavaSimpleWritableDataSource implements TestingV2Source, SessionConfigSupport { - - private final StructType tableSchema = new StructType().add("i", "long").add("j", "long"); - @Override public String keyPrefix() { return "javaSimpleWritableDataSource"; } - class MyScanBuilder extends JavaSimpleScanBuilder { + static class MyScanBuilder extends JavaSimpleScanBuilder { private final String path; private final Configuration conf; @@ -89,11 +85,6 @@ public InputPartition[] planInputPartitions() { } } - @Override - public StructType readSchema() { - return tableSchema; - } - @Override public PartitionReaderFactory createReaderFactory() { SerializableConfiguration serializableConf = new SerializableConfiguration(conf); @@ -152,7 +143,6 @@ public BatchWrite toBatch() { } static class MyBatchWrite implements BatchWrite { - private final String queryId; private final String path; private final Configuration conf; @@ -209,7 +199,7 @@ public void abort(WriterCommitMessage[] messages) { } } - class MyTable extends JavaSimpleBatchTable implements SupportsWrite { + static class MyTable extends JavaSimpleBatchTable implements SupportsWrite { private final String path; private final Configuration conf = SparkHadoopUtil.get().conf(); @@ -226,11 +216,6 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { return new MyWriteBuilder(path, info); } - - @Override - public StructType schema() { - return tableSchema; - } } @Override @@ -238,11 +223,6 @@ public Table getTable(CaseInsensitiveStringMap options) { return new MyTable(options); } - @Override - public StructType inferSchema(CaseInsensitiveStringMap options) { - return tableSchema; - } - static class JavaCSVInputPartitionReader implements InputPartition { private String path; @@ -260,7 +240,6 @@ public void setPath(String path) { } static class JavaCSVReaderFactory implements PartitionReaderFactory { - private final SerializableConfiguration conf; JavaCSVReaderFactory(SerializableConfiguration conf) { @@ -292,9 +271,9 @@ public boolean next() { @Override public InternalRow get() { Object[] objects = - Arrays.stream(currentLine.split(",")) - .map(String::trim) - .map(Long::parseLong) + Arrays.stream(currentLine.split(",")) + .map(String::trim) + .map(Integer::parseInt) .toArray(); return new GenericInternalRow(objects); } @@ -347,7 +326,7 @@ static class JavaCSVDataWriter implements DataWriter { @Override public void write(InternalRow record) throws IOException { - out.writeBytes(String.format("%d,%d\n", record.getLong(0), record.getLong(1))); + out.writeBytes(String.format("%d,%d\n", record.getInt(0), record.getInt(1))); } @Override From 9178240ee37b9b4abb318b91f6ce2bbf4c0c6f84 Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Sat, 20 Feb 2021 17:25:50 +0800 Subject: [PATCH 5/8] reformat code --- .../JavaSimpleWritableDataSource.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index 51291301101f..0b917654ee10 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -70,13 +70,13 @@ public InputPartition[] planInputPartitions() { FileSystem fs = dataPath.getFileSystem(conf); if (fs.exists(dataPath)) { return Arrays.stream(fs.listStatus(dataPath)) - .filter( - status -> { - String name = status.getPath().getName(); - return !name.startsWith("_") && !name.startsWith("."); - }) - .map(f -> new JavaCSVInputPartitionReader(f.getPath().toUri().toString())) - .toArray(InputPartition[]::new); + .filter( + status -> { + String name = status.getPath().getName(); + return !name.startsWith("_") && !name.startsWith("."); + }) + .map(f -> new JavaCSVInputPartitionReader(f.getPath().toUri().toString())) + .toArray(InputPartition[]::new); } else { return new InputPartition[0]; } @@ -271,9 +271,9 @@ public boolean next() { @Override public InternalRow get() { Object[] objects = - Arrays.stream(currentLine.split(",")) - .map(String::trim) - .map(Integer::parseInt) + Arrays.stream(currentLine.split(",")) + .map(String::trim) + .map(Integer::parseInt) .toArray(); return new GenericInternalRow(objects); } From 3fc7288e1ff49510174545aabbcc028b409cec5c Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Sat, 20 Feb 2021 20:11:48 +0800 Subject: [PATCH 6/8] In order to match SimpleBatchTable, remove BATCH_WRITE and TRUNCATE from JavaSimpleBatchTable and override the function capabilities in MyTable --- .../spark/sql/connector/JavaSimpleBatchTable.java | 9 +++------ .../sql/connector/JavaSimpleWritableDataSource.java | 11 +++++++++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java index 71cf97b56fe5..ee876cad3688 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java @@ -17,7 +17,7 @@ package test.org.apache.spark.sql.connector; -import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -28,11 +28,8 @@ import org.apache.spark.sql.types.StructType; abstract class JavaSimpleBatchTable implements Table, SupportsRead { - private static final Set CAPABILITIES = new HashSet<>(Arrays.asList( - TableCapability.BATCH_READ, - TableCapability.BATCH_WRITE, - TableCapability.TRUNCATE)); - + private static final Set CAPABILITIES = + new HashSet<>(Collections.singletonList(TableCapability.BATCH_READ)); @Override public StructType schema() { return TestingV2Source.schema(); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index 0b917654ee10..d5f51492904a 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; @@ -34,6 +36,7 @@ import org.apache.spark.sql.connector.catalog.SessionConfigSupport; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; import org.apache.spark.sql.connector.read.PartitionReaderFactory; @@ -216,6 +219,14 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { return new MyWriteBuilder(path, info); } + + @Override + public Set capabilities() { + return new HashSet<>(Arrays.asList( + TableCapability.BATCH_READ, + TableCapability.BATCH_WRITE, + TableCapability.TRUNCATE)); + } } @Override From c665f683f708a328cc4afdfec57e6e54d04949a7 Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Sun, 21 Feb 2021 21:58:00 +0800 Subject: [PATCH 7/8] reformat code --- .../apache/spark/sql/connector/JavaSimpleBatchTable.java | 2 +- .../spark/sql/connector/JavaSimpleWritableDataSource.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java index ee876cad3688..4d147ac14a52 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleBatchTable.java @@ -29,7 +29,7 @@ abstract class JavaSimpleBatchTable implements Table, SupportsRead { private static final Set CAPABILITIES = - new HashSet<>(Collections.singletonList(TableCapability.BATCH_READ)); + new HashSet<>(Collections.singletonList(TableCapability.BATCH_READ)); @Override public StructType schema() { return TestingV2Source.schema(); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index d5f51492904a..bd791a172666 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -223,9 +223,9 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { @Override public Set capabilities() { return new HashSet<>(Arrays.asList( - TableCapability.BATCH_READ, - TableCapability.BATCH_WRITE, - TableCapability.TRUNCATE)); + TableCapability.BATCH_READ, + TableCapability.BATCH_WRITE, + TableCapability.TRUNCATE)); } } From 0d85f08a9e41322e62398ab45ba78046f8d2a78c Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Sun, 21 Feb 2021 23:00:49 +0800 Subject: [PATCH 8/8] reformat code --- .../sql/connector/JavaSimpleWritableDataSource.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index bd791a172666..b588d4c06e6e 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -58,6 +58,7 @@ public String keyPrefix() { } static class MyScanBuilder extends JavaSimpleScanBuilder { + private final String path; private final Configuration conf; @@ -96,6 +97,7 @@ public PartitionReaderFactory createReaderFactory() { } static class MyWriteBuilder implements WriteBuilder, SupportsTruncate { + private final String path; private final String queryId; private boolean needTruncate = false; @@ -118,6 +120,7 @@ public Write build() { } static class MyWrite implements Write { + private final String path; private final String queryId; private final boolean needTruncate; @@ -146,6 +149,7 @@ public BatchWrite toBatch() { } static class MyBatchWrite implements BatchWrite { + private final String queryId; private final String path; private final Configuration conf; @@ -203,6 +207,7 @@ public void abort(WriterCommitMessage[] messages) { } static class MyTable extends JavaSimpleBatchTable implements SupportsWrite { + private final String path; private final Configuration conf = SparkHadoopUtil.get().conf(); @@ -235,6 +240,7 @@ public Table getTable(CaseInsensitiveStringMap options) { } static class JavaCSVInputPartitionReader implements InputPartition { + private String path; JavaCSVInputPartitionReader(String path) { @@ -251,6 +257,7 @@ public void setPath(String path) { } static class JavaCSVReaderFactory implements PartitionReaderFactory { + private final SerializableConfiguration conf; JavaCSVReaderFactory(SerializableConfiguration conf) { @@ -301,6 +308,7 @@ public void close() throws IOException { } static class JavaCSVDataWriterFactory implements DataWriterFactory { + private final String path; private final String jobId; private final SerializableConfiguration conf; @@ -325,6 +333,7 @@ public DataWriter createWriter(int partitionId, long taskId) { } static class JavaCSVDataWriter implements DataWriter { + private final FileSystem fs; private final Path file; private final FSDataOutputStream out; @@ -356,6 +365,7 @@ public void abort() throws IOException { } @Override - public void close() {} + public void close() { + } } }