Skip to content

Commit

Permalink
DRILL-7734: Revise the result set reader
Browse files Browse the repository at this point in the history
Revised into two forms: push (for streaming JSON results) and
pull (for one operator reading from another).

closes apache#2077
  • Loading branch information
paul-rogers authored and vvysotskyi committed May 25, 2020
1 parent 64b40be commit 953280b
Show file tree
Hide file tree
Showing 21 changed files with 1,060 additions and 605 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,17 @@

import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.record.metadata.TupleMetadata;

/**
* Iterates over the set of batches in a result set, providing
* a row set reader to iterate over the rows within each batch.
* Handles schema changes between batches.
* <p>
* Designed to handle batches arriving from a single upstream
* operator. Uses Drill's strict form of schema identity: that
* not only must the column definitions match; the vectors must
* be identical from one batch to the next. If the vectors differ,
* then this class assumes a new schema has occurred, and will
* rebuild all the underlying readers, which can be costly.
* Handles schema changes between batches. A typical use is to
* iterate over batches from an upstream operator. Protocol:
*
* <h4>Protocol</h4>
* <ol>
* <li>Create an instance, passing in a
* {@link BatchAccessor} to hold the batch and optional
* selection vector.</li>
* <li>Create an instance.</li>
* <li>For each incoming batch:
* <ol>
* <li>Call {@link #start()} to attach the batch. The associated
Expand All @@ -49,51 +42,64 @@
* </ol>
* <li>Call {@link #close()} after all batches are read.</li>
* </ol>
* <ul>
* <li>Create the result set reader via a specific subclass.
* If a query has a null result (no rows,
* no schema), the code which creates this class should instead
* indicate that no results are available. This class is only for
* the cases </li>
* <li>Call {@link #schema()}, if desired, to obtain the schema
* for this result set.</li>
* <li>Call {@link #next()} to advance to the first batch.</li>
* <li>If {@code next()} returns {@code true}, then call
* {@link #reader()} to obtain a reader over rows. This reader also
* provides the batch schema.</li>
* <li>Use the reader to iterate over rows in the batch.</li>
* <li>Call {@code next()} to advance to the next batch and
* repeat.</li>
* </ul>
* <p>
* The implementation may perform complex tasks behind the scenes:
* coordinate with the query runner (if remote), drive an operator
* (if within a DAG), etc. The implementation takes an interface
* that interfaces with the source of batches.
* <p>
* Designed to handle batches arriving from a single upstream
* operator. Uses Drill's strict form of schema identity: that
* not only must the column definitions match; the vectors must
* be identical from one batch to the next. If the vectors differ,
* then this class assumes a new schema has occurred, and will
* rebuild all the underlying readers, which can be costly.
*/
public interface ResultSetReader {
public interface PullResultSetReader {

/**
* Start tracking a new batch in the associated
* vector container.
* Advance to the next batch of data. The iterator starts
* positioned before the first batch (but after obtaining
* a schema.)
* @return {@code true} if another batch is available,
* {@code false} if EOF
*/
void start();
boolean next();

/**
* Get the row reader for this batch. The row reader is
* guaranteed to remain the same for the life of the
* result set reader.
*
* @return the row reader to read rows for the current
* batch
* Return the schema for this result set.
*/
RowSetReader reader();
TupleMetadata schema();

/**
* Detach the batch of data from this reader. Does not
* release the memory for that batch.
*/
void detach();
int schemaVersion();

/**
* Detach the batch of data from this reader and release
* the memory for that batch. Call this method before
* loading the underlying vector container with more
* data, then call {@link #start()} after new data is
* available.
* Obtain a reader to iterate over the rows of the batch. The return
* value will likely be the same reader each time, so that this call
* is optional after the first batch.
*/
void release();
RowSetReader reader();

/**
* Close this reader. Releases any memory still assigned
* to any attached batch. Call {@link #detach()} first if
* you want to preserve the batch memory.
*/
void close();

/**
* Convenience method to access the input batch.
* @return the batch bound to the reader at construction
* time
*/
BatchAccessor inputBatch();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet;

import org.apache.drill.exec.physical.rowSet.RowSetReader;

/**
* Push-based result set reader, in which the caller obtains batches
* and registers them with the implementation. The client thus is responsible
* for detecting the end of batches and releasing memory. General protocol:
* <p>
* <ul>
* <li>Create an instance and bind it to a batch source.</li>
* <li>Obtain a batch, typically by having it passed in.</li>
* <li>Call {@link #start()} to obtain a reader for that batch.</li>
* <li>Iterate over the rows.</li>
* <li>Release memory for the batch.</li>
* </ul>
* <p>
* In Drill,
* batches may have the same or different schemas. Each call to
* {@link #start()} prepares a {@link RowSetReader} to use for
* the available batch. If the batch has the same schema as the previous,
* then the existing reader is simply repositioned at the start of the
* batch. If the schema changed (or this is the first batch), then a
* new reader is created. Thus, the client should not assume that the
* same reader is available across calls. However, if it is useful to
* cache column writers, simply check if the reader returned from
* {@code start()} is the same as the previous one. If so, the column
* writers are also the same.
*/
public interface PushResultSetReader {
RowSetReader start();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@
* each non-schema-change batch.
*
* <h4>Protocol</h4>
*
* Overall lifecycle:
* <ol>
* <li>Create an instance of the
* {@link org.apache.drill.exec.physical.resultSet.impl.ResultSetCopierImpl
* ResultSetCopierImpl} class, passing the input batch
* accessor to the constructor.</li>
* ResultSetCopierImpl} class, passing the input row set reader
* to the constructor.</li>
* <li>Loop to process each output batch as shown below. That is, continually
* process calls to the {@link BatchIterator#next()} method.</li>
* <li>Call {@link #close()}.</li>
Expand All @@ -57,8 +58,7 @@
* <pre><code>
* public IterOutcome next() {
* copier.startOutputBatch();
* while (! copier.isFull() {
* copier.freeInput();
* while (!copier.isFull() {
* IterOutcome innerResult = inner.next();
* if (innerResult == DONE) { break; }
* copier.startInputBatch();
Expand Down Expand Up @@ -92,7 +92,6 @@
* Because we wish to fill the output batch, we may be able to copy
* part of a batch, the whole batch, or multiple batches to the output.
*/

public interface ResultSetCopier {

/**
Expand All @@ -102,9 +101,9 @@ public interface ResultSetCopier {

/**
* Start the next input batch. The input batch must be held
* by the VectorAccessor passed into the constructor.
* by the {@code ResultSetReader} passed into the constructor.
*/
void startInputBatch();
boolean nextInputBatch();

/**
* If copying rows one by one, copy the next row from the
Expand Down Expand Up @@ -134,12 +133,6 @@ public interface ResultSetCopier {
*/
void copyAllRows();

/**
* Release the input. Must be called (explicitly, or via
* {@link #copyInput()} before loading another input batch.
*/
void releaseInputBatch();

/**
* Reports if the output batch has rows. Useful after the end
* of input to determine if a partial output batch exists to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet.impl;

import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;

/**
* <h4>Protocol</h4>
* <ol>
* <li>Create an instance, passing in a
* {@link UpstreamSource} to provide batches and optional
* selection vector.</li>
* <li>For each incoming batch:
* <ol>
* <li>Call {@link #start()} to attach the batch. The associated
* {@link BatchAccessor} reports if the schema has changed.</li>
* <li>Call {@link #reader()} to obtain a reader.</li>
* <li>Iterate over the batch using the reader.</li>
* <li>Call {@link #release()} to free the memory for the
* incoming batch. Or, to call {@link #detach()} to keep
* the batch memory.</li>
* </ol>
* <li>Call {@link #close()} after all batches are read.</li>
* </ol>
*/
public class PullResultSetReaderImpl implements PullResultSetReader {

public interface UpstreamSource extends PushResultSetReaderImpl.UpstreamSource {
boolean next();
void release();
}

@VisibleForTesting
protected enum State {
START,
PENDING,
BATCH,
DETACHED,
EOF,
CLOSED
}

private final PushResultSetReaderImpl baseReader;
private final UpstreamSource source;
private State state = State.START;
private RowSetReader rowSetReader;

public PullResultSetReaderImpl(UpstreamSource source) {
this.baseReader = new PushResultSetReaderImpl(source);
this.source = source;
}

@Override
public TupleMetadata schema() {
switch (state) {
case CLOSED:
return null;
case START:
if (!next()) {
return null;
}
state = State.PENDING;
break;
default:
}
return rowSetReader.tupleSchema();
}

@Override
public boolean next() {
switch (state) {
case PENDING:
state = State.BATCH;
return true;
case BATCH:
source.release();
break;
case CLOSED:
throw new IllegalStateException("Reader is closed");
case EOF:
return false;
case START:
break;
default:
source.release();
}
if (!source.next()) {
state = State.EOF;
return false;
}

rowSetReader = baseReader.start();
state = State.BATCH;
return true;
}

@Override
public int schemaVersion() { return source.schemaVersion(); }

@Override
public RowSetReader reader() {
Preconditions.checkState(state == State.BATCH, "Not in batch-ready state.");
return rowSetReader;
}

@Override
public void close() {
source.release();
state = State.CLOSED;
}

@VisibleForTesting
protected State state() { return state; }
}
Loading

0 comments on commit 953280b

Please sign in to comment.