Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class IcebergExecuteActionFactory {
public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
public static final String REWRITE_DATA_FILES = "rewrite_data_files";
public static final String PUBLISH_CHANGES = "publish_changes";
public static final String REWRITE_MANIFESTS = "rewrite_manifests";

/**
* Create an Iceberg-specific ExecuteAction instance.
Expand Down Expand Up @@ -84,6 +85,9 @@ public static ExecuteAction createAction(String actionType, Map<String, String>
case PUBLISH_CHANGES:
return new IcebergPublishChangesAction(properties, partitionNamesInfo,
whereCondition);
case REWRITE_MANIFESTS:
return new IcebergRewriteManifestsAction(properties, partitionNamesInfo,
whereCondition);
default:
throw new DdlException("Unsupported Iceberg procedure: " + actionType
+ ". Supported procedures: " + String.join(", ", getSupportedActions()));
Expand All @@ -104,7 +108,8 @@ public static String[] getSupportedActions() {
FAST_FORWARD,
EXPIRE_SNAPSHOTS,
REWRITE_DATA_FILES,
PUBLISH_CHANGES
PUBLISH_CHANGES,
REWRITE_MANIFESTS
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.rewrite.RewriteManifestExecutor;
import org.apache.doris.info.PartitionNamesInfo;
import org.apache.doris.nereids.trees.expressions.Expression;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Action for rewriting Iceberg manifest files to optimize metadata layout
*/
public class IcebergRewriteManifestsAction extends BaseIcebergAction {
private static final Logger LOG = LogManager.getLogger(IcebergRewriteManifestsAction.class);
public static final String SPEC_ID = "spec_id";

public IcebergRewriteManifestsAction(Map<String, String> properties,
Optional<PartitionNamesInfo> partitionNamesInfo,
Optional<Expression> whereCondition) {
super("rewrite_manifests", properties, partitionNamesInfo, whereCondition);
}

@Override
protected void registerIcebergArguments() {
namedArguments.registerOptionalArgument(SPEC_ID,
"Spec id of the manifests to rewrite (defaults to current spec id)",
null,
ArgumentParsers.intRange(SPEC_ID, 0, Integer.MAX_VALUE));
}

@Override
protected void validateIcebergAction() throws UserException {
validateNoPartitions();
validateNoWhereCondition();
}

@Override
protected List<String> executeAction(TableIf table) throws UserException {
try {
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
Snapshot current = icebergTable.currentSnapshot();
if (current == null) {
// No current snapshot means the table is empty, no manifests to rewrite
return Lists.newArrayList("0", "0");
}

// Get optional spec_id parameter
Integer specId = namedArguments.getInt(SPEC_ID);

// Execute rewrite operation
RewriteManifestExecutor executor = new RewriteManifestExecutor();
RewriteManifestExecutor.Result result = executor.execute(
icebergTable,
(ExternalTable) table,
specId);

return result.toStringList();
} catch (Exception e) {
LOG.warn("Failed to rewrite manifests for table: {}", table.getName(), e);
throw new UserException("Rewrite manifests failed: " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("rewritten_manifests_count", Type.INT, false,
"Number of manifests which were re-written by this command"),
new Column("added_manifests_count", Type.INT, false,
"Number of new manifest files which were written by this command")
);
}

@Override
public String getDescription() {
return "Rewrite Iceberg manifest files to optimize metadata layout";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.rewrite;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;

import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RewriteManifests;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
* Executor for manifest rewrite operations
*/
public class RewriteManifestExecutor {
private static final Logger LOG = LogManager.getLogger(RewriteManifestExecutor.class);

public static class Result {
private final int rewrittenCount;
private final int addedCount;

public Result(int rewrittenCount, int addedCount) {
this.rewrittenCount = rewrittenCount;
this.addedCount = addedCount;
}

public java.util.List<String> toStringList() {
return java.util.Arrays.asList(String.valueOf(rewrittenCount),
String.valueOf(addedCount));
}
}

/**
* Execute manifest rewrite using Iceberg RewriteManifests API
*/
public Result execute(Table table, ExternalTable extTable, Integer specId) throws UserException {
try {
// Get current snapshot and return early if table is empty
Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
return new Result(0, 0);
}

// Collect manifests before rewrite and filter by specId if provided
List<ManifestFile> manifestsBefore = currentSnapshot.dataManifests(table.io());
List<ManifestFile> manifestsBeforeTargeted = filterBySpecId(manifestsBefore, specId);

int rewrittenCount = manifestsBeforeTargeted.size();

if (rewrittenCount == 0) {
return new Result(0, 0);
}

// Configure rewrite operation, optionally restricting manifests by specId
RewriteManifests rm = table.rewriteManifests();

if (specId != null) {
final int targetSpecId = specId;
rm.rewriteIf(manifest -> manifest.partitionSpecId() == targetSpecId);
}

// Commit manifest rewrite
rm.commit();

// Refresh snapshot after rewrite
Snapshot snapshotAfter = table.currentSnapshot();
if (snapshotAfter == null) {
return new Result(rewrittenCount, 0);
}

// Collect manifests after rewrite and filter by specId
List<ManifestFile> manifestsAfter = snapshotAfter.dataManifests(table.io());
List<ManifestFile> manifestsAfterTargeted = filterBySpecId(manifestsAfter, specId);

// Compute addedCount as newly produced manifests (path not in before set)
java.util.Set<String> beforePaths = manifestsBeforeTargeted.stream()
.map(ManifestFile::path)
.collect(java.util.stream.Collectors.toSet());

int addedCount = (int) manifestsAfterTargeted.stream()
.map(ManifestFile::path)
.filter(path -> !beforePaths.contains(path))
.count();

// Invalidate table cache to ensure metadata is refreshed
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);

return new Result(rewrittenCount, addedCount);
} catch (Exception e) {
LOG.warn("Failed to execute manifest rewrite for table: {}", extTable.getName(), e);
throw new UserException("Failed to rewrite manifests: " + e.getMessage(), e);
}
}

private List<ManifestFile> filterBySpecId(List<ManifestFile> manifests, Integer specId) {
if (specId == null) {
return manifests;
}
final int targetSpecId = specId;
return manifests.stream()
.filter(manifest -> manifest.partitionSpecId() == targetSpecId)
.collect(java.util.stream.Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !before_basic_rewrite --
1 item1 electronics 100 2024-01-01
2 item2 electronics 200 2024-01-02
3 item3 books 300 2024-01-03
4 item4 books 400 2024-01-04
5 item5 clothing 500 2024-01-05
6 item6 clothing 600 2024-01-06
7 item7 electronics 700 2024-01-07
8 item8 electronics 800 2024-01-08

-- !after_basic_rewrite --
1 item1 electronics 100 2024-01-01
2 item2 electronics 200 2024-01-02
3 item3 books 300 2024-01-03
4 item4 books 400 2024-01-04
5 item5 clothing 500 2024-01-05
6 item6 clothing 600 2024-01-06
7 item7 electronics 700 2024-01-07
8 item8 electronics 800 2024-01-08

-- !before_partitioned_rewrite --
1 item1 100 2024 1
2 item2 200 2024 1
3 item3 300 2024 2
4 item4 400 2024 2
5 item5 500 2024 3
6 item6 600 2024 3

-- !after_partitioned_rewrite --
1 item1 100 2024 1
2 item2 200 2024 1
3 item3 300 2024 2
4 item4 400 2024 2
5 item5 500 2024 3
6 item6 600 2024 3

-- !after_spec_id_rewrite --
1 item1 100 2024 1 15
2 item2 200 2024 1 16
3 item3 300 2024 2 17
4 item4 400 2024 3 18
5 item5 500 2024 3 19

Loading
Loading