diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java similarity index 57% rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java index 45f319385b1..1877ec4cd4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java @@ -19,24 +19,17 @@ import org.apache.drill.exec.physical.impl.protocol.BatchAccessor; import org.apache.drill.exec.physical.rowSet.RowSetReader; +import org.apache.drill.exec.record.metadata.TupleMetadata; /** * Iterates over the set of batches in a result set, providing * a row set reader to iterate over the rows within each batch. - * Handles schema changes between batches. - *

- * Designed to handle batches arriving from a single upstream - * operator. Uses Drill's strict form of schema identity: that - * not only must the column definitions match; the vectors must - * be identical from one batch to the next. If the vectors differ, - * then this class assumes a new schema has occurred, and will - * rebuild all the underlying readers, which can be costly. + * Handles schema changes between batches. A typical use is to + * iterate over batches from an upstream operator. Protocol: * *

Protocol

*
    - *
  1. Create an instance, passing in a - * {@link BatchAccessor} to hold the batch and optional - * selection vector.
  2. + *
  3. Create an instance.
  4. *
  5. For each incoming batch: *
      *
    1. Call {@link #start()} to attach the batch. The associated @@ -49,39 +42,59 @@ *
    *
  6. Call {@link #close()} after all batches are read.
  7. *
+ * + *

+ * The implementation may perform complex tasks behind the scenes: + * coordinate with the query runner (if remote), drive an operator + * (if within a DAG), etc. The implementation takes an interface + * that interfaces with the source of batches. + *

+ * Designed to handle batches arriving from a single upstream + * operator. Uses Drill's strict form of schema identity: that + * not only must the column definitions match; the vectors must + * be identical from one batch to the next. If the vectors differ, + * then this class assumes a new schema has occurred, and will + * rebuild all the underlying readers, which can be costly. */ -public interface ResultSetReader { +public interface PullResultSetReader { /** - * Start tracking a new batch in the associated - * vector container. + * Advance to the next batch of data. The iterator starts + * positioned before the first batch (but after obtaining + * a schema.) + * @return {@code true} if another batch is available, + * {@code false} if EOF */ - void start(); + boolean next(); /** - * Get the row reader for this batch. The row reader is - * guaranteed to remain the same for the life of the - * result set reader. - * - * @return the row reader to read rows for the current - * batch + * Return the schema for this result set. */ - RowSetReader reader(); + TupleMetadata schema(); - /** - * Detach the batch of data from this reader. Does not - * release the memory for that batch. - */ - void detach(); + int schemaVersion(); /** - * Detach the batch of data from this reader and release - * the memory for that batch. Call this method before - * loading the underlying vector container with more - * data, then call {@link #start()} after new data is - * available. + * Obtain a reader to iterate over the rows of the batch. The return + * value will likely be the same reader each time, so that this call + * is optional after the first batch. */ - void release(); + RowSetReader reader(); /** * Close this reader. Releases any memory still assigned @@ -89,11 +102,4 @@ public interface ResultSetReader { * you want to preserve the batch memory. */ void close(); - - /** - * Convenience method to access the input batch. - * @return the batch bound to the reader at construction - * time - */ - BatchAccessor inputBatch(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java new file mode 100644 index 00000000000..b011cd9f63e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.resultSet; + +import org.apache.drill.exec.physical.rowSet.RowSetReader; + +/** + * Push-based result set reader, in which the caller obtains batches + * and registers them with the implementation. The client thus is responsible + * for detecting the end of batches and releasing memory. General protocol: + *

+ *

+ *

+ * In Drill, + * batches may have the same or different schemas. Each call to + * {@link #start()} prepares a {@link RowSetReader} to use for + * the available batch. If the batch has the same schema as the previous, + * then the existing reader is simply repositioned at the start of the + * batch. If the schema changed (or this is the first batch), then a + * new reader is created. Thus, the client should not assume that the + * same reader is available across calls. However, if it is useful to + * cache column writers, simply check if the reader returned from + * {@code start()} is the same as the previous one. If so, the column + * writers are also the same. + */ +public interface PushResultSetReader { + RowSetReader start(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java index 5d68daff422..cc18218f546 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java @@ -40,12 +40,13 @@ * each non-schema-change batch. * *

Protocol

+ * * Overall lifecycle: *
    *
  1. Create an instance of the * {@link org.apache.drill.exec.physical.resultSet.impl.ResultSetCopierImpl - * ResultSetCopierImpl} class, passing the input batch - * accessor to the constructor.
  2. + * ResultSetCopierImpl} class, passing the input row set reader + * to the constructor. *
  3. Loop to process each output batch as shown below. That is, continually * process calls to the {@link BatchIterator#next()} method.
  4. *
  5. Call {@link #close()}.
  6. @@ -57,8 +58,7 @@ *
    
      * public IterOutcome next() {
      *   copier.startOutputBatch();
    - *   while (! copier.isFull() {
    - *     copier.freeInput();
    + *   while (!copier.isFull() {
      *     IterOutcome innerResult = inner.next();
      *     if (innerResult == DONE) { break; }
      *     copier.startInputBatch();
    @@ -92,7 +92,6 @@
      * Because we wish to fill the output batch, we may be able to copy
      * part of a batch, the whole batch, or multiple batches to the output.
      */
    -
     public interface ResultSetCopier {
     
       /**
    @@ -102,9 +101,9 @@ public interface ResultSetCopier {
     
       /**
        * Start the next input batch. The input batch must be held
    -   * by the VectorAccessor passed into the constructor.
    +   * by the {@code ResultSetReader} passed into the constructor.
        */
    -  void startInputBatch();
    +  boolean nextInputBatch();
     
       /**
        * If copying rows one by one, copy the next row from the
    @@ -134,12 +133,6 @@ public interface ResultSetCopier {
        */
       void copyAllRows();
     
    -  /**
    -   * Release the input. Must be called (explicitly, or via
    -   * {@link #copyInput()} before loading another input batch.
    -   */
    -  void releaseInputBatch();
    -
       /**
        * Reports if the output batch has rows. Useful after the end
        * of input to determine if a partial output batch exists to
    diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java
    new file mode 100644
    index 00000000000..dc803de58c2
    --- /dev/null
    +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.resultSet.impl;
    +
    +import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
    +import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
    +import org.apache.drill.exec.physical.rowSet.RowSetReader;
    +import org.apache.drill.exec.record.metadata.TupleMetadata;
    +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
    +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
    +
    +/**
    + * 

    Protocol

    + *
      + *
    1. Create an instance, passing in a + * {@link UpstreamSource} to provide batches and optional + * selection vector.
    2. + *
    3. For each incoming batch: + *
        + *
      1. Call {@link #start()} to attach the batch. The associated + * {@link BatchAccessor} reports if the schema has changed.
      2. + *
      3. Call {@link #reader()} to obtain a reader.
      4. + *
      5. Iterate over the batch using the reader.
      6. + *
      7. Call {@link #release()} to free the memory for the + * incoming batch. Or, to call {@link #detach()} to keep + * the batch memory.
      8. + *
      + *
    4. Call {@link #close()} after all batches are read.
    5. + *
    + */ +public class PullResultSetReaderImpl implements PullResultSetReader { + + public interface UpstreamSource extends PushResultSetReaderImpl.UpstreamSource { + boolean next(); + void release(); + } + + @VisibleForTesting + protected enum State { + START, + PENDING, + BATCH, + DETACHED, + EOF, + CLOSED + } + + private final PushResultSetReaderImpl baseReader; + private final UpstreamSource source; + private State state = State.START; + private RowSetReader rowSetReader; + + public PullResultSetReaderImpl(UpstreamSource source) { + this.baseReader = new PushResultSetReaderImpl(source); + this.source = source; + } + + @Override + public TupleMetadata schema() { + switch (state) { + case CLOSED: + return null; + case START: + if (!next()) { + return null; + } + state = State.PENDING; + break; + default: + } + return rowSetReader.tupleSchema(); + } + + @Override + public boolean next() { + switch (state) { + case PENDING: + state = State.BATCH; + return true; + case BATCH: + source.release(); + break; + case CLOSED: + throw new IllegalStateException("Reader is closed"); + case EOF: + return false; + case START: + break; + default: + source.release(); + } + if (!source.next()) { + state = State.EOF; + return false; + } + + rowSetReader = baseReader.start(); + state = State.BATCH; + return true; + } + + @Override + public int schemaVersion() { return source.schemaVersion(); } + + @Override + public RowSetReader reader() { + Preconditions.checkState(state == State.BATCH, "Not in batch-ready state."); + return rowSetReader; + } + + @Override + public void close() { + source.release(); + state = State.CLOSED; + } + + @VisibleForTesting + protected State state() { return state; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java new file mode 100644 index 00000000000..7fc1156313f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.resultSet.impl; + +import org.apache.drill.exec.physical.resultSet.PushResultSetReader; +import org.apache.drill.exec.physical.rowSet.DirectRowSet; +import org.apache.drill.exec.physical.rowSet.IndirectRowSet; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetReader; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +public class PushResultSetReaderImpl implements PushResultSetReader { + + public interface UpstreamSource { + int schemaVersion(); + VectorContainer batch(); + SelectionVector2 sv2(); + } + + public static class BatchHolder implements UpstreamSource { + + private final VectorContainer container; + private int schemaVersion; + + public BatchHolder(VectorContainer container) { + this.container = container; + } + + public void newBatch() { + if (schemaVersion == 0) { + schemaVersion = 1; + } else if (container.isSchemaChanged()) { + schemaVersion++; + } + } + + @Override + public int schemaVersion() { return schemaVersion; } + + @Override + public VectorContainer batch() { return container; } + + @Override + public SelectionVector2 sv2() { return null; } + } + + private final UpstreamSource source; + private int priorSchemaVersion; + private RowSetReader rowSetReader; + + public PushResultSetReaderImpl(UpstreamSource source) { + this.source = source; + } + + @Override + public RowSetReader start() { + int sourceSchemaVersion = source.schemaVersion(); + Preconditions.checkState(sourceSchemaVersion > 0); + Preconditions.checkState(priorSchemaVersion <= sourceSchemaVersion); + + // If new schema, discard the old reader (if any, and create + // a new one that matches the new schema. If not a new schema, + // then the old reader is reused: it points to vectors which + // Drill requires be the same vectors as the previous batch, + // but with different buffers. + boolean newSchema = priorSchemaVersion != sourceSchemaVersion; + if (newSchema) { + rowSetReader = createRowSet().reader(); + priorSchemaVersion = sourceSchemaVersion; + } else { + rowSetReader.newBatch(); + } + return rowSetReader; + } + + // TODO: Build the reader without the need for a row set + private RowSet createRowSet() { + VectorContainer container = source.batch(); + switch (container.getSchema().getSelectionVectorMode()) { + case FOUR_BYTE: + throw new IllegalArgumentException("Build from SV4 not yet supported"); + case NONE: + return DirectRowSet.fromContainer(container); + case TWO_BYTE: + return IndirectRowSet.fromSv2(container, source.sv2()); + default: + throw new IllegalStateException("Invalid selection mode"); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java index 9c596ff7655..1dd8d5498a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java @@ -18,14 +18,12 @@ package org.apache.drill.exec.physical.resultSet.impl; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.impl.protocol.BatchAccessor; +import org.apache.drill.exec.physical.resultSet.PullResultSetReader; import org.apache.drill.exec.physical.resultSet.ResultSetCopier; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; -import org.apache.drill.exec.physical.resultSet.ResultSetReader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.physical.rowSet.RowSetReader; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.metadata.MetadataUtils; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.accessor.ColumnReader; import org.apache.drill.exec.vector.accessor.ColumnWriter; @@ -40,6 +38,7 @@ private enum State { BATCH_ACTIVE, NEW_SCHEMA, SCHEMA_PENDING, + END_OF_INPUT, CLOSED } @@ -69,7 +68,7 @@ protected CopyPair(ColumnWriter writer, ColumnReader reader) { // Input state private int currentSchemaVersion = -1; - private final ResultSetReader resultSetReader; + private final PullResultSetReader resultSetReader; protected RowSetReader rowReader; // Output state @@ -85,14 +84,14 @@ protected CopyPair(ColumnWriter writer, ColumnReader reader) { private CopyPair[] projection; private CopyAll activeCopy; - public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor inputBatch) { - this(allocator, inputBatch, new ResultSetOptionBuilder()); + public ResultSetCopierImpl(BufferAllocator allocator, PullResultSetReader source) { + this(allocator, source, new ResultSetOptionBuilder()); } - public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor inputBatch, + public ResultSetCopierImpl(BufferAllocator allocator, PullResultSetReader source, ResultSetOptionBuilder outputOptions) { this.allocator = allocator; - resultSetReader = new ResultSetReaderImpl(inputBatch); + resultSetReader = source; writerOptions = outputOptions; writerOptions.vectorCache(new ResultVectorCacheImpl(allocator)); state = State.START; @@ -104,7 +103,6 @@ public void startOutputBatch() { // No schema yet. Defer real batch start until we see an input // batch. - state = State.NO_SCHEMA; return; } @@ -112,7 +110,6 @@ public void startOutputBatch() { if (state == State.SCHEMA_PENDING) { // We have a pending new schema. Create new writers to match. - createProjection(); } resultSetWriter.startBatch(); @@ -120,67 +117,58 @@ public void startOutputBatch() { if (isCopyPending()) { // Resume copying if a copy is active. - copyBlock(); } } @Override - public void startInputBatch() { + public boolean nextInputBatch() { + if (state == State.END_OF_INPUT) { + return false; + } Preconditions.checkState(state == State.NO_SCHEMA || state == State.NEW_SCHEMA || state == State.BATCH_ACTIVE, "Can only start input while in an output batch"); Preconditions.checkState(!isCopyPending(), "Finish the pending copy before changing input"); - bindInput(); + if (!resultSetReader.next()) { + state = State.END_OF_INPUT; + return false; + } + rowReader = resultSetReader.reader(); if (state == State.BATCH_ACTIVE) { // If no schema change, we are ready to copy. - - if (currentSchemaVersion == resultSetReader.inputBatch().schemaVersion()) { - return; + if (currentSchemaVersion == resultSetReader.schemaVersion()) { + return true; } // The schema has changed. Handle it now or later. - if (hasOutputRows()) { // Output batch has rows. Can't switch and bind inputs // until current batch is sent downstream. - state = State.NEW_SCHEMA; - return; + return true; } } // The schema changed: first schema, or a change while a bath // is active, but is empty. - if (state == State.NO_SCHEMA) { state = State.BATCH_ACTIVE; } else { // Discard the unused empty batch - harvest().zeroVectors(); } createProjection(); resultSetWriter.startBatch(); // Stay in the current state. - } - - protected void bindInput() { - resultSetReader.start(); - rowReader = resultSetReader.reader(); - } - - @Override - public void releaseInputBatch() { - Preconditions.checkState(state != State.CLOSED); - resultSetReader.release(); + return true; } private void createProjection() { @@ -190,14 +178,13 @@ private void createProjection() { // will tear down the whole show. But, the vector cache will // ensure that the new writer reuses any matching vectors from // the prior batch to provide vector persistence as Drill expects. - resultSetWriter.close(); } - TupleMetadata schema = MetadataUtils.fromFields(resultSetReader.inputBatch().schema()); + TupleMetadata schema = resultSetReader.schema(); writerOptions.readerSchema(schema); resultSetWriter = new ResultSetLoaderImpl(allocator, writerOptions.build()); rowWriter = resultSetWriter.writer(); - currentSchemaVersion = resultSetReader.inputBatch().schemaVersion(); + currentSchemaVersion = resultSetReader.schemaVersion(); int colCount = schema.size(); projection = new CopyPair[colCount]; @@ -225,6 +212,7 @@ public boolean isOutputFull() { case BATCH_ACTIVE: return rowWriter.isFull(); case NEW_SCHEMA: + case END_OF_INPUT: return true; default: return false; @@ -288,7 +276,8 @@ public boolean isCopyPending() { @Override public VectorContainer harvest() { - Preconditions.checkState(state == State.BATCH_ACTIVE || state == State.NEW_SCHEMA); + Preconditions.checkState(state == State.BATCH_ACTIVE || state == State.NEW_SCHEMA || + state == State.END_OF_INPUT); VectorContainer output = resultSetWriter.harvest(); state = (state == State.BATCH_ACTIVE) ? State.BETWEEN_BATCHES : State.SCHEMA_PENDING; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java deleted file mode 100644 index 6046c973b7a..00000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.resultSet.impl; - -import org.apache.drill.exec.physical.impl.protocol.BatchAccessor; -import org.apache.drill.exec.physical.resultSet.ResultSetReader; -import org.apache.drill.exec.physical.rowSet.RowSetReader; -import org.apache.drill.exec.physical.rowSet.RowSets; -import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; - -public class ResultSetReaderImpl implements ResultSetReader { - - @VisibleForTesting - protected enum State { - START, - BATCH, - DETACHED, - CLOSED - } - - private State state = State.START; - private int priorSchemaVersion; - private final BatchAccessor batch; - private RowSetReader rowSetReader; - - public ResultSetReaderImpl(BatchAccessor batch) { - this.batch = batch; - } - - @Override - public void start() { - Preconditions.checkState(state != State.CLOSED, "Reader is closed"); - Preconditions.checkState(state != State.BATCH, - "Call detach/release before starting another batch"); - Preconditions.checkState(state == State.START || - priorSchemaVersion <= batch.schemaVersion()); - boolean newSchema = state == State.START || - priorSchemaVersion != batch.schemaVersion(); - state = State.BATCH; - - // If new schema, discard the old reader (if any, and create - // a new one that matches the new schema. If not a new schema, - // then the old reader is reused: it points to vectors which - // Drill requires be the same vectors as the previous batch, - // but with different buffers. - - if (newSchema) { - rowSetReader = RowSets.wrap(batch).reader(); - priorSchemaVersion = batch.schemaVersion(); - } else { - rowSetReader.newBatch(); - } - } - - @Override - public RowSetReader reader() { - Preconditions.checkState(state == State.BATCH, "Call start() before requesting the reader."); - return rowSetReader; - } - - @Override - public void detach() { - if (state != State.START) { - Preconditions.checkState(state == State.BATCH || state == State.DETACHED); - state = State.DETACHED; - } - } - - @Override - public void release() { - if (state != State.START && state != State.DETACHED) { - detach(); - batch.release(); - } - } - - @Override - public void close() { - if (state != State.CLOSED) { - release(); - state = State.CLOSED; - } - } - - @VisibleForTesting - protected State state() { return state; } - - @Override - public BatchAccessor inputBatch() { return batch; } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java index 13780de56f5..8404dc42bad 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java @@ -24,12 +24,11 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.impl.protocol.BatchAccessor; -import org.apache.drill.exec.physical.impl.protocol.IndirectContainerAccessor; -import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor; +import org.apache.drill.exec.physical.resultSet.PullResultSetReader; import org.apache.drill.exec.physical.resultSet.ResultSetCopier; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource; import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions; import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.physical.rowSet.RowSetBuilder; @@ -39,6 +38,7 @@ import org.apache.drill.exec.record.metadata.MetadataUtils; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector2Builder; import org.apache.drill.exec.vector.accessor.ArrayWriter; import org.apache.drill.exec.vector.accessor.TupleWriter; @@ -55,77 +55,95 @@ public class TestResultSetCopier extends SubOperatorTest { .add("name", MinorType.VARCHAR) .build(); - private static class BaseDataGen { - protected final TupleMetadata schema; + private static abstract class BaseDataGen implements UpstreamSource { + protected int schemaVersion = 1; protected final ResultSetLoader rsLoader; - protected final VectorContainerAccessor batch = new VectorContainerAccessor(); + protected VectorContainer batch; + protected int batchCount; + protected int rowCount; + protected int batchSize; + protected int batchLimit; - public BaseDataGen(TupleMetadata schema) { - this.schema = schema; + public BaseDataGen(TupleMetadata schema, int batchSize, int batchLimit) { ResultSetOptions options = new ResultSetOptionBuilder() .readerSchema(schema) .vectorCache(new ResultVectorCacheImpl(fixture.allocator())) .build(); rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + this.batchSize = batchSize; + this.batchLimit = batchLimit; } - public TupleMetadata schema() { return schema; } + @Override + public int schemaVersion() { return schemaVersion; } - public BatchAccessor batchAccessor() { - return batch; + @Override + public VectorContainer batch() { return batch; } + + @Override + public boolean next() { + if (batchCount >= batchLimit) { + return false; + } + makeBatch(); + return true; + } + + protected abstract void makeBatch(); + + @Override + public SelectionVector2 sv2() { return null; } + + @Override + public void release() { + if (batch != null) { + batch.zeroVectors(); + } + SelectionVector2 sv2 = sv2(); + if (sv2 != null) { + sv2.clear(); + } } } private static class DataGen extends BaseDataGen { public DataGen() { - super(TEST_SCHEMA); + this(3, 1); + } + + public DataGen(int batchSize, int batchLimit) { + super(TEST_SCHEMA, batchSize, batchLimit); } - public void makeBatch(int start, int end) { + @Override + protected void makeBatch() { rsLoader.startBatch(); - for (int i = start; i <= end; i++) { - rsLoader.writer().addRow(i, "Row " + i); + for (int i = 0; i < batchSize; i++) { + rowCount++; + rsLoader.writer().addRow(rowCount, "Row " + rowCount); } - batch.addBatch(rsLoader.harvest()); + batch = rsLoader.harvest(); + batchCount++; } } - public static class DataGen2 extends DataGen { - private final int batchCount = 2; - private final int batchSize = 5; - private int batchIndex; + public static class SchemaChangeGen extends DataGen { - boolean next() { - if (batchIndex >= batchCount) { - return false; - } - int start = nextRow(); - makeBatch(start, start + batchSize - 1); - batchIndex++; - return true; - } + int schema1Limit; - int nextRow() { - return batchIndex * batchSize + 1; + public SchemaChangeGen(int batchSize, int batchLimit, int schema1Limit) { + super(batchSize, batchLimit); + this.schema1Limit = schema1Limit; } - int targetRowCount( ) { - return batchCount * batchSize; + public SchemaChangeGen(int schema1Limit) { + super(3, 3); + this.schema1Limit = schema1Limit; } - } - public static class SchemaChangeGen extends DataGen { - private int batchIndex; - public final int batchSize = 5; - private int schemaVersion = 1; - - public void makeBatch2(int start, int end) { - rsLoader.startBatch(); - for (int i = start; i <= end; i++) { - rsLoader.writer().addRow(i, "Row " + i, i * 10); - } - batch.addBatch(rsLoader.harvest()); + public SchemaChangeGen() { + this(2); } public TupleMetadata schema2() { @@ -136,20 +154,31 @@ public TupleMetadata schema2() { .build(); } - public void evolveSchema() { - rsLoader.writer().addColumn(MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED)); - schemaVersion = 2; + @Override + protected void makeBatch() { + if (batchCount < schema1Limit) { + super.makeBatch(); + } else if (batchCount == schema1Limit) { + evolveSchema(); + makeBatch2(); + } else { + makeBatch2(); + } } - public void nextBatch() { - int start = batchIndex * batchSize + 1; - int end = start + batchSize - 1; - if (schemaVersion == 1) { - makeBatch(start, end); - } else { - makeBatch2(start, end); + public void makeBatch2() { + rsLoader.startBatch(); + for (int i = 0; i < batchSize; i++) { + rowCount++; + rsLoader.writer().addRow(rowCount, "Row " + rowCount, rowCount * 10); } - batchIndex++; + batch = rsLoader.harvest(); + batchCount++; + } + + public void evolveSchema() { + rsLoader.writer().addColumn(MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED)); + schemaVersion = 2; } } @@ -160,24 +189,28 @@ public NullableGen() { .add("id", MinorType.INT) .addNullable("name", MinorType.VARCHAR) .addNullable("amount", MinorType.INT) - .build()); + .build(), + 10, 1); } - public void makeBatch(int start, int end) { + @Override + protected void makeBatch() { rsLoader.startBatch(); RowSetLoader writer = rsLoader.writer(); - for (int i = start; i <= end; i++) { + for (int i = 0; i < batchSize; i++) { + rowCount++; writer.start(); - writer.scalar(0).setInt(i); + writer.scalar(0).setInt(rowCount); if (i % 2 == 0) { - writer.scalar(1).setString("Row " + i); + writer.scalar(1).setString("Row " + rowCount); } if (i % 3 == 0) { - writer.scalar(2).setInt(i * 10); + writer.scalar(2).setInt(rowCount * 10); } writer.save(); } - batch.addBatch(rsLoader.harvest()); + batch = rsLoader.harvest(); + batchCount++; } } @@ -187,23 +220,27 @@ public ArrayGen() { super(new SchemaBuilder() .add("id", MinorType.INT) .addArray("name", MinorType.VARCHAR) - .build()); + .build(), + 3, 1); } - public void makeBatch(int start, int end) { + @Override + protected void makeBatch() { rsLoader.startBatch(); RowSetLoader writer = rsLoader.writer(); ArrayWriter aw = writer.array(1); - for (int i = start; i <= end; i++) { + for (int i = 0; i < batchSize; i++) { + rowCount++; writer.start(); - writer.scalar(0).setInt(i); + writer.scalar(0).setInt(rowCount); int n = i % 3; for (int j = 0; j < n; j++) { - aw.scalar().setString("Row " + i + "." + j); + aw.scalar().setString("Row " + rowCount + "." + j); } writer.save(); } - batch.addBatch(rsLoader.harvest()); + batch = rsLoader.harvest(); + batchCount++; } } @@ -216,38 +253,80 @@ public MapGen() { .add("name", MinorType.VARCHAR) .add("amount", MinorType.INT) .resumeSchema() - .build()); + .build(), + 3, 1); } - public void makeBatch(int start, int end) { + @Override + protected void makeBatch() { rsLoader.startBatch(); RowSetLoader writer = rsLoader.writer(); ArrayWriter aw = writer.array(1); TupleWriter mw = aw.entry().tuple(); - for (int i = start; i <= end; i++) { + for (int i = 0; i < batchSize; i++) { + rowCount++; writer.start(); - writer.scalar(0).setInt(i); + writer.scalar(0).setInt(rowCount); int n = i % 3; for (int j = 0; j < n; j++) { - mw.scalar(0).setString("Row " + i + "." + j); - mw.scalar(1).setInt(i * 100 + j); + mw.scalar(0).setString("Row " + rowCount + "." + j); + mw.scalar(1).setInt(rowCount * 100 + j); aw.save(); } writer.save(); } - batch.addBatch(rsLoader.harvest()); + batch = rsLoader.harvest(); + batchCount++; } } + public static class FilteredGen extends DataGen { + + SelectionVector2 sv2; + + public FilteredGen() { + super(10, 1); + } + + @Override + protected void makeBatch() { + super.makeBatch(); + makeSv2(); + } + + // Pick out every other record, in descending + // order. + private void makeSv2() { + SelectionVector2Builder sv2Builder = + new SelectionVector2Builder(fixture.allocator(), batch.getRecordCount()); + for (int i = 0; i < 5; i++) { + sv2Builder.setNext(10 - 2 * i - 1); + } + sv2 = sv2Builder.harvest(batch); + batch.buildSchema(SelectionVectorMode.TWO_BYTE); + } + + @Override + public SelectionVector2 sv2() { return sv2; } + } + + private ResultSetCopierImpl newCopier(UpstreamSource source) { + PullResultSetReader reader = new PullResultSetReaderImpl(source); + return new ResultSetCopierImpl(fixture.allocator(), reader); + } + + private ResultSetCopierImpl newCopier(UpstreamSource source, ResultSetOptionBuilder outputOptions) { + PullResultSetReader reader = new PullResultSetReaderImpl(source); + return new ResultSetCopierImpl(fixture.allocator(), reader, outputOptions); + } + @Test public void testBasics() { DataGen dataGen = new DataGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(dataGen); // Nothing should work yet - try { copier.copyAllRows(); fail(); @@ -262,28 +341,23 @@ public void testBasics() { } // Predicates should work - assertFalse(copier.isCopyPending()); assertFalse(copier.hasOutputRows()); assertFalse(copier.isOutputFull()); // Define a schema and start an output batch. - copier.startOutputBatch(); assertFalse(copier.isCopyPending()); assertFalse(copier.hasOutputRows()); assertFalse(copier.isOutputFull()); - // Provide an input row - - dataGen.makeBatch(1, 3); - copier.startInputBatch(); + // Provide an input batch + assertTrue(copier.nextInputBatch()); assertFalse(copier.isCopyPending()); assertFalse(copier.hasOutputRows()); assertFalse(copier.isOutputFull()); // Now can do some actual copying - while (copier.copyNextRow()) { // empty } @@ -294,72 +368,62 @@ public void testBasics() { // Get and verify the output batch // (Does not free the input batch, we reuse it // in the verify step below.) - RowSet result = fixture.wrap(copier.harvest()); - new RowSetComparison(fixture.wrap(dataGen.batchAccessor().container())) + new RowSetComparison(fixture.wrap(dataGen.batch())) .verifyAndClear(result); - // Copier will release the input batch + // No more input + copier.startOutputBatch(); + assertFalse(copier.nextInputBatch()); + // OK to try multiple times + assertFalse(copier.nextInputBatch()); + + // Copier will release the input batch copier.close(); } @Test public void testImmediateClose() { - DataGen dataGen = new DataGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new DataGen()); // Close OK before things got started - copier.close(); // Second close is benign - copier.close(); } @Test public void testCloseBeforeSchema() { - DataGen dataGen = new DataGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new DataGen()); // Start batch, no data yet. - copier.startOutputBatch(); // Close OK before things data arrives - copier.close(); // Second close is benign - copier.close(); } @Test public void testCloseWithData() { - DataGen dataGen = new DataGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new DataGen()); // Start batch, with data. - copier.startOutputBatch(); - dataGen.makeBatch(1, 3); - copier.startInputBatch(); + copier.nextInputBatch(); copier.copyNextRow(); // Close OK with input and output batch allocated. - copier.close(); // Second close is benign - copier.close(); } @@ -371,27 +435,25 @@ public void testCloseWithData() { * This copier does not support merging from multiple * streams. */ - @Test public void testMerge() { - DataGen dataGen = new DataGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new DataGen(3, 5)); copier.startOutputBatch(); for (int i = 0; i < 5; i++) { - int start = i * 3 + 1; - dataGen.makeBatch(start, start + 2); - copier.startInputBatch(); + assertTrue(copier.nextInputBatch()); assertFalse(copier.isOutputFull()); copier.copyAllRows(); - copier.releaseInputBatch(); assertFalse(copier.isOutputFull()); assertFalse(copier.isCopyPending()); } + assertFalse(copier.nextInputBatch()); RowSet result = fixture.wrap(copier.harvest()); - dataGen.makeBatch(1, 15); - RowSet expected = RowSets.wrap(dataGen.batchAccessor()); + + // Verify with single batch with all rows + DataGen dataGen = new DataGen(15, 1); + dataGen.next(); + RowSet expected = RowSets.wrap(dataGen.batch()); RowSetUtilities.verify(expected, result); copier.close(); @@ -399,81 +461,67 @@ public void testMerge() { @Test public void testMultiOutput() { - DataGen2 dataGen = new DataGen2(); - DataGen validatorGen = new DataGen(); // Equivalent of operator start() method. - + DataGen dataGen = new DataGen(15, 2); ResultSetOptionBuilder options = new ResultSetOptionBuilder() .rowCountLimit(12); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor(), options); + ResultSetCopier copier = newCopier(dataGen, options); // Equivalent of an entire operator run - - int start = 1; + DataGen validatorGen = new DataGen(12, 2); + int outputCount = 0; while (true) { // Equivalent of operator next() method - copier.startOutputBatch(); while (! copier.isOutputFull()) { - copier.releaseInputBatch(); - if (! dataGen.next()) { + if (!copier.nextInputBatch()) { break; } - copier.startInputBatch(); copier.copyAllRows(); } - if (! copier.hasOutputRows()) { + if (!copier.hasOutputRows()) { break; } // Equivalent of sending downstream - RowSet result = fixture.wrap(copier.harvest()); - int nextRow = dataGen.nextRow(); - validatorGen.makeBatch(start, nextRow - 1); - RowSet expected = RowSets.wrap(validatorGen.batchAccessor()); - RowSetUtilities.verify(expected, result); - start = nextRow; + + validatorGen.next(); + RowSet expected = RowSets.wrap(validatorGen.batch()); + RowSetUtilities.verify(expected, result, result.rowCount()); + outputCount++; } // Ensure more than one output batch. - - assertTrue(start > 1); + assertTrue(outputCount > 1); // Ensure all rows generated. - - assertEquals(dataGen.targetRowCount(), start - 1); + assertEquals(30, dataGen.rowCount); // Simulate operator close(); - copier.close(); } @Test public void testCopyRecord() { - DataGen dataGen = new DataGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new DataGen(3, 2)); copier.startOutputBatch(); - dataGen.makeBatch(1, 3); - copier.startInputBatch(); + copier.nextInputBatch(); copier.copyRow(2); copier.copyRow(0); copier.copyRow(1); - copier.releaseInputBatch(); - dataGen.makeBatch(4, 6); - copier.startInputBatch(); + copier.nextInputBatch(); copier.copyRow(1); copier.copyRow(0); copier.copyRow(2); - copier.releaseInputBatch(); - RowSet expected = new RowSetBuilder(fixture.allocator(), dataGen.schema()) + assertFalse(copier.nextInputBatch()); + + RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA) .addRow(3, "Row 3") .addRow(1, "Row 1") .addRow(2, "Row 2") @@ -481,67 +529,49 @@ public void testCopyRecord() { .addRow(4, "Row 4") .addRow(6, "Row 6") .build(); - RowSet result = fixture.wrap(copier.harvest()); - RowSetUtilities.verify(expected, result); + RowSetUtilities.verify(expected, fixture.wrap(copier.harvest())); copier.close(); } @Test public void testSchemaChange() { - SchemaChangeGen dataGen = new SchemaChangeGen(); - SchemaChangeGen verifierGen = new SchemaChangeGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new SchemaChangeGen(3, 4, 2)); // Copy first batch with first schema - copier.startOutputBatch(); - dataGen.nextBatch(); - copier.startInputBatch(); + assertTrue(copier.nextInputBatch()); copier.copyAllRows(); assertFalse(copier.isOutputFull()); // Second, same schema - - copier.releaseInputBatch(); - dataGen.nextBatch(); - copier.startInputBatch(); + assertTrue(copier.nextInputBatch()); copier.copyAllRows(); assertFalse(copier.isOutputFull()); // Plenty of room. But, change the schema. - - copier.releaseInputBatch(); - dataGen.evolveSchema(); - dataGen.nextBatch(); - copier.startInputBatch(); + assertTrue(copier.nextInputBatch()); assertTrue(copier.isOutputFull()); // Must harvest partial output - RowSet result = fixture.wrap(copier.harvest()); - verifierGen.makeBatch(1, 2 * dataGen.batchSize - 1); - RowSet expected = RowSets.wrap(verifierGen.batchAccessor()); + SchemaChangeGen verifierGen = new SchemaChangeGen(6, 2, 1); + verifierGen.next(); + RowSet expected = RowSets.wrap(verifierGen.batch()); RowSetUtilities.verify(expected, result); // Start a new batch, implicitly complete pending copy - copier.startOutputBatch(); copier.copyAllRows(); // Add one more of second schema - - copier.releaseInputBatch(); - dataGen.nextBatch(); - copier.startInputBatch(); + assertTrue(copier.nextInputBatch()); copier.copyAllRows(); assertFalse(copier.isOutputFull()); result = fixture.wrap(copier.harvest()); - verifierGen.evolveSchema(); - verifierGen.makeBatch2(2 * dataGen.batchSize + 1, 4 * dataGen.batchSize - 1); - expected = RowSets.wrap(verifierGen.batchAccessor()); + verifierGen.next(); + expected = RowSets.wrap(verifierGen.batch()); RowSetUtilities.verify(expected, result); assertFalse(copier.isCopyPending()); @@ -553,31 +583,11 @@ public void testSchemaChange() { @Test public void testSV2() { - DataGen dataGen = new DataGen(); - IndirectContainerAccessor filtered = new IndirectContainerAccessor(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), filtered); + ResultSetCopier copier = newCopier(new FilteredGen()); copier.startOutputBatch(); - dataGen.makeBatch(1, 10); - - // Pick out every other record, in descending - // order. - - VectorContainer container = dataGen.batchAccessor().container(); - SelectionVector2Builder sv2Builder = - new SelectionVector2Builder(fixture.allocator(), container.getRecordCount()); - for (int i = 0; i < 5; i++) { - sv2Builder.setNext(10 - 2 * i - 1); - } - container.buildSchema(SelectionVectorMode.TWO_BYTE); - filtered.addBatch(container); - filtered.setSelectionVector(sv2Builder.harvest(container)); - assertEquals(5, filtered.rowCount()); - - copier.startInputBatch(); + assertTrue(copier.nextInputBatch()); copier.copyAllRows(); - copier.releaseInputBatch(); RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA) .addRow(10, "Row 10") @@ -599,17 +609,16 @@ public void testSV4() { @Test public void testNullable() { - NullableGen dataGen = new NullableGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new NullableGen()); copier.startOutputBatch(); - dataGen.makeBatch(1, 10); - copier.startInputBatch(); + copier.nextInputBatch(); copier.copyAllRows(); RowSet result = fixture.wrap(copier.harvest()); - RowSet expected = RowSets.wrap(dataGen.batchAccessor()); + NullableGen verifierGen = new NullableGen(); + verifierGen.next(); + RowSet expected = RowSets.wrap(verifierGen.batch()); RowSetUtilities.verify(expected, result); copier.close(); @@ -617,17 +626,16 @@ public void testNullable() { @Test public void testArrays() { - ArrayGen dataGen = new ArrayGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new ArrayGen()); copier.startOutputBatch(); - dataGen.makeBatch(1, 5); - copier.startInputBatch(); + copier.nextInputBatch(); copier.copyAllRows(); RowSet result = fixture.wrap(copier.harvest()); - RowSet expected = RowSets.wrap(dataGen.batchAccessor()); + ArrayGen verifierGen = new ArrayGen(); + verifierGen.next(); + RowSet expected = RowSets.wrap(verifierGen.batch()); RowSetUtilities.verify(expected, result); copier.close(); @@ -635,17 +643,16 @@ public void testArrays() { @Test public void testMaps() { - MapGen dataGen = new MapGen(); - ResultSetCopier copier = new ResultSetCopierImpl( - fixture.allocator(), dataGen.batchAccessor()); + ResultSetCopier copier = newCopier(new MapGen()); copier.startOutputBatch(); - dataGen.makeBatch(1, 5); - copier.startInputBatch(); + copier.nextInputBatch(); copier.copyAllRows(); RowSet result = fixture.wrap(copier.harvest()); - RowSet expected = RowSets.wrap(dataGen.batchAccessor()); + MapGen verifierGen = new MapGen(); + verifierGen.next(); + RowSet expected = RowSets.wrap(verifierGen.batch()); RowSetUtilities.verify(expected, result); copier.close(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java index 05c543062c2..08ad5ac6b61 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java @@ -18,98 +18,140 @@ package org.apache.drill.exec.physical.resultSet.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.impl.protocol.BatchAccessor; -import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor; +import org.apache.drill.exec.physical.resultSet.PullResultSetReader; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; -import org.apache.drill.exec.physical.resultSet.ResultSetReader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource; import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions; import org.apache.drill.exec.physical.rowSet.RowSetReader; -import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.test.SubOperatorTest; import org.junit.Test; public class TestResultSetReader extends SubOperatorTest { - public static class BatchGenerator { + private static final TupleMetadata SCHEMA1 = new SchemaBuilder() + .add("id", MinorType.INT) + .add("name", MinorType.VARCHAR) + .build(); + private static final TupleMetadata SCHEMA2 = new SchemaBuilder() + .addAll(SCHEMA1) + .add("amount", MinorType.INT) + .build(); + + public static class BatchGenerator implements UpstreamSource { private enum State { SCHEMA1, SCHEMA2 }; private final ResultSetLoader rsLoader; - private final VectorContainerAccessor batch = new VectorContainerAccessor(); + private VectorContainer batch; + private int schemaVersion; private State state; + private int batchCount; + private int rowCount; + private final int schema1Count; + private final int schema2Count; + private final int batchSize; - public BatchGenerator() { - TupleMetadata schema1 = new SchemaBuilder() - .add("id", MinorType.INT) - .add("name", MinorType.VARCHAR) - .build(); + public BatchGenerator(int batchSize, int schema1Count, int schema2Count) { ResultSetOptions options = new ResultSetOptionBuilder() - .readerSchema(schema1) + .readerSchema(SCHEMA1) .vectorCache(new ResultVectorCacheImpl(fixture.allocator())) .build(); - rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); - state = State.SCHEMA1; + this.rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + this.state = State.SCHEMA1; + this.batchSize = batchSize; + this.schemaVersion = 1; + this.schema1Count = schema1Count; + this.schema2Count = schema2Count; } - public void batch1(int start, int end) { + public void batch1() { Preconditions.checkState(state == State.SCHEMA1); rsLoader.startBatch(); RowSetLoader writer = rsLoader.writer(); - for (int i = start; i <= end; i++) { + for (int i = 0; i < batchSize; i++) { + rowCount++; writer.start(); - writer.scalar("id").setInt(i); - writer.scalar("name").setString("Row" + i); + writer.scalar("id").setInt(rowCount); + writer.scalar("name").setString("Row" + rowCount); writer.save(); } - batch.addBatch(rsLoader.harvest()); + batch = rsLoader.harvest(); + batchCount++; } - public void batch2(int start, int end) { + public void batch2() { RowSetLoader writer = rsLoader.writer(); if (state == State.SCHEMA1) { - ColumnMetadata balCol = MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED); - writer.addColumn(balCol); + writer.addColumn(SCHEMA2.metadata("amount")); state = State.SCHEMA2; + schemaVersion++; } rsLoader.startBatch(); - for (int i = start; i <= end; i++) { + for (int i = 0; i < batchSize; i++) { + rowCount++; writer.start(); - writer.scalar("id").setInt(i); - writer.scalar("name").setString("Row" + i); - writer.scalar("amount").setInt(i * 10); + writer.scalar("id").setInt(rowCount); + writer.scalar("name").setString("Row" + rowCount); + writer.scalar("amount").setInt(rowCount * 10); writer.save(); } - batch.addBatch(rsLoader.harvest()); - } - - public BatchAccessor batchAccessor() { - return batch; + batch = rsLoader.harvest(); + batchCount++; } public void close() { rsLoader.close(); } + + @Override + public boolean next() { + if (batchCount == schema1Count + schema2Count) { + return false; + } + if (batchCount < schema1Count) { + batch1(); + } else { + batch2(); + } + return true; + } + + @Override + public int schemaVersion() { return schemaVersion; } + + @Override + public VectorContainer batch() { return batch; } + + @Override + public SelectionVector2 sv2() { return null; } + + @Override + public void release() { + if (batch != null) { + batch.zeroVectors(); + } + } } @Test public void testBasics() { - BatchGenerator gen = new BatchGenerator(); - ResultSetReader rsReader = new ResultSetReaderImpl(gen.batchAccessor()); + PullResultSetReader rsReader = new PullResultSetReaderImpl( + new BatchGenerator(10, 2, 1)); // Start state - try { rsReader.reader(); fail(); @@ -117,16 +159,15 @@ public void testBasics() { // Expected } - // OK to detach with no input - rsReader.detach(); - rsReader.release(); + // Ask for schema. Does an implicit next. + assertEquals(SCHEMA1, rsReader.schema()); + assertEquals(1, rsReader.schemaVersion()); - // Make a batch. Verify reader is attached. + // Move to the first batch. // (Don't need to do a full reader test, that is already done // elsewhere.) - - gen.batch1(1, 10); - rsReader.start(); + assertTrue(rsReader.next()); + assertEquals(1, rsReader.schemaVersion()); RowSetReader reader1; { RowSetReader reader = rsReader.reader(); @@ -135,18 +176,10 @@ public void testBasics() { assertEquals(1, reader.scalar("id").getInt()); assertEquals("Row1", reader.scalar("name").getString()); } - rsReader.release(); - try { - rsReader.reader(); - fail(); - } catch (IllegalStateException e) { - // Expected - } - - // Another batch of same schema - gen.batch1(11, 20); - rsReader.start(); + // Second batch, same schema. + assertTrue(rsReader.next()); + assertEquals(1, rsReader.schemaVersion()); { RowSetReader reader = rsReader.reader(); assertSame(reader1, reader); @@ -155,12 +188,10 @@ public void testBasics() { assertEquals(11, reader.scalar("id").getInt()); assertEquals("Row11", reader.scalar("name").getString()); } - rsReader.release(); // Batch with new schema - - gen.batch2(21, 30); - rsReader.start(); + assertTrue(rsReader.next()); + assertEquals(2, rsReader.schemaVersion()); { RowSetReader reader = rsReader.reader(); assertNotSame(reader1, reader); @@ -170,23 +201,50 @@ public void testBasics() { assertEquals("Row21", reader.scalar("name").getString()); assertEquals(210, reader.scalar("amount").getInt()); } - rsReader.release(); + assertFalse(rsReader.next()); rsReader.close(); } @Test public void testCloseAtStart() { - BatchGenerator gen = new BatchGenerator(); - ResultSetReaderImpl rsReader = new ResultSetReaderImpl(gen.batchAccessor()); + PullResultSetReader rsReader = new PullResultSetReaderImpl( + new BatchGenerator(10, 2, 1)); // Close OK in start state + rsReader.close(); + // Second close OK + rsReader.close(); + } + + @Test + public void testCloseDuringRead() { + PullResultSetReader rsReader = new PullResultSetReaderImpl( + new BatchGenerator(10, 2, 1)); + + // Move to first batch + assertTrue(rsReader.next()); + + // Close OK in start state rsReader.close(); - assertEquals(ResultSetReaderImpl.State.CLOSED, rsReader.state()); // Second close OK + rsReader.close(); + } + + @Test + public void testCloseAfterNext() { + PullResultSetReader rsReader = new PullResultSetReaderImpl( + new BatchGenerator(10, 2, 1)); + // Move to first batch + assertTrue(rsReader.next()); + + // Close OK in start state + rsReader.close(); + + // Second close OK rsReader.close(); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java index db1ab110642..0458aeea035 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java @@ -29,7 +29,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; -import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.physical.rowSet.DirectRowSet; import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.physical.rowSet.RowSetBuilder; @@ -321,8 +320,7 @@ public void testWildcardSortFailure() throws Exception { RowSet result = client.queryBuilder().sql(sql).rowSet(); assertEquals(4, result.rowCount()); result.clear(); - } catch (RpcException e) { - assertTrue(e.getCause() instanceof UserRemoteException); + } catch (UserRemoteException e) { sawError = true; break; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java index 6acdc9cc622..c7ddd9eabf8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java @@ -29,8 +29,8 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterTest; -import org.apache.drill.test.QueryBuilder.QuerySummary; import org.apache.drill.test.QueryRowSetIterator; +import org.apache.drill.test.QueryBuilder.QuerySummary; import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.physical.rowSet.RowSetReader; import org.junit.BeforeClass; diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java index 74a8e65d53f..7618456c8bd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java @@ -27,6 +27,8 @@ import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.shaded.guava.com.google.common.collect.Queues; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Drill query event listener that buffers rows into a producer-consumer @@ -36,13 +38,10 @@ * Query messages are transformed into events: query ID, batch, * EOF or error. */ +public class BufferingQueryEventListener implements UserResultsListener { + private static final Logger logger = LoggerFactory.getLogger(BufferingQueryEventListener.class); -public class BufferingQueryEventListener implements UserResultsListener -{ - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferingQueryEventListener.class); - - public static class QueryEvent - { + public static class QueryEvent { public enum Type { QUERY_ID, BATCH, EOF, ERROR } public final Type type; @@ -72,7 +71,7 @@ public QueryEvent(QueryState state) { } } - private BlockingQueue queue = Queues.newLinkedBlockingQueue(); + private final BlockingQueue queue = Queues.newLinkedBlockingQueue(); @Override public void queryIdArrived(QueryId queryId) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java index df3a217a0a2..c898e0e313b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java @@ -18,8 +18,9 @@ package org.apache.drill.test; import ch.qos.logback.classic.Level; -import org.apache.drill.exec.client.LoggingResultsListener; + import org.apache.drill.common.util.function.CheckedSupplier; +import org.apache.drill.exec.client.LoggingResultsListener; import org.apache.drill.exec.util.VectorUtil; import java.util.function.Supplier; diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java index 857744893d1..e4dcd98750f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java @@ -39,6 +39,8 @@ import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Parses a query profile and provides access to various bits of the profile @@ -46,7 +48,7 @@ */ public class ProfileParser { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileParser.class); + private static final Logger logger = LoggerFactory.getLogger(ProfileParser.class); /** * The original JSON profile. @@ -394,7 +396,7 @@ public void mapOpProfiles(FragInfo major) { logger.info("Can't find operator def: {}-{}", major.id, op.opId); continue; } - op.opName = CoreOperatorType.valueOf(op.type).name(); + op.opName = CoreOperatorType.forNumber(op.type).name(); op.opName = op.opName.replace("_", " "); op.name = opDef.name; if (op.name.equalsIgnoreCase(op.opName)) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java new file mode 100644 index 00000000000..2bace961c88 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.test.BufferingQueryEventListener.QueryEvent; + +/** + * Iterator over batches returned from a query. Uses a listener to obtain + * the serialized batches, then decodes them into value vectors held + * in a vector container - the same structure as used in the query + * executor. This format allows us to use the "row set" classes on the + * output of the query. + */ +public class QueryBatchIterator implements UpstreamSource, AutoCloseable { + + private enum State { START, RUN, RETAIN, EOF } + + private final BufferingQueryEventListener listener; + private final RecordBatchLoader loader; + private State state = State.START; + private boolean retainData; + private QueryId queryId; + private QueryState queryState; + private int schemaVersion; + private int recordCount; + private int batchCount; + + public QueryBatchIterator(BufferAllocator allocator, BufferingQueryEventListener listener) { + this.listener = listener; + this.loader = new RecordBatchLoader(allocator); + } + + public QueryId queryId() { return queryId; } + public String queryIdString() { return QueryIdHelper.getQueryId(queryId); } + public QueryState finalState() { return queryState; } + public int batchCount() { return batchCount; } + public int rowCount() { return recordCount; } + + @Override + public boolean next() { + retainData = false; + if (state == State.EOF) { + return false; + } + while (true) { + QueryEvent event = listener.get(); + queryState = event.state; + switch (event.type) { + case BATCH: + + // Skip over null batches + if (loadBatch(event)) { + return true; + } + break; + case EOF: + state = State.EOF; + return false; + case ERROR: + state = State.EOF; + if (event.error instanceof UserException) { + throw (UserException) event.error; + } else { + throw new RuntimeException(event.error); + } + case QUERY_ID: + queryId = event.queryId; + break; + default: + throw new IllegalStateException("Unexpected event: " + event.type); + } + } + } + + private boolean loadBatch(QueryEvent event) { + batchCount++; + recordCount += event.batch.getHeader().getRowCount(); + QueryDataBatch inputBatch = Preconditions.checkNotNull(event.batch); + + // Unload the batch and convert to a row set. + loader.load(inputBatch.getHeader().getDef(), inputBatch.getData()); + inputBatch.release(); + VectorContainer batch = loader.getContainer(); + batch.setRecordCount(loader.getRecordCount()); + + // Null results? Drill will return a single batch with no rows + // and no columns even if the scan (or other) operator returns + // no batches at all. For ease of testing, simply map this null + // result set to a null output row set that says "nothing at all + // was returned." Note that this is different than an empty result + // set which has a schema, but no rows. + if (batch.getRecordCount() == 0 && batch.getNumberOfColumns() == 0) { + release(); + return false; + } + + if (state == State.START || batch.isSchemaChanged()) { + schemaVersion++; + } + state = State.RUN; + return true; + } + + @Override + public int schemaVersion() { return schemaVersion; } + + @Override + public VectorContainer batch() { return loader.getContainer(); } + + @Override + public SelectionVector2 sv2() { return null; } + + @Override + public void release() { + loader.clear(); + } + + public void retainData() { + retainData = true; + } + + @Override + public void close() { + if (!retainData) { + release(); + } + + // Consume any pending input + while (state != State.EOF) { + QueryEvent event = listener.get(); + if (event.type == QueryEvent.Type.EOF) { + state = State.EOF; + } + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index 4828f6cdce7..e451dbb2fcc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -17,6 +17,8 @@ */ package org.apache.drill.test; +import static org.junit.Assert.assertEquals; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -58,11 +60,8 @@ import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.test.BufferingQueryEventListener.QueryEvent; -import org.apache.drill.test.ClientFixture.StatementParser; import org.joda.time.Period; -import static org.junit.Assert.assertEquals; - /** * Builder for a Drill query. Provides all types of query formats, * and a variety of ways to run the query. @@ -121,21 +120,18 @@ public void queryCompleted(QueryState state) { * The future used to wait for the completion of an async query. Returns * just the summary of the query. */ - public static class QuerySummaryFuture implements Future { /** * Synchronizes the listener thread and the test thread that * launched the query. */ - private final CountDownLatch lock = new CountDownLatch(1); private QuerySummary summary; /** * Unsupported at present. */ - @Override public boolean cancel(boolean mayInterruptIfRunning) { throw new UnsupportedOperationException(); @@ -144,7 +140,6 @@ public boolean cancel(boolean mayInterruptIfRunning) { /** * Always returns false. */ - @Override public boolean isCancelled() { return false; } @@ -160,7 +155,6 @@ public QuerySummary get() throws InterruptedException { /** * Not supported at present, just does a non-timeout get. */ - @Override public QuerySummary get(long timeout, TimeUnit unit) throws InterruptedException { return get(); @@ -345,56 +339,32 @@ public List results() throws RpcException { */ public DirectRowSet rowSet() throws RpcException { - // Ignore all but the first non-empty batch. - // Always return the last batch, which may be empty. - - QueryDataBatch resultBatch = null; - for (QueryDataBatch batch : results()) { - if (resultBatch == null) { - resultBatch = batch; - } else if (resultBatch.getHeader().getRowCount() == 0) { - resultBatch.release(); - resultBatch = batch; - } else if (batch.getHeader().getRowCount() > 0) { - throw new IllegalStateException("rowSet() returns a single batch, but this query returned multiple batches. Consider rowSetIterator() instead."); - } else { - batch.release(); + VectorContainer batch = null; + try (QueryBatchIterator iter = new QueryBatchIterator(client.allocator(), withEventListener())) { + while (iter.next()) { + batch = iter.batch(); + if (batch.getRecordCount() != 0) { + iter.retainData(); + break; + } } + iter.retainData(); } - - // No results? - - if (resultBatch == null) { - return null; - } - - // Unload the batch and convert to a row set. - - RecordBatchLoader loader = new RecordBatchLoader(client.allocator()); - loader.load(resultBatch.getHeader().getDef(), resultBatch.getData()); - resultBatch.release(); - VectorContainer container = loader.getContainer(); - container.setRecordCount(loader.getRecordCount()); - - // Null results? Drill will return a single batch with no rows - // and no columns even if the scan (or other) operator returns - // no batches at all. For ease of testing, simply map this null - // result set to a null output row set that says "nothing at all - // was returned." Note that this is different than an empty result - // set which has a schema, but no rows. - - if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) { - container.clear(); + if (batch == null) { return null; + } else { + return DirectRowSet.fromContainer(batch); } - - return DirectRowSet.fromContainer(container); } public QueryRowSetIterator rowSetIterator() { return new QueryRowSetIterator(client.allocator(), withEventListener()); } + public QueryRowSetReader rowSetReader() { + return QueryRowSetReader.build(client.allocator(), withEventListener()); + } + /** * Run the query which expect to return vector {@code V} representation * of type {@code T} for the column {@code columnName}. diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java index 3a3fe678de2..d8e2da16e42 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java @@ -30,15 +30,14 @@ * a very easy way for tests to work with query data using the * row set tools. */ - public class QueryResultSet { private final BufferingQueryEventListener listener; private boolean eof; - private int recordCount = 0; - private int batchCount = 0; - private QueryId queryId = null; + private int recordCount; + private int batchCount; + private QueryId queryId; @SuppressWarnings("unused") - private QueryState state = null; + private QueryState state; final RecordBatchLoader loader; public QueryResultSet(BufferingQueryEventListener listener, BufferAllocator allocator) { @@ -53,7 +52,6 @@ public QueryResultSet(BufferingQueryEventListener listener, BufferAllocator allo * @return the next batch as a row set, or null if EOF * @throws Exception on a server error */ - public DirectRowSet next() throws Exception { if (eof) { return null; diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java index acc50565cf0..37e7aeb39e3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java @@ -24,74 +24,32 @@ import org.apache.drill.exec.physical.rowSet.RowSetFormatter; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.rpc.user.QueryDataBatch; -import org.apache.drill.test.BufferingQueryEventListener.QueryEvent; +/** + * Converts an incoming set of record batches into an iterator over a + * set of row sets. Primarily for testing. + */ public class QueryRowSetIterator implements Iterator, Iterable { - private final BufferingQueryEventListener listener; - private final BufferAllocator allocator; - private int recordCount; - private int batchCount; - private QueryId queryId; - private QueryDataBatch batch; - private QueryState state; + private final QueryBatchIterator batchIter; - QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener listener) { - this.allocator = allocator; - this.listener = listener; + public QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener listener) { + batchIter = new QueryBatchIterator(allocator, listener); } - public QueryId queryId() { return queryId; } - public String queryIdString() { return QueryIdHelper.getQueryId(queryId); } - public QueryState finalState() { return state; } - public int batchCount() { return batchCount; } - public int rowCount() { return recordCount; } + public QueryId queryId() { return batchIter.queryId(); } + public String queryIdString() { return batchIter.queryIdString(); } + public QueryState finalState() { return batchIter.finalState(); } + public int batchCount() { return batchIter.batchCount(); } + public int rowCount() { return batchIter.rowCount(); } @Override public boolean hasNext() { - while (true) { - QueryEvent event = listener.get(); - state = event.state; - batch = null; - switch (event.type) { - case BATCH: - batchCount++; - recordCount += event.batch.getHeader().getRowCount(); - batch = event.batch; - return true; - case EOF: - state = event.state; - return false; - case ERROR: - throw new RuntimeException(event.error); - case QUERY_ID: - queryId = event.queryId; - break; - default: - throw new IllegalStateException("Unexpected event: " + event.type); - } - } + return batchIter.next(); } @Override public DirectRowSet next() { - - if (batch == null) { - throw new IllegalStateException(); - } - - // Unload the batch and convert to a row set. - - final RecordBatchLoader loader = new RecordBatchLoader(allocator); - loader.load(batch.getHeader().getDef(), batch.getData()); - batch.release(); - batch = null; - VectorContainer container = loader.getContainer(); - container.setRecordCount(loader.getRecordCount()); - return DirectRowSet.fromContainer(container); + return DirectRowSet.fromContainer(batchIter.batch()); } public void printAll() { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java new file mode 100644 index 00000000000..8703f286f5e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl; +import org.apache.drill.exec.proto.UserBitShared.QueryId; + +public class QueryRowSetReader extends PullResultSetReaderImpl { + + private final QueryBatchIterator batchIter; + + public QueryRowSetReader(QueryBatchIterator batchIter) { + super(batchIter); + this.batchIter = batchIter; + } + + public static QueryRowSetReader build(BufferAllocator allocator, BufferingQueryEventListener listener) { + return new QueryRowSetReader(new QueryBatchIterator(allocator, listener)); + } + + public QueryId queryId() { return batchIter.queryId(); } + public String queryIdString() { return batchIter.queryIdString(); } + + @Override + public void close() { + batchIter.close(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java new file mode 100644 index 00000000000..70c4260a109 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; + +/** + * Very simple parser for semi-colon separated lists of SQL statements which + * handles quoted semicolons. Drill can execute only one statement at a time + * (without a trailing semi-colon.) This parser breaks up a statement list + * into single statements. Input:
    + * USE a.b;
    + * ALTER SESSION SET `foo` = ";";
    + * SELECT * FROM bar WHERE x = "\";";
    + * 
    Output: + *
      + *
    • USE a.b
    • + *
    • ALTER SESSION SET `foo` = ";"
    • + *
    • SELECT * FROM bar WHERE x = "\";"
    • + */ +public class StatementParser { + private final Reader in; + + public StatementParser(Reader in) { + this.in = in; + } + + public StatementParser(String text) { + this(new StringReader(text)); + } + + public String parseNext() throws IOException { + boolean eof = false; + StringBuilder buf = new StringBuilder(); + while (true) { + int c = in.read(); + if (c == -1) { + eof = true; + break; + } + if (c == ';') { + break; + } + buf.append((char) c); + if (c == '"' || c == '\'' || c == '`') { + int quote = c; + boolean escape = false; + while (true) { + c = in.read(); + if (c == -1) { + throw new IllegalArgumentException("Mismatched quote: " + (char) c); + } + buf.append((char) c); + if (! escape && c == quote) { + break; + } + escape = c == '\\'; + } + } + } + String stmt = buf.toString().trim(); + if (stmt.isEmpty() && eof) { + return null; + } + return stmt; + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java index 334e90d81ad..925130fc9ad 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import java.math.BigDecimal; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; @@ -35,7 +36,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.ValueType; -import org.bouncycastle.util.Arrays; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.LocalDate; @@ -45,7 +45,6 @@ /** * Various utilities useful for working with row sets, especially for testing. */ - public class RowSetUtilities { private RowSetUtilities() { } @@ -55,7 +54,6 @@ private RowSetUtilities() { } * and easy way to reverse the sort order of an expected-value row set. * @param sv2 the SV2 which is reversed in place */ - public static void reverse(SelectionVector2 sv2) { int count = sv2.getCount(); for (int i = 0; i < count / 2; i++) { @@ -164,7 +162,7 @@ public static void assertEqualValues(String msg, ValueType type, Object expected byte[] expected = (byte[]) expectedObj; byte[] actual = (byte[]) actualObj; assertEquals(msg + " - byte lengths differ", expected.length, actual.length); - assertTrue(msg, Arrays.areEqual(expected, actual)); + assertTrue(msg, Arrays.equals(expected, actual)); break; } case DOUBLE: @@ -280,6 +278,10 @@ public static void verify(RowSet expected, RowSet actual) { new RowSetComparison(expected).verifyAndClearAll(actual); } + public static void verify(RowSet expected, RowSet actual, int rowCount) { + new RowSetComparison(expected).span(rowCount).verifyAndClearAll(actual); + } + public static BigDecimal dec(String value) { return new BigDecimal(value); }