Skip to content

Commit

Permalink
[AMORO-3312]Separate the shared iceberg-worker-pool for planning and …
Browse files Browse the repository at this point in the history
…committing (#3313)

* [AMORO-3312]Separate the shared iceberg-worker-pool for planning and committing

* fix UT Error

* Modify configuration names

---------

Co-authored-by: Xavier Bai <xuba@apache.org>
  • Loading branch information
rfyu and XBaith authored Nov 18, 2024
1 parent 933c1ed commit 7b0d808
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ public class AmoroManagementConf {
"Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing "
+ "manifests in the base table implementation across all concurrent planning or commit operations.");

public static final ConfigOption<Integer> TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT =
ConfigOptions.key("self-optimizing.plan-manifest-io-thread-count")
.intType()
.defaultValue(10)
.withDescription(
"Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing "
+ "manifests in the base table implementation across all concurrent planning operations.");

public static final ConfigOption<Integer> TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT =
ConfigOptions.key("self-optimizing.commit-manifest-io-thread-count")
.intType()
.defaultValue(10)
.withDescription(
"Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing "
+ "manifests in the base table implementation across all concurrent commit operations.");

public static final ConfigOption<Long> REFRESH_EXTERNAL_CATALOGS_INTERVAL =
ConfigOptions.key("refresh-external-catalogs.interval")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.JacksonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.SystemProperties;
Expand Down Expand Up @@ -449,10 +450,20 @@ private Map<String, Object> initEnvConfig() {
private void setIcebergSystemProperties() {
int workerThreadPoolSize =
Math.max(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() / 2,
serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_THREAD_COUNT));
System.setProperty(
SystemProperties.WORKER_THREAD_POOL_SIZE_PROP, String.valueOf(workerThreadPoolSize));
int planningThreadPoolSize =
Math.max(
Runtime.getRuntime().availableProcessors() / 2,
serviceConfig.getInteger(
AmoroManagementConf.TABLE_MANIFEST_IO_PLANNING_THREAD_COUNT));
int commitThreadPoolSize =
Math.max(
Runtime.getRuntime().availableProcessors() / 2,
serviceConfig.getInteger(AmoroManagementConf.TABLE_MANIFEST_IO_COMMIT_THREAD_COUNT));
IcebergThreadPools.init(planningThreadPoolSize, commitThreadPoolSize);
}

private void initContainerConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.amoro.utils.TablePropertyUtil;
Expand Down Expand Up @@ -262,7 +263,8 @@ private void rewriteFiles(
return;
}

RewriteFiles rewriteFiles = transaction.newRewrite();
RewriteFiles rewriteFiles =
transaction.newRewrite().scanManifestsWith(IcebergThreadPools.getCommitExecutor());
if (targetSnapshotId != Constants.INVALID_SNAPSHOT_ID) {
long sequenceNumber = table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber();
rewriteFiles.validateFromSnapshot(targetSnapshotId).dataSequenceNumber(sequenceNumber);
Expand All @@ -282,7 +284,8 @@ private void rewriteFiles(
}

private void addDeleteFiles(Transaction transaction, Set<DeleteFile> addDeleteFiles) {
RowDelta rowDelta = transaction.newRowDelta();
RowDelta rowDelta =
transaction.newRowDelta().scanManifestsWith(IcebergThreadPools.getCommitExecutor());
addDeleteFiles.forEach(rowDelta::addDeletes);
rowDelta.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name());
rowDelta.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -42,7 +43,12 @@ public CloseableIterable<FileScanResult> scan() {
return CloseableIterable.empty();
}
return CloseableIterable.transform(
table.newScan().useSnapshot(snapshotId).filter(partitionFilter).planFiles(),
table
.newScan()
.planWith(IcebergThreadPools.getPlanningExecutor())
.useSnapshot(snapshotId)
.filter(partitionFilter)
.planFiles(),
this::buildFileScanResult);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.utils;

import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;

public class IcebergThreadPools {

private static final Logger LOG = LoggerFactory.getLogger(IcebergThreadPools.class);
private static volatile ExecutorService planningExecutor;
private static volatile ExecutorService commitExecutor;

public static void init(int planningThreadPoolSize, int commitThreadPoolSize) {
if (planningExecutor == null) {
synchronized (IcebergThreadPools.class) {
if (planningExecutor == null) {
planningExecutor =
ThreadPools.newWorkerPool("iceberg-planning-pool", planningThreadPoolSize);
}
}
}
if (commitExecutor == null) {
synchronized (IcebergThreadPools.class) {
if (commitExecutor == null) {
commitExecutor = ThreadPools.newWorkerPool("iceberg-commit-pool", commitThreadPoolSize);
}
}
}

LOG.info(
"init iceberg thread pool success, planningExecutor size:{},commitExecutor size:{}",
planningThreadPoolSize,
commitThreadPoolSize);
}

public static ExecutorService getPlanningExecutor() {
if (planningExecutor == null) {
synchronized (IcebergThreadPools.class) {
if (planningExecutor == null) {
planningExecutor =
ThreadPools.newWorkerPool(
"iceberg-planning-pool", Runtime.getRuntime().availableProcessors());
}
}
}
return planningExecutor;
}

public static ExecutorService getCommitExecutor() {
if (commitExecutor == null) {
synchronized (IcebergThreadPools.class) {
if (commitExecutor == null) {
commitExecutor =
ThreadPools.newWorkerPool(
"iceberg-commit-pool", Runtime.getRuntime().availableProcessors());
}
}
}
return commitExecutor;
}
}

0 comments on commit 7b0d808

Please sign in to comment.