From 42991f16e9a1df077ae8f9b346467a39f4092e6c Mon Sep 17 00:00:00 2001 From: Steve Lord Date: Mon, 18 Mar 2024 15:19:57 -0700 Subject: [PATCH] GH-37720: [Java][FlightSQL] Implement stateless prepared statements Part fixed caching of statementContext --- .../adapter/jdbc/JdbcParameterBinder.java | 27 ++----------- .../adapter/jdbc/JdbcParameterBinderTest.java | 25 +++--------- .../flight/sql/example/FlightSqlExample.java | 40 ++++++++++++------- 3 files changed, 35 insertions(+), 57 deletions(-) diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java index 5fd02d08636b3..2dfc0658cb8d1 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java @@ -17,11 +17,6 @@ package org.apache.arrow.adapter.jdbc; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.HashMap; @@ -43,7 +38,6 @@ public class JdbcParameterBinder { private final ColumnBinder[] binders; private final int[] parameterIndices; private int nextRowIndex; - private byte[] bindersAsByteArray; /** * Create a new parameter binder. @@ -57,8 +51,7 @@ private JdbcParameterBinder( final PreparedStatement statement, final VectorSchemaRoot root, final ColumnBinder[] binders, - int[] parameterIndices, - byte[] bindersAsByteArray) { + int[] parameterIndices) { Preconditions.checkArgument( binders.length == parameterIndices.length, "Number of column binders (%s) must equal number of parameter indices (%s)", @@ -68,7 +61,6 @@ private JdbcParameterBinder( this.binders = binders; this.parameterIndices = parameterIndices; this.nextRowIndex = 0; - this.bindersAsByteArray = bindersAsByteArray; } /** @@ -145,7 +137,7 @@ public Builder bind(int parameterIndex, ColumnBinder binder) { } /** Build the binder. */ - public JdbcParameterBinder build() throws IOException { + public JdbcParameterBinder build() { ColumnBinder[] binders = new ColumnBinder[bindings.size()]; int[] parameterIndices = new int[bindings.size()]; int index = 0; @@ -154,20 +146,7 @@ public JdbcParameterBinder build() throws IOException { parameterIndices[index] = entry.getKey(); index++; } - - // Convert parameters to byte array - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - try (ObjectOutputStream outObject = new ObjectOutputStream(outStream)) { - outObject.writeObject(bindings.toString().getBytes(UTF_8)); - outObject.flush(); - } - - // return new JdbcParameterBinder(statement, root, binders, parameterIndices, outStream.toByteArray()); - return new JdbcParameterBinder(statement, root, binders, parameterIndices, outStream.toByteArray()); + return new JdbcParameterBinder(statement, root, binders, parameterIndices); } } - - public byte[] getBindersAsByteArray() { - return bindersAsByteArray; - } } diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java index fc64891cf91e9..15b9ab0386159 100644 --- a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java +++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java @@ -19,7 +19,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.io.IOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.Date; @@ -81,7 +80,6 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.JsonStringHashMap; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -100,7 +98,7 @@ void afterEach() { } @Test - void bindOrder() throws SQLException, IOException { + void bindOrder() throws SQLException { final Schema schema = new Schema( Arrays.asList( @@ -161,7 +159,7 @@ void bindOrder() throws SQLException, IOException { } @Test - void customBinder() throws SQLException, IOException { + void customBinder() throws SQLException { final Schema schema = new Schema(Collections.singletonList( Field.nullable("ints0", new ArrowType.Int(32, true)))); @@ -564,7 +562,7 @@ void testSimpleType(ArrowType arrowType, int jdbcType try (final MockPreparedStatement statement = new MockPreparedStatement(); final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { final JdbcParameterBinder binder = - JdbcParameterBinder.builder(statement, root).bindAll().build(); + JdbcParameterBinder.builder(statement, root).bindAll().build(); assertThat(binder.next()).isFalse(); @SuppressWarnings("unchecked") @@ -607,8 +605,6 @@ void testSimpleType(ArrowType arrowType, int jdbcType assertThat(binder.next()).isTrue(); assertThat(statement.getParamValue(1)).isEqualTo(values.get(1)); assertThat(binder.next()).isFalse(); - } catch (IOException e) { - Assertions.fail("Unexpected binding error."); } // Non-nullable (since some types have a specialized binder) @@ -651,8 +647,6 @@ void testSimpleType(ArrowType arrowType, int jdbcType assertThat(binder.next()).isTrue(); assertThat(statement.getParamValue(1)).isEqualTo(values.get(1)); assertThat(binder.next()).isFalse(); - } catch (IOException e) { - Assertions.fail("Unexpected binding error."); } } @@ -666,10 +660,11 @@ void testListType(ArrowType arrowType, TriConsumer void testListType(ArrowType arrowType, TriConsumer void testListType(ArrowType arrowType, TriConsumer void testMapType(ArrowType arrowType, TriConsumer void testMapType(ArrowType arrowType, TriConsumer 0) { + final DoPutPreparedStatementResult doPutPreparedStatementResult = + DoPutPreparedStatementResult.newBuilder() + .setPreparedStatementHandle(ByteString.copyFrom(ByteBuffer.wrap(out.toByteArray()))) + .build(); + + // Update prepared statement cache by storing with new handle and remove old entry. + preparedStatementLoadingCache.put(doPutPreparedStatementResult.getPreparedStatementHandle(), + statementContext); + // TODO: If we invalidate old cached entry here this invalidates the statement, which is not what is needed. + // We need to re-cache the statementContext with a new key. + // preparedStatementLoadingCache.invalidate(command.getPreparedStatementHandle()); + + try (final ArrowBuf buffer = rootAllocator.buffer(doPutPreparedStatementResult.getSerializedSize())) { + buffer.writeBytes(doPutPreparedStatementResult.toByteArray()); + ackStream.onNext(PutResult.metadata(buffer)); + } + } } } catch (SQLException e) { @@ -939,17 +962,6 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co return; } - if (binder != null && binder.getBindersAsByteArray() != null) { - final byte[] byteArray = binder.getBindersAsByteArray(); - final DoPutPreparedStatementResult build = - DoPutPreparedStatementResult.newBuilder() - .setPreparedStatementHandle(ByteString.copyFrom(ByteBuffer.wrap(byteArray))).build(); - - try (final ArrowBuf buffer = rootAllocator.buffer(build.getSerializedSize())) { - buffer.writeBytes(build.toByteArray()); - ackStream.onNext(PutResult.metadata(buffer)); - } - } ackStream.onCompleted(); }; }