Skip to content

Commit

Permalink
Merge remote-tracking branch 'amoro/master' into AMORO-2842
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Sep 24, 2024
2 parents d816d41 + 3f627ec commit 7b27e16
Show file tree
Hide file tree
Showing 36 changed files with 553 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -215,7 +214,7 @@ public void dispose() {
MetricManager.dispose();
}

private void initConfig() throws IOException {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
}
Expand Down Expand Up @@ -407,14 +406,14 @@ private class ConfigurationHelper {

private JsonNode yamlConfig;

public void init() throws IOException {
public void init() throws Exception {
Map<String, Object> envConfig = initEnvConfig();
initServiceConfig(envConfig);
setIcebergSystemProperties();
initContainerConfig();
}

private void initServiceConfig(Map<String, Object> envConfig) throws IOException {
private void initServiceConfig(Map<String, Object> envConfig) throws Exception {
LOG.info("initializing service configuration...");
String configPath = Environments.getConfigPath() + "/" + SERVER_CONFIG_FILENAME;
LOG.info("load config from path: {}", configPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,24 +120,24 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntimeMeta>> groupToTableRuntimes =
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup));
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntimeMeta> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
group,
this,
planExecutor,
Optional.ofNullable(tableRuntimeMetas).orElseGet(ArrayList::new),
Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
maxPlanningParallelism);
optimizingQueueByGroup.put(groupName, optimizingQueue);
});
Expand Down Expand Up @@ -456,9 +455,9 @@ public void handleTableRemoved(TableRuntime tableRuntime) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeMetaList);
loadOptimizingQueues(tableRuntimeList);
optimizerKeeper.start();
LOG.info("SuspendingDetector for Optimizer has been started.");
LOG.info("OptimizerManagementService initializing has completed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri
long totalTableSize = 0L;

// table size
List<MetricKey> metricKeys = metricDefineMap.get(TABLE_SUMMARY_TOTAL_FILES_SIZE);
List<MetricKey> metricKeys =
metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES_SIZE, ImmutableList.of());
for (MetricKey metricKey : metricKeys) {
String tableName = fullTableName(metricKey);
allCatalogs.add(catalog(metricKey));
Expand All @@ -197,7 +198,7 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri
}

// file count
metricKeys = metricDefineMap.get(TABLE_SUMMARY_TOTAL_FILES);
metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES, ImmutableList.of());
for (MetricKey metricKey : metricKeys) {
String tableName = fullTableName(metricKey);
allCatalogs.add(catalog(metricKey));
Expand All @@ -209,7 +210,7 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri
}

// health score
metricKeys = metricDefineMap.get(TABLE_SUMMARY_HEALTH_SCORE);
metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_HEALTH_SCORE, ImmutableList.of());
for (MetricKey metricKey : metricKeys) {
String tableName = fullTableName(metricKey);
allCatalogs.add(catalog(metricKey));
Expand All @@ -221,7 +222,7 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri

this.totalDataSize.set(totalTableSize);
this.totalCatalog.set(allCatalogs.size());
this.totalTableCount.set(metricKeys.size());
this.totalTableCount.set(topTableItemMap.size());
this.allTopTableItem = new ArrayList<>(topTableItemMap.values());
addAndCheck(new OverviewDataSizeItem(ts, totalTableSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.amoro.server.dashboard.controller;

import io.javalin.http.Context;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
Expand All @@ -30,13 +29,13 @@
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;

import javax.ws.rs.BadRequestException;

Expand Down Expand Up @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) {
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

List<TableRuntime> tableRuntimes = new ArrayList<>();
List<ServerTableIdentifier> tables = tableService.listManagedTables();
for (ServerTableIdentifier identifier : tables) {
TableRuntime tableRuntime = tableService.getRuntime(identifier);
if (tableRuntime == null) {
continue;
}
if ((ALL_GROUP.equals(optimizerGroup)
|| tableRuntime.getOptimizerGroup().equals(optimizerGroup))
&& (StringUtils.isEmpty(dbFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr))
&& (StringUtils.isEmpty(tableFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) {
tableRuntimes.add(tableRuntime);
}
}
tableRuntimes.sort(
(o1, o2) -> {
// first we compare the status , and then we compare the start time when status are equal;
int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus());
// status order is asc, startTime order is desc
if (statDiff == 0) {
long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime();
return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1;
} else {
return statDiff;
}
});
String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(
optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
ctx.json(OkResponse.of(amsPageResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier()));
if (serverTableIdentifier.isPresent()) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get());
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId());
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary();
if (tableRuntimeSummary != null) {
Expand Down Expand Up @@ -659,7 +659,9 @@ public void cancelOptimizingProcess(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, db, table).buildTableIdentifier());
TableRuntime tableRuntime =
serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null;
serverTableIdentifier != null
? tableService.getRuntime(serverTableIdentifier.getId())
: null;

Preconditions.checkArgument(
tableRuntime != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +92,7 @@ public OptimizingQueue(
ResourceGroup optimizerGroup,
QuotaProvider quotaProvider,
Executor planExecutor,
List<TableRuntimeMeta> tableRuntimeMetaList,
List<TableRuntime> tableRuntimeList,
int maxPlanningParallelism) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
Expand All @@ -106,14 +105,12 @@ public OptimizingQueue(
new OptimizerGroupMetrics(
optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this);
this.metrics.register();
tableRuntimeMetaList.forEach(this::initTableRuntime);
tableRuntimeList.forEach(this::initTableRuntime);
}

private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime();
if (tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntimeMeta.getOptimizingProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta));
private void initTableRuntime(TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing() && tableRuntime.getProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntime));
}

if (tableRuntime.isOptimizingEnabled()) {
Expand All @@ -122,7 +119,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
scheduler.addTable(tableRuntime);
} else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta));
tableQueue.offer(new TableOptimizingProcess(tableRuntime));
}
} else {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
Expand Down Expand Up @@ -387,21 +384,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) {
beginAndPersistProcess();
}

public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) {
processId = tableRuntimeMeta.getOptimizingProcessId();
tableRuntime = tableRuntimeMeta.getTableRuntime();
optimizingType = tableRuntimeMeta.getOptimizingType();
targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId();
planTime = tableRuntimeMeta.getPlanTime();
if (tableRuntimeMeta.getFromSequence() != null) {
fromSequence = tableRuntimeMeta.getFromSequence();
public TableOptimizingProcess(TableRuntime tableRuntime) {
processId = tableRuntime.getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = tableRuntime.getOptimizingType();
targetSnapshotId = tableRuntime.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId();
planTime = tableRuntime.getLastPlanTime();
if (tableRuntime.getFromSequence() != null) {
fromSequence = tableRuntime.getFromSequence();
}
if (tableRuntimeMeta.getToSequence() != null) {
toSequence = tableRuntimeMeta.getToSequence();
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
tableRuntimeMeta.getTableRuntime().recover(this);
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
package org.apache.amoro.server.optimizing;

public enum OptimizingStatus {
FULL_OPTIMIZING("full", true),
MAJOR_OPTIMIZING("major", true),
MINOR_OPTIMIZING("minor", true),
COMMITTING("committing", true),
PLANNING("planning", false),
PENDING("pending", false),
IDLE("idle", false);
FULL_OPTIMIZING("full", true, 100),
MAJOR_OPTIMIZING("major", true, 200),
MINOR_OPTIMIZING("minor", true, 300),
COMMITTING("committing", true, 400),
PLANNING("planning", false, 500),
PENDING("pending", false, 600),
IDLE("idle", false, 700);
private final String displayValue;

private final boolean isProcessing;

OptimizingStatus(String displayValue, boolean isProcessing) {
private final int code;

OptimizingStatus(String displayValue, boolean isProcessing, int code) {
this.displayValue = displayValue;
this.isProcessing = isProcessing;
this.code = code;
}

public boolean isProcessing() {
Expand All @@ -42,4 +45,17 @@ public boolean isProcessing() {
public String displayValue() {
return displayValue;
}

public int getCode() {
return code;
}

public static OptimizingStatus ofCode(int code) {
for (OptimizingStatus status : values()) {
if (status.getCode() == code) {
return status;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public List<TaskDescriptor> planTasks() {
return cacheAndReturnTasks(Collections.emptyList());
}

List<PartitionEvaluator> evaluators = new ArrayList<>(partitionPlanMap.values());
List<PartitionEvaluator> evaluators = new ArrayList<>(needOptimizingPlanMap.values());
// prioritize partitions with high cost to avoid starvation
evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
import org.apache.ibatis.jdbc.ScriptRunner;
import org.apache.ibatis.mapping.DatabaseIdProvider;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.mapping.VendorDatabaseIdProvider;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
Expand All @@ -53,8 +55,10 @@
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Properties;

public class SqlSessionFactoryProvider {
private static final Logger LOG = LoggerFactory.getLogger(SqlSessionFactoryProvider.class);
Expand All @@ -73,7 +77,7 @@ public static SqlSessionFactoryProvider getInstance() {

private volatile SqlSessionFactory sqlSessionFactory;

public void init(Configurations config) {
public void init(Configurations config) throws SQLException {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL));
dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME));
Expand Down Expand Up @@ -111,6 +115,14 @@ public void init(Configurations config) {
configuration.addMapper(PlatformFileMapper.class);
configuration.addMapper(ResourceMapper.class);
configuration.addMapper(TableBlockerMapper.class);

DatabaseIdProvider provider = new VendorDatabaseIdProvider();
Properties properties = new Properties();
properties.setProperty("MySQL", "mysql");
properties.setProperty("PostgreSQL", "postgres");
properties.setProperty("Derby", "derby");
provider.setProperties(properties);
configuration.setDatabaseId(provider.getDatabaseId(dataSource));
if (sqlSessionFactory == null) {
synchronized (this) {
if (sqlSessionFactory == null) {
Expand Down
Loading

0 comments on commit 7b27e16

Please sign in to comment.