Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Map;
import org.apache.arrow.driver.jdbc.accessor.impl.complex.AbstractArrowFlightJdbcListVectorAccessor;
import org.apache.arrow.driver.jdbc.utils.SqlTypes;
Expand All @@ -42,6 +43,7 @@ public class ArrowFlightJdbcArray implements Array {
private final FieldVector dataVector;
private final long startOffset;
private final long valuesCount;
private final Calendar localCalendar;

/**
* Instantiate an {@link Array} backed up by given {@link FieldVector}, limited by a start offset
Expand All @@ -51,10 +53,12 @@ public class ArrowFlightJdbcArray implements Array {
* @param startOffset offset from FieldVector pointing to this Array's first value.
* @param valuesCount how many items this Array contains.
*/
public ArrowFlightJdbcArray(FieldVector dataVector, long startOffset, long valuesCount) {
public ArrowFlightJdbcArray(
FieldVector dataVector, long startOffset, long valuesCount, Calendar localCalendar) {
this.dataVector = dataVector;
this.startOffset = startOffset;
this.valuesCount = valuesCount;
this.localCalendar = localCalendar;
}

@Override
Expand Down Expand Up @@ -125,7 +129,8 @@ public ResultSet getResultSet(Map<String, Class<?>> map) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

return getResultSetNoBoundariesCheck(this.dataVector, this.startOffset, this.valuesCount);
return getResultSetNoBoundariesCheck(
this.dataVector, this.startOffset, this.valuesCount, this.localCalendar);
}

@Override
Expand All @@ -134,14 +139,15 @@ public ResultSet getResultSet(long index, int count) throws SQLException {
}

private static ResultSet getResultSetNoBoundariesCheck(
ValueVector dataVector, long start, long count) throws SQLException {
ValueVector dataVector, long start, long count, Calendar localCalendar) throws SQLException {
TransferPair transferPair = dataVector.getTransferPair(dataVector.getAllocator());
transferPair.splitAndTransfer(
LargeMemoryUtil.checkedCastToInt(start), LargeMemoryUtil.checkedCastToInt(count));
FieldVector vectorSlice = (FieldVector) transferPair.getTo();

VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.of(vectorSlice);
return ArrowFlightJdbcVectorSchemaRootResultSet.fromVectorSchemaRoot(vectorSchemaRoot);
return ArrowFlightJdbcVectorSchemaRootResultSet.fromVectorSchemaRoot(
vectorSchemaRoot, localCalendar);
}

@Override
Expand All @@ -153,7 +159,10 @@ public ResultSet getResultSet(long index, int count, Map<String, Class<?>> map)

checkBoundaries(index, count);
return getResultSetNoBoundariesCheck(
this.dataVector, LargeMemoryUtil.checkedCastToInt(this.startOffset + index), count);
this.dataVector,
LargeMemoryUtil.checkedCastToInt(this.startOffset + index),
count,
this.localCalendar);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@ public List<Accessor> createAccessors(

return IntStream.range(0, fieldVectors.size())
.mapToObj(root::getVector)
.map(this::createAccessor)
.map(v -> this.createAccessor(v, localCalendar))
.collect(Collectors.toCollection(() -> new ArrayList<>(fieldVectors.size())));
}

private Accessor createAccessor(FieldVector vector) {
private Accessor createAccessor(FieldVector vector, Calendar localCalendar) {
return ArrowFlightJdbcAccessorFactory.createAccessor(
vector,
this::getCurrentRow,
(boolean wasNull) -> {
// AbstractCursor creates a boolean array of length 1 to hold the wasNull value
this.wasNull[0] = wasNull;
});
},
localCalendar);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand All @@ -28,14 +30,17 @@
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaResultSet;
import org.apache.calcite.avatica.AvaticaResultSetMetaData;
import org.apache.calcite.avatica.AvaticaSite;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.Meta.Frame;
import org.apache.calcite.avatica.Meta.Signature;
import org.apache.calcite.avatica.QueryState;
import org.apache.calcite.avatica.util.Cursor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,10 +69,10 @@ public class ArrowFlightJdbcVectorSchemaRootResultSet extends AvaticaResultSet {
* @return a ResultSet which accesses the given VectorSchemaRoot
*/
public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
final VectorSchemaRoot vectorSchemaRoot) throws SQLException {
final VectorSchemaRoot vectorSchemaRoot, Calendar localCalendar) throws SQLException {
// Similar to how org.apache.calcite.avatica.util.ArrayFactoryImpl does

final TimeZone timeZone = TimeZone.getDefault();
final TimeZone timeZone = localCalendar.getTimeZone();
final QueryState state = new QueryState();

final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null);
Expand Down Expand Up @@ -102,6 +107,33 @@ void populateData(final VectorSchemaRoot vectorSchemaRoot, final Schema schema)
execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), this.signature.columns);
}

/**
* The default method in AvaticaResultSet does not properly handle TIMESTASMP_WITH_TIMEZONE, so we
* override here to add support.
*
* @param columnIndex the first column is 1, the second is 2, ...
* @return Object
* @throws SQLException if there is an underlying exception
*/
@Override
public Object getObject(int columnIndex) throws SQLException {
this.checkOpen();

Cursor.Accessor accessor;
try {
accessor = accessorList.get(columnIndex - 1);
} catch (IndexOutOfBoundsException e) {
throw AvaticaConnection.HELPER.createException("invalid column ordinal: " + columnIndex);
}

ColumnMetaData metaData = columnMetaDataList.get(columnIndex - 1);
if (metaData.type.id == Types.TIMESTAMP_WITH_TIMEZONE) {
return accessor.getTimestamp(localCalendar);
} else {
return AvaticaSite.get(accessor, metaData.type.id, localCalendar);
}
}

@Override
protected void cancel() {
signature.columns.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc.accessor;

import java.util.Calendar;
import java.util.function.IntSupplier;
import org.apache.arrow.driver.jdbc.accessor.impl.ArrowFlightJdbcNullVectorAccessor;
import org.apache.arrow.driver.jdbc.accessor.impl.binary.ArrowFlightJdbcBinaryVectorAccessor;
Expand Down Expand Up @@ -87,7 +88,10 @@ public class ArrowFlightJdbcAccessorFactory {
* @return an instance of one of the accessors.
*/
public static ArrowFlightJdbcAccessor createAccessor(
ValueVector vector, IntSupplier getCurrentRow, WasNullConsumer setCursorWasNull) {
ValueVector vector,
IntSupplier getCurrentRow,
WasNullConsumer setCursorWasNull,
Calendar localCalendar) {
if (vector instanceof UInt1Vector) {
return new ArrowFlightJdbcBaseIntVectorAccessor(
(UInt1Vector) vector, getCurrentRow, setCursorWasNull);
Expand Down Expand Up @@ -138,7 +142,7 @@ public static ArrowFlightJdbcAccessor createAccessor(
(FixedSizeBinaryVector) vector, getCurrentRow, setCursorWasNull);
} else if (vector instanceof TimeStampVector) {
return new ArrowFlightJdbcTimeStampVectorAccessor(
(TimeStampVector) vector, getCurrentRow, setCursorWasNull);
(TimeStampVector) vector, getCurrentRow, setCursorWasNull, localCalendar);
} else if (vector instanceof TimeNanoVector) {
return new ArrowFlightJdbcTimeVectorAccessor(
(TimeNanoVector) vector, getCurrentRow, setCursorWasNull);
Expand Down Expand Up @@ -180,22 +184,22 @@ public static ArrowFlightJdbcAccessor createAccessor(
(StructVector) vector, getCurrentRow, setCursorWasNull);
} else if (vector instanceof MapVector) {
return new ArrowFlightJdbcMapVectorAccessor(
(MapVector) vector, getCurrentRow, setCursorWasNull);
(MapVector) vector, getCurrentRow, setCursorWasNull, localCalendar);
} else if (vector instanceof ListVector) {
return new ArrowFlightJdbcListVectorAccessor(
(ListVector) vector, getCurrentRow, setCursorWasNull);
(ListVector) vector, getCurrentRow, setCursorWasNull, localCalendar);
} else if (vector instanceof LargeListVector) {
return new ArrowFlightJdbcLargeListVectorAccessor(
(LargeListVector) vector, getCurrentRow, setCursorWasNull);
(LargeListVector) vector, getCurrentRow, setCursorWasNull, localCalendar);
} else if (vector instanceof FixedSizeListVector) {
return new ArrowFlightJdbcFixedSizeListVectorAccessor(
(FixedSizeListVector) vector, getCurrentRow, setCursorWasNull);
(FixedSizeListVector) vector, getCurrentRow, setCursorWasNull, localCalendar);
} else if (vector instanceof UnionVector) {
return new ArrowFlightJdbcUnionVectorAccessor(
(UnionVector) vector, getCurrentRow, setCursorWasNull);
(UnionVector) vector, getCurrentRow, setCursorWasNull, localCalendar);
} else if (vector instanceof DenseUnionVector) {
return new ArrowFlightJdbcDenseUnionVectorAccessor(
(DenseUnionVector) vector, getCurrentRow, setCursorWasNull);
(DenseUnionVector) vector, getCurrentRow, setCursorWasNull, localCalendar);
} else if (vector instanceof NullVector || vector == null) {
return new ArrowFlightJdbcNullVectorAccessor(setCursorWasNull);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ArrowFlightJdbcTimeStampVectorAccessor extends ArrowFlightJdbcAcces
private final LongToLocalDateTime longToLocalDateTime;
private final Holder holder;
private final boolean isZoned;
private final Calendar localCalendar;

/** Functional interface used to convert a number (in any time resolution) to LocalDateTime. */
interface LongToLocalDateTime {
Expand All @@ -61,9 +62,11 @@ interface LongToLocalDateTime {
public ArrowFlightJdbcTimeStampVectorAccessor(
TimeStampVector vector,
IntSupplier currentRowSupplier,
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull,
Calendar localCalendar) {
super(currentRowSupplier, setCursorWasNull);
this.holder = new Holder();
this.localCalendar = localCalendar;
this.getter = createGetter(vector);

// whether the vector included TZ info
Expand All @@ -89,7 +92,7 @@ public <T> T getObject(final Class<T> type) throws SQLException {
} else if (type == OffsetDateTime.class) {
value = getOffsetDateTime();
} else if (type == LocalDateTime.class) {
value = getLocalDateTime(null);
value = getLocalDateTime(localCalendar);
} else if (type == ZonedDateTime.class) {
value = getZonedDateTime();
} else if (type == Instant.class) {
Expand All @@ -105,7 +108,7 @@ public <T> T getObject(final Class<T> type) throws SQLException {

@Override
public Object getObject() {
return this.getTimestamp(null);
return this.getTimestamp(localCalendar);
}

private ZonedDateTime getZonedDateTime() {
Expand Down Expand Up @@ -154,7 +157,7 @@ private LocalDateTime getLocalDateTime(Calendar calendar) {
long millis = this.timeUnit.toMillis(value);
localDateTime =
localDateTime.minus(
timeZone.getOffset(millis) - this.timeZone.getOffset(millis), ChronoUnit.MILLIS);
this.timeZone.getOffset(millis) - timeZone.getOffset(millis), ChronoUnit.MILLIS);
}
return localDateTime;
}
Expand Down Expand Up @@ -202,7 +205,7 @@ private Timestamp getTimestampWithOffset(Calendar calendar, LocalDateTime localD
long millis = Timestamp.valueOf(localDateTime).getTime();
localDateTime =
localDateTime.minus(
timeZone.getOffset(millis) - this.timeZone.getOffset(millis), ChronoUnit.MILLIS);
this.timeZone.getOffset(millis) - timeZone.getOffset(millis), ChronoUnit.MILLIS);
}
return Timestamp.valueOf(localDateTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.arrow.driver.jdbc.accessor.impl.complex;

import java.sql.Array;
import java.util.Calendar;
import java.util.List;
import java.util.function.IntSupplier;
import org.apache.arrow.driver.jdbc.ArrowFlightJdbcArray;
Expand All @@ -33,10 +34,14 @@
*/
public abstract class AbstractArrowFlightJdbcListVectorAccessor extends ArrowFlightJdbcAccessor {

private final Calendar localCalendar;

protected AbstractArrowFlightJdbcListVectorAccessor(
IntSupplier currentRowSupplier,
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull,
Calendar localCalendar) {
super(currentRowSupplier, setCursorWasNull);
this.localCalendar = localCalendar;
}

@Override
Expand Down Expand Up @@ -67,6 +72,6 @@ public final Array getArray() {
long endOffset = getEndOffset(index);

long valuesCount = endOffset - startOffset;
return new ArrowFlightJdbcArray(dataVector, startOffset, valuesCount);
return new ArrowFlightJdbcArray(dataVector, startOffset, valuesCount, localCalendar);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ public abstract class AbstractArrowFlightJdbcUnionVectorAccessor extends ArrowFl
*/
private final ArrowFlightJdbcAccessor[] accessors = new ArrowFlightJdbcAccessor[128];

protected final Calendar localCalendar;

private final ArrowFlightJdbcNullVectorAccessor nullAccessor =
new ArrowFlightJdbcNullVectorAccessor((boolean wasNull) -> {});

protected AbstractArrowFlightJdbcUnionVectorAccessor(
IntSupplier currentRowSupplier,
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull,
Calendar localCalendar) {
super(currentRowSupplier, setCursorWasNull);
this.localCalendar = localCalendar;
}

protected abstract ArrowFlightJdbcAccessor createAccessorForVector(ValueVector vector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc.accessor.impl.complex;

import java.util.Calendar;
import java.util.function.IntSupplier;
import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessor;
import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
Expand All @@ -38,15 +39,19 @@ public class ArrowFlightJdbcDenseUnionVectorAccessor
public ArrowFlightJdbcDenseUnionVectorAccessor(
DenseUnionVector vector,
IntSupplier currentRowSupplier,
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
super(currentRowSupplier, setCursorWasNull);
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull,
Calendar localCalendar) {
super(currentRowSupplier, setCursorWasNull, localCalendar);
this.vector = vector;
}

@Override
protected ArrowFlightJdbcAccessor createAccessorForVector(ValueVector vector) {
return ArrowFlightJdbcAccessorFactory.createAccessor(
vector, () -> this.vector.getOffset(this.getCurrentRow()), (boolean wasNull) -> {});
vector,
() -> this.vector.getOffset(this.getCurrentRow()),
(boolean wasNull) -> {},
localCalendar);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.arrow.driver.jdbc.accessor.impl.complex;

import java.util.Calendar;
import java.util.List;
import java.util.function.IntSupplier;
import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
Expand All @@ -31,8 +32,9 @@ public class ArrowFlightJdbcFixedSizeListVectorAccessor
public ArrowFlightJdbcFixedSizeListVectorAccessor(
FixedSizeListVector vector,
IntSupplier currentRowSupplier,
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
super(currentRowSupplier, setCursorWasNull);
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull,
Calendar localCalendar) {
super(currentRowSupplier, setCursorWasNull, localCalendar);
this.vector = vector;
}

Expand Down
Loading