From 7b0d8085eaab8865e695d11f1da117374533825f Mon Sep 17 00:00:00 2001 From: rfyu <39233058+rfyu@users.noreply.github.com> Date: Mon, 18 Nov 2024 19:12:45 +0800 Subject: [PATCH 1/2] [AMORO-3312]Separate the shared iceberg-worker-pool for planning and committing (#3313) * [AMORO-3312]Separate the shared iceberg-worker-pool for planning and committing * fix UT Error * Modify configuration names --------- Co-authored-by: Xavier Bai --- .../amoro/server/AmoroManagementConf.java | 16 ++++ .../amoro/server/AmoroServiceContainer.java | 13 ++- .../server/optimizing/UnKeyedTableCommit.java | 7 +- .../scan/IcebergTableFileScanHelper.java | 8 +- .../amoro/utils/IcebergThreadPools.java | 81 +++++++++++++++++++ 5 files changed, 121 insertions(+), 4 deletions(-) create mode 100644 amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index f52a0113f2..c21a2ca1b4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -59,6 +59,22 @@ public class AmoroManagementConf { "Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing " + "manifests in the base table implementation across all concurrent planning or commit operations."); + public static final ConfigOption TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT = + ConfigOptions.key("self-optimizing.plan-manifest-io-thread-count") + .intType() + .defaultValue(10) + .withDescription( + "Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing " + + "manifests in the base table implementation across all concurrent planning operations."); + + public static final ConfigOption TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT = + ConfigOptions.key("self-optimizing.commit-manifest-io-thread-count") + .intType() + .defaultValue(10) + .withDescription( + "Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing " + + "manifests in the base table implementation across all concurrent commit operations."); + public static final ConfigOption REFRESH_EXTERNAL_CATALOGS_INTERVAL = ConfigOptions.key("refresh-external-catalogs.interval") .longType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 3d70c0acbf..400fc62354 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -59,6 +59,7 @@ import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport; +import org.apache.amoro.utils.IcebergThreadPools; import org.apache.amoro.utils.JacksonUtil; import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.SystemProperties; @@ -449,10 +450,20 @@ private Map initEnvConfig() { private void setIcebergSystemProperties() { int workerThreadPoolSize = Math.max( - Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors() / 2, serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_THREAD_COUNT)); System.setProperty( SystemProperties.WORKER_THREAD_POOL_SIZE_PROP, String.valueOf(workerThreadPoolSize)); + int planningThreadPoolSize = + Math.max( + Runtime.getRuntime().availableProcessors() / 2, + serviceConfig.getInteger( + AmoroManagementConf.TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT)); + int commitThreadPoolSize = + Math.max( + Runtime.getRuntime().availableProcessors() / 2, + serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT)); + IcebergThreadPools.init(planningThreadPoolSize, commitThreadPoolSize); } private void initContainerConfig() { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java index 957aa3762a..dbeb37a582 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java @@ -39,6 +39,7 @@ import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.UnkeyedTable; import org.apache.amoro.utils.ContentFiles; +import org.apache.amoro.utils.IcebergThreadPools; import org.apache.amoro.utils.MixedTableUtil; import org.apache.amoro.utils.TableFileUtil; import org.apache.amoro.utils.TablePropertyUtil; @@ -262,7 +263,8 @@ private void rewriteFiles( return; } - RewriteFiles rewriteFiles = transaction.newRewrite(); + RewriteFiles rewriteFiles = + transaction.newRewrite().scanManifestsWith(IcebergThreadPools.getCommitExecutor()); if (targetSnapshotId != Constants.INVALID_SNAPSHOT_ID) { long sequenceNumber = table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber(); rewriteFiles.validateFromSnapshot(targetSnapshotId).dataSequenceNumber(sequenceNumber); @@ -282,7 +284,8 @@ private void rewriteFiles( } private void addDeleteFiles(Transaction transaction, Set addDeleteFiles) { - RowDelta rowDelta = transaction.newRowDelta(); + RowDelta rowDelta = + transaction.newRowDelta().scanManifestsWith(IcebergThreadPools.getCommitExecutor()); addDeleteFiles.forEach(rowDelta::addDeletes); rowDelta.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); rowDelta.commit(); diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java index 2f9c7ec307..856a89ebc1 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/scan/IcebergTableFileScanHelper.java @@ -20,6 +20,7 @@ import org.apache.amoro.iceberg.Constants; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.utils.IcebergThreadPools; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; @@ -42,7 +43,12 @@ public CloseableIterable scan() { return CloseableIterable.empty(); } return CloseableIterable.transform( - table.newScan().useSnapshot(snapshotId).filter(partitionFilter).planFiles(), + table + .newScan() + .planWith(IcebergThreadPools.getPlanningExecutor()) + .useSnapshot(snapshotId) + .filter(partitionFilter) + .planFiles(), this::buildFileScanResult); } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java new file mode 100644 index 0000000000..1c19968bc0 --- /dev/null +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/IcebergThreadPools.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.utils; + +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; + +public class IcebergThreadPools { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergThreadPools.class); + private static volatile ExecutorService planningExecutor; + private static volatile ExecutorService commitExecutor; + + public static void init(int planningThreadPoolSize, int commitThreadPoolSize) { + if (planningExecutor == null) { + synchronized (IcebergThreadPools.class) { + if (planningExecutor == null) { + planningExecutor = + ThreadPools.newWorkerPool("iceberg-planning-pool", planningThreadPoolSize); + } + } + } + if (commitExecutor == null) { + synchronized (IcebergThreadPools.class) { + if (commitExecutor == null) { + commitExecutor = ThreadPools.newWorkerPool("iceberg-commit-pool", commitThreadPoolSize); + } + } + } + + LOG.info( + "init iceberg thread pool success, planningExecutor size:{},commitExecutor size:{}", + planningThreadPoolSize, + commitThreadPoolSize); + } + + public static ExecutorService getPlanningExecutor() { + if (planningExecutor == null) { + synchronized (IcebergThreadPools.class) { + if (planningExecutor == null) { + planningExecutor = + ThreadPools.newWorkerPool( + "iceberg-planning-pool", Runtime.getRuntime().availableProcessors()); + } + } + } + return planningExecutor; + } + + public static ExecutorService getCommitExecutor() { + if (commitExecutor == null) { + synchronized (IcebergThreadPools.class) { + if (commitExecutor == null) { + commitExecutor = + ThreadPools.newWorkerPool( + "iceberg-commit-pool", Runtime.getRuntime().availableProcessors()); + } + } + } + return commitExecutor; + } +} From 243d2894c6bbba59b74be97a1ea93f935f0bd9ad Mon Sep 17 00:00:00 2001 From: Congxian Qiu Date: Wed, 20 Nov 2024 14:07:36 +0800 Subject: [PATCH 2/2] [AMORO-3332] Add homepage info and maillist in the README (#3334) --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 878d90aa06..d5bf8506aa 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,8 @@ Apache Amoro (incubating) is a Lakehouse management system built on open data la Working with compute engines including Flink, Spark, and Trino, Amoro brings pluggable and self-managed features for Lakehouse to provide out-of-the-box data warehouse experience, and helps data platforms or products easily build infra-decoupled, stream-and-batch-fused and lake-native architecture. +Learn more about Amoro at https://amoro.apache.org/, contact the developers and community on the [mailing list](https://amoro.apache.org/join-community/#mailing-lists) if you need any help. + ## Architecture Here is the architecture diagram of Amoro: