Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.internal.io
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce._

import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -132,7 +133,7 @@ abstract class FileCommitProtocol {
}


object FileCommitProtocol {
object FileCommitProtocol extends Logging {
class TaskCommitMessage(val obj: Any) extends Serializable

object EmptyTaskCommitMessage extends TaskCommitMessage(null)
Expand All @@ -145,15 +146,23 @@ object FileCommitProtocol {
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {

logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
s" dynamic=$dynamicPartitionOverwrite")
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
// First try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
try {
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
logDebug("Using (String, String, Boolean) constructor")
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
logDebug("Falling back to (String, String) constructor")
require(!dynamicPartitionOverwrite,
"Dynamic Partition Overwrite is enabled but" +
s" the committer ${className} does not have the appropriate constructor")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, why don't we warn and continue? Just wanted to make sure that we took this case into account. For example,
wouldn't this invalidate the case below?

private class CommitProtocol(arg1: String, arg2: String)
  extends HadoopMapReduceCommitProtocol(arg1, arg2, true) {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that the dynamic partition logic in InsertIntoHadoopFsRelationCommand assumes that rename() is a fast reliable operation you can do with any implementation of the FileCommitProtocol, sets itself up for it when enabled, then instantiates the inner committer, and carries on with the dynamic partitioning, irrespective of whether or not. rename() doesn't always work like that, breaking the rest of the algorithm.

If the committer doesn't have that 3-arg constructor, you can't be confident that you can do that. To silently log and continue is to run the risk that the underlying committers commit algorithm isn't compatible with the algorithm.

A fail-fast ensures that when the outcome is going to be unknown, you aren' t left trying to work out what's happened.

Regarding your example, yes, it's in trouble. Problem is: how to differentiate that from subclasses which don't know anything at all about the new feature. you can't even look for an interface on the newly created object if the base class implements it; you are left with some dynamic probe of the instance.

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm .. actually we could do .. for example ..

abstract class FileCommitProtocol {
  ...
  def dynamicPartitionOverwrite: Boolean = false
}
class HadoopMapReduceCommitProtocol(
    jobId: String,
    path: String,
    override val dynamicPartitionOverwrite: Boolean = false)

(^ it's not double checked closely, for example, if the signature is safe or not. Was just an idea)

and use committer.dynamicPartitionOverwrite in InsertIntoHadoopFsRelationCommand to respect if the commit protocol supports or not, if I understood all correctly, and then produce a warning (or throw an error?) saying dynamic partition overwrite will be ignored (or not).

However, sure. I think this case is kind of a made-up case and should be a corner case I guess. I don't want to suggest an overkill (maybe) and I think we don't have to make this complicated too much for now.

I am okay as is. Just wanted to make sure that we considered and checked other possible stories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like that would work, though it'd be bit more convoluted...InsertIntoFSRelation would have to check, and then handle the situation of missing support.

One thing to consider in any form is: all implementations of FileCommitProtocol should be aware of the new Dynamic Partition overwrite feature...adding a new 3-arg constructor is an implicit way of saying "I understand this". Where it's weak is there's no way for for it to say "I understand this and will handle it myself" Because essentially that's what being done in the [Netflix Partioned committer(https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java#L142), which purges all parts for which the new job has data. With that committer, if the insert asks for the feature then the FileCommitProtocol binding to it could (somehow) turn this on and so handle everything internally.

Like I said, a more complex model. It'd need changes a fair way through things and then the usual complexity of getting commit logic.

val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.io

import org.apache.spark.SparkFunSuite

/**
* Unit tests for instantiation of FileCommitProtocol implementations.
*/
class FileCommitProtocolInstantiationSuite extends SparkFunSuite {

test("Dynamic partitions require appropriate constructor") {

// you cannot instantiate a two-arg client with dynamic partitions
// enabled.
val ex = intercept[IllegalArgumentException] {
instantiateClassic(true)
}
// check the contents of the message and rethrow if unexpected.
// this preserves the stack trace of the unexpected
// exception.
if (!ex.toString.contains("Dynamic Partition Overwrite")) {
fail(s"Wrong text in caught exception $ex", ex)
}
}

test("Standard partitions work with classic constructor") {
instantiateClassic(false)
}

test("Three arg constructors have priority") {
assert(3 == instantiateNew(false).argCount,
"Wrong constructor argument count")
}

test("Three arg constructors have priority when dynamic") {
assert(3 == instantiateNew(true).argCount,
"Wrong constructor argument count")
}

test("The protocol must be of the correct class") {
intercept[ClassCastException] {
FileCommitProtocol.instantiate(
classOf[Other].getCanonicalName,
"job",
"path",
false)
}
}

test("If there is no matching constructor, class hierarchy is irrelevant") {
intercept[NoSuchMethodException] {
FileCommitProtocol.instantiate(
classOf[NoMatchingArgs].getCanonicalName,
"job",
"path",
false)
}
}

/**
* Create a classic two-arg protocol instance.
* @param dynamic dyanmic partitioning mode
* @return the instance
*/
private def instantiateClassic(dynamic: Boolean): ClassicConstructorCommitProtocol = {
FileCommitProtocol.instantiate(
classOf[ClassicConstructorCommitProtocol].getCanonicalName,
"job",
"path",
dynamic).asInstanceOf[ClassicConstructorCommitProtocol]
}

/**
* Create a three-arg protocol instance.
* @param dynamic dyanmic partitioning mode
* @return the instance
*/
private def instantiateNew(
dynamic: Boolean): FullConstructorCommitProtocol = {
FileCommitProtocol.instantiate(
classOf[FullConstructorCommitProtocol].getCanonicalName,
"job",
"path",
dynamic).asInstanceOf[FullConstructorCommitProtocol]
}

}

/**
* This protocol implementation does not have the new three-arg
* constructor.
*/
private class ClassicConstructorCommitProtocol(arg1: String, arg2: String)
extends HadoopMapReduceCommitProtocol(arg1, arg2) {
}

/**
* This protocol implementation does have the new three-arg constructor
* alongside the original, and a 4 arg one for completeness.
* The final value of the real constructor is the number of arguments
* used in the 2- and 3- constructor, for test assertions.
*/
private class FullConstructorCommitProtocol(
arg1: String,
arg2: String,
b: Boolean,
val argCount: Int)
extends HadoopMapReduceCommitProtocol(arg1, arg2, b) {

def this(arg1: String, arg2: String) = {
this(arg1, arg2, false, 2)
}

def this(arg1: String, arg2: String, b: Boolean) = {
this(arg1, arg2, false, 3)
}
}

/**
* This has the 2-arity constructor, but isn't the right class.
*/
private class Other(arg1: String, arg2: String) {

}

/**
* This has no matching arguments as well as being the wrong class.
*/
private class NoMatchingArgs() {

}