From fb27ababd04519308d1f0e70daa8c700db53c80a Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Wed, 27 May 2015 23:30:58 -0700 Subject: [PATCH 1/8] Added support for CassandraSQLContext for %sql queries --- .../zeppelin/spark/SparkInterpreter.java | 86 ++++++++++++------- 1 file changed, 53 insertions(+), 33 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 731068e48ed..2de95f26b5d 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -26,11 +26,7 @@ import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; import org.apache.spark.HttpServer; import org.apache.spark.SparkConf; @@ -163,32 +159,6 @@ private boolean useHiveContext() { return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); } - public SQLContext getSQLContext() { - if (sqlc == null) { - if (useHiveContext()) { - String name = "org.apache.spark.sql.hive.HiveContext"; - Constructor hc; - try { - hc = getClass().getClassLoader().loadClass(name) - .getConstructor(SparkContext.class); - sqlc = (SQLContext) hc.newInstance(getSparkContext()); - } catch (NoSuchMethodException | SecurityException - | ClassNotFoundException | InstantiationException - | IllegalAccessException | IllegalArgumentException - | InvocationTargetException e) { - logger.warn("Can't create HiveContext. Fallback to SQLContext", e); - // when hive dependency is not loaded, it'll fail. - // in this case SQLContext can be used. - sqlc = new SQLContext(getSparkContext()); - } - } else { - sqlc = new SQLContext(getSparkContext()); - } - } - - return sqlc; - } - public DependencyResolver getDependencyResolver() { if (dep == null) { dep = new DependencyResolver(intp, sc, getProperty("zeppelin.dep.localrepo")); @@ -213,6 +183,45 @@ private DepInterpreter getDepInterpreter() { return null; } + private boolean useCassandraContext() { + return Boolean.parseBoolean(getProperty("zeppelin.spark.useCassandraContext")); + } + + private SQLContext loadCustomContext(final String contextName) { + Constructor hc; + SQLContext context; + try { + hc = getClass().getClassLoader().loadClass(contextName) + .getConstructor(SparkContext.class); + context = (SQLContext) hc.newInstance(getSparkContext()); + } catch (NoSuchMethodException | SecurityException + | ClassNotFoundException | InstantiationException + | IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + logger.warn("Can't create " + contextName + ". Fallback to SQLContext", e); + // when hive dependency is not loaded, it'll fail. + // in this case SQLContext can be used. + context = new SQLContext(getSparkContext()); + } + return context; + } + + public SQLContext getSQLContext() { + if (sqlc == null) { + if (useCassandraContext()) { + sqlc = loadCustomContext("org.apache.spark.sql.cassandra.CassandraSQLContext"); + logger.debug("Loading Cassandra SQL Context"); + } else if (useHiveContext()) { + sqlc = loadCustomContext("org.apache.spark.sql.hive.HiveContext"); + logger.debug("Loading Hive SQL Context"); + } else { + sqlc = new SQLContext(getSparkContext()); + logger.debug("Loading Standard SQL Context"); + } + } + return sqlc; + } + public SparkContext createSparkContext() { System.err.println("------ Create new SparkContext " + getProperty("master") + " -------"); @@ -240,12 +249,23 @@ public SparkContext createSparkContext() { } } - SparkConf conf = - new SparkConf() + SparkConf conf = new SparkConf() .setMaster(getProperty("master")) .setAppName(getProperty("spark.app.name")) .set("spark.repl.class.uri", classServerUri); + if (useCassandraContext()) { + conf.set("spark.cassandra.connection.host", getProperty("spark.cassandra.connection.host")); + + if (getProperty("spark.cassandra.auth.username") != null) { + conf.set("spark.cassandra.auth.username", getProperty("spark.cassandra.auth.username")); + } + + if (getProperty("spark.cassandra.auth.password") != null) { + conf.set("spark.cassandra.auth.password", getProperty("spark.cassandra.auth.password")); + } + } + if (jars.length > 0) { conf.setJars(jars); } From 4808830661228e071db0f04545545b8169ac7a12 Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Wed, 1 Jul 2015 12:09:07 -0700 Subject: [PATCH 2/8] No longer set spark.cassandra props explicitly as this is done already for all spark.* props Set default use of Cassandra Context to false --- .../apache/zeppelin/spark/SparkInterpreter.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 2de95f26b5d..53ec37fdd24 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -103,6 +103,8 @@ public class SparkInterpreter extends Interpreter { + "we should set this value") .add("zeppelin.spark.useHiveContext", "true", "Use HiveContext instead of SQLContext if it is true.") + .add("zeppelin.spark.useCassandraContext", "false", + "Use CassandraContext instead of SQLContext if it is true") .add("zeppelin.spark.maxResult", "1000", "Max number of SparkSQL result to display.") .add("args", "", "spark commandline args").build()); @@ -254,18 +256,6 @@ public SparkContext createSparkContext() { .setAppName(getProperty("spark.app.name")) .set("spark.repl.class.uri", classServerUri); - if (useCassandraContext()) { - conf.set("spark.cassandra.connection.host", getProperty("spark.cassandra.connection.host")); - - if (getProperty("spark.cassandra.auth.username") != null) { - conf.set("spark.cassandra.auth.username", getProperty("spark.cassandra.auth.username")); - } - - if (getProperty("spark.cassandra.auth.password") != null) { - conf.set("spark.cassandra.auth.password", getProperty("spark.cassandra.auth.password")); - } - } - if (jars.length > 0) { conf.setJars(jars); } From b400ee927ead473ec0958f0c4196dc89c33178e7 Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Thu, 9 Jul 2015 15:23:30 -0700 Subject: [PATCH 3/8] Adding integration tests for Cassandra SQL context Maven will now only run Cassandra tests when a Cassandra profile is enabled Add some default values for the CassandraContext Raise exception if Cassandra and Hive contexts are enabled at the same time --- spark/pom.xml | 46 ++++++++- .../zeppelin/spark/SparkInterpreter.java | 66 ++++++++----- spark/src/main/resources/cassandra.cql | 9 ++ .../log4j-embedded-cassandra.properties | 33 +++++++ .../CassandraSparkSqlInterpreterTest.java | 95 +++++++++++++++++++ 5 files changed, 221 insertions(+), 28 deletions(-) create mode 100644 spark/src/main/resources/cassandra.cql create mode 100644 spark/src/main/resources/log4j-embedded-cassandra.properties create mode 100644 spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java diff --git a/spark/pom.xml b/spark/pom.xml index 782670e256f..b5719b91359 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -38,6 +38,7 @@ 1.4.0 2.10.4 2.10 + **/CassandraSparkSqlInterpreterTest.java 2.3.0 ${hadoop.version} @@ -51,6 +52,7 @@ + cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ @@ -499,6 +501,26 @@ junit test + + + org.cassandraunit + cassandra-unit + 2.1.3.1 + test + + + + com.datastax.cassandra + * + + + + org.apache.cassandra + cassandra-all + + + @@ -519,7 +541,7 @@ com.datastax.spark spark-cassandra-connector_${scala.binary.version} - 1.1.1 + 1.1.2 org.joda @@ -547,8 +569,14 @@ cassandra-spark-1.2 1.2.1 + + + org.apache.cassandra + cassandra-all + 2.1.8 + com.datastax.spark spark-cassandra-connector_${scala.binary.version} @@ -579,15 +607,22 @@ cassandra-spark-1.3 1.3.0 + + + + org.apache.cassandra + cassandra-all + 2.1.8 + com.datastax.spark spark-cassandra-connector_${scala.binary.version} - - 1.3.0-SNAPSHOT + + 1.3.0-M2 org.joda @@ -830,6 +865,9 @@ maven-surefire-plugin 2.17 + + ${exclude.tests} + 1 false -Xmx1024m -XX:MaxPermSize=256m diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index ab3609ab422..97c19711a7f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -105,6 +105,8 @@ public class SparkInterpreter extends Interpreter { getSystemDefault("SPARK_YARN_JAR", "spark.yarn.jar", ""), "The location of the Spark jar file. If you use yarn as a cluster, " + "we should set this value") + .add("zeppelin.spark.useCassandraContext", "false", + "Use CassandraContext instead of SQLContext if it is true") .add("zeppelin.spark.useHiveContext", getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT", "zeppelin.spark.useHiveContext", "true"), @@ -167,30 +169,8 @@ private boolean useHiveContext() { return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); } - public SQLContext getSQLContext() { - if (sqlc == null) { - if (useHiveContext()) { - String name = "org.apache.spark.sql.hive.HiveContext"; - Constructor hc; - try { - hc = getClass().getClassLoader().loadClass(name) - .getConstructor(SparkContext.class); - sqlc = (SQLContext) hc.newInstance(getSparkContext()); - } catch (NoSuchMethodException | SecurityException - | ClassNotFoundException | InstantiationException - | IllegalAccessException | IllegalArgumentException - | InvocationTargetException e) { - logger.warn("Can't create HiveContext. Fallback to SQLContext", e); - // when hive dependency is not loaded, it'll fail. - // in this case SQLContext can be used. - sqlc = new SQLContext(getSparkContext()); - } - } else { - sqlc = new SQLContext(getSparkContext()); - } - } - - return sqlc; + private boolean useCassandraContext() { + return Boolean.parseBoolean(getProperty("zeppelin.spark.useCassandraContext")); } public DependencyResolver getDependencyResolver() { @@ -217,6 +197,44 @@ private DepInterpreter getDepInterpreter() { return null; } + private SQLContext loadCustomContext(final String contextName) { + Constructor hc; + SQLContext context; + try { + hc = getClass().getClassLoader().loadClass(contextName) + .getConstructor(SparkContext.class); + context = (SQLContext) hc.newInstance(getSparkContext()); + } catch (NoSuchMethodException | SecurityException + | ClassNotFoundException | InstantiationException + | IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + logger.warn("Can't create " + contextName + ". Fallback to SQLContext", e); + // when hive dependency is not loaded, it'll fail. + // in this case SQLContext can be used. + context = new SQLContext(getSparkContext()); + } + return context; + } + + public SQLContext getSQLContext() { + if (sqlc == null) { + if(useCassandraContext() && useHiveContext()) + throw new InterpreterException("Cassandra and Hive context are both enabled, please enable only one"); + + if (useCassandraContext()) { + sqlc = loadCustomContext("org.apache.spark.sql.cassandra.CassandraSQLContext"); + logger.debug("Loading Cassandra SQL Context"); + } else if (useHiveContext()) { + sqlc = loadCustomContext("org.apache.spark.sql.hive.HiveContext"); + logger.debug("Loading Hive SQL Context"); + } else { + sqlc = new SQLContext(getSparkContext()); + logger.debug("Loading Standard SQL Context"); + } + } + return sqlc; + } + public SparkContext createSparkContext() { System.err.println("------ Create new SparkContext " + getProperty("master") + " -------"); diff --git a/spark/src/main/resources/cassandra.cql b/spark/src/main/resources/cassandra.cql new file mode 100644 index 00000000000..7688682b30a --- /dev/null +++ b/spark/src/main/resources/cassandra.cql @@ -0,0 +1,9 @@ +CREATE TABLE test ( + name text, + age int, + PRIMARY KEY(name)); + +INSERT INTO test (name, age) values ('moon', 33); +INSERT INTO test (name, age) values ('jobs', 51); +INSERT INTO test (name, age) values ('gates', 51); +INSERT INTO test (name, age) values ('park', 34); \ No newline at end of file diff --git a/spark/src/main/resources/log4j-embedded-cassandra.properties b/spark/src/main/resources/log4j-embedded-cassandra.properties new file mode 100644 index 00000000000..b2195ad809e --- /dev/null +++ b/spark/src/main/resources/log4j-embedded-cassandra.properties @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# for production, you should probably set the root to INFO +# and the pattern to %c instead of %l. (%l is slower.) + +# output messages into a rolling log file as well as stdout +log4j.rootLogger=ERROR,stdout,HColumnFamilyLogger + +# stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{3} - %m%n +log4j.appender.stdout.follow=true + +log4j.appender.HColumnFamilyLogger=org.apache.log4j.ConsoleAppender +log4j.appender.HColumnFamilyLogger.layout=org.apache.log4j.PatternLayout +log4j.appender.HColumnFamilyLogger.layout.ConversionPattern=%m%n +log4j.category.HColumnFamilyLogger=DEBUG +#log4j.category.org.apache=INFO, stdout \ No newline at end of file diff --git a/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java new file mode 100644 index 00000000000..8ce10706fba --- /dev/null +++ b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.cassandraunit.CassandraCQLUnit; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class CassandraSparkSqlInterpreterTest { + + private SparkSqlInterpreter sql; + private SparkInterpreter repl; + private InterpreterContext context; + private InterpreterGroup intpGroup; + + @Rule + public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("cassandra.cql","sparkkeyspace")); + + + @Before + public void setUp() throws Exception { + Properties p = new Properties(); + p.setProperty("zeppelin.spark.useCassandraContext", "true"); + p.setProperty("spark.cassandra.connection.host", "127.0.0.1"); + p.setProperty("spark.cassandra.connection.port", "9142"); + + if (repl == null) { + + if (SparkInterpreterTest.repl == null) { + repl = new SparkInterpreter(p); + repl.open(); + SparkInterpreterTest.repl = repl; + } else { + repl = SparkInterpreterTest.repl; + } + + sql = new SparkSqlInterpreter(p); + + intpGroup = new InterpreterGroup(); + intpGroup.add(repl); + intpGroup.add(sql); + sql.setInterpreterGroup(intpGroup); + sql.open(); + } + context = new InterpreterContext("id", "title", "text", new HashMap(), new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList()); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void test() { + InterpreterResult ret = sql.interpret("select name, age from sparkkeyspace.test where age < 40", context); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(Type.TABLE, ret.type()); + assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message()); + + assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select * FROM sparkkeyspace.test as t1 INNER JOIN sparkkeyspace.test as t2 on t1.name = t2.name", context).code()); + } +} From 7fef967dd5bfc58b4063145886a8100940290bcc Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Fri, 17 Jul 2015 11:06:57 -0700 Subject: [PATCH 4/8] Minor style changes --- .../java/org/apache/zeppelin/spark/SparkInterpreter.java | 5 +++-- spark/src/main/resources/log4j-embedded-cassandra.properties | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 97c19711a7f..eb794c3c6f7 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -218,8 +218,9 @@ private SQLContext loadCustomContext(final String contextName) { public SQLContext getSQLContext() { if (sqlc == null) { - if(useCassandraContext() && useHiveContext()) - throw new InterpreterException("Cassandra and Hive context are both enabled, please enable only one"); + if (useCassandraContext() && useHiveContext()) + throw new InterpreterException("Cassandra and Hive context are both enabled, " + + "please enable only one"); if (useCassandraContext()) { sqlc = loadCustomContext("org.apache.spark.sql.cassandra.CassandraSQLContext"); diff --git a/spark/src/main/resources/log4j-embedded-cassandra.properties b/spark/src/main/resources/log4j-embedded-cassandra.properties index b2195ad809e..df58384071a 100644 --- a/spark/src/main/resources/log4j-embedded-cassandra.properties +++ b/spark/src/main/resources/log4j-embedded-cassandra.properties @@ -30,4 +30,4 @@ log4j.appender.HColumnFamilyLogger=org.apache.log4j.ConsoleAppender log4j.appender.HColumnFamilyLogger.layout=org.apache.log4j.PatternLayout log4j.appender.HColumnFamilyLogger.layout.ConversionPattern=%m%n log4j.category.HColumnFamilyLogger=DEBUG -#log4j.category.org.apache=INFO, stdout \ No newline at end of file +#log4j.category.org.apache=INFO, stdout From 3f39f93f6052d5c540d9585fb6563d5ceb9c7f62 Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Fri, 17 Jul 2015 11:38:33 -0700 Subject: [PATCH 5/8] Changed SqarkInterpreter to only use the Hive context when explicitly set, plain old spark should be the default with an opt-in to hive/cassandra/etc ... Change SparkSqlInterpreterTest to use Hive context as the tests use Hive specific syntax. Note: This changes the default SparkInterpreter behaviour and those upgrading will need to change their configuration. --- .../main/java/org/apache/zeppelin/spark/SparkInterpreter.java | 2 +- .../java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index eb794c3c6f7..8d8c7af6d55 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -109,7 +109,7 @@ public class SparkInterpreter extends Interpreter { "Use CassandraContext instead of SQLContext if it is true") .add("zeppelin.spark.useHiveContext", getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT", - "zeppelin.spark.useHiveContext", "true"), + "zeppelin.spark.useHiveContext", "false"), "Use HiveContext instead of SQLContext if it is true.") .add("zeppelin.spark.maxResult", getSystemDefault("ZEPPELIN_SPARK_MAXRESULT", "zeppelin.spark.maxResult", "1000"), diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index eaa0a8af693..13ad6f571f2 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -44,6 +44,7 @@ public class SparkSqlInterpreterTest { @Before public void setUp() throws Exception { Properties p = new Properties(); + p.setProperty("zeppelin.spark.useHiveContext", "true"); if (repl == null) { From 4c8e638b9c5ed8b6c4e97ec06a7989780a519f7b Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Fri, 17 Jul 2015 15:24:47 -0700 Subject: [PATCH 6/8] Added CI test for Cassandra and Spark 1.3 Allow Cassandra-spark-1.1 profile to run Cassandra tests Apache license for cassandra.cql file Add extra params to InterpreterContext Removed wrong syntax test --- .travis.yml | 7 +++++++ spark/pom.xml | 1 + spark/src/main/resources/cassandra.cql | 17 +++++++++++++++++ .../spark/CassandraSparkSqlInterpreterTest.java | 3 +-- 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index ac80e8a6959..24a55c583ac 100644 --- a/.travis.yml +++ b/.travis.yml @@ -51,6 +51,13 @@ script: - ./testing/startSparkCluster.sh 1.1.1 2.3 - SPARK_HOME=./spark-1.1.1-bin-hadoop2.3 mvn verify -Pspark-1.1 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' - ./testing/stopSparkCluster.sh 1.1.1 2.3 + +# CassandraSpark 1.3 + - mvn clean package -DskipTests -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B -pl 'zeppelin-interpreter,spark' + - mvn package -Pbuild-distr -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B + - ./testing/startSparkCluster.sh 1.3.1 2.3 + - SPARK_HOME=./spark-1.3.1-bin-hadoop2.3 mvn verify -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B -pl 'zeppelin-interpreter,spark' + - ./testing/stopSparkCluster.sh 1.3.1 2.3 after_failure: - cat target/rat.txt diff --git a/spark/pom.xml b/spark/pom.xml index fb4abbcbb4e..cf9e8e84de6 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -548,6 +548,7 @@ 1.1.1 2.2.3-shaded-protobuf + diff --git a/spark/src/main/resources/cassandra.cql b/spark/src/main/resources/cassandra.cql index 7688682b30a..ab79cc2069f 100644 --- a/spark/src/main/resources/cassandra.cql +++ b/spark/src/main/resources/cassandra.cql @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + CREATE TABLE test ( name text, age int, diff --git a/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java index 8ce10706fba..1f1c64d19a2 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java @@ -73,7 +73,7 @@ public void setUp() throws Exception { sql.setInterpreterGroup(intpGroup); sql.open(); } - context = new InterpreterContext("id", "title", "text", new HashMap(), new GUI(), + context = new InterpreterContext("note", "id", "title", "text", new HashMap(), new GUI(), new AngularObjectRegistry(intpGroup.getId(), null), new LinkedList()); } @@ -89,7 +89,6 @@ public void test() { assertEquals(Type.TABLE, ret.type()); assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message()); - assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code()); assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select * FROM sparkkeyspace.test as t1 INNER JOIN sparkkeyspace.test as t2 on t1.name = t2.name", context).code()); } } From c4534266a79c1d02346162daac2db587e54d93ac Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Wed, 22 Jul 2015 14:57:53 -0700 Subject: [PATCH 7/8] Reverted change to default spark context. Now uses HiveContext as per spark-shell --- .../main/java/org/apache/zeppelin/spark/SparkInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index d2eb906c8a9..030f28dd29a 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -106,7 +106,7 @@ public class SparkInterpreter extends Interpreter { "Use CassandraContext instead of SQLContext if it is true") .add("zeppelin.spark.useHiveContext", getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT", - "zeppelin.spark.useHiveContext", "false"), + "zeppelin.spark.useHiveContext", "true"), "Use HiveContext instead of SQLContext if it is true.") .add("zeppelin.spark.maxResult", getSystemDefault("ZEPPELIN_SPARK_MAXRESULT", "zeppelin.spark.maxResult", "1000"), From 056f6dde776c62fe00c7325cc289a1baf9a69114 Mon Sep 17 00:00:00 2001 From: Ben Bromhead Date: Wed, 22 Jul 2015 16:06:43 -0700 Subject: [PATCH 8/8] turn off hive for Cassandra test --- .../apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java index 1f1c64d19a2..1d0519628f3 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/CassandraSparkSqlInterpreterTest.java @@ -52,6 +52,7 @@ public class CassandraSparkSqlInterpreterTest { public void setUp() throws Exception { Properties p = new Properties(); p.setProperty("zeppelin.spark.useCassandraContext", "true"); + p.setProperty("zeppelin.spark.useHiveContext", "false"); p.setProperty("spark.cassandra.connection.host", "127.0.0.1"); p.setProperty("spark.cassandra.connection.port", "9142");