diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java index a97ef9dc8..2815dfb4c 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java @@ -542,7 +542,11 @@ public TableResult query(String sql) { } } - String createSql(TableId table, ImmutableList requiredColumns, String[] filters) { + String createSql( + TableId table, + ImmutableList requiredColumns, + String[] filters, + OptionalLong snapshotTimeMillis) { String columns = requiredColumns.isEmpty() ? "*" @@ -550,17 +554,25 @@ String createSql(TableId table, ImmutableList requiredColumns, String[] .map(column -> String.format("`%s`", column)) .collect(Collectors.joining(",")); - return createSql(table, columns, filters); + return createSql(table, columns, filters, snapshotTimeMillis); } // assuming the SELECT part is properly formatted, can be used to call functions such as COUNT and // SUM - String createSql(TableId table, String formattedQuery, String[] filters) { + String createSql( + TableId table, String formattedQuery, String[] filters, OptionalLong snapshotTimeMillis) { String tableName = fullTableName(table); String whereClause = createWhereClause(filters).map(clause -> "WHERE " + clause).orElse(""); - return String.format("SELECT %s FROM `%s` %s", formattedQuery, tableName, whereClause); + String snapshotTimeClause = + snapshotTimeMillis.isPresent() + ? String.format( + "FOR SYSTEM_TIME AS OF TIMESTAMP_MILLIS(%d)", snapshotTimeMillis.getAsLong()) + : ""; + + return String.format( + "SELECT %s FROM `%s` %s %s", formattedQuery, tableName, whereClause, snapshotTimeClause); } public static String fullTableName(TableId tableId) { diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java index 963dacbbf..3edf29349 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java @@ -98,10 +98,14 @@ public ReadSessionResponse create( log.info( "|creation a read session for table {}, parameters: " + "|selectedFields=[{}]," - + "|filter=[{}]", + + "|filter=[{}]" + + "|snapshotTimeMillis[{}]", actualTable.getFriendlyName(), String.join(",", selectedFields), - filter.orElse("None")); + filter.orElse("None"), + config.getSnapshotTimeMillis().isPresent() + ? String.valueOf(config.getSnapshotTimeMillis().getAsLong()) + : "None"); String tablePath = toTablePath(actualTable.getTableId()); CreateReadSessionRequest request = @@ -167,17 +171,20 @@ public ReadSessionResponse create( Instant sessionPrepEndTime = Instant.now(); TableModifiers.Builder modifiers = TableModifiers.newBuilder(); - config - .getSnapshotTimeMillis() - .ifPresent( - millis -> { - Instant snapshotTime = Instant.ofEpochMilli(millis); - modifiers.setSnapshotTime( - Timestamp.newBuilder() - .setSeconds(snapshotTime.getEpochSecond()) - .setNanos(snapshotTime.getNano()) - .build()); - }); + + if (!isInputTableAView(tableDetails)) { + config + .getSnapshotTimeMillis() + .ifPresent( + millis -> { + Instant snapshotTime = Instant.ofEpochMilli(millis); + modifiers.setSnapshotTime( + Timestamp.newBuilder() + .setSeconds(snapshotTime.getEpochSecond()) + .setNanos(snapshotTime.getNano()) + .build()); + }); + } CreateReadSessionRequest createReadSessionRequest = request @@ -255,7 +262,9 @@ TableInfo getActualTable( } if (isInputTableAView(table)) { // get it from the view - String querySql = bigQueryClient.createSql(table.getTableId(), requiredColumns, filters); + String querySql = + bigQueryClient.createSql( + table.getTableId(), requiredColumns, filters, config.getSnapshotTimeMillis()); log.debug("querySql is {}", querySql); return bigQueryClient.materializeViewToTable( querySql, table.getTableId(), config.getMaterializationExpirationTimeInMinutes()); diff --git a/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java b/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java index 9c3877137..4f1b7bf59 100644 --- a/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java +++ b/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java @@ -31,6 +31,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions; @@ -79,6 +80,15 @@ public class ReadSessionCreatorTest { .setNumBytes(1L) .build()) .build(); + TableInfo view = + TableInfo.newBuilder( + TableId.of("a", "v"), + StandardTableDefinition.newBuilder() + .setType(TableDefinition.Type.VIEW) + .setSchema(Schema.of(Field.of("name", StandardSQLTypeName.BOOL))) + .setNumBytes(1L) + .build()) + .build(); private static MockBigQueryRead mockBigQueryRead; private static MockServiceHelper mockServiceHelper; @@ -316,6 +326,38 @@ public void testSnapshotTimeMillis() throws Exception { .isEqualTo(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build()); } + @Test + public void testViewSnapshotTimeMillis() throws Exception { + // setting up + String query = "SELECT * FROM `a.v`"; + when(bigQueryClient.getTable(any())).thenReturn(view); + when(bigQueryClient.createSql( + view.getTableId(), ImmutableList.of(), new String[0], OptionalLong.of(1234567890L))) + .thenReturn(query); + when(bigQueryClient.materializeViewToTable(query, view.getTableId(), 120)).thenReturn(table); + mockBigQueryRead.reset(); + mockBigQueryRead.addResponse( + ReadSession.newBuilder().addStreams(ReadStream.newBuilder().setName("0")).build()); + BigQueryClientFactory mockBigQueryClientFactory = mock(BigQueryClientFactory.class); + when(mockBigQueryClientFactory.getBigQueryReadClient()).thenReturn(client); + + ReadSessionCreatorConfig config = + new ReadSessionCreatorConfigBuilder() + .setEnableReadSessionCaching(false) + .setSnapshotTimeMillis(OptionalLong.of(1234567890L)) + .setViewsEnabled(true) + .build(); + ReadSessionCreator creator = + new ReadSessionCreator(config, bigQueryClient, mockBigQueryClientFactory); + ReadSessionResponse readSessionResponse = + creator.create(table.getTableId(), ImmutableList.of(), Optional.empty()); + assertThat(readSessionResponse).isNotNull(); + CreateReadSessionRequest createReadSessionRequest = + (CreateReadSessionRequest) mockBigQueryRead.getRequests().get(0); + assertThat(createReadSessionRequest.getReadSession().getTableModifiers()) + .isEqualTo(TableModifiers.newBuilder().build()); + } + private void testCacheMissScenario( ReadSessionCreator creator, String readSessionName,