Skip to content

Commit

Permalink
parallel fetch matrialized view freshness in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Jan 17, 2025
1 parent 1b1359c commit 8f02e8c
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForIcebergMaterializedView {}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ public class IcebergMetadata
private final Predicate<String> allowedExtraProperties;
private final ExecutorService icebergScanExecutor;
private final Executor metadataFetchingExecutor;
private final Executor materializedViewExecutor;

private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();

Expand All @@ -443,7 +444,8 @@ public IcebergMetadata(
boolean addFilesProcedureEnabled,
Predicate<String> allowedExtraProperties,
ExecutorService icebergScanExecutor,
Executor metadataFetchingExecutor)
Executor metadataFetchingExecutor,
Executor materializedViewExecutor)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
Expand All @@ -456,6 +458,7 @@ public IcebergMetadata(
this.allowedExtraProperties = requireNonNull(allowedExtraProperties, "allowedExtraProperties is null");
this.icebergScanExecutor = requireNonNull(icebergScanExecutor, "icebergScanExecutor is null");
this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null");
this.materializedViewExecutor = requireNonNull(materializedViewExecutor, "materializedViewExecutor is null");
}

@Override
Expand Down Expand Up @@ -3684,49 +3687,34 @@ public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession s
}

boolean hasUnknownTables = false;
boolean hasStaleIcebergTables = false;
Optional<Long> firstTableChange = Optional.of(Long.MAX_VALUE);

Iterable<String> tableToSnapshotIds = Splitter.on(',').split(dependsOnTables);
for (String entry : tableToSnapshotIds) {
if (entry.equals(UNKNOWN_SNAPSHOT_TOKEN)) {
// This is a "federated" materialized view (spanning across connectors). Trust user's choice and assume "fresh or fresh enough".
ImmutableList.Builder<Callable<TableChangeInfo>> tableChangeInfoTasks = ImmutableList.builder();
for (String tableToSnapShot : Splitter.on(',').split(dependsOnTables)) {
if (tableToSnapShot.equals(UNKNOWN_SNAPSHOT_TOKEN)) {
hasUnknownTables = true;
firstTableChange = Optional.empty();
continue;
}
List<String> keyValue = Splitter.on("=").splitToList(entry);
if (keyValue.size() != 2) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid entry in '%s' property: %s'", DEPENDS_ON_TABLES, entry));
}
String tableName = keyValue.get(0);
String value = keyValue.get(1);
List<String> strings = Splitter.on(".").splitToList(tableName);
if (strings.size() == 3) {
strings = strings.subList(1, 3);
}
else if (strings.size() != 2) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid table name in '%s' property: %s'", DEPENDS_ON_TABLES, strings));
}
String schema = strings.get(0);
String name = strings.get(1);
SchemaTableName schemaTableName = new SchemaTableName(schema, name);
ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());

if (tableHandle == null || tableHandle instanceof CorruptedIcebergTableHandle) {
// Base table is gone or table is corrupted
return new MaterializedViewFreshness(STALE, Optional.empty());
}
Optional<Long> snapshotAtRefresh;
if (value.isEmpty()) {
snapshotAtRefresh = Optional.empty();
}
else {
snapshotAtRefresh = Optional.of(Long.parseLong(value));
}
switch (getTableChangeInfo(session, (IcebergTableHandle) tableHandle, snapshotAtRefresh)) {

tableChangeInfoTasks.add(() -> getTableChangeInfo(session, tableToSnapShot));
}

boolean hasStaleIcebergTables = false;
List<TableChangeInfo> tableChangeInfos;

try {
tableChangeInfos = processWithAdditionalThreads(tableChangeInfoTasks.build(), materializedViewExecutor);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}

verifyNotNull(tableChangeInfos);

for (TableChangeInfo tableChangeInfo : tableChangeInfos) {
switch (tableChangeInfo) {
case NoTableChange() -> {
// Fresh

}
case FirstChangeSnapshot(Snapshot snapshot) -> {
hasStaleIcebergTables = true;
Expand All @@ -3737,6 +3725,9 @@ case UnknownTableChange() -> {
hasStaleIcebergTables = true;
firstTableChange = Optional.empty();
}
case CorruptedTableChange() -> {
return new MaterializedViewFreshness(STALE, Optional.empty());
}
}
}

Expand All @@ -3752,6 +3743,40 @@ case UnknownTableChange() -> {
return new MaterializedViewFreshness(FRESH, Optional.empty());
}

private TableChangeInfo getTableChangeInfo(ConnectorSession session, String entry)
{
List<String> keyValue = Splitter.on("=").splitToList(entry);
if (keyValue.size() != 2) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid entry in '%s' property: %s'", DEPENDS_ON_TABLES, entry));
}
String tableName = keyValue.get(0);
String value = keyValue.get(1);
List<String> strings = Splitter.on(".").splitToList(tableName);
if (strings.size() == 3) {
strings = strings.subList(1, 3);
}
else if (strings.size() != 2) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Invalid table name in '%s' property: %s'", DEPENDS_ON_TABLES, strings));
}
String schema = strings.get(0);
String name = strings.get(1);
SchemaTableName schemaTableName = new SchemaTableName(schema, name);
ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty());

if (tableHandle == null || tableHandle instanceof CorruptedIcebergTableHandle) {
// Base table is gone or table is corrupted
return new CorruptedTableChange();
}
Optional<Long> snapshotAtRefresh;
if (value.isEmpty()) {
snapshotAtRefresh = Optional.empty();
}
else {
snapshotAtRefresh = Optional.of(Long.parseLong(value));
}
return getTableChangeInfo(session, (IcebergTableHandle) tableHandle, snapshotAtRefresh);
}

private TableChangeInfo getTableChangeInfo(ConnectorSession session, IcebergTableHandle table, Optional<Long> snapshotAtRefresh)
{
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
Expand Down Expand Up @@ -3866,7 +3891,7 @@ private static IcebergTableHandle checkValidTableHandle(ConnectorTableHandle tab
}

private sealed interface TableChangeInfo
permits NoTableChange, FirstChangeSnapshot, UnknownTableChange {}
permits NoTableChange, FirstChangeSnapshot, UnknownTableChange, CorruptedTableChange {}

private record NoTableChange()
implements TableChangeInfo {}
Expand All @@ -3883,6 +3908,9 @@ private record FirstChangeSnapshot(Snapshot snapshot)
private record UnknownTableChange()
implements TableChangeInfo {}

private record CorruptedTableChange()
implements TableChangeInfo {}

private static TableStatistics getIncrementally(
Map<IcebergTableHandle, AtomicReference<TableStatistics>> cache,
IcebergTableHandle key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class IcebergMetadataFactory
private final Predicate<String> allowedExtraProperties;
private final ExecutorService icebergScanExecutor;
private final Executor metadataFetchingExecutor;
private final Executor materializedViewExecutor;

@Inject
public IcebergMetadataFactory(
Expand All @@ -58,6 +59,7 @@ public IcebergMetadataFactory(
@RawHiveMetastoreFactory Optional<HiveMetastoreFactory> metastoreFactory,
@ForIcebergScanPlanning ExecutorService icebergScanExecutor,
@ForIcebergMetadata ExecutorService metadataExecutorService,
@ForIcebergMaterializedView ExecutorService materializedViewExecutor,
IcebergConfig config)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -82,6 +84,8 @@ public IcebergMetadataFactory(
else {
this.metadataFetchingExecutor = new BoundedExecutor(metadataExecutorService, config.getMetadataParallelism());
}

this.materializedViewExecutor = requireNonNull(materializedViewExecutor, "materializedViewExecutor is null");
}

public IcebergMetadata create(ConnectorIdentity identity)
Expand All @@ -97,6 +101,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
addFilesProcedureEnabled,
allowedExtraProperties,
icebergScanExecutor,
metadataFetchingExecutor);
metadataFetchingExecutor,
materializedViewExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void configure(Binder binder)
closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergMetadata.class));
closingBinder(binder).registerExecutor(Key.get(ListeningExecutorService.class, ForIcebergSplitManager.class));
closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergScanPlanning.class));
closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergMaterializedView.class));

binder.bind(IcebergConnector.class).in(Scopes.SINGLETON);
}
Expand All @@ -170,6 +171,14 @@ public ListeningExecutorService createSplitSourceExecutor(CatalogName catalogNam
return listeningDecorator(newCachedThreadPool(daemonThreadsNamed("iceberg-split-source-" + catalogName + "-%s")));
}

@Provides
@Singleton
@ForIcebergMaterializedView
public ListeningExecutorService createMaterializedViewExecutor(CatalogName catalogName)
{
return listeningDecorator(newCachedThreadPool(daemonThreadsNamed("iceberg-materialized-view-" + catalogName + "-%s")));
}

@Provides
@Singleton
@ForIcebergScanPlanning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void testNonLowercaseNamespace()
false,
_ -> false,
newDirectExecutorService(),
directExecutor(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public void testNonLowercaseGlueDatabase()
false,
_ -> false,
newDirectExecutorService(),
directExecutor(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)")
.isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void testNonLowercaseNamespace()
false,
_ -> false,
newDirectExecutorService(),
directExecutor(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void testNonLowercaseNamespace()
false,
_ -> false,
newDirectExecutorService(),
directExecutor(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public void testNonLowercaseNamespace()
false,
_ -> false,
newDirectExecutorService(),
directExecutor(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
Expand Down

0 comments on commit 8f02e8c

Please sign in to comment.