Skip to content

Commit

Permalink
Add all_manifests metadata table to Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 3, 2024
1 parent 0023d31 commit db88992
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;

import java.util.List;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS;

public class AllManifestsTable
extends BaseSystemTable
{
public AllManifestsTable(SchemaTableName tableName, Table icebergTable)
{
super(requireNonNull(icebergTable, "icebergTable is null"),
new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("path", VARCHAR))
.add(new ColumnMetadata("length", BIGINT))
.add(new ColumnMetadata("partition_spec_id", INTEGER))
.add(new ColumnMetadata("added_snapshot_id", BIGINT))
.add(new ColumnMetadata("added_data_files_count", INTEGER))
.add(new ColumnMetadata("existing_data_files_count", INTEGER))
.add(new ColumnMetadata("deleted_data_files_count", INTEGER))
.add(new ColumnMetadata("partition_summaries", new ArrayType(RowType.rowType(
RowType.field("contains_null", BOOLEAN),
RowType.field("contains_nan", BOOLEAN),
RowType.field("lower_bound", VARCHAR),
RowType.field("upper_bound", VARCHAR)))))
.build()),
ALL_MANIFESTS);
}

@Override
protected void addRow(PageListBuilder pagesBuilder, Row row, TimeZoneKey timeZoneKey)
{
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(row.get("path", String.class));
pagesBuilder.appendBigint(row.get("length", Long.class));
pagesBuilder.appendInteger(row.get("partition_spec_id", Integer.class));
pagesBuilder.appendBigint(row.get("added_snapshot_id", Long.class));
pagesBuilder.appendInteger(row.get("added_data_files_count", Integer.class));
pagesBuilder.appendInteger(row.get("existing_data_files_count", Integer.class));
pagesBuilder.appendInteger(row.get("deleted_data_files_count", Integer.class));
//noinspection unchecked
appendPartitionSummaries((ArrayBlockBuilder) pagesBuilder.nextColumn(), row.get("partition_summaries", List.class));
pagesBuilder.endRow();
}

private static void appendPartitionSummaries(ArrayBlockBuilder arrayBuilder, List<StructLike> partitionSummaries)
{
arrayBuilder.buildEntry(elementBuilder -> {
for (StructLike partitionSummary : partitionSummaries) {
((RowBlockBuilder) elementBuilder).buildEntry(fieldBuilders -> {
BOOLEAN.writeBoolean(fieldBuilders.get(0), partitionSummary.get(0, Boolean.class)); // required contains_null
Boolean containsNan = partitionSummary.get(1, Boolean.class);
if (containsNan == null) {
// This usually occurs when reading from V1 table, where contains_nan is not populated.
fieldBuilders.get(1).appendNull();
}
else {
BOOLEAN.writeBoolean(fieldBuilders.get(1), containsNan);
}
VARCHAR.writeString(fieldBuilders.get(2), partitionSummary.get(2, String.class)); // optional lower_bound (human-readable)
VARCHAR.writeString(fieldBuilders.get(3), partitionSummary.get(3, String.class)); // optional upper_bound (human-readable)
});
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table));
case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table));
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table));
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum TableType
HISTORY,
METADATA_LOG_ENTRIES,
SNAPSHOTS,
ALL_MANIFESTS,
MANIFESTS,
PARTITIONS,
FILES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,45 @@ public void testSnapshotsTable()
assertQuery("SELECT summary['total-records'] FROM test_schema.\"test_table$snapshots\"", "VALUES '0', '3', '6'");
}

@Test
void testAllManifests()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_all_manifests", "AS SELECT 1 x")) {
assertThat(query("SHOW COLUMNS FROM \"" + table.getName() + "$all_manifests\""))
.skippingTypesCheck()
.matches("VALUES " +
"('path', 'varchar', '', '')," +
"('length', 'bigint', '', '')," +
"('partition_spec_id', 'integer', '', '')," +
"('added_snapshot_id', 'bigint', '', '')," +
"('added_data_files_count', 'integer', '', '')," +
"('existing_data_files_count', 'integer', '', '')," +
"('deleted_data_files_count', 'integer', '', '')," +
"('partition_summaries', 'array(row(contains_null boolean, contains_nan boolean, lower_bound varchar, upper_bound varchar))', '', '')");

assertThat((String) computeScalar("SELECT path FROM \"" + table.getName() + "$all_manifests\"")).endsWith("-m0.avro");
assertThat((Long) computeScalar("SELECT length FROM \"" + table.getName() + "$all_manifests\"")).isPositive();
assertThat((Integer) computeScalar("SELECT partition_spec_id FROM \"" + table.getName() + "$all_manifests\"")).isZero();
assertThat((Long) computeScalar("SELECT added_snapshot_id FROM \"" + table.getName() + "$all_manifests\"")).isPositive();
assertThat((Integer) computeScalar("SELECT added_data_files_count FROM \"" + table.getName() + "$all_manifests\"")).isEqualTo(1);
assertThat((Integer) computeScalar("SELECT existing_data_files_count FROM \"" + table.getName() + "$all_manifests\"")).isZero();
assertThat((Integer) computeScalar("SELECT deleted_data_files_count FROM \"" + table.getName() + "$all_manifests\"")).isZero();
assertThat((List<?>) computeScalar("SELECT partition_summaries FROM \"" + table.getName() + "$all_manifests\"")).isEmpty();

assertUpdate("DELETE FROM " + table.getName(), 1);
assertThat((Long) computeScalar("SELECT count(1) FROM \"" + table.getName() + "$all_manifests\"")).isEqualTo(2);
}
}

@Test
void testAllManifestsWithPartitionTable()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_all_manifests", "WITH (partitioning = ARRAY['dt']) AS SELECT 1 x, DATE '2021-01-01' dt")) {
assertThat(query("SELECT partition_summaries FROM \"" + table.getName() + "$all_manifests\""))
.matches("VALUES CAST(ARRAY[ROW(false, false, VARCHAR '2021-01-01', VARCHAR '2021-01-01')] AS array(row(contains_null boolean, contains_nan boolean, lower_bound varchar, upper_bound varchar)))");
}
}

@Test
public void testManifestsTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static io.trino.plugin.hive.metastore.MetastoreMethod.GET_TABLES;
import static io.trino.plugin.hive.metastore.MetastoreMethod.REPLACE_TABLE;
import static io.trino.plugin.iceberg.IcebergSessionProperties.COLLECT_EXTENDED_STATISTICS_ON_WRITE;
import static io.trino.plugin.iceberg.TableType.ALL_MANIFESTS;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TableType.FILES;
import static io.trino.plugin.iceberg.TableType.HISTORY;
Expand Down Expand Up @@ -313,6 +314,12 @@ public void testSelectSystemTable()
.addCopies(GET_TABLE, 1)
.build());

// select from $all_manifests
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$all_manifests\"",
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLE, 1)
.build());

// select from $manifests
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$manifests\"",
ImmutableMultiset.<MetastoreMethod>builder()
Expand Down Expand Up @@ -342,7 +349,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
}

@Test
Expand Down

0 comments on commit db88992

Please sign in to comment.