Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3965 [Java] JDBC-To-Arrow Configuration #3133

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public static VectorSchemaRoot sqlToArrow(Connection connection, String query, B
Preconditions.checkArgument(query != null && query.length() > 0, "SQL query can not be null or empty");
Preconditions.checkNotNull(allocator, "Memory allocator object can not be null");

return sqlToArrow(connection, query, allocator, Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT));
JdbcToArrowConfig config =
new JdbcToArrowConfig(allocator, Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT));
return sqlToArrow(connection, query, config);
}

/**
Expand All @@ -115,8 +117,30 @@ public static VectorSchemaRoot sqlToArrow(
Preconditions.checkNotNull(allocator, "Memory allocator object can not be null");
Preconditions.checkNotNull(calendar, "Calendar object can not be null");

return sqlToArrow(connection, query, new JdbcToArrowConfig(allocator, calendar));
}

/**
* For the given SQL query, execute and fetch the data from Relational DB and convert it to Arrow objects.
*
* @param connection Database connection to be used. This method will not close the passed connection object.
* Since the caller has passed the connection object it's the responsibility of the caller
* to close or return the connection to the pool.
* @param query The DB Query to fetch the data.
* @param config Configuration
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException Propagate any SQL Exceptions to the caller after closing any resources opened such as
* ResultSet and Statement objects.
*/
public static VectorSchemaRoot sqlToArrow(Connection connection, String query, JdbcToArrowConfig config)
throws SQLException, IOException {
Preconditions.checkNotNull(connection, "JDBC connection object can not be null");
Preconditions.checkArgument(query != null && query.length() > 0, "SQL query can not be null or empty");
Preconditions.checkNotNull(config, "The configuration cannot be null");
Preconditions.checkArgument(config.isValid(), "The configuration must be valid");

try (Statement stmt = connection.createStatement()) {
return sqlToArrow(stmt.executeQuery(query), allocator, calendar);
return sqlToArrow(stmt.executeQuery(query), config);
}
}

Expand Down Expand Up @@ -147,7 +171,9 @@ public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, BaseAllocator all
Preconditions.checkNotNull(resultSet, "JDBC ResultSet object can not be null");
Preconditions.checkNotNull(allocator, "Memory Allocator object can not be null");

return sqlToArrow(resultSet, allocator, Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT));
JdbcToArrowConfig config =
new JdbcToArrowConfig(allocator, Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT));
return sqlToArrow(resultSet, config);
}

/**
Expand All @@ -162,10 +188,7 @@ public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, Calendar calendar
Preconditions.checkNotNull(resultSet, "JDBC ResultSet object can not be null");
Preconditions.checkNotNull(calendar, "Calendar object can not be null");

RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
VectorSchemaRoot root = sqlToArrow(resultSet, rootAllocator, calendar);

return root;
return sqlToArrow(resultSet, new JdbcToArrowConfig(new RootAllocator(Integer.MAX_VALUE), calendar));
}

/**
Expand All @@ -183,9 +206,26 @@ public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, BaseAllocator all
Preconditions.checkNotNull(allocator, "Memory Allocator object can not be null");
Preconditions.checkNotNull(calendar, "Calendar object can not be null");

return sqlToArrow(resultSet, new JdbcToArrowConfig(allocator, calendar));
}

/**
* For the given JDBC {@link ResultSet}, fetch the data from Relational DB and convert it to Arrow objects.
*
* @param resultSet ResultSet to use to fetch the data from underlying database
* @param config Configuration of the conversion from JDBC to Arrow.
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException on error
*/
public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, JdbcToArrowConfig config)
throws SQLException, IOException {
Preconditions.checkNotNull(resultSet, "JDBC ResultSet object can not be null");
Preconditions.checkNotNull(config, "The configuration cannot be null");
Preconditions.checkArgument(config.isValid(), "The configuration must be valid");

VectorSchemaRoot root = VectorSchemaRoot.create(
JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), calendar), allocator);
JdbcToArrowUtils.jdbcToArrowVectors(resultSet, root, calendar);
JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config), config.getAllocator());
JdbcToArrowUtils.jdbcToArrowVectors(resultSet, root, config);
return root;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.adapter.jdbc;

import java.util.Calendar;

import org.apache.arrow.memory.BaseAllocator;

import com.google.common.base.Preconditions;

/**
* This class configures the JDBC-to-Arrow conversion process.
* <p>
* The allocator is used to construct the {@link org.apache.arrow.vector.VectorSchemaRoot},
* and the calendar is used to define the time zone of any {@link org.apahe.arrow.vector.pojo.ArrowType.Timestamp}
* fields that are created during the conversion.
* </p>
* <p>
* Neither field may be <code>null</code>.
* </p>
*/
public final class JdbcToArrowConfig {
private Calendar calendar;
private BaseAllocator allocator;

/**
* Constructs a new configuration from the provided allocator and calendar. The <code>allocator</code>
* is used when constructing the Arrow vectors from the ResultSet, and the calendar is used to define
* Arrow Timestamp fields, and to read time-based fields from the JDBC <code>ResultSet</code>.
*
* @param allocator The memory allocator to construct the Arrow vectors with.
* @param calendar The calendar to use when constructing Timestamp fields and reading time-based results.
*/
public JdbcToArrowConfig(BaseAllocator allocator, Calendar calendar) {
Preconditions.checkNotNull(allocator, "Memory allocator cannot be null");
Preconditions.checkNotNull(calendar, "Calendar object can not be null");

this.allocator = allocator;
this.calendar = calendar;
}

/**
* The calendar to use when defining Arrow Timestamp fields
* and retrieving time-based fields from the database.
* @return the calendar.
*/
public Calendar getCalendar() {
return calendar;
}

/**
* Sets the {@link Calendar} to use when constructing timestamp fields in the
* Arrow schema, and reading time-based fields from the JDBC <code>ResultSet</code>.
*
* @param calendar the calendar to set.
* @exception NullPointerExeption if <code>calendar</code> is <code>null</code>.
*/
public JdbcToArrowConfig setCalendar(Calendar calendar) {
Copy link
Contributor

@praveenbingo praveenbingo Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would prefer a builder instead..so that this pojo is immutable..

Preconditions.checkNotNull(calendar, "Calendar object can not be null");
this.calendar = calendar;
return this;
}

/**
* The Arrow memory allocator.
* @return the allocator.
*/
public BaseAllocator getAllocator() {
return allocator;
}

/**
* Sets the memory allocator to use when construting the Arrow vectors from the ResultSet.
*
* @param allocator the allocator to set.
* @exception NullPointerException if <code>allocator</code> is null.
*/
public JdbcToArrowConfig setAllocator(BaseAllocator allocator) {
Preconditions.checkNotNull(allocator, "Memory allocator cannot be null");
this.allocator = allocator;
return this;
}

/**
* Whether this configuration is valid. The configuration is valid when:
* <ul>
* <li>A memory allocator is provided.</li>
* <li>A calendar is provided.</li>
* </ul>
*
* @return Whether this configuration is valid.
*/
public boolean isValid() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a builder would ensure this object is always valid after construction..reducing the need to validate it everywhere..

return (calendar != null) && (allocator != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Calendar;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
Expand Down Expand Up @@ -90,6 +91,21 @@ public class JdbcToArrowUtils {
private static final int DEFAULT_STREAM_BUFFER_SIZE = 1024;
private static final int DEFAULT_CLOB_SUBSTRING_READ_SIZE = 256;

/**
* Create Arrow {@link Schema} object for the given JDBC {@link ResultSetMetaData}.
*
* @param rsmd The ResultSetMetaData containing the results, to read the JDBC metadata from.
* @param calendar The calendar to use the time zone field of, to construct Timestamp fields from.
* @return {@link Schema}
* @throws SQLException on error
*/
public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, Calendar calendar) throws SQLException {
Preconditions.checkNotNull(rsmd, "JDBC ResultSetMetaData object can't be null");
Preconditions.checkNotNull(calendar, "Calendar object can't be null");

return jdbcToArrowSchema(rsmd, new JdbcToArrowConfig(new RootAllocator(0), calendar));
}

/**
* Create Arrow {@link Schema} object for the given JDBC {@link ResultSetMetaData}.
*
Expand Down Expand Up @@ -120,14 +136,15 @@ public class JdbcToArrowUtils {
* CLOB --> ArrowType.Utf8
* BLOB --> ArrowType.Binary
*
* @param rsmd ResultSetMetaData
* @param rsmd The ResultSetMetaData containing the results, to read the JDBC metadata from.
* @param config The configuration to use when constructing the schema.
* @return {@link Schema}
* @throws SQLException on error
*/
public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, Calendar calendar) throws SQLException {

public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, JdbcToArrowConfig config) throws SQLException {
Preconditions.checkNotNull(rsmd, "JDBC ResultSetMetaData object can't be null");
Preconditions.checkNotNull(calendar, "Calendar object can't be null");
Preconditions.checkNotNull(config, "The configuration object must not be null");
Preconditions.checkArgument(config.isValid(), "The configuration object must be valid");

List<Field> fields = new ArrayList<>();
int columnCount = rsmd.getColumnCount();
Expand Down Expand Up @@ -179,7 +196,7 @@ public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, Calendar calendar
break;
case Types.TIMESTAMP:
fields.add(new Field(columnName, FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND,
calendar.getTimeZone().getID())), null));
config.getCalendar().getTimeZone().getID())), null));
break;
case Types.BINARY:
case Types.VARBINARY:
Expand Down Expand Up @@ -222,17 +239,38 @@ private static void allocateVectors(VectorSchemaRoot root, int size) {
* Iterate the given JDBC {@link ResultSet} object to fetch the data and transpose it to populate
* the given Arrow Vector objects.
*
* @param rs ResultSet to use to fetch the data from underlying database
* @param root Arrow {@link VectorSchemaRoot} object to populate
* @param rs ResultSet to use to fetch the data from underlying database
* @param root Arrow {@link VectorSchemaRoot} object to populate
* @param calendar The calendar to use when reading time-based data.
* @throws SQLException on error
*/
public static void jdbcToArrowVectors(ResultSet rs, VectorSchemaRoot root, Calendar calendar)
throws SQLException, IOException {

Preconditions.checkNotNull(rs, "JDBC ResultSet object can't be null");
Preconditions.checkNotNull(root, "JDBC ResultSet object can't be null");
Preconditions.checkNotNull(root, "Vector Schema cannot be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

Preconditions.checkNotNull(calendar, "Calendar object can't be null");

jdbcToArrowVectors(rs, root, new JdbcToArrowConfig(new RootAllocator(0), calendar));
}

/**
* Iterate the given JDBC {@link ResultSet} object to fetch the data and transpose it to populate
* the given Arrow Vector objects.
*
* @param rs ResultSet to use to fetch the data from underlying database
* @param root Arrow {@link VectorSchemaRoot} object to populate
* @param config The configuration to use when reading the data.
* @throws SQLException on error
*/
public static void jdbcToArrowVectors(ResultSet rs, VectorSchemaRoot root, JdbcToArrowConfig config)
throws SQLException, IOException {

Preconditions.checkNotNull(rs, "JDBC ResultSet object can't be null");
Preconditions.checkNotNull(root, "JDBC ResultSet object can't be null");
Preconditions.checkNotNull(config, "JDBC-to-Arrow configuration cannot be null");
Preconditions.checkArgument(config.isValid(), "JDBC-to-Arrow configuration must be valid");

ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();

Expand Down Expand Up @@ -289,16 +327,16 @@ public static void jdbcToArrowVectors(ResultSet rs, VectorSchemaRoot root, Calen
break;
case Types.DATE:
updateVector((DateMilliVector) root.getVector(columnName),
rs.getDate(i, calendar), !rs.wasNull(), rowCount);
rs.getDate(i, config.getCalendar()), !rs.wasNull(), rowCount);
break;
case Types.TIME:
updateVector((TimeMilliVector) root.getVector(columnName),
rs.getTime(i, calendar), !rs.wasNull(), rowCount);
rs.getTime(i, config.getCalendar()), !rs.wasNull(), rowCount);
break;
case Types.TIMESTAMP:
// TODO: Need to handle precision such as milli, micro, nano
updateVector((TimeStampVector) root.getVector(columnName),
rs.getTimestamp(i, calendar), !rs.wasNull(), rowCount);
rs.getTimestamp(i, config.getCalendar()), !rs.wasNull(), rowCount);
break;
case Types.BINARY:
case Types.VARBINARY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class AbstractJdbcToArrowTest {
* @return Table object
* @throws IOException on error
*/
protected static Table getTable(String ymlFilePath, Class clss) throws IOException {
protected static Table getTable(String ymlFilePath, @SuppressWarnings("rawtypes") Class clss) throws IOException {
return new ObjectMapper(new YAMLFactory()).readValue(
clss.getClassLoader().getResourceAsStream(ymlFilePath), Table.class);
}
Expand Down Expand Up @@ -94,7 +94,7 @@ public void destroy() throws SQLException {
* @throws ClassNotFoundException on error
* @throws IOException on error
*/
public static Object[][] prepareTestData(String[] testFiles, Class clss)
public static Object[][] prepareTestData(String[] testFiles, @SuppressWarnings("rawtypes") Class clss)
throws SQLException, ClassNotFoundException, IOException {
Object[][] tableArr = new Object[testFiles.length][];
int i = 0;
Expand Down
Loading