diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java index 8b71f95bb6..80abd18560 100644 --- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java +++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcDriver.java @@ -21,6 +21,11 @@ /** A handle to an ADBC database driver. */ public interface AdbcDriver { + /** The standard parameter name for a connection URL (type String). */ + String PARAM_URL = "adbc.url"; + /** The standard parameter name for SQL quirks configuration (type SqlQuirks). */ + String PARAM_SQL_QUIRKS = "adbc.sql.quirks"; + /** * Open a database via this driver. * diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java index 66de0778e9..c865f2ec83 100644 --- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java +++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcException.java @@ -48,6 +48,11 @@ public static AdbcException invalidArgument(String message) { return new AdbcException(message, /*cause*/ null, AdbcStatusCode.INVALID_ARGUMENT, null, 0); } + /** Create a new exception with code {@link AdbcStatusCode#IO}. */ + public static AdbcException io(String message) { + return new AdbcException(message, /*cause*/ null, AdbcStatusCode.IO, null, 0); + } + /** Create a new exception with code {@link AdbcStatusCode#INVALID_STATE}. */ public static AdbcException invalidState(String message) { return new AdbcException(message, /*cause*/ null, AdbcStatusCode.INVALID_STATE, null, 0); @@ -70,6 +75,13 @@ public int getVendorCode() { return vendorCode; } + /** + * Copy this exception with a different cause (a convenience for use with the static factories). + */ + public AdbcException withCause(Throwable cause) { + return new AdbcException(this.getMessage(), cause, status, sqlState, vendorCode); + } + @Override public String toString() { return "AdbcException{" diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java b/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java index b625bf69e1..3f2047801c 100644 --- a/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java +++ b/java/core/src/main/java/org/apache/arrow/adbc/core/PartitionDescriptor.java @@ -21,11 +21,9 @@ /** An opaque descriptor for a part of a potentially distributed or partitioned result set. */ public final class PartitionDescriptor { - private final String friendlyName; private final ByteBuffer descriptor; - public PartitionDescriptor(final String friendlyName, final ByteBuffer descriptor) { - this.friendlyName = friendlyName; + public PartitionDescriptor(final ByteBuffer descriptor) { this.descriptor = Objects.requireNonNull(descriptor); } @@ -42,23 +40,16 @@ public boolean equals(Object o) { return false; } PartitionDescriptor that = (PartitionDescriptor) o; - return Objects.equals(friendlyName, that.friendlyName) - && getDescriptor().equals(that.getDescriptor()); + return descriptor.equals(that.descriptor); } @Override public int hashCode() { - return Objects.hash(friendlyName, getDescriptor()); + return Objects.hash(descriptor); } @Override public String toString() { - return "PartitionDescriptor{" - + "friendlyName='" - + friendlyName - + '\'' - + ", descriptor=" - + descriptor - + '}'; + return "PartitionDescriptor{" + "descriptor=" + descriptor + '}'; } } diff --git a/java/driver/flight-sql-validation/pom.xml b/java/driver/flight-sql-validation/pom.xml new file mode 100644 index 0000000000..3ad44e21dc --- /dev/null +++ b/java/driver/flight-sql-validation/pom.xml @@ -0,0 +1,55 @@ + + + + 4.0.0 + + arrow-adbc-java-root + org.apache.arrow.adbc + 9.0.0-SNAPSHOT + ../../pom.xml + + + adbc-driver-flight-sql-validation + jar + Arrow ADBC Driver Flight SQL Validation + Tests validating the Flight SQL driver. + + + + org.apache.arrow.adbc + adbc-core + test + + + org.apache.arrow.adbc + adbc-driver-flight-sql + test + + + + + org.assertj + assertj-core + test + + + org.junit.jupiter + junit-jupiter + test + + + org.apache.arrow.adbc + adbc-driver-validation + test + + + diff --git a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java new file mode 100644 index 0000000000..da56280e15 --- /dev/null +++ b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionMetadataTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import org.apache.arrow.adbc.driver.testsuite.AbstractConnectionMetadataTest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; + +public class FlightSqlConnectionMetadataTest extends AbstractConnectionMetadataTest { + @BeforeAll + public static void beforeAll() { + quirks = new FlightSqlQuirks(); + } + + @Override + @Disabled("Not yet implemented") + public void getObjectsColumns() throws Exception {} + + @Override + @Disabled("Not yet implemented") + public void getObjectsCatalogs() throws Exception {} + + @Override + @Disabled("Not yet implemented") + public void getObjectsDbSchemas() throws Exception { + super.getObjectsDbSchemas(); + } + + @Override + @Disabled("Not yet implemented") + public void getObjectsTables() throws Exception { + super.getObjectsTables(); + } + + @Override + @Disabled("Not yet implemented") + public void getTableSchema() throws Exception { + super.getTableSchema(); + } + + @Override + @Disabled("Not yet implemented") + public void getTableTypes() throws Exception { + super.getTableTypes(); + } +} diff --git a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionTest.java b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionTest.java new file mode 100644 index 0000000000..6137d4e21d --- /dev/null +++ b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnectionTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import org.apache.arrow.adbc.driver.testsuite.AbstractConnectionTest; +import org.junit.jupiter.api.BeforeAll; + +public class FlightSqlConnectionTest extends AbstractConnectionTest { + @BeforeAll + public static void beforeAll() { + quirks = new FlightSqlQuirks(); + } +} diff --git a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlPartitionDescriptorTest.java b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlPartitionDescriptorTest.java new file mode 100644 index 0000000000..3bd2969cb2 --- /dev/null +++ b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlPartitionDescriptorTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import org.apache.arrow.adbc.driver.testsuite.AbstractPartitionDescriptorTest; +import org.junit.jupiter.api.BeforeAll; + +class FlightSqlPartitionDescriptorTest extends AbstractPartitionDescriptorTest { + @BeforeAll + public static void beforeAll() { + quirks = new FlightSqlQuirks(); + } +} diff --git a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlQuirks.java b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlQuirks.java new file mode 100644 index 0000000000..d2cd614d9d --- /dev/null +++ b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlQuirks.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import java.util.HashMap; +import java.util.Map; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.driver.testsuite.SqlValidationQuirks; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.jupiter.api.Assumptions; + +public class FlightSqlQuirks extends SqlValidationQuirks { + static final String FLIGHT_SQL_LOCATION_ENV_VAR = "ADBC_FLIGHT_SQL_LOCATION"; + + static String getFlightLocation() { + final String location = System.getenv(FLIGHT_SQL_LOCATION_ENV_VAR); + Assumptions.assumeFalse( + location == null || location.isEmpty(), + "Flight SQL server not found, set " + FLIGHT_SQL_LOCATION_ENV_VAR); + return location; + } + + @Override + public AdbcDatabase initDatabase() throws AdbcException { + String url = getFlightLocation(); + + final Map parameters = new HashMap<>(); + parameters.put(AdbcDriver.PARAM_URL, url); + return FlightSqlDriver.INSTANCE.open(parameters); + } + + @Override + public void cleanupTable(String name) throws Exception { + try (final BufferAllocator allocator = new RootAllocator(); + final FlightSqlClient client = + new FlightSqlClient( + FlightClient.builder(allocator, new Location(getFlightLocation())).build())) { + client.executeUpdate("DROP TABLE " + name); + } catch (FlightRuntimeException e) { + // Ignored + } + } + + @Override + public String caseFoldTableName(String name) { + return name.toUpperCase(); + } + + @Override + public String caseFoldColumnName(String name) { + return name.toUpperCase(); + } +} diff --git a/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatementTest.java b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatementTest.java new file mode 100644 index 0000000000..306f69e44f --- /dev/null +++ b/java/driver/flight-sql-validation/src/test/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatementTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import org.apache.arrow.adbc.driver.testsuite.AbstractStatementTest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; + +class FlightSqlStatementTest extends AbstractStatementTest { + @BeforeAll + public static void beforeAll() { + quirks = new FlightSqlQuirks(); + } + + @Override + @Disabled("Requires spec clarification") + public void prepareQueryWithParameters() {} +} diff --git a/java/driver/flight-sql/pom.xml b/java/driver/flight-sql/pom.xml new file mode 100644 index 0000000000..02033c7af4 --- /dev/null +++ b/java/driver/flight-sql/pom.xml @@ -0,0 +1,70 @@ + + + + 4.0.0 + + arrow-adbc-java-root + org.apache.arrow.adbc + 9.0.0-SNAPSHOT + ../../pom.xml + + + adbc-driver-flight-sql + jar + Arrow ADBC Driver Flight SQL + An ADBC driver wrapping Flight SQL. + + + + com.github.ben-manes.caffeine + caffeine + + 2.9.3 + + + com.google.protobuf + protobuf-java + 3.21.3 + + + + + org.apache.arrow + arrow-memory-core + + + org.apache.arrow + arrow-vector + + + org.apache.arrow + flight-core + + + org.apache.arrow + flight-sql + + + + org.apache.arrow.adbc + adbc-core + + + org.apache.arrow.adbc + adbc-driver-manager + + + org.apache.arrow.adbc + adbc-sql + + + diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FixedRootStatement.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FixedRootStatement.java new file mode 100644 index 0000000000..a7a678710b --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FixedRootStatement.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import java.util.Collections; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; + +/** An AdbcStatement implementation that returns fixed data from a root. */ +class FixedRootStatement implements AdbcStatement { + private final BufferAllocator allocator; + private final Schema overrideSchema; + private ArrowRecordBatch recordBatch; + + public FixedRootStatement(BufferAllocator allocator, VectorSchemaRoot root) { + this.allocator = allocator; + this.overrideSchema = root.getSchema(); + // Unload the root to preserve the data + recordBatch = new VectorUnloader(root).getRecordBatch(); + } + + @Override + public void execute() throws AdbcException { + throw AdbcException.invalidState("[Flight SQL] Cannot execute() this statement"); + } + + @Override + public ArrowReader getArrowReader() throws AdbcException { + final ArrowReader reader = + new RootArrowReader(allocator, overrideSchema, Collections.singletonList(recordBatch)); + recordBatch = null; + return reader; + } + + @Override + public void prepare() throws AdbcException { + throw AdbcException.invalidState("[Flight SQL] Cannot execute() this statement"); + } + + @Override + public void close() throws Exception { + AutoCloseables.close(recordBatch); + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightInfoReader.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightInfoReader.java new file mode 100644 index 0000000000..9b0cda91dc --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightInfoReader.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adbc.driver.flightsql; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatusCode; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; + +/** An ArrowReader that wraps a FlightInfo. */ +public class FlightInfoReader extends ArrowReader { + private final Schema schema; + private final FlightSqlClient client; + private final LoadingCache clientCache; + private final List flightEndpoints; + private int nextEndpointIndex; + private FlightStream currentStream; + private long bytesRead; + + FlightInfoReader( + BufferAllocator allocator, + FlightSqlClient client, + LoadingCache clientCache, + List flightEndpoints) + throws AdbcException { + super(allocator); + this.client = client; + this.clientCache = clientCache; + this.flightEndpoints = flightEndpoints; + this.nextEndpointIndex = 0; + this.bytesRead = 0; + + try { + this.currentStream = + client.getStream(flightEndpoints.get(this.nextEndpointIndex++).getTicket()); + this.schema = this.currentStream.getSchema(); + } catch (FlightRuntimeException e) { + throw FlightSqlDriverUtil.fromFlightException(e); + } + + try { + this.ensureInitialized(); + } catch (IOException e) { + throw new AdbcException( + FlightSqlDriverUtil.prefixExceptionMessage(e.getMessage()), + e, + AdbcStatusCode.IO, + null, + 0); + } + } + + @Override + public boolean loadNextBatch() throws IOException { + if (!currentStream.next()) { + if (nextEndpointIndex >= flightEndpoints.size()) { + return false; + } else { + try { + currentStream.close(); + FlightEndpoint endpoint = flightEndpoints.get(nextEndpointIndex++); + currentStream = tryLoadNextStream(endpoint); + if (!schema.equals(currentStream.getSchema())) { + throw new IOException( + "Stream has inconsistent schema. Expected: " + + schema + + "\nFound: " + + currentStream.getSchema()); + } + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + } + } + final VectorSchemaRoot root = currentStream.getRoot(); + final VectorUnloader unloader = new VectorUnloader(root); + final ArrowRecordBatch recordBatch = unloader.getRecordBatch(); + bytesRead += recordBatch.computeBodyLength(); + loadRecordBatch(recordBatch); + return true; + } + + private FlightStream tryLoadNextStream(FlightEndpoint endpoint) throws IOException { + if (endpoint.getLocations().isEmpty()) { + return client.getStream(endpoint.getTicket()); + } else { + List locations = new ArrayList<>(endpoint.getLocations()); + Collections.shuffle(locations); + IOException failure = null; + for (final Location location : locations) { + try { + return Objects.requireNonNull(clientCache.get(location)).getStream(endpoint.getTicket()); + } catch (RuntimeException e) { + // Also handles CompletionException (from clientCache#get), FlightRuntimeException + if (failure == null) { + failure = + new IOException("Failed to get stream from location " + location + ": " + e, e); + } else { + failure.addSuppressed( + new IOException("Failed to get stream from location " + location + ": " + e, e)); + } + } + } + throw Objects.requireNonNull(failure); + } + } + + @Override + public long bytesRead() { + return bytesRead; + } + + @Override + protected void closeReadSource() throws IOException { + try { + currentStream.close(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + protected Schema readSchema() { + return schema; + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java new file mode 100644 index 0000000000..5e9c415e96 --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlConnection.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adbc.driver.flightsql; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.google.protobuf.InvalidProtocolBufferException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.core.BulkIngestMode; +import org.apache.arrow.adbc.sql.SqlQuirks; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.impl.Flight; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; + +public class FlightSqlConnection implements AdbcConnection { + private final BufferAllocator allocator; + private final FlightSqlClient client; + private final SqlQuirks quirks; + private final LoadingCache clientCache; + + FlightSqlConnection(BufferAllocator allocator, FlightClient client, SqlQuirks quirks) { + this.allocator = allocator; + this.client = new FlightSqlClient(client); + this.quirks = quirks; + this.clientCache = + Caffeine.newBuilder() + .expireAfterAccess(5, TimeUnit.MINUTES) + .removalListener( + (Location key, FlightClient value, RemovalCause cause) -> { + if (value == null) return; + try { + value.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }) + .build(location -> FlightClient.builder(allocator, location).build()); + } + + @Override + public void commit() throws AdbcException { + throw AdbcException.notImplemented("[Flight SQL] Transaction methods are not supported"); + } + + @Override + public AdbcStatement createStatement() throws AdbcException { + return new FlightSqlStatement(allocator, client, clientCache, quirks); + } + + @Override + public AdbcStatement deserializePartitionDescriptor(ByteBuffer descriptor) throws AdbcException { + final FlightEndpoint endpoint; + try { + final Flight.FlightEndpoint protoEndpoint = Flight.FlightEndpoint.parseFrom(descriptor); + Location[] locations = new Location[protoEndpoint.getLocationCount()]; + int index = 0; + for (Flight.Location protoLocation : protoEndpoint.getLocationList()) { + Location location = new Location(protoLocation.getUri()); + locations[index++] = location; + } + + endpoint = + new FlightEndpoint( + new Ticket(protoEndpoint.getTicket().getTicket().toByteArray()), locations); + } catch (InvalidProtocolBufferException | URISyntaxException e) { + throw AdbcException.invalidArgument( + "[Flight SQL] Partition descriptor is invalid: " + e.getMessage()) + .withCause(e); + } + + return FlightSqlStatement.fromDescriptor( + allocator, client, clientCache, quirks, Collections.singletonList(endpoint)); + } + + @Override + public AdbcStatement bulkIngest(String targetTableName, BulkIngestMode mode) + throws AdbcException { + return FlightSqlStatement.ingestRoot( + allocator, client, clientCache, quirks, targetTableName, mode); + } + + @Override + public AdbcStatement getInfo(int[] infoCodes) throws AdbcException { + final VectorSchemaRoot root = new InfoMetadataBuilder(allocator, client, infoCodes).build(); + return new FixedRootStatement(allocator, root); + } + + @Override + public void rollback() throws AdbcException { + throw AdbcException.notImplemented("[Flight SQL] Transaction methods are not supported"); + } + + @Override + public boolean getAutoCommit() throws AdbcException { + return true; + } + + @Override + public void setAutoCommit(boolean enableAutoCommit) throws AdbcException { + if (!enableAutoCommit) { + throw AdbcException.notImplemented("[Flight SQL] Transaction methods are not supported"); + } + } + + @Override + public void close() throws Exception { + client.close(); + } + + @Override + public String toString() { + return "FlightSqlConnection{" + "client=" + client + '}'; + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDatabase.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDatabase.java new file mode 100644 index 0000000000..11ca360579 --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDatabase.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.sql.SqlQuirks; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; + +/** An instance of a database (e.g. a handle to an in-memory database). */ +public final class FlightSqlDatabase implements AdbcDatabase { + private final BufferAllocator allocator; + private final Location location; + private final SqlQuirks quirks; + private final FlightClient client; + private final AtomicInteger counter; + + FlightSqlDatabase(BufferAllocator allocator, Location location, SqlQuirks quirks) + throws AdbcException { + this.allocator = allocator; + this.location = location; + this.quirks = quirks; + try { + this.client = FlightClient.builder(allocator, location).build(); + } catch (FlightRuntimeException e) { + throw FlightSqlDriverUtil.fromFlightException(e); + } + this.counter = new AtomicInteger(); + } + + @Override + public AdbcConnection connect() throws AdbcException { + final FlightClient client; + try { + client = FlightClient.builder(allocator, location).build(); + } catch (FlightRuntimeException e) { + throw FlightSqlDriverUtil.fromFlightException(e); + } + final int count = counter.getAndIncrement(); + return new FlightSqlConnection( + allocator.newChildAllocator("adbc-jdbc-connection-" + count, 0, allocator.getLimit()), + client, + quirks); + } + + @Override + public void close() throws Exception { + client.close(); + } + + @Override + public String toString() { + return "FlightSqlDatabase{" + "target='" + location + '\'' + '}'; + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriver.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriver.java new file mode 100644 index 0000000000..045d728b1f --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriver.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adbc.driver.flightsql; + +import java.net.URISyntaxException; +import java.util.Map; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.drivermanager.AdbcDriverManager; +import org.apache.arrow.adbc.sql.SqlQuirks; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.Preconditions; + +/** An ADBC driver wrapping Arrow Flight SQL. */ +public enum FlightSqlDriver implements AdbcDriver { + INSTANCE; + + private final BufferAllocator allocator; + + FlightSqlDriver() { + allocator = new RootAllocator(); + AdbcDriverManager.getInstance().registerDriver("org.apache.arrow.adbc.driver.flightsql", this); + } + + @Override + public AdbcDatabase open(Map parameters) throws AdbcException { + Object target = parameters.get("adbc.url"); + if (!(target instanceof String)) { + throw AdbcException.invalidArgument( + "[Flight SQL] Must provide String " + PARAM_URL + " parameter"); + } + Location location; + try { + location = new Location((String) target); + } catch (URISyntaxException e) { + throw AdbcException.invalidArgument( + String.format("[Flight SQL] Location %s is invalid: %s", target, e)) + .withCause(e); + } + Object quirks = parameters.get(PARAM_SQL_QUIRKS); + if (quirks != null) { + Preconditions.checkArgument( + quirks instanceof SqlQuirks, + String.format( + "[Flight SQL] %s must be a SqlQuirks instance, not %s", + PARAM_SQL_QUIRKS, quirks.getClass().getName())); + } else { + quirks = new SqlQuirks(); + } + return new FlightSqlDatabase(allocator, location, (SqlQuirks) quirks); + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriverUtil.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriverUtil.java new file mode 100644 index 0000000000..cb6b3038f8 --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlDriverUtil.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adbc.driver.flightsql; + +import java.sql.SQLException; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatusCode; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightStatusCode; + +final class FlightSqlDriverUtil { + private FlightSqlDriverUtil() { + throw new AssertionError("Do not instantiate this class"); + } + + static String prefixExceptionMessage(final String s) { + return "[Flight SQL] " + s; + } + + static AdbcException fromSqlException(SQLException e) { + return new AdbcException( + prefixExceptionMessage(e.getMessage()), + e.getCause(), + AdbcStatusCode.UNKNOWN, + e.getSQLState(), + e.getErrorCode()); + } + + static AdbcStatusCode fromFlightStatusCode(FlightStatusCode code) { + switch (code) { + case OK: + throw new IllegalArgumentException("Cannot convert OK status"); + case UNKNOWN: + return AdbcStatusCode.UNKNOWN; + case INTERNAL: + return AdbcStatusCode.INTERNAL; + case INVALID_ARGUMENT: + return AdbcStatusCode.INVALID_ARGUMENT; + case TIMED_OUT: + return AdbcStatusCode.TIMEOUT; + case NOT_FOUND: + return AdbcStatusCode.NOT_FOUND; + case ALREADY_EXISTS: + return AdbcStatusCode.ALREADY_EXISTS; + case CANCELLED: + return AdbcStatusCode.CANCELLED; + case UNAUTHENTICATED: + return AdbcStatusCode.UNAUTHENTICATED; + case UNAUTHORIZED: + return AdbcStatusCode.UNAUTHORIZED; + case UNIMPLEMENTED: + return AdbcStatusCode.NOT_IMPLEMENTED; + case UNAVAILABLE: + return AdbcStatusCode.IO; + default: + return AdbcStatusCode.UNKNOWN; + } + } + + static AdbcException fromFlightException(FlightRuntimeException e) { + return new AdbcException( + e.getMessage(), e.getCause(), fromFlightStatusCode(e.status().code()), null, 0); + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatement.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatement.java new file mode 100644 index 0000000000..3a3e072a4e --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/FlightSqlStatement.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.flightsql; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.core.AdbcStatusCode; +import org.apache.arrow.adbc.core.BulkIngestMode; +import org.apache.arrow.adbc.core.PartitionDescriptor; +import org.apache.arrow.adbc.sql.SqlQuirks; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.impl.Flight; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Field; + +public class FlightSqlStatement implements AdbcStatement { + private final BufferAllocator allocator; + private final FlightSqlClient client; + private final LoadingCache clientCache; + private final SqlQuirks quirks; + + // State for SQL queries + private String sqlQuery; + private FlightSqlClient.PreparedStatement preparedStatement; + private List flightEndpoints; + private ArrowReader reader; + // State for bulk ingest + private BulkState bulkOperation; + private VectorSchemaRoot bindRoot; + + FlightSqlStatement( + BufferAllocator allocator, + FlightSqlClient client, + LoadingCache clientCache, + SqlQuirks quirks) { + this.allocator = allocator; + this.client = client; + this.clientCache = clientCache; + this.quirks = quirks; + this.sqlQuery = null; + } + + static FlightSqlStatement ingestRoot( + BufferAllocator allocator, + FlightSqlClient client, + LoadingCache clientCache, + SqlQuirks quirks, + String targetTableName, + BulkIngestMode mode) { + Objects.requireNonNull(targetTableName); + final FlightSqlStatement statement = + new FlightSqlStatement(allocator, client, clientCache, quirks); + statement.bulkOperation = new BulkState(); + statement.bulkOperation.mode = mode; + statement.bulkOperation.targetTable = targetTableName; + return statement; + } + + public static AdbcStatement fromDescriptor( + BufferAllocator allocator, + FlightSqlClient client, + LoadingCache clientCache, + SqlQuirks quirks, + List flightEndpoints) { + final FlightSqlStatement statement = + new FlightSqlStatement(allocator, client, clientCache, quirks); + statement.flightEndpoints = flightEndpoints; + return statement; + } + + @Override + public void setSqlQuery(String query) throws AdbcException { + if (bulkOperation != null) { + throw AdbcException.invalidState( + "[Flight SQL] Statement is configured for a bulk ingest/append operation"); + } + sqlQuery = query; + } + + @Override + public void bind(VectorSchemaRoot root) { + bindRoot = root; + } + + @Override + public void execute() throws AdbcException { + if (bulkOperation != null) { + executeBulk(); + } else if (sqlQuery != null) { + executeSqlQuery(); + } else { + throw AdbcException.invalidState("[Flight SQL] Must setSqlQuery() first"); + } + } + + private void createBulkTable() throws AdbcException { + final StringBuilder create = new StringBuilder("CREATE TABLE "); + create.append(bulkOperation.targetTable); + create.append(" ("); + for (int col = 0; col < bindRoot.getFieldVectors().size(); col++) { + if (col > 0) { + create.append(", "); + } + final Field field = bindRoot.getVector(col).getField(); + create.append(field.getName()); + create.append(' '); + String typeName = quirks.getArrowToSqlTypeNameMapping().apply(field.getType()); + if (typeName == null) { + throw AdbcException.notImplemented( + "[Flight SQL] Cannot generate CREATE TABLE statement for field " + field); + } + create.append(typeName); + } + create.append(")"); + + try { + client.executeUpdate(create.toString()); + } catch (FlightRuntimeException e) { + throw new AdbcException( + "[Flight SQL] Could not create table for bulk ingestion: " + bulkOperation.targetTable, + e, + AdbcStatusCode.ALREADY_EXISTS, + null, + 0); + } + } + + private void executeBulk() throws AdbcException { + if (bindRoot == null) { + throw AdbcException.invalidState("[Flight SQL] Must call bind() before bulk insert"); + } + + if (bulkOperation.mode == BulkIngestMode.CREATE) { + createBulkTable(); + } + + // XXX: potential injection + final StringBuilder insert = new StringBuilder("INSERT INTO "); + insert.append(bulkOperation.targetTable); + insert.append(" VALUES ("); + for (int col = 0; col < bindRoot.getFieldVectors().size(); col++) { + if (col > 0) { + insert.append(", "); + } + insert.append("?"); + } + insert.append(")"); + + final FlightSqlClient.PreparedStatement statement; + try { + statement = client.prepare(insert.toString()); + } catch (FlightRuntimeException e) { + throw new AdbcException( + "[Flight SQL] Could not prepare statement for bulk ingestion into " + + bulkOperation.targetTable, + e, + AdbcStatusCode.NOT_FOUND, + null, + 0); + } + try { + try { + statement.setParameters(new NonOwningRoot(bindRoot)); + statement.executeUpdate(); + } finally { + statement.close(); + } + } catch (FlightRuntimeException e) { + throw FlightSqlDriverUtil.fromFlightException(e); + } + } + + private void executeSqlQuery() throws AdbcException { + try { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + throw new AdbcException( + "[Flight SQL] Failed to close unread result set", + e, + AdbcStatusCode.IO, + null, /*vendorCode*/ + 0); + } + } + + if (preparedStatement != null) { + // TODO: This binds only the LAST row + if (bindRoot != null) { + preparedStatement.setParameters(new NonOwningRoot(bindRoot)); + } + // XXX(ARROW-17199): why does this throw SQLException? + flightEndpoints = preparedStatement.execute().getEndpoints(); + } else { + flightEndpoints = client.execute(sqlQuery).getEndpoints(); + } + } catch (FlightRuntimeException e) { + throw FlightSqlDriverUtil.fromFlightException(e); + } catch (SQLException e) { + throw FlightSqlDriverUtil.fromSqlException(e); + } + } + + @Override + public ArrowReader getArrowReader() throws AdbcException { + if (reader != null) { + ArrowReader result = reader; + reader = null; + return result; + } + if (flightEndpoints == null) { + throw AdbcException.invalidState("[Flight SQL] Must call execute() before getArrowReader()"); + } + final ArrowReader reader = + new FlightInfoReader(allocator, client, clientCache, flightEndpoints); + flightEndpoints = null; + return reader; + } + + @Override + public List getPartitionDescriptors() throws AdbcException { + if (flightEndpoints == null) { + throw AdbcException.invalidState( + "[Flight SQL] Must call execute() before getPartitionDescriptors()"); + } + final List result = new ArrayList<>(); + for (final FlightEndpoint endpoint : flightEndpoints) { + // FlightEndpoint doesn't expose its serializer, so do it manually + Flight.FlightEndpoint.Builder protoEndpoint = + Flight.FlightEndpoint.newBuilder() + .setTicket( + Flight.Ticket.newBuilder() + .setTicket(ByteString.copyFrom(endpoint.getTicket().getBytes()))); + for (final Location location : endpoint.getLocations()) { + protoEndpoint.addLocation( + Flight.Location.newBuilder().setUri(location.getUri().toString()).build()); + } + result.add( + new PartitionDescriptor(protoEndpoint.build().toByteString().asReadOnlyByteBuffer())); + } + return result; + } + + @Override + public void prepare() throws AdbcException { + try { + if (sqlQuery == null) { + throw AdbcException.invalidArgument( + "[Flight SQL] Must call setSqlQuery(String) before prepare()"); + } + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + throw new AdbcException( + "[Flight SQL] Failed to close unread result set", + e, + AdbcStatusCode.IO, + null, /*vendorCode*/ + 0); + } + } + + preparedStatement = client.prepare(sqlQuery); + } catch (FlightRuntimeException e) { + throw FlightSqlDriverUtil.fromFlightException(e); + } + } + + @Override + public void close() throws Exception { + AutoCloseables.close(reader, preparedStatement); + } + + private static final class BulkState { + public BulkIngestMode mode; + String targetTable; + } + + /** A VectorSchemaRoot which does not own its data. */ + private static final class NonOwningRoot extends VectorSchemaRoot { + public NonOwningRoot(VectorSchemaRoot parent) { + super(parent.getSchema(), parent.getFieldVectors(), parent.getRowCount()); + } + + @Override + public void close() {} + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/InfoMetadataBuilder.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/InfoMetadataBuilder.java new file mode 100644 index 0000000000..318405d6ce --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/InfoMetadataBuilder.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adbc.driver.flightsql; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.core.AdbcInfoCode; +import org.apache.arrow.adbc.core.StandardSchemas; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.flight.sql.impl.FlightSql; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.DenseUnionVector; + +/** Helper class to track state needed to build up the info structure. */ +final class InfoMetadataBuilder implements AutoCloseable { + private static final byte STRING_VALUE_TYPE_ID = (byte) 0; + private static final Map ADBC_TO_FLIGHT_SQL_CODES = new HashMap<>(); + private static final Map SUPPORTED_CODES = new HashMap<>(); + + private final Collection requestedCodes; + private final FlightSqlClient client; + private VectorSchemaRoot root; + + private final UInt4Vector infoCodes; + private final DenseUnionVector infoValues; + private final VarCharVector stringValues; + + @FunctionalInterface + interface AddInfo { + void accept(InfoMetadataBuilder builder, DenseUnionVector sqlInfo, int srcIndex, int dstIndex); + } + + static { + ADBC_TO_FLIGHT_SQL_CODES.put( + AdbcInfoCode.VENDOR_NAME.getValue(), FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME.getNumber()); + ADBC_TO_FLIGHT_SQL_CODES.put( + AdbcInfoCode.VENDOR_VERSION.getValue(), + FlightSql.SqlInfo.FLIGHT_SQL_SERVER_VERSION.getNumber()); + + SUPPORTED_CODES.put( + FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME.getNumber(), + (b, sqlInfo, srcIndex, dstIndex) -> { + b.infoCodes.setSafe(dstIndex, AdbcInfoCode.VENDOR_NAME.getValue()); + b.setStringValue(dstIndex, sqlInfo.getVarCharVector(STRING_VALUE_TYPE_ID).get(srcIndex)); + }); + SUPPORTED_CODES.put( + FlightSql.SqlInfo.FLIGHT_SQL_SERVER_VERSION.getNumber(), + (b, sqlInfo, srcIndex, dstIndex) -> { + b.infoCodes.setSafe(dstIndex, AdbcInfoCode.VENDOR_VERSION.getValue()); + b.setStringValue(dstIndex, sqlInfo.getVarCharVector(STRING_VALUE_TYPE_ID).get(srcIndex)); + }); + } + + InfoMetadataBuilder(BufferAllocator allocator, FlightSqlClient client, int[] infoCodes) { + if (infoCodes == null) { + this.requestedCodes = new ArrayList<>(SUPPORTED_CODES.keySet()); + this.requestedCodes.add(AdbcInfoCode.DRIVER_NAME.getValue()); + this.requestedCodes.add(AdbcInfoCode.DRIVER_VERSION.getValue()); + } else { + this.requestedCodes = IntStream.of(infoCodes).boxed().collect(Collectors.toList()); + } + this.client = client; + this.root = VectorSchemaRoot.create(StandardSchemas.GET_INFO_SCHEMA, allocator); + this.infoCodes = (UInt4Vector) root.getVector(0); + this.infoValues = (DenseUnionVector) root.getVector(1); + this.stringValues = this.infoValues.getVarCharVector((byte) 0); + } + + void setStringValue(int index, byte[] value) { + infoValues.setValueCount(index + 1); + infoValues.setTypeId(index, STRING_VALUE_TYPE_ID); + stringValues.setSafe(index, value); + infoValues + .getOffsetBuffer() + .setInt((long) index * DenseUnionVector.OFFSET_WIDTH, stringValues.getLastSet()); + } + + VectorSchemaRoot build() throws AdbcException { + // XXX: rather hacky, we need a better way to do this + int dstIndex = 0; + + List translatedCodes = new ArrayList<>(); + for (int code : requestedCodes) { + Integer translatedCode = ADBC_TO_FLIGHT_SQL_CODES.get(code); + if (translatedCode != null) { + translatedCodes.add(translatedCode); + } else if (code == AdbcInfoCode.DRIVER_NAME.getValue()) { + infoCodes.setSafe(dstIndex, code); + setStringValue(dstIndex++, "ADBC Flight SQL Driver".getBytes(StandardCharsets.UTF_8)); + } else if (code == AdbcInfoCode.DRIVER_VERSION.getValue()) { + infoCodes.setSafe(dstIndex, code); + // TODO: actual version + setStringValue(dstIndex++, "0.0.1".getBytes(StandardCharsets.UTF_8)); + } + } + final FlightInfo info = client.getSqlInfo(translatedCodes); + + for (final FlightEndpoint endpoint : info.getEndpoints()) { + // TODO: this should account for locations property + try (final FlightStream stream = client.getStream(endpoint.getTicket())) { + final VectorSchemaRoot root = stream.getRoot(); + final UInt4Vector sqlCode = (UInt4Vector) root.getVector(0); + final DenseUnionVector sqlInfo = (DenseUnionVector) root.getVector(1); + for (int srcIndex = 0; srcIndex < root.getRowCount(); srcIndex++) { + final AddInfo addInfo = SUPPORTED_CODES.get(sqlCode.get(srcIndex)); + if (addInfo != null) { + addInfo.accept(this, sqlInfo, srcIndex, dstIndex++); + } + } + } catch (FlightRuntimeException e) { + throw FlightSqlDriverUtil.fromFlightException(e); + } catch (Exception e) { + throw AdbcException.io("[Flight SQL] " + e.getMessage()).withCause(e); + } + } + + root.setRowCount(dstIndex); + VectorSchemaRoot result = root; + root = null; + return result; + } + + @Override + public void close() throws Exception { + AutoCloseables.close(root); + } +} diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/RootArrowReader.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/RootArrowReader.java new file mode 100644 index 0000000000..0325fd7948 --- /dev/null +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/RootArrowReader.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.adbc.driver.flightsql; + +import java.io.IOException; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; + +/** An ArrowReader that wraps a list of ArrowRecordBatches. */ +class RootArrowReader extends ArrowReader { + private final Schema schema; + private final List batches; + int nextIndex; + + public RootArrowReader(BufferAllocator allocator, Schema schema, List batches) { + super(allocator); + this.schema = schema; + this.batches = batches; + this.nextIndex = 0; + } + + @Override + public boolean loadNextBatch() throws IOException { + if (nextIndex < batches.size()) { + new VectorLoader(getVectorSchemaRoot()).load(batches.get(nextIndex++)); + return true; + } + return false; + } + + @Override + public long bytesRead() { + return 0; + } + + @Override + protected void closeReadSource() throws IOException { + try { + AutoCloseables.close(batches); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + protected Schema readSchema() { + return schema; + } +} diff --git a/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java b/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java index 877f51244b..b8a4def06e 100644 --- a/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java +++ b/java/driver/jdbc-validation-derby/src/test/java/org/apache/arrow/adbc/driver/jdbc/derby/DerbyQuirks.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; import org.apache.arrow.adbc.core.AdbcException; import org.apache.arrow.adbc.driver.jdbc.JdbcDriver; import org.apache.arrow.adbc.driver.testsuite.SqlValidationQuirks; @@ -39,7 +40,7 @@ public DerbyQuirks(Path databaseRoot) { @Override public AdbcDatabase initDatabase() throws AdbcException { final Map parameters = new HashMap<>(); - parameters.put("adbc.jdbc.url", jdbcUrl); + parameters.put(AdbcDriver.PARAM_URL, jdbcUrl); return JdbcDriver.INSTANCE.open(parameters); } diff --git a/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java b/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java index a0a1885207..126adac777 100644 --- a/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java +++ b/java/driver/jdbc-validation-postgresql/src/test/java/org/apache/arrow/adbc/driver/jdbc/postgresql/PostgresqlQuirks.java @@ -24,10 +24,11 @@ import java.util.HashMap; import java.util.Map; import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcDriver; import org.apache.arrow.adbc.core.AdbcException; import org.apache.arrow.adbc.driver.jdbc.JdbcDriver; -import org.apache.arrow.adbc.driver.jdbc.JdbcDriverQuirks; import org.apache.arrow.adbc.driver.testsuite.SqlValidationQuirks; +import org.apache.arrow.adbc.sql.SqlQuirks; import org.apache.arrow.vector.types.pojo.ArrowType; import org.junit.jupiter.api.Assumptions; @@ -52,17 +53,16 @@ public AdbcDatabase initDatabase() throws AdbcException { String url = makeJdbcUrl(); final Map parameters = new HashMap<>(); - parameters.put("adbc.jdbc.url", url); + parameters.put(AdbcDriver.PARAM_URL, url); parameters.put( - "adbc.jdbc.quirks", - JdbcDriverQuirks.builder() + AdbcDriver.PARAM_SQL_QUIRKS, + SqlQuirks.builder() .arrowToSqlTypeNameMapping( (arrowType -> { if (arrowType.getTypeID() == ArrowType.ArrowTypeID.Utf8) { return "TEXT"; } - return JdbcDriverQuirks.DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING.apply( - arrowType); + return SqlQuirks.DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING.apply(arrowType); })) .build()); return JdbcDriver.INSTANCE.open(parameters); diff --git a/java/driver/jdbc/pom.xml b/java/driver/jdbc/pom.xml index 0ea8d43f2e..05b0e9f375 100644 --- a/java/driver/jdbc/pom.xml +++ b/java/driver/jdbc/pom.xml @@ -50,37 +50,9 @@ org.apache.arrow.adbc adbc-driver-manager - - - - - org.apache.derby - derby - 10.14.2.0 - test - - - org.apache.derby - derbytools - 10.14.2.0 - test - - - - - org.assertj - assertj-core - test - - - org.junit.jupiter - junit-jupiter - test - org.apache.arrow.adbc - adbc-driver-validation - test + adbc-sql diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java index 5bbb3b0d1a..5982236312 100644 --- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java +++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java @@ -29,6 +29,7 @@ import org.apache.arrow.adbc.core.AdbcStatement; import org.apache.arrow.adbc.core.BulkIngestMode; import org.apache.arrow.adbc.core.StandardSchemas; +import org.apache.arrow.adbc.sql.SqlQuirks; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -39,9 +40,9 @@ public class JdbcConnection implements AdbcConnection { private final BufferAllocator allocator; private final Connection connection; - private final JdbcDriverQuirks quirks; + private final SqlQuirks quirks; - JdbcConnection(BufferAllocator allocator, Connection connection, JdbcDriverQuirks quirks) { + JdbcConnection(BufferAllocator allocator, Connection connection, SqlQuirks quirks) { this.allocator = allocator; this.connection = connection; this.quirks = quirks; diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java index 4cef0e44eb..7c8a2a2f20 100644 --- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java +++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDatabase.java @@ -24,17 +24,18 @@ import org.apache.arrow.adbc.core.AdbcConnection; import org.apache.arrow.adbc.core.AdbcDatabase; import org.apache.arrow.adbc.core.AdbcException; +import org.apache.arrow.adbc.sql.SqlQuirks; import org.apache.arrow.memory.BufferAllocator; /** An instance of a database (e.g. a handle to an in-memory database). */ public final class JdbcDatabase implements AdbcDatabase { private final BufferAllocator allocator; private final String target; - private final JdbcDriverQuirks quirks; + private final SqlQuirks quirks; private final Connection connection; private final AtomicInteger counter; - JdbcDatabase(BufferAllocator allocator, final String target, JdbcDriverQuirks quirks) + JdbcDatabase(BufferAllocator allocator, final String target, SqlQuirks quirks) throws AdbcException { this.allocator = allocator; this.target = target; diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java index 341c8493dc..67db75a90f 100644 --- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java +++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriver.java @@ -21,10 +21,12 @@ import org.apache.arrow.adbc.core.AdbcDriver; import org.apache.arrow.adbc.core.AdbcException; import org.apache.arrow.adbc.drivermanager.AdbcDriverManager; +import org.apache.arrow.adbc.sql.SqlQuirks; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.Preconditions; +/** An ADBC driver wrapping the JDBC API. */ public enum JdbcDriver implements AdbcDriver { INSTANCE; @@ -37,18 +39,20 @@ public enum JdbcDriver implements AdbcDriver { @Override public AdbcDatabase open(Map parameters) throws AdbcException { - Object target = parameters.get("adbc.jdbc.url"); + Object target = parameters.get(PARAM_URL); if (!(target instanceof String)) { - throw AdbcException.invalidArgument("[JDBC] Must provide String adbc.jdbc.url parameter"); + throw AdbcException.invalidArgument("[JDBC] Must provide String " + PARAM_URL + " parameter"); } - Object quirks = parameters.get("adbc.jdbc.quirks"); + Object quirks = parameters.get(PARAM_SQL_QUIRKS); if (quirks != null) { Preconditions.checkArgument( - quirks instanceof JdbcDriverQuirks, - "[JDBC] adbc.jdbc.quirks must be a JdbcDriverQuirks instance"); + quirks instanceof SqlQuirks, + String.format( + "[JDBC] %s must be a SqlQuirks instance, not %s", + PARAM_SQL_QUIRKS, quirks.getClass().getName())); } else { - quirks = new JdbcDriverQuirks(); + quirks = new SqlQuirks(); } - return new JdbcDatabase(allocator, (String) target, (JdbcDriverQuirks) quirks); + return new JdbcDatabase(allocator, (String) target, (SqlQuirks) quirks); } } diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java index df523d4f9a..1d243bdb93 100644 --- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java +++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java @@ -29,6 +29,7 @@ import org.apache.arrow.adbc.core.AdbcStatusCode; import org.apache.arrow.adbc.core.BulkIngestMode; import org.apache.arrow.adbc.driver.jdbc.util.JdbcParameterBinder; +import org.apache.arrow.adbc.sql.SqlQuirks; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.VectorSchemaRoot; @@ -38,7 +39,7 @@ public class JdbcStatement implements AdbcStatement { private final BufferAllocator allocator; private final Connection connection; - private final JdbcDriverQuirks quirks; + private final SqlQuirks quirks; // State for SQL queries private Statement statement; @@ -49,7 +50,7 @@ public class JdbcStatement implements AdbcStatement { private BulkState bulkOperation; private VectorSchemaRoot bindRoot; - JdbcStatement(BufferAllocator allocator, Connection connection, JdbcDriverQuirks quirks) { + JdbcStatement(BufferAllocator allocator, Connection connection, SqlQuirks quirks) { this.allocator = allocator; this.connection = connection; this.quirks = quirks; @@ -59,7 +60,7 @@ public class JdbcStatement implements AdbcStatement { static JdbcStatement ingestRoot( BufferAllocator allocator, Connection connection, - JdbcDriverQuirks quirks, + SqlQuirks quirks, String targetTableName, BulkIngestMode mode) { Objects.requireNonNull(targetTableName); @@ -179,7 +180,11 @@ private void executeSqlQuery() throws AdbcException { reader.close(); } catch (IOException e) { throw new AdbcException( - "Failed to close unread result set", e, AdbcStatusCode.IO, null, /*vendorCode*/ 0); + "[JDBC] Failed to close unread result set", + e, + AdbcStatusCode.IO, + null, /*vendorCode*/ + 0); } } if (resultSet != null) { @@ -225,6 +230,10 @@ public ArrowReader getArrowReader() throws AdbcException { @Override public void prepare() throws AdbcException { try { + if (sqlQuery == null) { + throw AdbcException.invalidArgument( + "[Flight SQL] Must call setSqlQuery(String) before prepare()"); + } if (resultSet != null) { resultSet.close(); } diff --git a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java index ecb0f4ea13..46ae770d03 100644 --- a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java +++ b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java @@ -79,7 +79,7 @@ public void afterEach() throws Exception { } @Test - void getInfo() throws Exception { + public void getInfo() throws Exception { try (final AdbcStatement stmt = connection.getInfo()) { try (final ArrowReader reader = stmt.getArrowReader()) { assertThat(reader.getVectorSchemaRoot().getSchema()) @@ -91,7 +91,7 @@ void getInfo() throws Exception { } @Test - void getInfoByCode() throws Exception { + public void getInfoByCode() throws Exception { try (final AdbcStatement stmt = connection.getInfo(new AdbcInfoCode[] {AdbcInfoCode.DRIVER_NAME})) { try (final ArrowReader reader = stmt.getArrowReader()) { @@ -112,7 +112,7 @@ void getInfoByCode() throws Exception { } @Test - void getObjectsColumns() throws Exception { + public void getObjectsColumns() throws Exception { final Schema schema = util.ingestTableIntsStrs(allocator, connection, tableName); boolean tableFound = false; try (final AdbcStatement stmt = @@ -153,7 +153,7 @@ void getObjectsColumns() throws Exception { } @Test - void getObjectsCatalogs() throws Exception { + public void getObjectsCatalogs() throws Exception { util.ingestTableIntsStrs(allocator, connection, tableName); try (final AdbcStatement stmt = connection.getObjects( @@ -171,7 +171,7 @@ void getObjectsCatalogs() throws Exception { } @Test - void getObjectsDbSchemas() throws Exception { + public void getObjectsDbSchemas() throws Exception { util.ingestTableIntsStrs(allocator, connection, tableName); try (final AdbcStatement stmt = connection.getObjects( @@ -186,7 +186,7 @@ void getObjectsDbSchemas() throws Exception { } @Test - void getObjectsTables() throws Exception { + public void getObjectsTables() throws Exception { util.ingestTableIntsStrs(allocator, connection, tableName); try (final AdbcStatement stmt = connection.getObjects( @@ -208,7 +208,7 @@ void getObjectsTables() throws Exception { } @Test - void getTableSchema() throws Exception { + public void getTableSchema() throws Exception { final Schema schema = new Schema( Arrays.asList( @@ -226,7 +226,7 @@ void getTableSchema() throws Exception { } @Test - void getTableTypes() throws Exception { + public void getTableTypes() throws Exception { try (final AdbcStatement stmt = connection.getTableTypes()) { try (final ArrowReader reader = stmt.getArrowReader()) { assertThat(reader.getVectorSchemaRoot().getSchema()) diff --git a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractPartitionDescriptorTest.java b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractPartitionDescriptorTest.java new file mode 100644 index 0000000000..72467321a4 --- /dev/null +++ b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractPartitionDescriptorTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adbc.driver.testsuite; + +import static org.apache.arrow.adbc.driver.testsuite.ArrowAssertions.assertRoot; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import org.apache.arrow.adbc.core.AdbcConnection; +import org.apache.arrow.adbc.core.AdbcDatabase; +import org.apache.arrow.adbc.core.AdbcStatement; +import org.apache.arrow.adbc.core.BulkIngestMode; +import org.apache.arrow.adbc.core.PartitionDescriptor; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public abstract class AbstractPartitionDescriptorTest { + /** Must be initialized by the subclass. */ + protected static SqlValidationQuirks quirks; + + protected AdbcDatabase database; + protected AdbcConnection connection; + protected BufferAllocator allocator; + protected SqlTestUtil util; + protected String tableName; + protected Schema schema; + + @BeforeEach + public void beforeEach() throws Exception { + Preconditions.checkNotNull(quirks, "Must initialize quirks in subclass with @BeforeAll"); + database = quirks.initDatabase(); + connection = database.connect(); + allocator = new RootAllocator(); + util = new SqlTestUtil(quirks); + tableName = quirks.caseFoldTableName("bulktable"); + schema = + new Schema( + Arrays.asList( + Field.nullable( + quirks.caseFoldColumnName("ints"), new ArrowType.Int(32, /*signed=*/ true)), + Field.nullable(quirks.caseFoldColumnName("strs"), new ArrowType.Utf8()))); + quirks.cleanupTable(tableName); + } + + @AfterEach + public void afterEach() throws Exception { + quirks.cleanupTable(tableName); + AutoCloseables.close(connection, database, allocator); + } + + @Test + public void serializeDeserializeQuery() throws Exception { + try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + final IntVector ints = (IntVector) root.getVector(0); + final VarCharVector strs = (VarCharVector) root.getVector(1); + + ints.allocateNew(4); + ints.setSafe(0, 0); + ints.setSafe(1, 1); + ints.setSafe(2, 2); + ints.setNull(3); + strs.allocateNew(4); + strs.setNull(0); + strs.setSafe(1, "foo".getBytes(StandardCharsets.UTF_8)); + strs.setSafe(2, "".getBytes(StandardCharsets.UTF_8)); + strs.setSafe(3, "asdf".getBytes(StandardCharsets.UTF_8)); + root.setRowCount(4); + + try (final AdbcStatement stmt = connection.bulkIngest(tableName, BulkIngestMode.CREATE)) { + stmt.bind(root); + stmt.execute(); + } + final List descriptors; + try (final AdbcStatement stmt = connection.createStatement()) { + stmt.setSqlQuery("SELECT * FROM " + tableName); + stmt.execute(); + descriptors = stmt.getPartitionDescriptors(); + // For convenience, assume database won't shard 4 rows over more than 1 partition… + assertThat(descriptors).hasSize(1); + } + + // The serialized partition descriptor should be executable on a separate connection + try (final AdbcConnection connection2 = database.connect(); + final AdbcStatement stmt = + connection2.deserializePartitionDescriptor(descriptors.get(0).getDescriptor()); + final ArrowReader reader = stmt.getArrowReader()) { + assertThat(reader.loadNextBatch()).isTrue(); + assertThat(reader.getVectorSchemaRoot().getSchema()).isEqualTo(root.getSchema()); + assertRoot(reader.getVectorSchemaRoot()).isEqualTo(root); + } + } + } +} diff --git a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java index 947cd3d1eb..c6fc97cd2d 100644 --- a/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java +++ b/java/driver/validation/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractStatementTest.java @@ -71,6 +71,7 @@ public void beforeEach() throws Exception { Field.nullable( quirks.caseFoldColumnName("ints"), new ArrowType.Int(32, /*signed=*/ true)), Field.nullable(quirks.caseFoldColumnName("strs"), new ArrowType.Utf8()))); + quirks.cleanupTable(tableName); } @AfterEach @@ -80,7 +81,7 @@ public void afterEach() throws Exception { } @Test - public void bulkInsertAppend() throws Exception { + public void bulkIngestAppend() throws Exception { try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { final IntVector ints = (IntVector) root.getVector(0); final VarCharVector strs = (VarCharVector) root.getVector(1); @@ -140,6 +141,7 @@ public void bulkIngestAppendConflict() throws Exception { Collections.singletonList( Field.nullable(quirks.caseFoldColumnName("ints"), new ArrowType.Utf8()))); try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.setRowCount(1); try (final AdbcStatement stmt = connection.bulkIngest(tableName, BulkIngestMode.CREATE)) { stmt.bind(root); stmt.execute(); @@ -169,6 +171,7 @@ public void bulkIngestAppendNotFound() throws Exception { @Test public void bulkIngestCreateConflict() throws Exception { try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.setRowCount(1); try (final AdbcStatement stmt = connection.bulkIngest(tableName, BulkIngestMode.CREATE)) { stmt.bind(root); stmt.execute(); @@ -178,7 +181,7 @@ public void bulkIngestCreateConflict() throws Exception { try (final AdbcStatement stmt = connection.bulkIngest(tableName, BulkIngestMode.CREATE)) { stmt.bind(root); final AdbcException e = assertThrows(AdbcException.class, stmt::execute); - assertThat(e.getStatus()).isEqualTo(AdbcStatusCode.ALREADY_EXISTS); + assertThat(e.getStatus()).describedAs("%s", e).isEqualTo(AdbcStatusCode.ALREADY_EXISTS); } } } @@ -193,7 +196,9 @@ public void prepareQuery() throws Exception { assertThat(reader.getVectorSchemaRoot().getSchema()).isEqualTo(expectedSchema); assertThat(reader.loadNextBatch()).isTrue(); assertThat(reader.getVectorSchemaRoot().getRowCount()).isEqualTo(4); - assertThat(reader.loadNextBatch()).isFalse(); + while (reader.loadNextBatch()) { + assertThat(reader.getVectorSchemaRoot().getRowCount()).isEqualTo(0); + } } } } diff --git a/java/pom.xml b/java/pom.xml index 11987e17cd..fc9bceac97 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -77,12 +77,15 @@ core + driver/flight-sql + driver/flight-sql-validation driver/jdbc driver/jdbc-util driver/jdbc-validation-derby driver/jdbc-validation-postgresql driver/validation driver-manager + sql @@ -103,12 +106,28 @@ arrow-vector ${dep.arrow.version} + + org.apache.arrow + flight-core + ${dep.arrow.version} + + + org.apache.arrow + flight-sql + ${dep.arrow.version} + + org.apache.arrow.adbc adbc-core ${adbc.version} + + org.apache.arrow.adbc + adbc-driver-flight-sql + ${adbc.version} + org.apache.arrow.adbc adbc-driver-jdbc @@ -129,6 +148,11 @@ adbc-driver-manager ${adbc.version} + + org.apache.arrow.adbc + adbc-sql + ${adbc.version} + diff --git a/java/sql/pom.xml b/java/sql/pom.xml new file mode 100644 index 0000000000..5009d673df --- /dev/null +++ b/java/sql/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + arrow-adbc-java-root + org.apache.arrow.adbc + 9.0.0-SNAPSHOT + + + adbc-sql + jar + Arrow ADBC SQL + Common utilities for SQL-based ADBC drivers. + + + + org.apache.arrow + arrow-vector + + + + + org.assertj + assertj-core + test + + + org.junit.jupiter + junit-jupiter + test + + + diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverQuirks.java b/java/sql/src/main/java/org/apache/arrow/adbc/sql/SqlQuirks.java similarity index 89% rename from java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverQuirks.java rename to java/sql/src/main/java/org/apache/arrow/adbc/sql/SqlQuirks.java index 99c87e6dfd..007f699c0f 100644 --- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverQuirks.java +++ b/java/sql/src/main/java/org/apache/arrow/adbc/sql/SqlQuirks.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.arrow.adbc.driver.jdbc; +package org.apache.arrow.adbc.sql; import java.util.function.Function; import org.apache.arrow.vector.types.pojo.ArrowType; -/** Parameters to pass to the ADBC JDBC driver to account for driver/vendor-specific quirks. */ -public final class JdbcDriverQuirks { +/** Parameters to pass to SQL-based drivers to account for driver/vendor-specific SQL quirks. */ +public final class SqlQuirks { public static final Function DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING = (arrowType) -> { switch (arrowType.getTypeID()) { @@ -58,7 +58,7 @@ public final class JdbcDriverQuirks { }; Function arrowToSqlTypeNameMapping; - public JdbcDriverQuirks() { + public SqlQuirks() { this.arrowToSqlTypeNameMapping = DEFAULT_ARROW_TYPE_TO_SQL_TYPE_NAME_MAPPING; } @@ -80,8 +80,8 @@ public Builder arrowToSqlTypeNameMapping(Function mapper) { return this; } - public JdbcDriverQuirks build() { - final JdbcDriverQuirks quirks = new JdbcDriverQuirks(); + public SqlQuirks build() { + final SqlQuirks quirks = new SqlQuirks(); if (arrowToSqlTypeNameMapping != null) { quirks.arrowToSqlTypeNameMapping = arrowToSqlTypeNameMapping; } diff --git a/java/sql/src/main/java/org/apache/arrow/adbc/sql/package-info.java b/java/sql/src/main/java/org/apache/arrow/adbc/sql/package-info.java new file mode 100644 index 0000000000..7ed2c8b69e --- /dev/null +++ b/java/sql/src/main/java/org/apache/arrow/adbc/sql/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This module contains common utilities for drivers working with SQL. + * + *

ADBC is currently experimental. + */ +package org.apache.arrow.adbc.sql;