Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data writing ability and save the data to the data source.
*/
@InterfaceStability.Evolving
public interface WriteSupport {

/**
* Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
* sources can return None if there is no writing needed to be done according to the save mode.
*
* @param jobId A unique string for the writing job. It's possible that there are many writing
* jobs running at the same time, and the returned {@link DataSourceV2Writer} should
* use this job id to distinguish itself with writers of other jobs.
* @param schema the schema of the data to be written.
* @param mode the save mode which determines what to do when the data are already in this data
* source, please refer to {@link SaveMode} for more details.
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
*/
Optional<DataSourceV2Writer> createWriter(
String jobId, StructType schema, SaveMode mode, DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.sources.Filter;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.types.StructType;

/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
*
* The writing procedure is:
* 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the
* partitions of the input data(RDD).
* 2. For each partition, create the data writer, and write the data of the partition with this
* writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
* exception happens during the writing, call {@link DataWriter#abort()}.
* 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
* some writers are aborted, or the job failed with an unknown reason, call
* {@link #abort(WriterCommitMessage[])}.
*
* Spark won't retry failed writing jobs, users should do it manually in their Spark applications if
* they want to retry.
*
* Please refer to the document of commit/abort methods for detailed specifications.
*
* Note that, this interface provides a protocol between Spark and data sources for transactional
* data writing, but the transaction here is Spark-level transaction, which may not be the
* underlying storage transaction. For example, Spark successfully writes data to a Cassandra data
* source, but Cassandra may need some more time to reach consistency at storage level.
*/
@InterfaceStability.Evolving
public interface DataSourceV2Writer {

/**
* Creates a writer factory which will be serialized and sent to executors.
*/
DataWriterFactory<Row> createWriterFactory();

/**
* Commits this writing job with a list of commit messages. The commit messages are collected from
* successful data writers and are produced by {@link DataWriter#commit()}. If this method
* fails(throw exception), this writing job is considered to be failed, and
* {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible
* to data source readers if this method succeeds.
*
* Note that, one partition may have multiple committed data writers because of speculative tasks.
* Spark will pick the first successful one and get its commit message. Implementations should be
* aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer
* can commit successfully, or have a way to clean up the data of already-committed writers.
*/
void commit(WriterCommitMessage[] messages);

/**
* Aborts this writing job because some data writers are failed to write the records and aborted,
* or the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])}
* fails. If this method fails(throw exception), the underlying data source may have garbage that
* need to be cleaned manually, but these garbage should not be visible to data source readers.
*
* Unless the abort is triggered by the failure of commit, the given messages should have some
* null slots as there maybe only a few data writers that are committed before the abort
* happens, or some data writers were committed but their commit messages haven't reached the
* driver when the abort is triggered. So this is just a "best effort" for data sources to
* clean up the data left by data writers.
*/
void abort(WriterCommitMessage[] messages);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.InterfaceStability;

/**
* A data writer returned by {@link DataWriterFactory#createWriter(int, int)} and is
* responsible for writing data for an input RDD partition.
*
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
*
* {@link #write(Object)} is called for each record in the input RDD partition. If one record fails
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
* each time {@link DataWriterFactory#createWriter(int, int)} gets a different `attemptNumber`,
* and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
* takes too long to finish. Different from retried tasks, which are launched one by one after the
* previous one fails, speculative tasks are running simultaneously. It's possible that one input
* RDD partition has multiple data writers with different `attemptNumber` running at the same time,
* and data sources should guarantee that these data writers don't conflict and can work together.
* Implementations can coordinate with driver during {@link #commit()} to make sure only one of
* these data writers can commit successfully. Or implementations can allow all of them to commit
* successfully, and have a way to revert committed data writers without the commit message, because
* Spark only accepts the commit message that arrives first and ignore others.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test case, could we implement the above logics?

*
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
* source writers, or {@link org.apache.spark.sql.catalyst.InternalRow} for data source writers
* that mix in {@link SupportsWriteInternalRow}.
*/
@InterfaceStability.Evolving
public interface DataWriter<T> {

/**
* Writes one record.
*
* If this method fails(throw exception), {@link #abort()} will be called and this data writer is
* considered to be failed.
*/
void write(T record);

/**
* Commits this writer after all records are written successfully, returns a commit message which
* will be send back to driver side and pass to
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
*
* The written data should only be visible to data source readers after
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link DataSourceV2Writer} at driver side to
* do the final commitment via {@link WriterCommitMessage}.
*
* If this method fails(throw exception), {@link #abort()} will be called and this data writer is
* considered to be failed.
*/
WriterCommitMessage commit();

/**
* Aborts this writer if it is failed. Implementations should clean up the data for already
* written records.
*
* This method will only be called if there is one record failed to write, or {@link #commit()}
* failed.
*
* If this method fails(throw exception), the underlying data source may have garbage that need
* to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually, but
* these garbage should not be visible to data source readers.
*/
void abort();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import java.io.Serializable;

import org.apache.spark.annotation.InterfaceStability;

/**
* A factory of {@link DataWriter} returned by {@link DataSourceV2Writer#createWriterFactory()},
* which is responsible for creating and initializing the actual data writer at executor side.
*
* Note that, the writer factory will be serialized and sent to executors, then the data writer
* will be created on executors and do the actual writing. So {@link DataWriterFactory} must be
* serializable and {@link DataWriter} doesn't need to be.
*/
@InterfaceStability.Evolving
public interface DataWriterFactory<T> extends Serializable {

/**
* Returns a data writer to do the actual writing work.
*
* @param partitionId A unique id of the RDD partition that the returned writer will process.
* Usually Spark processes many RDD partitions at the same time,
* implementations should use the partition id to distinguish writers for
* different partitions.
* @param attemptNumber Spark may launch multiple tasks with the same task id. For example, a task
* failed, Spark launches a new task wth the same task id but different
* attempt number. Or a task is too slow, Spark launches new tasks wth the
* same task id but different attempt number, which means there are multiple
* tasks with the same task id running at the same time. Implementations can
* use this attempt number to distinguish writers of different task attempts.
*/
DataWriter<T> createWriter(int partitionId, int attemptNumber);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;

/**
* A mix-in interface for {@link DataSourceV2Writer}. Data source writers can implement this
* interface to write {@link InternalRow} directly and avoid the row conversion at Spark side.
* This is an experimental and unstable interface, as {@link InternalRow} is not public and may get
* changed in the future Spark versions.
*/

@InterfaceStability.Evolving
@Experimental
@InterfaceStability.Unstable
public interface SupportsWriteInternalRow extends DataSourceV2Writer {

@Override
default DataWriterFactory<Row> createWriterFactory() {
throw new IllegalStateException(
"createWriterFactory should not be called with SupportsWriteInternalRow.");
}

DataWriterFactory<InternalRow> createInternalRowWriterFactory();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import java.io.Serializable;

import org.apache.spark.annotation.InterfaceStability;

/**
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
*
* This is an empty interface, data sources should define their own message class and use it in
* their {@link DataWriter#commit()} and {@link DataSourceV2Writer#commit(WriterCommitMessage[])}
* implementations.
*/
@InterfaceStability.Evolving
public interface WriterCommitMessage extends Serializable {}
Loading