Skip to content

Commit

Permalink
Refactor QueryDataDecoder to return Iterator
Browse files Browse the repository at this point in the history
Iterable<List<Object>> was an initial sin, as the assumption was that you can iterate over data
multiple times, which isn't true anymore for spooled segments, which are downloaded and acknowledged
once the data is fully read.

Two new classes are introduced: SpooledSegmentIterator which is responsible for loading, acknowledging
spooled segments, closing of the input stream for the spooled segment. This encapsulates this logic
now in a single place and makes this logic reusable for other use cases. This iterator is lazy,
the segment is loaded on the first row read.
  • Loading branch information
wendigo committed Feb 7, 2025
1 parent a007626 commit 7162785
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static com.fasterxml.jackson.core.JsonParser.Feature.AUTO_CLOSE_SOURCE;
Expand Down Expand Up @@ -136,28 +137,16 @@ private void close()
}
}

public static ResultRows forJsonParser(JsonParser parser, List<Column> columns)
public static Iterator<List<Object>> forJsonParser(JsonParser parser, List<Column> columns)
throws IOException
{
return () -> {
try {
return new RowWiseIterator(parser, createTypeDecoders(columns));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
return new RowWiseIterator(parser, createTypeDecoders(columns));
}

public static ResultRows forInputStream(InputStream stream, TypeDecoder[] decoders)
public static Iterator<List<Object>> forInputStream(InputStream stream, TypeDecoder[] decoders)
throws IOException
{
return () -> {
try {
return new RowWiseIterator(stream, decoders);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
return new RowWiseIterator(stream, decoders);
}

@SuppressModernizer // There is no JsonFactory in the client module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import okhttp3.Request;
import okhttp3.Response;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
Expand Down Expand Up @@ -65,7 +64,7 @@ public InputStream load(SpooledSegment segment)
}

if (response.isSuccessful()) {
return delegatingInputStream(response, response.body().byteStream(), segment);
return response.body().byteStream();
}
throw new IOException(format("Could not open segment for streaming, got error '%s' with code %d", response.message(), response.code()));
}
Expand Down Expand Up @@ -95,21 +94,6 @@ public void onResponse(Call call, Response response)
});
}

private InputStream delegatingInputStream(Response response, InputStream delegate, SpooledSegment segment)
{
return new FilterInputStream(delegate)
{
@Override
public void close()
throws IOException
{
try (Response ignored = response; InputStream ignored2 = delegate) {
acknowledge(segment);
}
}
};
}

private static Headers toHeaders(Map<String, List<String>> headers)
{
Headers.Builder builder = new Headers.Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;

public interface QueryDataDecoder
Expand All @@ -39,7 +40,7 @@ interface Factory
*
* @throws IOException if an I/O error occurs
*/
ResultRows decode(InputStream input, DataAttributes segmentAttributes)
Iterator<List<Object>> decode(InputStream input, DataAttributes segmentAttributes)
throws IOException;

String encoding();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Iterator<List<Object>> iterator()
}
};

static ResultRows fromIterableRows(Iterable<List<Object>> values)
static ResultRows wrapList(List<List<Object>> values)
{
return values::iterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,25 @@
*/
package io.trino.client;

import com.google.common.collect.Iterables;
import io.trino.client.spooling.DataAttributes;
import io.trino.client.spooling.EncodedQueryData;
import io.trino.client.spooling.InlineSegment;
import io.trino.client.spooling.Segment;
import io.trino.client.spooling.SegmentLoader;
import io.trino.client.spooling.SpooledSegment;
import io.trino.client.spooling.encoding.QueryDataDecoders;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Iterators.concat;
import static com.google.common.collect.Iterators.transform;
import static io.trino.client.ResultRows.NULL_ROWS;
import static io.trino.client.ResultRows.fromIterableRows;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -100,41 +96,33 @@ public ResultRows toRows(List<Column> columns, QueryData data)
if (jsonData.isNull()) {
return NULL_ROWS;
}
return () -> JsonResultRows.forJsonParser(jsonData.getJsonParser(), columns).iterator();
return () -> {
try {
return JsonResultRows.forJsonParser(jsonData.getJsonParser(), columns);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}

if (data instanceof EncodedQueryData) {
EncodedQueryData encodedData = (EncodedQueryData) data;
setEncoding(columns, encodedData.getEncoding());
return concat(transform(encodedData.getSegments(), this::segmentToRows));
return () -> concat(transform(encodedData.getSegments().iterator(), this::segmentToRows));
}

throw new UnsupportedOperationException("Unsupported data type: " + data.getClass().getName());
}

private ResultRows segmentToRows(Segment segment)
private Iterator<List<Object>> segmentToRows(Segment segment)
{
if (segment instanceof InlineSegment) {
InlineSegment inlineSegment = (InlineSegment) segment;
try {
return decoder.decode(new ByteArrayInputStream(inlineSegment.getData()), inlineSegment.getMetadata());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return ((InlineSegment) segment).toIterator(decoder);
}

if (segment instanceof SpooledSegment) {
SpooledSegment spooledSegment = (SpooledSegment) segment;

try {
// The returned rows are lazy which means that decoder is responsible for closing input stream
InputStream stream = loader.load(spooledSegment);
return decoder.decode(stream, spooledSegment.getMetadata());
}
catch (IOException e) {
throw new RuntimeException(e);
}
return ((SpooledSegment) segment).toIterator(loader, decoder);
}

throw new UnsupportedOperationException("Unsupported segment type: " + segment.getClass().getName());
Expand All @@ -152,10 +140,4 @@ public void close()
{
loader.close();
}

@SuppressModernizer
private static ResultRows concat(Iterable<ResultRows> resultRows)
{
return fromIterableRows(Iterables.concat(filter(resultRows, rows -> !rows.isNull())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.client.QueryDataDecoder;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static java.lang.String.format;
Expand Down Expand Up @@ -48,4 +51,9 @@ public String toString()
{
return format("InlineSegment{offset=%d, rows=%d, size=%d}", getOffset(), getRowsCount(), getSegmentSize());
}

public Iterator<List<Object>> toIterator(QueryDataDecoder decoder)
{
return new InlineSegmentIterator(this, decoder);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.client.spooling;

import com.google.common.collect.AbstractIterator;
import io.trino.client.QueryDataDecoder;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;

import static java.util.Objects.requireNonNull;

// Accessible through the InlineSegment.toIterator
class InlineSegmentIterator
extends AbstractIterator<List<Object>>
{
private InlineSegment segment;
private final QueryDataDecoder decoder;
private Iterator<List<Object>> iterator;

public InlineSegmentIterator(InlineSegment segment, QueryDataDecoder decoder)
{
this.segment = requireNonNull(segment, "segment is null");
this.decoder = requireNonNull(decoder, "decoder is null");
}

@Override
protected List<Object> computeNext()
{
if (iterator == null) {
try {
iterator = decoder.decode(new ByteArrayInputStream(segment.getData()), segment.getMetadata());
segment = null;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

if (iterator.hasNext()) {
return iterator.next();
}
return endOfData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.trino.client.QueryDataDecoder;

import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -75,4 +77,9 @@ public String toString()
{
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet());
}

public Iterator<List<Object>> toIterator(SegmentLoader loader, QueryDataDecoder decoder)
{
return new SpooledSegmentIterator(this, loader, decoder);
}
}
Loading

0 comments on commit 7162785

Please sign in to comment.