diff --git a/bin/interpreter.sh b/bin/interpreter.sh index a81c8f21067..4fb4b269265 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -149,6 +149,28 @@ elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then else echo "HBASE_HOME and HBASE_CONF_DIR are not set, configuration might not be loaded" fi +elif [[ "${INTERPRETER_ID}" == "pig" ]]; then + # autodetect HADOOP_CONF_HOME by heuristic + if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then + if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then + export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" + elif [[ -d "/etc/hadoop/conf" ]]; then + export HADOOP_CONF_DIR="/etc/hadoop/conf" + fi + fi + + if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}" + fi + + # autodetect TEZ_CONF_DIR + if [[ -n "${TEZ_CONF_DIR}" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}" + elif [[ -d "/etc/tez/conf" ]]; then + ZEPPELIN_INTP_CLASSPATH+=":/etc/tez/conf" + else + echo "TEZ_CONF_DIR is not set, configuration might not be loaded" + fi fi addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}" diff --git a/conf/interpreter-list b/conf/interpreter-list index 098b3c6c188..38cb386d8cd 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -32,6 +32,7 @@ kylin org.apache.zeppelin:zeppelin-kylin:0.6.1 Kylin in lens org.apache.zeppelin:zeppelin-lens:0.6.1 Lens interpreter livy org.apache.zeppelin:zeppelin-livy:0.6.1 Livy interpreter md org.apache.zeppelin:zeppelin-markdown:0.6.1 Markdown support +pig org.apache.zeppelin:zeppelin-pig:0.6.1 Pig interpreter postgresql org.apache.zeppelin:zeppelin-postgresql:0.6.1 Postgresql interpreter python org.apache.zeppelin:zeppelin-python:0.6.1 Python interpreter shell org.apache.zeppelin:zeppelin-shell:0.6.1 Shell command diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 05bd7195277..c4b369c301c 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -190,7 +190,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index a0e4485db88..9abcdb177d1 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -62,6 +62,7 @@
  • Lens
  • Livy
  • Markdown
  • +
  • Pig
  • Python
  • Postgresql, HAWQ
  • R
  • diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md new file mode 100644 index 00000000000..227656bba78 --- /dev/null +++ b/docs/interpreter/pig.md @@ -0,0 +1,97 @@ +--- +layout: page +title: "Pig Interpreter for Apache Zeppelin" +description: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs." +group: manual +--- +{% include JB/setup %} + + +# Pig Interpreter for Apache Zeppelin + +
    + +## Overview +[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets. + +## Supported interpreter type + - `%pig.script` (default) + + All the pig script can run in this type of interpreter, and display type is plain text. + + - `%pig.query` + + Almost the same as `%pig.script`. The only difference is that you don't need to add alias in the last statement. And the display type is table. + +## Supported runtime mode + - Local + - MapReduce + - Tez (Only Tez 0.7 is supported) + +## How to use + +### How to setup Pig + +- Local Mode + + Nothing needs to be done for local mode + +- MapReduce Mode + + HADOOP\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. + +- Tez Mode + + HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. + +### How to configure interpreter + +At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default. + + + + + + + + + + + + + + + + + + + + + + +
    PropertyDefaultDescription
    zeppelin.pig.execTypemapreduceExecution mode for pig runtime. local | mapreduce | tez
    zeppelin.pig.includeJobStatsfalsewhether display jobStats info in %pig.script
    zeppelin.pig.maxResult1000max row number displayed in %pig.query
    + +### Example + +##### pig + +``` +%pig + +raw_data = load 'dataset/sf_crime/train.csv' using PigStorage(',') as (Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y); +b = group raw_data all; +c = foreach b generate COUNT($1); +dump c; +``` + +##### pig.query + +``` +%pig.query + +b = foreach raw_data generate Category; +c = group b by Category; +foreach c generate group as category, COUNT($1) as count; +``` + +Data is shared between `%pig` and `%pig.query`, so that you can do some common work in `%pig`, and do different kinds of query based on the data of `%pig`. diff --git a/pig/pom.xml b/pig/pom.xml new file mode 100644 index 00000000000..a4e5cbb38e1 --- /dev/null +++ b/pig/pom.xml @@ -0,0 +1,184 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.7.0-SNAPSHOT + + + org.apache.zeppelin + zeppelin-pig + jar + 0.7.0-SNAPSHOT + Zeppelin: Apache Pig Interpreter + Zeppelin interpreter for Apache Pig + http://zeppelin.apache.org + + + 0.16.0 + 2.6.0 + 0.7.0 + + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.pig + pig + h2 + ${pig.version} + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + + org.apache.tez + tez-api + ${tez.version} + + + + org.apache.tez + tez-common + ${tez.version} + + + + org.apache.tez + tez-dag + ${tez.version} + + + + org.apache.tez + tez-runtime-library + ${tez.version} + + + + org.apache.tez + tez-runtime-internals + ${tez.version} + + + + org.apache.tez + tez-mapreduce + ${tez.version} + + + + org.apache.tez + tez-yarn-timeline-history-with-acls + ${tez.version} + + + + junit + junit + test + + + + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/pig + + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/pig + + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + diff --git a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java new file mode 100644 index 00000000000..0aa8a20f7ce --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.PigServer; +import org.apache.pig.backend.BackendException; +import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + */ +public abstract class BasePigInterpreter extends Interpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(BasePigInterpreter.class); + + protected ConcurrentHashMap listenerMap = new ConcurrentHashMap<>(); + + public BasePigInterpreter(Properties property) { + super(property); + } + + @Override + public void cancel(InterpreterContext context) { + LOGGER.info("Cancel paragraph:" + context.getParagraphId()); + PigScriptListener listener = listenerMap.get(context.getParagraphId()); + if (listener != null) { + Set jobIds = listener.getJobIds(); + if (jobIds.isEmpty()) { + LOGGER.info("No job is started, so can not cancel paragraph:" + context.getParagraphId()); + } + for (String jobId : jobIds) { + LOGGER.info("Kill jobId:" + jobId); + HExecutionEngine engine = + (HExecutionEngine) getPigServer().getPigContext().getExecutionEngine(); + try { + Field launcherField = HExecutionEngine.class.getDeclaredField("launcher"); + launcherField.setAccessible(true); + Launcher launcher = (Launcher) launcherField.get(engine); + // It doesn't work for Tez Engine due to PIG-5035 + launcher.killJob(jobId, new Configuration()); + } catch (NoSuchFieldException | BackendException | IllegalAccessException e) { + LOGGER.error("Fail to cancel paragraph:" + context.getParagraphId(), e); + } + } + } else { + LOGGER.warn("No PigScriptListener found, can not cancel paragraph:" + + context.getParagraphId()); + } + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + PigScriptListener listener = listenerMap.get(context.getParagraphId()); + if (listener != null) { + return listener.getProgress(); + } + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + PigInterpreter.class.getName() + this.hashCode()); + } + + public abstract PigServer getPigServer(); +} diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java new file mode 100644 index 00000000000..8cd1efc929e --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pig.PigServer; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.tools.pigstats.*; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.*; + +/** + * Pig interpreter for Zeppelin. + */ +public class PigInterpreter extends BasePigInterpreter { + private static Logger LOGGER = LoggerFactory.getLogger(PigInterpreter.class); + + private PigServer pigServer; + private boolean includeJobStats = false; + + public PigInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + String execType = getProperty("zeppelin.pig.execType"); + if (execType == null) { + execType = "mapreduce"; + } + String includeJobStats = getProperty("zeppelin.pig.includeJobStats"); + if (includeJobStats != null) { + this.includeJobStats = Boolean.parseBoolean(includeJobStats); + } + try { + pigServer = new PigServer(execType); + } catch (IOException e) { + LOGGER.error("Fail to initialize PigServer", e); + throw new RuntimeException("Fail to initialize PigServer", e); + } + } + + @Override + public void close() { + pigServer = null; + } + + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + // remember the origial stdout, because we will redirect stdout to capture + // the pig dump output. + PrintStream originalStdOut = System.out; + ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream(); + File tmpFile = null; + try { + tmpFile = PigUtils.createTempPigScript(cmd); + System.setOut(new PrintStream(bytesOutput)); + // each thread should its own ScriptState & PigStats + ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState()); + // reset PigStats, otherwise you may get the PigStats of last job in the same thread + // because PigStats is ThreadLocal variable + PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats()); + PigScriptListener scriptListener = new PigScriptListener(); + ScriptState.get().registerListener(scriptListener); + listenerMap.put(contextInterpreter.getParagraphId(), scriptListener); + pigServer.registerScript(tmpFile.getAbsolutePath()); + } catch (IOException e) { + if (e instanceof FrontendException) { + FrontendException fe = (FrontendException) e; + if (!fe.getMessage().contains("Backend error :")) { + // If the error message contains "Backend error :", that means the exception is from + // backend. + LOGGER.error("Fail to run pig script.", e); + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } + } + PigStats stats = PigStats.get(); + if (stats != null) { + String errorMsg = PigUtils.extactJobStats(stats); + if (errorMsg != null) { + LOGGER.error("Fail to run pig script, " + errorMsg); + return new InterpreterResult(Code.ERROR, errorMsg); + } + } + LOGGER.error("Fail to run pig script.", e); + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + System.setOut(originalStdOut); + listenerMap.remove(contextInterpreter.getParagraphId()); + if (tmpFile != null) { + tmpFile.delete(); + } + } + StringBuilder outputBuilder = new StringBuilder(); + PigStats stats = PigStats.get(); + if (stats != null && includeJobStats) { + String jobStats = PigUtils.extactJobStats(stats); + if (jobStats != null) { + outputBuilder.append(jobStats); + } + } + outputBuilder.append(bytesOutput.toString()); + return new InterpreterResult(Code.SUCCESS, outputBuilder.toString()); + } + + + public PigServer getPigServer() { + return pigServer; + } + +} + diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java new file mode 100644 index 00000000000..c763b7f9654 --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.pig; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +/** + * + */ +public class PigQueryInterpreter extends BasePigInterpreter { + + private static Logger LOGGER = LoggerFactory.getLogger(PigQueryInterpreter.class); + private PigServer pigServer; + private int maxResult; + + public PigQueryInterpreter(Properties properties) { + super(properties); + } + + @Override + public void open() { + pigServer = getPigInterpreter().getPigServer(); + maxResult = Integer.parseInt(getProperty("zeppelin.pig.maxResult")); + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + // '-' is invalid for pig alias + String alias = "paragraph_" + context.getParagraphId().replace("-", "_"); + String[] lines = st.split("\n"); + List queries = new ArrayList(); + for (int i = 0; i < lines.length; ++i) { + if (i == lines.length - 1) { + lines[i] = alias + " = " + lines[i]; + } + queries.add(lines[i]); + } + + StringBuilder resultBuilder = new StringBuilder("%table "); + try { + File tmpScriptFile = PigUtils.createTempPigScript(queries); + // each thread should its own ScriptState & PigStats + ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState()); + // reset PigStats, otherwise you may get the PigStats of last job in the same thread + // because PigStats is ThreadLocal variable + PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats()); + PigScriptListener scriptListener = new PigScriptListener(); + ScriptState.get().registerListener(scriptListener); + listenerMap.put(context.getParagraphId(), scriptListener); + pigServer.registerScript(tmpScriptFile.getAbsolutePath()); + Schema schema = pigServer.dumpSchema(alias); + boolean schemaKnown = (schema != null); + if (schemaKnown) { + for (int i = 0; i < schema.size(); ++i) { + Schema.FieldSchema field = schema.getField(i); + resultBuilder.append(field.alias); + if (i != schema.size() - 1) { + resultBuilder.append("\t"); + } + } + resultBuilder.append("\n"); + } + Iterator iter = pigServer.openIterator(alias); + boolean firstRow = true; + int index = 0; + while (iter.hasNext() && index <= maxResult) { + index++; + Tuple tuple = iter.next(); + if (firstRow && !schemaKnown) { + for (int i = 0; i < tuple.size(); ++i) { + resultBuilder.append("c_" + i + "\t"); + } + resultBuilder.append("\n"); + firstRow = false; + } + resultBuilder.append(StringUtils.join(tuple, "\t")); + resultBuilder.append("\n"); + } + if (index >= maxResult && iter.hasNext()) { + resultBuilder.append("\nResults are limited by " + maxResult + "."); + } + } catch (IOException e) { + // Extract error in the following order + // 1. catch FrontendException, FrontendException happens in the query compilation phase. + // 2. PigStats, This is execution error + // 3. Other errors. + if (e instanceof FrontendException) { + FrontendException fe = (FrontendException) e; + if (!fe.getMessage().contains("Backend error :")) { + LOGGER.error("Fail to run pig query.", e); + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } + } + PigStats stats = PigStats.get(); + if (stats != null) { + String errorMsg = PigUtils.extactJobStats(stats); + if (errorMsg != null) { + return new InterpreterResult(Code.ERROR, errorMsg); + } + } + LOGGER.error("Fail to run pig query.", e); + return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + listenerMap.remove(context.getParagraphId()); + } + return new InterpreterResult(Code.SUCCESS, resultBuilder.toString()); + } + + @Override + public PigServer getPigServer() { + return this.pigServer; + } + + private PigInterpreter getPigInterpreter() { + LazyOpenInterpreter lazy = null; + PigInterpreter pig = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + pig = (PigInterpreter) p; + + if (lazy != null) { + lazy.open(); + } + return pig; + } +} diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java new file mode 100644 index 00000000000..1f88b2ee6cc --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.pig; + +import org.apache.pig.impl.plan.OperatorPlan; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigProgressNotificationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +/** + * + */ +public class PigScriptListener implements PigProgressNotificationListener { + + private static Logger LOGGER = LoggerFactory.getLogger(PigScriptListener.class); + + private Set jobIds = new HashSet(); + private int progress; + + @Override + public void initialPlanNotification(String scriptId, OperatorPlan plan) { + + } + + @Override + public void launchStartedNotification(String scriptId, int numJobsToLaunch) { + + } + + @Override + public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) { + + } + + @Override + public void jobStartedNotification(String scriptId, String assignedJobId) { + this.jobIds.add(assignedJobId); + } + + @Override + public void jobFinishedNotification(String scriptId, JobStats jobStats) { + + } + + @Override + public void jobFailedNotification(String scriptId, JobStats jobStats) { + + } + + @Override + public void outputCompletedNotification(String scriptId, OutputStats outputStats) { + + } + + @Override + public void progressUpdatedNotification(String scriptId, int progress) { + LOGGER.debug("scriptId:" + scriptId + ", progress:" + progress); + this.progress = progress; + } + + @Override + public void launchCompletedNotification(String scriptId, int numJobsSucceeded) { + + } + + public Set getJobIds() { + return jobIds; + } + + public int getProgress() { + return progress; + } +} diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java new file mode 100644 index 00000000000..d444e0279d8 --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.pig.PigRunner; +import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType; +import org.apache.pig.tools.pigstats.InputStats; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; +import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; +import org.apache.pig.tools.pigstats.tez.TezDAGStats; +import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.Field; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class PigUtils { + + private static Logger LOGGER = LoggerFactory.getLogger(PigUtils.class); + + protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static File createTempPigScript(String content) throws IOException { + File tmpFile = File.createTempFile("zeppelin", "pig"); + LOGGER.debug("Create pig script file:" + tmpFile.getAbsolutePath()); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + return tmpFile.getAbsoluteFile(); + } + + public static File createTempPigScript(List lines) throws IOException { + return createTempPigScript(StringUtils.join(lines, "\n")); + } + + public static String extactJobStats(PigStats stats) { + if (stats instanceof SimplePigStats) { + return extractFromSimplePigStats((SimplePigStats) stats); + } else if (stats instanceof TezPigScriptStats) { + return extractFromTezPigStats((TezPigScriptStats) stats); + } else { + throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName()); + } + } + + public static String extractFromSimplePigStats(SimplePigStats stats) { + + try { + Field userIdField = PigStats.class.getDeclaredField("userId"); + userIdField.setAccessible(true); + String userId = (String) (userIdField.get(stats)); + Field startTimeField = PigStats.class.getDeclaredField("startTime"); + startTimeField.setAccessible(true); + long startTime = (Long) (startTimeField.get(stats)); + Field endTimeField = PigStats.class.getDeclaredField("endTime"); + endTimeField.setAccessible(true); + long endTime = (Long) (endTimeField.get(stats)); + + if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) { + LOGGER.warn("unknown return code, can't display the results"); + return null; + } + if (stats.getPigContext() == null) { + LOGGER.warn("unknown exec type, don't display the results"); + return null; + } + + SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + StringBuilder sb = new StringBuilder(); + sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n"); + sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t") + .append(userId).append("\t") + .append(sdf.format(new Date(startTime))).append("\t") + .append(sdf.format(new Date(endTime))).append("\t") + .append(stats.getFeatures()).append("\n"); + sb.append("\n"); + if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { + sb.append("Success!\n"); + } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Some jobs have failed! Stop running all dependent jobs\n"); + } else { + sb.append("Failed!\n"); + } + sb.append("\n"); + + Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); + jobPlanField.setAccessible(true); + PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats); + + if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS + || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Job Stats (time in seconds):\n"); + sb.append(MRJobStats.SUCCESS_HEADER).append("\n"); + List arr = jobPlan.getSuccessfulJobs(); + for (JobStats js : arr) { + sb.append(js.getDisplayString()); + } + sb.append("\n"); + } + if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE + || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Failed Jobs:\n"); + sb.append(MRJobStats.FAILURE_HEADER).append("\n"); + List arr = jobPlan.getFailedJobs(); + for (JobStats js : arr) { + sb.append(js.getDisplayString()); + } + sb.append("\n"); + } + sb.append("Input(s):\n"); + for (InputStats is : stats.getInputStats()) { + sb.append(is.getDisplayString()); + } + sb.append("\n"); + sb.append("Output(s):\n"); + for (OutputStats ds : stats.getOutputStats()) { + sb.append(ds.getDisplayString()); + } + + sb.append("\nCounters:\n"); + sb.append("Total records written : " + stats.getRecordWritten()).append("\n"); + sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n"); + sb.append("Spillable Memory Manager spill count : " + + stats.getSMMSpillCount()).append("\n"); + sb.append("Total bags proactively spilled: " + + stats.getProactiveSpillCountObjects()).append("\n"); + sb.append("Total records proactively spilled: " + + stats.getProactiveSpillCountRecords()).append("\n"); + sb.append("\nJob DAG:\n").append(jobPlan.toString()); + + return "Script Statistics: \n" + sb.toString(); + } catch (Exception e) { + LOGGER.error("Can not extract message from SimplePigStats", e); + return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); + } + } + + private static String extractFromTezPigStats(TezPigScriptStats stats) { + + try { + Field userIdField = PigStats.class.getDeclaredField("userId"); + userIdField.setAccessible(true); + String userId = (String) (userIdField.get(stats)); + Field startTimeField = PigStats.class.getDeclaredField("startTime"); + startTimeField.setAccessible(true); + long startTime = (Long) (startTimeField.get(stats)); + Field endTimeField = PigStats.class.getDeclaredField("endTime"); + endTimeField.setAccessible(true); + long endTime = (Long) (endTimeField.get(stats)); + + SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + StringBuilder sb = new StringBuilder(); + sb.append("\n"); + sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion())); + sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion())); + sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion())); + sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId)); + sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName())); + sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime)))); + sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime)))); + sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures())); + sb.append("\n"); + if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { + sb.append("Success!\n"); + } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + sb.append("Some tasks have failed! Stop running all dependent tasks\n"); + } else { + sb.append("Failed!\n"); + } + sb.append("\n"); + + // Print diagnostic info in case of failure + if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE + || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { + if (stats.getErrorMessage() != null) { + String[] lines = stats.getErrorMessage().split("\n"); + for (int i = 0; i < lines.length; i++) { + String s = lines[i].trim(); + if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) { + sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s)); + } + } + sb.append("\n"); + } + } + + Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); + tezDAGStatsMapField.setAccessible(true); + Map tezDAGStatsMap = + (Map) tezDAGStatsMapField.get(stats); + int count = 0; + for (TezDAGStats dagStats : tezDAGStatsMap.values()) { + sb.append("\n"); + sb.append("DAG " + count++ + ":\n"); + sb.append(dagStats.getDisplayString()); + sb.append("\n"); + } + + sb.append("Input(s):\n"); + for (InputStats is : stats.getInputStats()) { + sb.append(is.getDisplayString().trim()).append("\n"); + } + sb.append("\n"); + sb.append("Output(s):\n"); + for (OutputStats os : stats.getOutputStats()) { + sb.append(os.getDisplayString().trim()).append("\n"); + } + return "Script Statistics:\n" + sb.toString(); + } catch (Exception e) { + LOGGER.error("Can not extract message from SimplePigStats", e); + return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); + } + } + + public static List extractJobIds(PigStats stat) { + if (stat instanceof SimplePigStats) { + return extractJobIdsFromSimplePigStats((SimplePigStats) stat); + } else if (stat instanceof TezPigScriptStats) { + return extractJobIdsFromTezPigStats((TezPigScriptStats) stat); + } else { + throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName()); + } + } + + public static List extractJobIdsFromSimplePigStats(SimplePigStats stat) { + List jobIds = new ArrayList<>(); + try { + Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); + jobPlanField.setAccessible(true); + PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat); + List arr = jobPlan.getJobList(); + for (JobStats js : arr) { + jobIds.add(js.getJobId()); + } + return jobIds; + } catch (Exception e) { + LOGGER.error("Can not extract jobIds from SimpelPigStats", e); + throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e); + } + } + + public static List extractJobIdsFromTezPigStats(TezPigScriptStats stat) { + List jobIds = new ArrayList<>(); + try { + Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); + tezDAGStatsMapField.setAccessible(true); + Map tezDAGStatsMap = + (Map) tezDAGStatsMapField.get(stat); + for (TezDAGStats dagStats : tezDAGStatsMap.values()) { + LOGGER.debug("Tez JobId:" + dagStats.getJobId()); + jobIds.add(dagStats.getJobId()); + } + return jobIds; + } catch (Exception e) { + LOGGER.error("Can not extract jobIds from TezPigScriptStats", e); + throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e); + } + } +} diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json new file mode 100644 index 00000000000..27918ede1bf --- /dev/null +++ b/pig/src/main/resources/interpreter-setting.json @@ -0,0 +1,46 @@ +[ + { + "group": "pig", + "name": "script", + "className": "org.apache.zeppelin.pig.PigInterpreter", + "properties": { + "zeppelin.pig.execType": { + "envName": null, + "propertyName": "zeppelin.pig.execType", + "defaultValue": "mapreduce", + "description": "local | mapreduce | tez" + }, + "zeppelin.pig.includeJobStats": { + "envName": null, + "propertyName": "zeppelin.pig.includeJobStats", + "defaultValue": "false", + "description": "flag to include job stats in output" + } + }, + "editor": { + "language": "pig" + } + }, + { + "group": "pig", + "name": "query", + "className": "org.apache.zeppelin.pig.PigQueryInterpreter", + "properties": { + "zeppelin.pig.execType": { + "envName": null, + "propertyName": "zeppelin.pig.execType", + "defaultValue": "mapreduce", + "description": "local | mapreduce | tez" + }, + "zeppelin.pig.maxResult": { + "envName": null, + "propertyName": "zeppelin.pig.maxResult", + "defaultValue": "1000", + "description": "max row number for %pig.query" + } + }, + "editor": { + "language": "pig" + } + } +] diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java new file mode 100644 index 00000000000..3d062d61579 --- /dev/null +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.pig; + +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PigInterpreterTest { + + private PigInterpreter pigInterpreter; + private InterpreterContext context; + + @Before + public void setUp() { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "local"); + pigInterpreter = new PigInterpreter(properties); + pigInterpreter.open(); + context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, + null, null); + } + + @After + public void tearDown() { + pigInterpreter.close(); + } + + @Test + public void testBasics() throws IOException { + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + assertTrue(result.message().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + assertTrue(result.message().contains("a: {id: int,name: bytearray}")); + + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'")); + + // execution error + pigscript = "a = load 'invalid_path';" + + "dump a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + assertTrue(result.message().contains("Input path does not exist")); + } + + + @Test + public void testIncludeJobStats() throws IOException { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "local"); + properties.put("zeppelin.pig.includeJobStats", "true"); + pigInterpreter = new PigInterpreter(properties); + pigInterpreter.open(); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + assertTrue(result.message().contains("Counters:")); + assertTrue(result.message().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.SUCCESS, result.code()); + // no job is launched, so no jobStats + assertTrue(!result.message().contains("Counters:")); + assertTrue(result.message().contains("a: {id: int,name: bytearray}")); + + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + // no job is launched, so no jobStats + assertTrue(!result.message().contains("Counters:")); + assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'")); + + // execution error + pigscript = "a = load 'invalid_path';" + + "dump a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(Type.TEXT, result.type()); + assertEquals(Code.ERROR, result.code()); + assertTrue(result.message().contains("Counters:")); + assertTrue(result.message().contains("Input path does not exist")); + } +} diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java new file mode 100644 index 00000000000..00ece440542 --- /dev/null +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class PigQueryInterpreterTest { + + private PigInterpreter pigInterpreter; + private PigQueryInterpreter pigQueryInterpreter; + private InterpreterContext context; + + @Before + public void setUp() { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "local"); + properties.put("zeppelin.pig.maxResult", "20"); + + pigInterpreter = new PigInterpreter(properties); + pigQueryInterpreter = new PigQueryInterpreter(properties); + List interpreters = new ArrayList(); + interpreters.add(pigInterpreter); + interpreters.add(pigQueryInterpreter); + InterpreterGroup group = new InterpreterGroup(); + group.put("note_id", interpreters); + pigInterpreter.setInterpreterGroup(group); + pigQueryInterpreter.setInterpreterGroup(group); + pigInterpreter.open(); + pigQueryInterpreter.open(); + + context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, + null, null); + } + + @After + public void tearDown() { + pigInterpreter.close(); + pigQueryInterpreter.close(); + } + + @Test + public void testBasics() throws IOException { + String content = "andy\tmale\t10\n" + + "peter\tmale\t20\n" + + "amy\tfemale\t14\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // run script in PigInterpreter + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, gender, age);\n" + + "a2 = load 'invalid_path' as (name, gender, age);\n" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)")); + + // run single line query in PigQueryInterpreter + String query = "foreach a generate name, age;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TABLE, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message()); + + // run multiple line query in PigQueryInterpreter + query = "b = group a by gender;\nforeach b generate group as gender, COUNT($1) as count;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TABLE, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message()); + + // syntax error in PigQueryInterpereter + query = "b = group a by invalid_column;\nforeach b generate group as gender, COUNT($1) as count;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().contains("Projected field [invalid_column] does not exist in schema")); + + // execution error in PigQueryInterpreter + query = "foreach a2 generate name, age;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().contains("Input path does not exist")); + } + + @Test + public void testMaxResult() throws IOException { + StringBuilder content = new StringBuilder(); + for (int i=0;i<30;++i) { + content.append(i + "\tname_" + i + "\n"); + } + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // run script in PigInterpreter + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, name);"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // empty output + assertTrue(result.message().isEmpty()); + + // run single line query in PigQueryInterpreter + String query = "foreach a generate id;"; + result = pigQueryInterpreter.interpret(query, context); + assertEquals(InterpreterResult.Type.TABLE, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains("id\n0\n1\n2")); + assertTrue(result.message().contains("Results are limited by 20")); + } +} diff --git a/pig/src/test/resources/log4j.properties b/pig/src/test/resources/log4j.properties new file mode 100644 index 00000000000..8daee59d60d --- /dev/null +++ b/pig/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger = INFO, stdout + +log4j.appender.stdout = org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout = org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n diff --git a/pom.xml b/pom.xml index 03b226341e1..558ce0624a1 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ shell livy hbase + pig postgresql jdbc file diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index 2ee668a130e..e39a2ad0fc3 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -156,7 +156,15 @@ The following components are provided under Apache License. (Apache 2.0) Tachyon Project Core (org.tachyonproject:tachyon:0.6.4 - http://tachyonproject.org/tachyon/) (Apache 2.0) Tachyon Project Client (org.tachyonproject:tachyon-client:0.6.4 - http://tachyonproject.org/tachyon-client/) (Apache 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/) - + (Apache 2.0) Apache Pig (org.apache.pig:0.16 - http://pig.apache.org) + (Apache 2.0) tez-api (org.apache.tez:tez-api:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-common (org.apache.tez:tez-common:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-dag (org.apache.tez:tez-dag:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-runtime-library (org.apache.tez:runtime-library:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-runtime-internals (org.apache.tez:tez-runtime-internals:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-mapreduce (org.apache.tez:tez-mapreduce:0.7.0 - http://tez.apache.org) + (Apache 2.0) tez-yarn-timeline-history-with-acls (org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - http://tez.apache.org) + ======================================================================== MIT licenses ======================================================================== diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index d819869f8fd..414aed2a5bb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -521,6 +521,8 @@ public static enum ConfVars { + "org.apache.zeppelin.alluxio.AlluxioInterpreter," + "org.apache.zeppelin.file.HDFSFileInterpreter," + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter," + + "org.apache.zeppelin.pig.PigInterpreter," + + "org.apache.zeppelin.pig.PigQueryInterpreter," + "org.apache.zeppelin.flink.FlinkInterpreter," + "org.apache.zeppelin.python.PythonInterpreter," + "org.apache.zeppelin.python.PythonInterpreterPandasSql," @@ -543,7 +545,7 @@ public static enum ConfVars { ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," - + "scalding,jdbc,hbase,bigquery,beam"), + + "scalding,jdbc,hbase,bigquery,beam,pig"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), // use specified notebook (id) as homescreen