Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
Expand All @@ -34,11 +33,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;

Expand All @@ -51,21 +46,12 @@
public class Summary extends EvalFunc<String> implements Algebraic {

private static final TupleFactory TF = TupleFactory.getInstance();
private Schema inputSchema;
private String signature;

public static class Initial extends EvalFunc<Tuple> {

private Schema inputSchema;

@Override
public void setUDFContextSignature(String signature) {
inputSchema = Summary.getInputSchema(signature);
}

@Override
public Tuple exec(Tuple t) throws IOException {
return new JSONTuple(sumUp(inputSchema, t));
return new JSONTuple(sumUp(getInputSchema(), t));
}
}

Expand Down Expand Up @@ -179,18 +165,6 @@ public Iterator<Object> iterator() {

}

private static Properties getProperties(String signature) {
return UDFContext.getUDFContext().getUDFProperties(Summary.class, new String[] { signature });
}

private static Schema getInputSchema(String signature) {
try {
return Utils.getSchemaFromString(getProperties(signature).getProperty("inputSchema"));
} catch (ParserException e) {
throw new RuntimeException(e);
}
}

private static TupleSummaryData getData(Tuple tuple) throws ExecException {
if (tuple instanceof JSONTuple) {
return ((JSONTuple) tuple).data;
Expand Down Expand Up @@ -236,7 +210,7 @@ private static TupleSummaryData sumUp(Schema schema, Tuple t) throws ExecExcepti

@Override
public String exec(Tuple t) throws IOException {
return SummaryData.toPrettyJSON(sumUp(inputSchema, t));
return SummaryData.toPrettyJSON(sumUp(getInputSchema(), t));
}

@Override
Expand All @@ -254,30 +228,4 @@ public String getFinal() {
return Final.class.getName();
}

@Override
public void setInputSchema(Schema input) {
try {
// relation.bag.tuple
this.inputSchema = input.getField(0).schema.getField(0).schema;
saveSchemaToUDFContext();
} catch (FrontendException e) {
throw new RuntimeException("Usage: B = FOREACH (GROUP A ALL) GENERATE Summary(A); Can not get schema from " + input, e);
} catch (RuntimeException e) {
throw new RuntimeException("Usage: B = FOREACH (GROUP A ALL) GENERATE Summary(A); Can not get schema from "+input, e);
}
}

@Override
public void setUDFContextSignature(String signature) {
this.signature = signature;
saveSchemaToUDFContext();
}

private void saveSchemaToUDFContext() {
if (signature != null && inputSchema != null) {
String schemaString = inputSchema.toString();
getProperties(signature).put("inputSchema", schemaString.substring(1, schemaString.length() - 1));
}
}

}