Skip to content

Commit

Permalink
Merge branch 'master' into jimmy-fixed-join
Browse files Browse the repository at this point in the history
  • Loading branch information
zuozhiw authored Jan 23, 2017
2 parents 4cb077d + 414d4d9 commit fbd811b
Show file tree
Hide file tree
Showing 18 changed files with 452 additions and 31 deletions.
7 changes: 6 additions & 1 deletion textdb/textdb-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>${lucene.version}</version>
</dependency>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.json.JSONArray;
import org.json.JSONObject;
import org.json.JSONStringer;
import org.json.JSONWriter;

import edu.uci.ics.textdb.api.common.Attribute;
import edu.uci.ics.textdb.api.common.FieldType;
Expand Down Expand Up @@ -222,6 +226,19 @@ public static Schema addAttributeToSchema(Schema schema, Attribute attribute) {
Schema newSchema = new Schema(attributes.toArray(new Attribute[attributes.size()]));
return newSchema;
}

/**
* Removes one or more attributes from the schema and returns the new schema.
*
* @param schema
* @param attributeName
* @return
*/
public static Schema removeAttributeFromSchema(Schema schema, String... attributeName) {
return new Schema(schema.getAttributes().stream()
.filter(attr -> (! Arrays.asList(attributeName).contains(attr.getFieldName())))
.toArray(Attribute[]::new));
}

/**
* Tokenizes the query string using the given analyser
Expand Down Expand Up @@ -274,6 +291,55 @@ public static ArrayList<String> tokenizeQueryWithStopwords(String query) {

return result;
}


public static JSONArray getTupleListJSON(List<ITuple> tupleList) {
JSONArray jsonArray = new JSONArray();

for (ITuple tuple : tupleList) {
jsonArray.put(getTupleJSON(tuple));
}

return jsonArray;
}

public static JSONObject getTupleJSON(ITuple tuple) {
JSONObject jsonObject = new JSONObject();

for (String attrName : tuple.getSchema().getAttributeNames()) {
if (attrName.equalsIgnoreCase(SchemaConstants.SPAN_LIST)) {
List<Span> spanList = ((ListField<Span>) tuple.getField(SchemaConstants.SPAN_LIST)).getValue();
jsonObject.put(attrName, getSpanListJSON(spanList));
} else {
jsonObject.put(attrName, tuple.getField(attrName).getValue().toString());
}
}

return jsonObject;
}

public static JSONArray getSpanListJSON(List<Span> spanList) {
JSONArray jsonArray = new JSONArray();

for (Span span : spanList) {
jsonArray.put(getSpanJSON(span));
}

return jsonArray;
}

public static JSONObject getSpanJSON(Span span) {
JSONObject jsonObject = new JSONObject();

jsonObject.put("key", span.getKey());
jsonObject.put("value", span.getValue());
jsonObject.put("field", span.getFieldName());
jsonObject.put("start", span.getStart());
jsonObject.put("end", span.getEnd());
jsonObject.put("token offset", span.getTokenOffset());

return jsonObject;
}

public static String getTupleListString(List<ITuple> tupleList) {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package edu.uci.ics.textdb.dataflow.sink;

import java.util.ArrayList;
import java.util.List;

import edu.uci.ics.textdb.api.common.ITuple;
import edu.uci.ics.textdb.api.common.Schema;
import edu.uci.ics.textdb.api.dataflow.IOperator;
import edu.uci.ics.textdb.api.dataflow.ISink;
import edu.uci.ics.textdb.api.exception.TextDBException;
import edu.uci.ics.textdb.common.constants.SchemaConstants;
import edu.uci.ics.textdb.common.utils.Utils;

/**
* TupleStreamSink is a sink that can be used by the caller to get tuples one by one.
*
* @author Zuozhi Wang
*
*/
public class TupleStreamSink implements ISink {

private IOperator inputOperator;

private Schema inputSchema;
private Schema outputSchema;

private boolean isOpen = false;;

/**
* TupleStreamSink is a sink that can be used to
* collect tuples to an in-memory list.
*
* TupleStreamSink removes the _id attribute and payload attribute
* from the schema and each tuple.
*
*/
public TupleStreamSink() {
}

public void setInputOperator(IOperator inputOperator) {
this.inputOperator = inputOperator;
}

public IOperator getInputOperator() {
return this.inputOperator;
}

@Override
public Schema getOutputSchema() {
return outputSchema;
}

@Override
public void open() throws TextDBException {
if (isOpen) {
return;
}
inputOperator.open();
inputSchema = inputOperator.getOutputSchema();
outputSchema = Utils.removeAttributeFromSchema(inputSchema, SchemaConstants._ID, SchemaConstants.PAYLOAD);
isOpen = true;
}

@Override
public void processTuples() throws TextDBException {
return;
}

@Override
public ITuple getNextTuple() throws TextDBException {
ITuple tuple = inputOperator.getNextTuple();
if (tuple == null) {
return null;
}
return Utils.removeFields(tuple, SchemaConstants._ID, SchemaConstants.PAYLOAD);
}

@Override
public void close() throws TextDBException {
if (! isOpen) {
}
inputOperator.close();
isOpen = false;
}

/**
* Collects ALL the tuples to an in-memory list.
*
* @return a list of tuples
* @throws TextDBException
*/
public List<ITuple> collectAllTuples() throws TextDBException {
ArrayList<ITuple> results = new ArrayList<>();
ITuple tuple;
while ((tuple = inputOperator.getNextTuple()) != null) {
results.add(Utils.removeFields(tuple, SchemaConstants._ID, SchemaConstants.PAYLOAD));
}
return results;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,12 @@ private void checkOperatorInputArity() throws PlanGenException {
*/
private void checkOperatorOutputArity() throws PlanGenException {
for (String vertex : adjacencyList.keySet()) {
String vertexType = operatorTypeMap.get(vertex);

int actualOutputArity = adjacencyList.get(vertex).size();
int expectedOutputArity = OperatorArityConstants.getFixedOutputArity(operatorTypeMap.get(vertex));

if (vertex.toLowerCase().contains("sink")) {
int expectedOutputArity = OperatorArityConstants.getFixedOutputArity(vertexType);
if (vertexType.toLowerCase().contains("sink")) {
PlanGenUtils.planGenAssert(
actualOutputArity == expectedOutputArity,
String.format("Sink %s should have %d output links, got %d.", vertex, expectedOutputArity, actualOutputArity));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public class OperatorArityConstants {

put("FileSink".toLowerCase(), 1);
put("IndexSink".toLowerCase(), 1);
put("TupleStreamSink".toLowerCase(), 1);

put("Projection".toLowerCase(), 1);
put("Join".toLowerCase(), 2);
}};

Expand All @@ -38,6 +40,7 @@ public class OperatorArityConstants {
public static Map<String, Integer> fixedOutputArityMap = new HashMap<String, Integer>(){{
put("IndexSink".toLowerCase(), 0);
put("FileSink".toLowerCase(), 0);
put("TupleStreamSink".toLowerCase(), 0);

put("KeywordMatcher".toLowerCase(), 1);
put("DictionaryMatcher".toLowerCase(), 1);
Expand All @@ -48,6 +51,7 @@ public class OperatorArityConstants {
put("KeywordSource".toLowerCase(), 1);
put("DictionarySource".toLowerCase(), 1);

put("Projection".toLowerCase(), 1);
put("Join".toLowerCase(), 1);
}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import edu.uci.ics.textdb.plangen.operatorbuilder.NlpExtractorBuilder;
import edu.uci.ics.textdb.plangen.operatorbuilder.ProjectionBuilder;
import edu.uci.ics.textdb.plangen.operatorbuilder.RegexMatcherBuilder;
import edu.uci.ics.textdb.plangen.operatorbuilder.TupleStreamSinkBuilder;

/**
* This class provides a set of helper functions that are commonly used in plan generation.
Expand All @@ -40,6 +41,7 @@ public interface OperatorBuilder {
operatorBuilderMap.put("KeywordSource".toLowerCase(), KeywordSourceBuilder::buildSourceOperator);
operatorBuilderMap.put("DictionarySource".toLowerCase(), DictionarySourceBuilder::buildSourceOperator);
operatorBuilderMap.put("FileSink".toLowerCase(), FileSinkBuilder::buildSink);
operatorBuilderMap.put("TupleStreamSink".toLowerCase(), TupleStreamSinkBuilder::buildTupleStreamSink);
operatorBuilderMap.put("Join".toLowerCase(), JoinBuilder::buildOperator);
operatorBuilderMap.put("Projection".toLowerCase(), ProjectionBuilder::buildOperator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public static KeywordMatcherSourceOperator buildSourceOperator(Map<String, Strin
} catch (DataFlowException | StorageException e) {
throw new PlanGenException(e.getMessage(), e);
}

// set limit and offset
Integer limitInt = OperatorBuilderUtils.findLimit(operatorProperties);
if (limitInt != null) {
sourceOperator.setLimit(limitInt);
}
Integer offsetInt = OperatorBuilderUtils.findOffset(operatorProperties);
if (offsetInt != null) {
sourceOperator.setOffset(offsetInt);
}

return sourceOperator;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package edu.uci.ics.textdb.plangen.operatorbuilder;

import java.util.Map;

import edu.uci.ics.textdb.dataflow.sink.TupleStreamSink;

public class TupleStreamSinkBuilder {

public static TupleStreamSink buildTupleStreamSink(Map<String, String> operatorProperties) {
return new TupleStreamSink();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package edu.uci.ics.textdb.dataflow.sink;

import java.io.FileNotFoundException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import edu.uci.ics.textdb.api.common.Attribute;
import edu.uci.ics.textdb.api.common.FieldType;
import edu.uci.ics.textdb.api.common.ITuple;
import edu.uci.ics.textdb.api.common.Schema;
import edu.uci.ics.textdb.api.dataflow.IOperator;
import edu.uci.ics.textdb.common.constants.SchemaConstants;
import junit.framework.Assert;

public class TupleStreamSinkTest {

private TupleStreamSink tupleStreamSink;
private IOperator inputOperator;
private Schema inputSchema = new Schema(
SchemaConstants._ID_ATTRIBUTE, new Attribute("content", FieldType.TEXT), SchemaConstants.PAYLOAD_ATTRIBUTE);

@Before
public void setUp() throws FileNotFoundException {
inputOperator = Mockito.mock(IOperator.class);
Mockito.when(inputOperator.getOutputSchema()).thenReturn(inputSchema);

tupleStreamSink = new TupleStreamSink();
tupleStreamSink.setInputOperator(inputOperator);
}

@After
public void cleanUp() {
}

@Test
public void testOpen() throws Exception {
tupleStreamSink.open();
// verify that inputOperator called open() method
Mockito.verify(inputOperator).open();
// assert that the tuple stream sink removes the _ID and PAYLOAD attribute
Assert.assertEquals(new Schema(new Attribute("content", FieldType.TEXT)), tupleStreamSink.getOutputSchema());
}

@Test
public void testClose() throws Exception {
tupleStreamSink.close();
// verify that inputOperator called close() method
Mockito.verify(inputOperator).close();
}

@Test
public void testGetNextTuple() throws Exception {
ITuple sampleTuple = Mockito.mock(ITuple.class);
Mockito.when(sampleTuple.toString()).thenReturn("Sample Tuple");
Mockito.when(sampleTuple.getSchema()).thenReturn(inputSchema);
// Set the behavior for inputOperator,
// first it returns some non-null tuple and second time it returns null
Mockito.when(inputOperator.getNextTuple()).thenReturn(sampleTuple).thenReturn(null);

tupleStreamSink.open();
tupleStreamSink.getNextTuple();

// Verify that input operator's getNextTuple is called
Mockito.verify(inputOperator, Mockito.times(1)).getNextTuple();

tupleStreamSink.close();
}

}
Loading

0 comments on commit fbd811b

Please sign in to comment.