Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
val isDir = fs.isDirectory(hadoopPath)
val isDir = fs.getFileStatus(hadoopPath).isDir
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you're wondering: no this wasn't one of those things deprecated in Hadoop 2.x; this was deprecated in 1.0.4 even!

if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] class EventLoggingListener(
* Creates the log file in the configured log directory.
*/
def start() {
if (!fileSystem.isDirectory(new Path(logBaseDir))) {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDir) {
throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConversions._

import org.apache.spark.util.ParentClassLoader

/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
def resultHandler(x: Int, y: Unit): Unit = {}
val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd,
OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully,
0 until rdd.partitions.size, resultHandler, 0)
0 until rdd.partitions.size, resultHandler, () => Unit)
// It's an error if the job completes successfully even though no committer was authorized,
// so throw an exception if the job was allowed to complete.
intercept[TimeoutException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.net.URLClassLoader

import org.scalatest.FunSuite

import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, TestUtils}
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkException, TestUtils}

class MutableURLClassLoaderSuite extends FunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
ssc.checkpoint(".");

// Initial RDD input to updateStateByKey
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 1));
JavaPairRDD<String, Integer> initialRDD = ssc.sc().parallelizePairs(tuples);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.examples

import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
Expand All @@ -36,7 +36,7 @@ object HBaseTest {
// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(args(0))) {
val tableDesc = new HTableDescriptor(args(0))
val tableDesc = new HTableDescriptor(TableName.valueOf(args(0)))
admin.createTable(tableDesc)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,27 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.Arrays;

import org.apache.spark.SparkConf;

import scala.Tuple2;

import junit.framework.Assert;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.junit.Test;
import org.junit.After;
import org.junit.Before;

public class JavaDirectKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private transient Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
Expand Down Expand Up @@ -93,7 +88,7 @@ public void testKafkaStream() throws InterruptedException {
).map(
new Function<Tuple2<String, String>, String>() {
@Override
public String call(scala.Tuple2<String, String> kv) throws Exception {
public String call(Tuple2<String, String> kv) throws Exception {
return kv._2();
}
}
Expand Down Expand Up @@ -121,7 +116,7 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception
unifiedStream.foreachRDD(
new Function<JavaRDD<String>, Void>() {
@Override
public Void call(org.apache.spark.api.java.JavaRDD<String> rdd) throws Exception {
public Void call(JavaRDD<String> rdd) throws Exception {
result.addAll(rdd.collect());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,22 @@

import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;

import org.apache.spark.SparkConf;

import scala.Tuple2;

import junit.framework.Assert;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.junit.Test;
import org.junit.After;
import org.junit.Before;

public class JavaKafkaRDDSuite implements Serializable {
private transient JavaSparkContext sc = null;
private transient KafkaStreamSuiteBase suiteBase = null;
Expand Down Expand Up @@ -78,8 +73,8 @@ public void testKafkaRDD() throws InterruptedException {
OffsetRange.create(topic2, 0, 0, 1)
};

HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap();
HashMap<TopicAndPartition, Broker> leaders = new HashMap();
HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>();
HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>();
String[] hostAndPort = suiteBase.brokerAddress().split(":");
Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
leaders.put(new TopicAndPartition(topic1, 0), broker);
Expand All @@ -96,7 +91,7 @@ public void testKafkaRDD() throws InterruptedException {
).map(
new Function<Tuple2<String, String>, String>() {
@Override
public String call(scala.Tuple2<String, String> kv) throws Exception {
public String call(Tuple2<String, String> kv) throws Exception {
return kv._2();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,25 @@
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;

import junit.framework.Assert;

import kafka.serializer.StringDecoder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.junit.Test;
import org.junit.After;
import org.junit.Before;

public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private transient Random random = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
assert(formatVersion == thisFormatVersion)
val rank = (metadata \ "rank").extract[Int]
val userFeatures = sqlContext.parquetFile(userPath(path))
.map { case Row(id: Int, features: Seq[Double]) =>
(id, features.toArray)
.map { case Row(id: Int, features: Seq[_]) =>
(id, features.asInstanceOf[Seq[Double]].toArray)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strangely, this is how the scala compiler wanted it. It doesn't like matching on a type with generics, since they are erased.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the only way to check the type is to check the schema or to do an inner match-case for the Seq.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can also use the @unchecked annotation here, but getting the syntax right usually involves guessing until it works for me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I tried putting @unchecked just about everywhere it seemed to allow (method, statement, etc.) and it either didn't compile or didn't eliminate the warning. It does seem like this should be possible and right-er but there may be a reason that it would require more elaborate rewriting in order to employ it.

}
val productFeatures = sqlContext.parquetFile(productPath(path))
.map { case Row(id: Int, features: Seq[Double]) =>
(id, features.toArray)
.map { case Row(id: Int, features: Seq[_]) =>
(id, features.asInstanceOf[Seq[Double]].toArray)
}
new MatrixFactorizationModel(rank, userFeatures, productFeatures)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.sources

import scala.language.existentials
import scala.language.implicitConversions

import org.apache.spark.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ScalaReflectionRelationSuite extends FunSuite {

assert(sql("SELECT * FROM reflectData").collect().head ===
Row("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
new java.math.BigDecimal(1), new Date(70, 0, 1), // This is 1970-01-01
new java.math.BigDecimal(1), Date.valueOf("1970-01-01"),
new Timestamp(12345), Seq(1,2,3)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
Literal(0.asInstanceOf[Float]) ::
Literal(0.asInstanceOf[Double]) ::
Literal("0") ::
Literal(new java.sql.Date(114, 8, 23)) ::
Literal(java.sql.Date.valueOf("2014-09-23")) ::
Literal(Decimal(BigDecimal(123.123))) ::
Literal(new java.sql.Timestamp(123123)) ::
Literal(Array[Byte](1,2,3)) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ public void testReduceByWindowWithoutInverse() {
testReduceByWindow(false);
}

@SuppressWarnings("unchecked")
private void testReduceByWindow(boolean withInverse) {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3),
Expand Down Expand Up @@ -684,6 +685,7 @@ public void testStreamingContextTransform(){
JavaDStream<Long> transformed1 = ssc.transform(
listOfDStreams1,
new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
@Override
public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
Assert.assertEquals(2, listOfRDDs.size());
return null;
Expand All @@ -697,6 +699,7 @@ public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
listOfDStreams2,
new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
@Override
public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
Assert.assertEquals(3, listOfRDDs.size());
JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
Expand Down Expand Up @@ -1829,6 +1832,7 @@ private List<List<String>> fileTestPrepare(File testDir) throws IOException {
return expected;
}

@SuppressWarnings("unchecked")
// SPARK-5795: no logic assertions, just testing that intended API invocations compile
private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
pds.saveAsNewAPIHadoopFiles(
Expand Down