Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional
return HudiUtils.getHudiMvccSnapshot(tableSnapshot, this);
} else if (getDlaType() == DLAType.ICEBERG) {
return new IcebergMvccSnapshot(
IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, this, scanParams));
IcebergUtils.getSnapshotCacheValue(tableSnapshot, this, scanParams));
} else {
return new EmptyMvccSnapshot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
Expand Down Expand Up @@ -52,9 +51,7 @@ public IcebergDlaTable(HMSExternalTable table) {

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return Maps.newHashMap(
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable)
.getPartitionInfo().getNameToPartitionItem());
return Maps.newHashMap(IcebergUtils.getIcebergPartitionItems(snapshot, hmsTable));
}

@Override
Expand All @@ -69,19 +66,13 @@ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
hmsTable,
snapshotValue.getSnapshot().getSchemaId());
return schemaValue.getPartitionColumns();
return IcebergUtils.getIcebergPartitionColumns(snapshot, hmsTable);
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
// If partition snapshot ID is unavailable (<= 0), fallback to table snapshot ID
// This can happen when last_updated_snapshot_id is null in Iceberg metadata
Expand All @@ -102,16 +93,14 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException {
hmsTable.makeSureInitialized();
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}

@Override
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
hmsTable.makeSureInitialized();
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second";
public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND = "iceberg.snapshot.meta.cache.ttl-second";
public static final String ICEBERG_MANIFEST_CACHE_ENABLE = "iceberg.manifest.cache.enable";
public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = "iceberg.manifest.cache.capacity-mb";
public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = "iceberg.manifest.cache.ttl-second";
Expand Down Expand Up @@ -97,15 +96,6 @@ public void checkProperties() throws DdlException {
+ tableMetaCacheTtlSecond);
}

// check iceberg.snapshot.meta.cache.ttl-second parameter
String partitionCacheTtlSecond = catalogProperty.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
if (Objects.nonNull(partitionCacheTtlSecond) && NumberUtils.toInt(partitionCacheTtlSecond, CACHE_NO_TTL)
< CACHE_TTL_DISABLE_CACHE) {
throw new DdlException(
"The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND + " is wrong, value is "
+ partitionCacheTtlSecond);
}

String manifestCacheEnable = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
if (Objects.nonNull(manifestCacheEnable)
&& !(manifestCacheEnable.equalsIgnoreCase("true") || manifestCacheEnable.equalsIgnoreCase("false"))) {
Expand Down Expand Up @@ -135,8 +125,7 @@ public void checkProperties() throws DdlException {
public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
super.notifyPropertiesUpdated(updatedProps);
String tableMetaCacheTtl = updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
String snapshotMetaCacheTtl = updatedProps.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
if (Objects.nonNull(tableMetaCacheTtl) || Objects.nonNull(snapshotMetaCacheTtl)) {
if (Objects.nonNull(tableMetaCacheTtl)) {
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
}
String manifestCacheEnable = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,12 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return Maps.newHashMap(
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this)
.getPartitionInfo().getNameToPartitionItem());
return Maps.newHashMap(IcebergUtils.getIcebergPartitionItems(snapshot, this));
}

@Override
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this)
.getPartitionInfo().getNameToPartitionItem();
return IcebergUtils.getIcebergPartitionItems(snapshot, this);
}

@Override
Expand All @@ -158,18 +155,13 @@ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) thro

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
this, snapshotValue.getSnapshot().getSchemaId());
return schemaValue.getPartitionColumns();
return IcebergUtils.getIcebergPartitionColumns(snapshot, this);
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, this);
long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
// If partition snapshot ID is unavailable (<= 0), fallback to table snapshot ID
// This can happen when last_updated_snapshot_id is null in Iceberg metadata
Expand All @@ -195,13 +187,13 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<Mvcc
@Override
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
makeSureInitialized();
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
IcebergSnapshotCacheValue snapshotValue = IcebergUtils.getSnapshotCacheValue(snapshot, this);
return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}

@Override
public long getNewestUpdateVersionOrTime() {
return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), this, Optional.empty())
return IcebergUtils.getLatestSnapshotCacheValue(this)
.getPartitionInfo().getNameToIcebergPartition().values().stream()
.mapToLong(IcebergPartition::getLastUpdateTime).max().orElse(0);
}
Expand Down Expand Up @@ -256,7 +248,7 @@ public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional
if (isView()) {
return new EmptyMvccSnapshot();
} else {
return new IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue(
return new IcebergMvccSnapshot(IcebergUtils.getSnapshotCacheValue(
tableSnapshot, this, scanParams));
}
}
Expand Down
Loading
Loading