Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ script:
- ./testing/startSparkCluster.sh 1.1.1 2.3
- SPARK_HOME=./spark-1.1.1-bin-hadoop2.3 mvn verify -Pspark-1.1 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark'
- ./testing/stopSparkCluster.sh 1.1.1 2.3

# CassandraSpark 1.3
- mvn clean package -DskipTests -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B -pl 'zeppelin-interpreter,spark'
- mvn package -Pbuild-distr -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B
- ./testing/startSparkCluster.sh 1.3.1 2.3
- SPARK_HOME=./spark-1.3.1-bin-hadoop2.3 mvn verify -Pspark-1.3 -Phadoop-2.3 -Pcassandra-spark-1.3 -B -pl 'zeppelin-interpreter,spark'
- ./testing/stopSparkCluster.sh 1.3.1 2.3

after_failure:
- cat target/rat.txt
Expand Down
47 changes: 43 additions & 4 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<spark.version>1.4.0</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<exclude.tests>**/CassandraSparkSqlInterpreterTest.java</exclude.tests>

<hadoop.version>2.3.0</hadoop.version>
<yarn.version>${hadoop.version}</yarn.version>
Expand All @@ -53,6 +54,7 @@
</properties>

<repositories>

<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
Expand Down Expand Up @@ -494,6 +496,26 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>2.1.3.1</version>
<scope>test</scope>
<exclusions>
<!--Datastax driver is included in the spark connector dependencies-->
<exclusion>
<groupId>com.datastax.cassandra</groupId>
<artifactId>*</artifactId>
</exclusion>
<!--Exclude the default Cassandra version included in Cassandra unit
and set it explicitly above to get the most recent version-->
<exclusion>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<profiles>
Expand All @@ -514,7 +536,7 @@
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
<version>1.1.1</version>
<version>1.1.2</version>
<exclusions>
<exclusion>
<groupId>org.joda</groupId>
Expand All @@ -526,6 +548,7 @@
<properties>
<spark.version>1.1.1</spark.version>
<akka.version>2.2.3-shaded-protobuf</akka.version>
<exclude.tests></exclude.tests>
</properties>
</profile>

Expand All @@ -542,8 +565,14 @@
<id>cassandra-spark-1.2</id>
<properties>
<spark.version>1.2.1</spark.version>
<exclude.tests></exclude.tests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.1.8</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -574,15 +603,22 @@
<id>cassandra-spark-1.3</id>
<properties>
<spark.version>1.3.0</spark.version>
<!--Override exclude so that c* tests are run-->
<exclude.tests></exclude.tests>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.1.8</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
<!--You need to build your own version of Spark Cassandra connector 1.3.0-SNAPSHOT
because it is not yet released-->
<version>1.3.0-SNAPSHOT</version>
<!--This is currently a milestone release of the connector, will need updating when a GA
release becomes available-->
<version>1.3.0-M2</version>
<exclusions>
<exclusion>
<groupId>org.joda</groupId>
Expand Down Expand Up @@ -893,6 +929,9 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<excludes>
<exclude>${exclude.tests}</exclude>
</excludes>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
Expand Down
67 changes: 43 additions & 24 deletions spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public class SparkInterpreter extends Interpreter {
getSystemDefault("SPARK_YARN_JAR", "spark.yarn.jar", ""),
"The location of the Spark jar file. If you use yarn as a cluster, "
+ "we should set this value")
.add("zeppelin.spark.useCassandraContext", "false",
"Use CassandraContext instead of SQLContext if it is true")
.add("zeppelin.spark.useHiveContext",
getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT",
"zeppelin.spark.useHiveContext", "true"),
Expand Down Expand Up @@ -164,30 +166,8 @@ private boolean useHiveContext() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
}

public SQLContext getSQLContext() {
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = new SQLContext(getSparkContext());
}
}

return sqlc;
private boolean useCassandraContext() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.useCassandraContext"));
}

public DependencyResolver getDependencyResolver() {
Expand All @@ -214,6 +194,45 @@ private DepInterpreter getDepInterpreter() {
return null;
}

private SQLContext loadCustomContext(final String contextName) {
Constructor<?> hc;
SQLContext context;
try {
hc = getClass().getClassLoader().loadClass(contextName)
.getConstructor(SparkContext.class);
context = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create " + contextName + ". Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
context = new SQLContext(getSparkContext());
}
return context;
}

public SQLContext getSQLContext() {
if (sqlc == null) {
if (useCassandraContext() && useHiveContext())
throw new InterpreterException("Cassandra and Hive context are both enabled, " +
"please enable only one");

if (useCassandraContext()) {
sqlc = loadCustomContext("org.apache.spark.sql.cassandra.CassandraSQLContext");
logger.debug("Loading Cassandra SQL Context");
} else if (useHiveContext()) {
sqlc = loadCustomContext("org.apache.spark.sql.hive.HiveContext");
logger.debug("Loading Hive SQL Context");
} else {
sqlc = new SQLContext(getSparkContext());
logger.debug("Loading Standard SQL Context");
}
}
return sqlc;
}

public SparkContext createSparkContext() {
System.err.println("------ Create new SparkContext " + getProperty("master") + " -------");

Expand Down
26 changes: 26 additions & 0 deletions spark/src/main/resources/cassandra.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TABLE test (
name text,
age int,
PRIMARY KEY(name));

INSERT INTO test (name, age) values ('moon', 33);
INSERT INTO test (name, age) values ('jobs', 51);
INSERT INTO test (name, age) values ('gates', 51);
INSERT INTO test (name, age) values ('park', 34);
33 changes: 33 additions & 0 deletions spark/src/main/resources/log4j-embedded-cassandra.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# for production, you should probably set the root to INFO
# and the pattern to %c instead of %l. (%l is slower.)

# output messages into a rolling log file as well as stdout
log4j.rootLogger=ERROR,stdout,HColumnFamilyLogger

# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{3} - %m%n
log4j.appender.stdout.follow=true

log4j.appender.HColumnFamilyLogger=org.apache.log4j.ConsoleAppender
log4j.appender.HColumnFamilyLogger.layout=org.apache.log4j.PatternLayout
log4j.appender.HColumnFamilyLogger.layout.ConversionPattern=%m%n
log4j.category.HColumnFamilyLogger=DEBUG
#log4j.category.org.apache=INFO, stdout
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.spark;

import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.cassandraunit.CassandraCQLUnit;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;

import static org.junit.Assert.assertEquals;

public class CassandraSparkSqlInterpreterTest {

private SparkSqlInterpreter sql;
private SparkInterpreter repl;
private InterpreterContext context;
private InterpreterGroup intpGroup;

@Rule
public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("cassandra.cql","sparkkeyspace"));


@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.spark.useCassandraContext", "true");
p.setProperty("zeppelin.spark.useHiveContext", "false");
p.setProperty("spark.cassandra.connection.host", "127.0.0.1");
p.setProperty("spark.cassandra.connection.port", "9142");

if (repl == null) {

if (SparkInterpreterTest.repl == null) {
repl = new SparkInterpreter(p);
repl.open();
SparkInterpreterTest.repl = repl;
} else {
repl = SparkInterpreterTest.repl;
}

sql = new SparkSqlInterpreter(p);

intpGroup = new InterpreterGroup();
intpGroup.add(repl);
intpGroup.add(sql);
sql.setInterpreterGroup(intpGroup);
sql.open();
}
context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
}

@After
public void tearDown() throws Exception {
}

@Test
public void test() {
InterpreterResult ret = sql.interpret("select name, age from sparkkeyspace.test where age < 40", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(Type.TABLE, ret.type());
assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());

assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select * FROM sparkkeyspace.test as t1 INNER JOIN sparkkeyspace.test as t2 on t1.name = t2.name", context).code());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class SparkSqlInterpreterTest {
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.spark.useHiveContext", "true");

if (repl == null) {

Expand Down