Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
<artifactId>zeppelin-cassandra</artifactId>
<packaging>jar</packaging>
<version>0.6.0-SNAPSHOT</version>
<name>Zeppelin: Cassandra</name>
<name>Zeppelin: Apache Cassandra interpreter</name>
<description>Zeppelin cassandra support</description>
<url>http://zeppelin.apache.org</url>

<properties>
<cassandra.driver.version>3.0.0-rc1</cassandra.driver.version>
<cassandra.driver.version>3.0.1</cassandra.driver.version>
<snappy.version>1.0.5.4</snappy.version>
<lz4.version>1.3.0</lz4.version>
<scala.version>2.10.4</scala.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class CassandraInterpreter extends Interpreter {
public static final String DEFAULT_PORT = "9042";
public static final String DEFAULT_CLUSTER = "Test Cluster";
public static final String DEFAULT_KEYSPACE = "system";
public static final String DEFAULT_PROTOCOL_VERSION = "3";
public static final String DEFAULT_PROTOCOL_VERSION = "4";
public static final String DEFAULT_COMPRESSION = "NONE";
public static final String DEFAULT_CREDENTIAL = "none";
public static final String DEFAULT_POLICY = "DEFAULT";
Expand Down Expand Up @@ -159,7 +159,7 @@ public CassandraInterpreter(Properties properties) {
"IP address). Default = localhost. Ex: '192.168.0.12,node2,node3'")
.add(CASSANDRA_PORT, DEFAULT_PORT, "Cassandra native port. Default = 9042")
.add(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION,
"Cassandra protocol version. Default = 3")
"Cassandra protocol version. Default = 4")
.add(CASSANDRA_CLUSTER_NAME, DEFAULT_CLUSTER, "Cassandra cluster name. " +
"Default = 'Test Cluster'")
.add(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE, "Cassandra keyspace name. " +
Expand Down Expand Up @@ -311,7 +311,7 @@ public void cancel(InterpreterContext context) {

@Override
public FormType getFormType() {
return FormType.NATIVE;
return FormType.SIMPLE;
}

@Override
Expand Down
126 changes: 89 additions & 37 deletions cassandra/src/main/resources/scalate/helpMenu.ssp

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.DriverException
import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies}
import org.apache.zeppelin.cassandra.TextBlockHierarchy._
import org.apache.zeppelin.display.AngularObjectRegistry
import org.apache.zeppelin.display.Input.ParamOption
import org.apache.zeppelin.interpreter.InterpreterResult.Code
import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext}
Expand All @@ -41,17 +42,20 @@ import scala.collection.mutable.ArrayBuffer

/**
* Value object to store runtime query parameters
* @param consistency consistency level
*
* @param consistency consistency level
* @param serialConsistency serial consistency level
* @param timestamp timestamp
* @param retryPolicy retry policy
* @param fetchSize query fetch size
* @param requestTimeOut request time out in millisecs
*/
case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
serialConsistency:Option[ConsistencyLevel],
timestamp: Option[Long],
retryPolicy: Option[RetryPolicy],
fetchSize: Option[Int])
fetchSize: Option[Int],
requestTimeOut: Option[Int])

/**
* Singleton object to store constants
Expand All @@ -71,7 +75,7 @@ object InterpreterLogic {
val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE
val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy)
val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy)
val loggingFallThrougRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)
val loggingFallThroughRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)

val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala

Expand Down Expand Up @@ -273,7 +277,13 @@ class InterpreterLogic(val session: Session) {
.flatMap(x => Option(x.value))
.headOption

CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize)
val requestTimeOut: Option[Int] = parameters
.filter(_.paramType == RequestTimeOutParam)
.map(_.getParam[RequestTimeOut])
.flatMap(x => Option(x.value))
.headOption

CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize, requestTimeOut)
}

def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = {
Expand Down Expand Up @@ -305,19 +315,38 @@ class InterpreterLogic(val session: Session) {

def maybeExtractVariables(statement: String, context: InterpreterContext): String = {

def findInAngularRepository(variable: String): Option[AnyRef] = {
val registry = context.getAngularObjectRegistry
val noteId = context.getNoteId
val paragraphId = context.getParagraphId
val paragraphScoped: Option[AnyRef] = Option(registry.get(variable, noteId, paragraphId)).map[AnyRef](_.get())

paragraphScoped
}

def extractVariableAndDefaultValue(statement: String, exp: String):String = {
exp match {
case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable,choices) => {
case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable, choices) => {
val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
val paramOptions= listChoices.map(choice => new ParamOption(choice, choice))
val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray)
statement.replaceAll(escapedExp,selected.toString)
findInAngularRepository(variable) match {
case Some(value) => statement.replaceAll(escapedExp,value.toString)
case None => {
val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
val paramOptions= listChoices.map(choice => new ParamOption(choice, choice))
val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray)
statement.replaceAll(escapedExp,selected.toString)
}
}
}
case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => {
val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
val value = context.getGui.input(variable,defaultVal)
statement.replaceAll(escapedExp,value.toString)
findInAngularRepository(variable) match {
case Some(value) => statement.replaceAll(escapedExp,value.toString)
case None => {
val value = context.getGui.input(variable,defaultVal)
statement.replaceAll(escapedExp,value.toString)
}
}
}
case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
}
Expand All @@ -336,10 +365,11 @@ class InterpreterLogic(val session: Session) {
case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy)
case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy)
case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy)
case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThrougRetryPolicy)
case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThroughRetryPolicy)
case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""")
}
options.fetchSize.foreach(statement.setFetchSize(_))
options.requestTimeOut.foreach(statement.setReadTimeoutMillis(_))
}

private def createBoundStatement(codecRegistry: CodecRegistry, name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ class JavaDriverConfig {
DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
return ProtocolVersion.V3
case "4" =>
DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
return ProtocolVersion.V4
case _ =>
DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object ParagraphParser {
LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY)
.mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r
val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
val REQUEST_TIMEOUT_PATTERN = """^\s*@requestTimeOut\s*=\s*([0-9]+)\s*$""".r

val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r
val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
Expand All @@ -69,7 +70,7 @@ object ParagraphParser {
val UDF_PATTERN = """(?is)\s*(CREATE(?:\s+OR REPLACE)?\s+FUNCTION(?:\s+IF\s+NOT\s+EXISTS)?.+?(?:\s+|\n|\r|\f)AS(?:\s+|\n|\r|\f)(?:'|\$\$).+?(?:'|\$\$)\s*;)""".r

val GENERIC_STATEMENT_PREFIX =
"""(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|UPDATE|
"""(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|ALTER|
|DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r

val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"
Expand Down Expand Up @@ -146,6 +147,7 @@ class ParagraphParser extends RegexParsers{
def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)}
def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)}
def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)}
def requestTimeOut: Parser[RequestTimeOut] = """\s*@requestTimeOut.+""".r ^^ {case x => extractRequestTimeOut(x.trim)}

//Statements
def createFunctionStatement: Parser[SimpleStm] = UDF_PATTERN ^^{case x => extractUdfStatement(x.trim)}
Expand Down Expand Up @@ -188,7 +190,7 @@ class ParagraphParser extends RegexParsers{
case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)}

def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency |
timestamp | retryPolicy | fetchSize | removePrepare | prepare | bind | batch | describeCluster |
timestamp | retryPolicy | fetchSize | requestTimeOut | removePrepare | prepare | bind | batch | describeCluster |
describeKeyspace | describeKeyspaces |
describeTable | describeTables |
describeType | describeTypes |
Expand Down Expand Up @@ -244,6 +246,14 @@ class ParagraphParser extends RegexParsers{
}
}

def extractRequestTimeOut(text: String): RequestTimeOut = {
text match {
case REQUEST_TIMEOUT_PATTERN(requestTimeOut) => RequestTimeOut(requestTimeOut.trim.toInt)
case _ => throw new InterpreterException(s"Invalid syntax for @requestTimeOut. " +
s"It should comply to the pattern ${REQUEST_TIMEOUT_PATTERN.toString}")
}
}

def extractSimpleStatement(text: String): SimpleStm = {
text match {
case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object TextBlockHierarchy {
object TimestampParam extends ParameterType
object RetryPolicyParam extends ParameterType
object FetchSizeParam extends ParameterType
object RequestTimeOutParam extends ParameterType


abstract class QueryParameters(val paramType: ParameterType) extends AnyBlock(ParameterBlock) {
Expand All @@ -60,6 +61,8 @@ object TextBlockHierarchy {

case class FetchSize(value: Int) extends QueryParameters(FetchSizeParam)

case class RequestTimeOut(value: Int) extends QueryParameters(RequestTimeOutParam)

abstract class RetryPolicy extends QueryParameters(RetryPolicyParam)

object DefaultRetryPolicy extends RetryPolicy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;

import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
Expand All @@ -45,7 +46,6 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -63,7 +63,7 @@ public class CassandraInterpreterTest {
.withScript("prepare_data.cql")
.withProtocolVersion(ProtocolVersion.V3)
.buildNativeSessionOnly();
// public static Session session = null;

private static CassandraInterpreter interpreter;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
Expand All @@ -73,7 +73,7 @@ public class CassandraInterpreterTest {
public static void setUp() {
Properties properties = new Properties();
final Cluster cluster = session.getCluster();
// final Cluster cluster = null;

properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
Expand Down Expand Up @@ -289,6 +289,19 @@ public void should_execute_statement_with_retry_policy() throws Exception {
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
}

@Test
public void should_execute_statement_with_request_timeout() throws Exception {
//Given
String statement = "@requestTimeOut=10000000\n" +
"SELECT * FROM zeppelin.artists;";

//When
final InterpreterResult actual = interpreter.interpret(statement, intrContext);

//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
}

@Test
public void should_execute_prepared_and_bound_statements() throws Exception {
//Given
Expand Down Expand Up @@ -354,6 +367,8 @@ public void should_exception_when_executing_unknown_bound_statement() throws Exc
@Test
public void should_extract_variable_from_statement() throws Exception {
//Given
AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getGui().input("login", "hsue")).thenReturn("hsue");
when(intrContext.getGui().input("age", "27")).thenReturn("27");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;

import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand Down Expand Up @@ -101,6 +103,8 @@ public void should_exception_while_parsing_input() throws Exception {
@Test
public void should_extract_variable_and_default_value() throws Exception {
//Given
AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getGui().input("table", "zeppelin.demo")).thenReturn("zeppelin.demo");
when(intrContext.getGui().input("id", "'John'")).thenReturn("'John'");

Expand All @@ -114,6 +118,8 @@ public void should_extract_variable_and_default_value() throws Exception {
@Test
public void should_extract_variable_and_choices() throws Exception {
//Given
AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture())).thenReturn("'Jack'");

//When
Expand Down Expand Up @@ -141,6 +147,23 @@ public void should_extract_no_variable() throws Exception {
assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo");
}

@Test
public void should_extract_variable_from_angular_object_registry() throws Exception {
//Given
AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
angularObjectRegistry.add("id", "from_angular_registry", "noteId", "paragraphId");
when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getNoteId()).thenReturn("noteId");
when(intrContext.getParagraphId()).thenReturn("paragraphId");

//When
final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo WHERE id='{{id=John}}'", intrContext);

//Then
assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='from_angular_registry'");
verify(intrContext, never()).getGui();
}

@Test
public void should_error_if_incorrect_variable_definition() throws Exception {
//Given
Expand Down Expand Up @@ -203,6 +226,18 @@ public void should_extract_retry_policy_option() throws Exception {
assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$);
}

@Test
public void should_extract_request_timeout_option() throws Exception {
//Given
List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));

//When
final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));

//Then
assertThat(actual.requestTimeOut().get()).isEqualTo(100);
}

@Test
public void should_generate_simple_statement() throws Exception {
//Given
Expand All @@ -211,6 +246,7 @@ public void should_generate_simple_statement() throws Exception {
Option.<ConsistencyLevel>empty(),
Option.empty(),
Option.<RetryPolicy>empty(),
Option.empty(),
Option.empty());

//When
Expand All @@ -232,6 +268,7 @@ public void should_generate_batch_statement() throws Exception {
Option.<ConsistencyLevel>empty(),
Option.empty(),
Option.<RetryPolicy>empty(),
Option.empty(),
Option.empty());

//When
Expand Down
2 changes: 1 addition & 1 deletion cassandra/src/test/resources/scalate/Help.html

Large diffs are not rendered by default.

Loading