Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2385] Make the maximum input file size for per optimize thread configurable #2387

Merged
merged 8 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.netease.arctic.ams.api;

public class PropertyNames {
public class OptimizerProperties {

// Resource properties
public static final String RESOURCE_ID = "resource-id";
public static final String EXPORT_PROPERTY_PREFIX = "export.";

public static final String AMS_OPTIMIZER_URI = "ams-optimizing-uri";
public static final String AMS_HOME = "ams-home";

// Resource container properties
public static final String EXPORT_PROPERTY_PREFIX = "export.";

// Resource group properties
public static final String OPTIMIZER_EXECUTION_PARALLEL = "execution-parallel";
public static final String OPTIMIZER_MEMORY_SIZE = "memory-size";
public static final String OPTIMIZER_GROUP_NAME = "group-name";
Expand All @@ -15,4 +19,6 @@ public class PropertyNames {
public static final boolean OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT = false;
public static final String OPTIMIZER_DISK_STORAGE_PATH = "disk-storage-path";
public static final String OPTIMIZER_MEMORY_STORAGE_SIZE = "memory-storage-size";
public static final String MAX_INPUT_FILE_SIZE_PER_THREAD = "max-input-file-size-per-thread";
public static final Long MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT = 512 * 1024 * 1024L; // 512MB
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.ams.api.OptimizerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -21,7 +21,7 @@ public Optimizer(OptimizerConfig config) {
IntStream.range(0, config.getExecutionParallel())
.forEach(i -> executors[i] = new OptimizerExecutor(config, i));
if (config.getResourceId() != null) {
toucher.withRegisterProperty(PropertyNames.RESOURCE_ID, config.getResourceId());
toucher.withRegisterProperty(OptimizerProperties.RESOURCE_ID, config.getResourceId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.ams.api.OptimizerProperties;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
Expand All @@ -31,14 +31,14 @@ public class OptimizerConfig implements Serializable {

@Option(
name = "-a",
aliases = "--" + PropertyNames.AMS_OPTIMIZER_URI,
aliases = "--" + OptimizerProperties.AMS_OPTIMIZER_URI,
usage = "The ams url",
required = true)
private String amsUrl;

@Option(
name = "-p",
aliases = "--" + PropertyNames.OPTIMIZER_EXECUTION_PARALLEL,
aliases = "--" + OptimizerProperties.OPTIMIZER_EXECUTION_PARALLEL,
usage = "Optimizer execution parallel",
required = true)
private int executionParallel;
Expand All @@ -47,42 +47,42 @@ public class OptimizerConfig implements Serializable {
@Deprecated
@Option(
name = "-m",
aliases = "--" + PropertyNames.OPTIMIZER_MEMORY_SIZE,
aliases = "--" + OptimizerProperties.OPTIMIZER_MEMORY_SIZE,
usage = "Optimizer memory size(MB)")
private int memorySize;

@Option(
name = "-g",
aliases = "--" + PropertyNames.OPTIMIZER_GROUP_NAME,
aliases = "--" + OptimizerProperties.OPTIMIZER_GROUP_NAME,
usage = "Group name optimizer belong",
required = true)
private String groupName;

@Option(
name = "-hb",
aliases = "--" + PropertyNames.OPTIMIZER_HEART_BEAT_INTERVAL,
aliases = "--" + OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL,
usage = "Heart beat interval with ams(ms), default 10s")
private long heartBeat = 10000; // 10 s

@Option(
name = "-eds",
aliases = "--" + PropertyNames.OPTIMIZER_EXTEND_DISK_STORAGE,
aliases = "--" + OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE,
usage = "Whether extend storage to disk, default false")
private boolean extendDiskStorage = false;

@Option(
name = "-dsp",
aliases = "--" + PropertyNames.OPTIMIZER_DISK_STORAGE_PATH,
aliases = "--" + OptimizerProperties.OPTIMIZER_DISK_STORAGE_PATH,
usage = "Disk storage path")
private String diskStoragePath;

@Option(
name = "-msz",
aliases = "--" + PropertyNames.OPTIMIZER_MEMORY_STORAGE_SIZE,
aliases = "--" + OptimizerProperties.OPTIMIZER_MEMORY_STORAGE_SIZE,
usage = "Memory storage size limit when extending disk storage(MB), default 512MB")
private long memoryStorageSize = 512; // 512 M

@Option(name = "-id", aliases = "--" + PropertyNames.RESOURCE_ID, usage = "Resource id")
@Option(name = "-id", aliases = "--" + OptimizerProperties.RESOURCE_ID, usage = "Resource id")
private String resourceId;

public OptimizerConfig() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.netease.arctic.ams.api.ArcticTableMetastore;
import com.netease.arctic.ams.api.Constants;
import com.netease.arctic.ams.api.Environments;
import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.OptimizingService;
import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.server.dashboard.DashboardServer;
import com.netease.arctic.server.dashboard.response.ErrorResponse;
import com.netease.arctic.server.dashboard.utils.AmsUtil;
Expand Down Expand Up @@ -503,9 +503,9 @@ private void initContainerConfig() {
containerConfig.getObject(ArcticManagementConf.CONTAINER_PROPERTIES, Map.class));
}
// put properties in config.yaml first.
containerProperties.put(PropertyNames.AMS_HOME, Environments.getHomePath());
containerProperties.put(OptimizerProperties.AMS_HOME, Environments.getHomePath());
containerProperties.putIfAbsent(
PropertyNames.AMS_OPTIMIZER_URI,
OptimizerProperties.AMS_OPTIMIZER_URI,
AmsUtil.getAMSThriftAddress(serviceConfig, Constants.THRIFT_OPTIMIZING_SERVICE_NAME));
// put addition system properties
container.setProperties(containerProperties);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.netease.arctic.server.manager;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.resource.Resource;
import com.netease.arctic.ams.api.resource.ResourceContainer;
import com.netease.arctic.ams.api.resource.ResourceStatus;
Expand Down Expand Up @@ -28,14 +28,14 @@
public void init(String name, Map<String, String> containerProperties) {
this.containerName = name;
this.containerProperties = containerProperties;
this.amsHome = containerProperties.get(PropertyNames.AMS_HOME);
this.amsOptimizingUrl = containerProperties.get(PropertyNames.AMS_OPTIMIZER_URI);
this.amsHome = containerProperties.get(OptimizerProperties.AMS_HOME);
this.amsOptimizingUrl = containerProperties.get(OptimizerProperties.AMS_OPTIMIZER_URI);
Preconditions.checkNotNull(
this.amsHome, "Container Property: %s is required", PropertyNames.AMS_HOME);
this.amsHome, "Container Property: %s is required", OptimizerProperties.AMS_HOME);
Preconditions.checkNotNull(
this.amsOptimizingUrl,
"Container Property: %s is required",
PropertyNames.AMS_OPTIMIZER_URI);
OptimizerProperties.AMS_OPTIMIZER_URI);
}

@Override
Expand All @@ -47,7 +47,8 @@
protected abstract Map<String, String> doScaleOut(Resource resource);

protected String getOptimizingUri(Map<String, String> resourceProperties) {
String optimizingUrl = resourceProperties.getOrDefault(PropertyNames.AMS_OPTIMIZER_URI, null);
String optimizingUrl =
resourceProperties.getOrDefault(OptimizerProperties.AMS_OPTIMIZER_URI, null);

Check warning on line 51 in ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java#L50-L51

Added lines #L50 - L51 were not covered by tests
if (StringUtils.isNotEmpty(optimizingUrl)) {
return optimizingUrl;
}
Expand All @@ -67,22 +68,23 @@
.append(resource.getThreadCount())
.append(" -g ")
.append(resource.getGroupName());
if (resource.getProperties().containsKey(PropertyNames.OPTIMIZER_HEART_BEAT_INTERVAL)) {
if (resource.getProperties().containsKey(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL)) {
stringBuilder
.append(" -hb ")
.append(resource.getProperties().get(PropertyNames.OPTIMIZER_HEART_BEAT_INTERVAL));
.append(resource.getProperties().get(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL));

Check warning on line 74 in ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java#L74

Added line #L74 was not covered by tests
}
if (org.apache.iceberg.util.PropertyUtil.propertyAsBoolean(
resource.getProperties(),
PropertyNames.OPTIMIZER_EXTEND_DISK_STORAGE,
PropertyNames.OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT)) {
OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE,
OptimizerProperties.OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT)) {
stringBuilder
.append(" -eds -dsp ")
.append(resource.getRequiredProperty(PropertyNames.OPTIMIZER_DISK_STORAGE_PATH));
if (resource.getProperties().containsKey(PropertyNames.OPTIMIZER_MEMORY_STORAGE_SIZE)) {
.append(resource.getRequiredProperty(OptimizerProperties.OPTIMIZER_DISK_STORAGE_PATH));

Check warning on line 82 in ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java#L82

Added line #L82 was not covered by tests
if (resource.getProperties().containsKey(OptimizerProperties.OPTIMIZER_MEMORY_STORAGE_SIZE)) {
stringBuilder
.append(" -msz ")
.append(resource.getProperties().get(PropertyNames.OPTIMIZER_MEMORY_STORAGE_SIZE));
.append(
resource.getProperties().get(OptimizerProperties.OPTIMIZER_MEMORY_STORAGE_SIZE));

Check warning on line 87 in ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java#L86-L87

Added lines #L86 - L87 were not covered by tests
}
}
if (StringUtils.isNotEmpty(resource.getResourceId())) {
Expand All @@ -95,9 +97,9 @@
List<String> cmds = new ArrayList<>();
if (containerProperties != null) {
for (Map.Entry<String, String> entry : containerProperties.entrySet()) {
if (entry.getKey().startsWith(PropertyNames.EXPORT_PROPERTY_PREFIX)) {
if (entry.getKey().startsWith(OptimizerProperties.EXPORT_PROPERTY_PREFIX)) {
String exportPropertyName =
entry.getKey().substring(PropertyNames.EXPORT_PROPERTY_PREFIX.length());
entry.getKey().substring(OptimizerProperties.EXPORT_PROPERTY_PREFIX.length());

Check warning on line 102 in ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/manager/AbstractResourceContainer.java#L102

Added line #L102 was not covered by tests
String exportValue = entry.getValue();
cmds.add(String.format("export %s=%s", exportPropertyName, exportValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.netease.arctic.server.manager;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.resource.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -380,7 +380,8 @@

private String getFlinkConfDir() {
String flinkConfDir =
getContainerProperties().get(PropertyNames.EXPORT_PROPERTY_PREFIX + ENV_FLINK_CONF_DIR);
getContainerProperties()
.get(OptimizerProperties.EXPORT_PROPERTY_PREFIX + ENV_FLINK_CONF_DIR);

Check warning on line 384 in ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java#L383-L384

Added lines #L383 - L384 were not covered by tests
if (StringUtils.isNotEmpty(flinkConfDir)) {
return flinkConfDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.server.optimizing;

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.OptimizingTaskId;
import com.netease.arctic.ams.api.resource.ResourceGroup;
import com.netease.arctic.optimizing.RewriteFilesInput;
Expand All @@ -36,6 +37,7 @@
import com.netease.arctic.server.table.TableRuntimeMeta;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.utils.ArcticDataFiles;
import com.netease.arctic.utils.CompatiblePropertyUtil;
import com.netease.arctic.utils.ExceptionUtil;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -248,7 +250,10 @@ private TableOptimizingProcess planInternal(TableRuntime tableRuntime) {
AmoroTable<?> table = tableManager.loadTable(tableRuntime.getTableIdentifier());
OptimizingPlanner planner =
new OptimizingPlanner(
tableRuntime.refresh(table), (ArcticTable) table.originalTable(), getAvailableCore());
tableRuntime.refresh(table),
(ArcticTable) table.originalTable(),
getAvailableCore(),
maxInputSizePerThread());
if (planner.isNecessary()) {
return new TableOptimizingProcess(planner);
} else {
Expand Down Expand Up @@ -301,6 +306,13 @@ private double getAvailableCore() {
return Math.max(quotaProvider.getTotalQuota(optimizerGroup.getName()), 1);
}

private long maxInputSizePerThread() {
return CompatiblePropertyUtil.propertyAsLong(
optimizerGroup.getProperties(),
OptimizerProperties.MAX_INPUT_FILE_SIZE_PER_THREAD,
OptimizerProperties.MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT);
}

@VisibleForTesting
SchedulingPolicy getSchedulingPolicy() {
return scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
public class OptimizingPlanner extends OptimizingEvaluator {
private static final Logger LOG = LoggerFactory.getLogger(OptimizingPlanner.class);

private static final long MAX_INPUT_FILE_SIZE_PER_THREAD = 512 * 1024 * 1024; // 512MB

private final TableFileScanHelper.PartitionFilter partitionFilter;

protected long processId;
Expand All @@ -52,8 +50,13 @@ public class OptimizingPlanner extends OptimizingEvaluator {
private List<TaskDescriptor> tasks;

private List<AbstractPartitionPlan> actualPartitionPlans;
private final long maxInputSizePerThread;

public OptimizingPlanner(TableRuntime tableRuntime, ArcticTable table, double availableCore) {
public OptimizingPlanner(
TableRuntime tableRuntime,
ArcticTable table,
double availableCore,
long maxInputSizePerThread) {
super(tableRuntime, table);
this.partitionFilter =
tableRuntime.getPendingInput() == null
Expand All @@ -63,6 +66,7 @@ public OptimizingPlanner(TableRuntime tableRuntime, ArcticTable table, double av
this.planTime = System.currentTimeMillis();
this.processId = Math.max(tableRuntime.getNewestProcessId() + 1, planTime);
this.partitionPlannerFactory = new PartitionPlannerFactory(arcticTable, tableRuntime, planTime);
this.maxInputSizePerThread = maxInputSizePerThread;
}

@Override
Expand Down Expand Up @@ -128,7 +132,7 @@ public List<TaskDescriptor> planTasks() {
// prioritize partitions with high cost to avoid starvation
evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder()));

double maxInputSize = MAX_INPUT_FILE_SIZE_PER_THREAD * availableCore;
double maxInputSize = maxInputSizePerThread * availableCore;
actualPartitionPlans = Lists.newArrayList();
long actualInputSize = 0;
for (PartitionEvaluator evaluator : evaluators) {
Expand Down Expand Up @@ -157,12 +161,14 @@ public List<TaskDescriptor> planTasks() {
}
long endTime = System.nanoTime();
LOG.info(
"{} finish plan, type = {}, get {} tasks, cost {} ns, {} ms",
"{} finish plan, type = {}, get {} tasks, cost {} ns, {} ms maxInputSize {} actualInputSize {}",
tableRuntime.getTableIdentifier(),
getOptimizingType(),
tasks.size(),
endTime - startTime,
(endTime - startTime) / 1_000_000);
(endTime - startTime) / 1_000_000,
maxInputSize,
actualInputSize);
return cacheAndReturnTasks(tasks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.netease.arctic.server.manager;

import com.netease.arctic.ams.api.PropertyNames;
import com.netease.arctic.ams.api.OptimizerProperties;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -32,9 +32,9 @@ public class TestFlinkOptimizerContainer {
Map<String, String> containerProperties = Maps.newHashMap();

public TestFlinkOptimizerContainer() {
containerProperties.put(PropertyNames.AMS_HOME, "/home/ams");
containerProperties.put(OptimizerProperties.AMS_HOME, "/home/ams");
containerProperties.put(FlinkOptimizerContainer.FLINK_HOME_PROPERTY, "/home/ams");
containerProperties.put(PropertyNames.AMS_OPTIMIZER_URI, "thrift://127.0.0.1:1261");
containerProperties.put(OptimizerProperties.AMS_OPTIMIZER_URI, "thrift://127.0.0.1:1261");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT;
import static com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE;

import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.hive.optimizing.MixFormatRewriteExecutor;
import com.netease.arctic.optimizing.IcebergRewriteExecutor;
Expand Down Expand Up @@ -182,7 +183,11 @@ private OptimizingPlanner planner() {
Mockito.when(tableRuntime.getOptimizingConfig()).thenAnswer(f -> optimizingConfig());
Mockito.when(tableRuntime.getTableIdentifier())
.thenReturn(ServerTableIdentifier.of(1L, "a", "b", "c", table.format()));
return new OptimizingPlanner(tableRuntime, table, availableCore);
return new OptimizingPlanner(
tableRuntime,
table,
availableCore,
OptimizerProperties.MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT);
}

private OptimizingConfig optimizingConfig() {
Expand Down
Loading