diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java index a68e0d7dd..3f065adcc 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java @@ -600,14 +600,18 @@ public static String fullTableName(TableId tableId) { } } - public long calculateTableSize(TableId tableId, Optional filter) { - return calculateTableSize(getTable(tableId), filter); + public long calculateTableSize( + TableId tableId, Optional filter, OptionalLong snapshotTimeMillis) { + return calculateTableSize(getTable(tableId), filter, snapshotTimeMillis); } - public long calculateTableSize(TableInfo tableInfo, Optional filter) { + public long calculateTableSize( + TableInfo tableInfo, Optional filter, OptionalLong snapshotTimeMillis) { TableDefinition.Type type = tableInfo.getDefinition().getType(); + if ((type == TableDefinition.Type.EXTERNAL || type == TableDefinition.Type.TABLE) - && !filter.isPresent()) { + && !filter.isPresent() + && !snapshotTimeMillis.isPresent()) { if (isBigQueryNativeTable(tableInfo) && tableInfo.getRequirePartitionFilter() != null && tableInfo.getRequirePartitionFilter()) { @@ -628,11 +632,17 @@ public long calculateTableSize(TableInfo tableInfo, Optional filter) { } else if (type == TableDefinition.Type.VIEW || type == TableDefinition.Type.MATERIALIZED_VIEW || ((type == TableDefinition.Type.TABLE || type == TableDefinition.Type.EXTERNAL) - && filter.isPresent())) { + && (filter.isPresent() || snapshotTimeMillis.isPresent()))) { // run a query String table = fullTableName(tableInfo.getTableId()); + String timeTravelClause = + snapshotTimeMillis.isPresent() + ? String.format( + "FOR SYSTEM TIME AS OF TIMESTAMP_MILLIS(%d)", snapshotTimeMillis.getAsLong()) + : ""; String whereClause = filter.map(f -> "WHERE " + f).orElse(""); - return getNumberOfRows(String.format("SELECT COUNT(*) from `%s` %s", table, whereClause)); + return getNumberOfRows( + String.format("SELECT COUNT(*) from `%s` %s %s", table, timeTravelClause, whereClause)); } else { throw new IllegalArgumentException( String.format( diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java index 3edf29349..e98de9389 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java @@ -36,6 +36,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.slf4j.Logger; @@ -98,14 +99,16 @@ public ReadSessionResponse create( log.info( "|creation a read session for table {}, parameters: " + "|selectedFields=[{}]," - + "|filter=[{}]" - + "|snapshotTimeMillis[{}]", + + "|filter=[{}]," + + "|snapshotTimeMillis[{}]," + + "|view=[{}]", actualTable.getFriendlyName(), String.join(",", selectedFields), filter.orElse("None"), config.getSnapshotTimeMillis().isPresent() ? String.valueOf(config.getSnapshotTimeMillis().getAsLong()) - : "None"); + : "None", + isInputTableAView(tableDetails)); String tablePath = toTablePath(actualTable.getTableId()); CreateReadSessionRequest request = @@ -264,7 +267,7 @@ TableInfo getActualTable( // get it from the view String querySql = bigQueryClient.createSql( - table.getTableId(), requiredColumns, filters, config.getSnapshotTimeMillis()); + table.getTableId(), requiredColumns, filters, OptionalLong.empty()); log.debug("querySql is {}", querySql); return bigQueryClient.materializeViewToTable( querySql, table.getTableId(), config.getMaterializationExpirationTimeInMinutes()); diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java index 7636129d6..9a4fe7b3a 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.stream.Collectors; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; @@ -130,7 +131,11 @@ public RDD buildScan(String[] requiredColumns, Filter[] filters) { BigQueryUtil.emptyIfNeeded(compiledFilter)); return (RDD) generateEmptyRowRDD( - actualTable, readSessionCreator.isInputTableAView(table) ? "" : compiledFilter); + actualTable, + readSessionCreator.isInputTableAView(table) ? "" : compiledFilter, + readSessionCreator.isInputTableAView(table) + ? OptionalLong.empty() + : options.getSnapshotTimeMillis()); } else if (requiredColumns.length == 0) { log.debug("Not using optimized empty projection"); } @@ -182,11 +187,13 @@ String getCompiledFilter(Filter[] filters) { } } - private RDD generateEmptyRowRDD(TableInfo tableInfo, String filter) { + private RDD generateEmptyRowRDD( + TableInfo tableInfo, String filter, OptionalLong snapshotTimeMillis) { emptyRowRDDsCreated += 1; Optional optionalFilter = (filter.length() == 0) ? Optional.empty() : Optional.of(filter); - long numberOfRows = bigQueryClient.calculateTableSize(tableInfo, optionalFilter); + long numberOfRows = + bigQueryClient.calculateTableSize(tableInfo, optionalFilter, snapshotTimeMillis); Function1 objectToInternalRowConverter = new ObjectToInternalRowConverter(); diff --git a/spark-bigquery-dsv2/spark-bigquery-dsv2-common/src/main/java/com/google/cloud/spark/bigquery/v2/context/BigQueryDataSourceReaderContext.java b/spark-bigquery-dsv2/spark-bigquery-dsv2-common/src/main/java/com/google/cloud/spark/bigquery/v2/context/BigQueryDataSourceReaderContext.java index d35a34841..8c9803024 100644 --- a/spark-bigquery-dsv2/spark-bigquery-dsv2-common/src/main/java/com/google/cloud/spark/bigquery/v2/context/BigQueryDataSourceReaderContext.java +++ b/spark-bigquery-dsv2/spark-bigquery-dsv2-common/src/main/java/com/google/cloud/spark/bigquery/v2/context/BigQueryDataSourceReaderContext.java @@ -339,7 +339,9 @@ private ReadSessionResponse createReadSession() { Stream> createEmptyProjectionPartitions() { Optional filter = getCombinedFilter(); - long rowCount = bigQueryClient.calculateTableSize(tableId, filter); + long rowCount = + bigQueryClient.calculateTableSize( + tableId, filter, readSessionCreatorConfig.getSnapshotTimeMillis()); logger.info("Used optimized BQ count(*) path. Count: " + rowCount); int partitionsCount = readSessionCreatorConfig.getDefaultParallelism(); int partitionSize = (int) (rowCount / partitionsCount);