Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3279] Get the total result when retrieve the optimizing tables from db #3283

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.BadRequestException;

Expand All @@ -52,6 +55,8 @@

/** The controller that handles optimizer requests. */
public class OptimizerGroupController {
private static final Logger LOG = LoggerFactory.getLogger(OptimizerGroupController.class);

private static final String ALL_GROUP = "all";
private final TableService tableService;
private final DefaultOptimizingService optimizerManager;
Expand Down Expand Up @@ -82,25 +87,40 @@ public void getOptimizerTables(Context ctx) {

String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
List<Integer> statusCodes = new ArrayList<>(actionFilter.size());
for (String action : actionFilter) {
OptimizingStatus status = OptimizingStatus.ofDisplayValue(action);
if (status == null) {
LOG.warn("Can't find optimizer status for action:{}, skip it.", action);
} else {
statusCodes.add(status.getCode());
}
}

// use null to mark the filter as null filter
if (statusCodes.isEmpty()) {
statusCodes = null;
}
Pair<List<TableRuntimeMeta>, Integer> tableRuntimeBeans =
tableService.getTableRuntimes(
optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);
optimizerGroupUsedInDbFilter,
dbFilterStr,
tableFilterStr,
statusCodes,
pageSize,
offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
tableRuntimeBeans.getLeft().stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.filter(
tableRuntime ->
actionFilter.isEmpty()
|| actionFilter.contains(tableRuntime.getOptimizingStatus().displayValue()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(
tableRuntimes.stream()
.map(OptimizingUtil::buildTableOptimizeInfo)
.collect(Collectors.toList()),
tableService.listRuntimes(dbFilterStr, tableFilterStr).size());
tableRuntimeBeans.getRight());
ctx.json(OkResponse.of(amsPageResult));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ public static OptimizingStatus ofCode(int code) {
}
return null;
}

public static OptimizingStatus ofDisplayValue(String displayValue) {
for (OptimizingStatus status : values()) {
if (status.displayValue.equals(displayValue)) {
return status;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,12 @@ List<ServerTableIdentifier> selectTableIdentifiersByCatalog(
+ "<if test='fuzzyDbName != null and (isPostgreSQL or isDerby)'> AND db_name like '%' || #{fuzzyDbName, jdbcType=VARCHAR} || '%' </if>"
+ "<if test='fuzzyTableName != null and isMySQL'> AND table_name like CONCAT('%', #{fuzzyTableName, jdbcType=VARCHAR}, '%') </if>"
+ "<if test='fuzzyTableName != null and (isPostgreSQL or isDerby)'> AND table_name like '%' || #{fuzzyTableName, jdbcType=VARCHAR} || '%' </if>"
+ "<if test='statusCodeFilter != null and statusCodeFilter.size() > 0'>"
+ "AND optimizing_status_code IN ("
+ "<foreach item='item' collection='statusCodeFilter' separator=','>"
+ "#{item}"
+ "</foreach> ) </if>"
+ "ORDER BY optimizing_status_code, optimizing_status_start_time DESC "
+ "<if test='isMySQL or isPostgreSQL'> LIMIT #{limitCount} OFFSET #{offsetNum} </if>"
+ "<if test='isDerby'> OFFSET #{offsetNum} ROWS FETCH FIRST #{limitCount} ROWS ONLY </if>"
+ "</script>")
@Results({
@Result(property = "tableId", column = "table_id"),
Expand Down Expand Up @@ -523,6 +526,7 @@ List<TableRuntimeMeta> selectTableRuntimesForOptimizerGroup(
@Param("optimizerGroup") String optimizerGroup,
@Param("fuzzyDbName") String fuzzyDbName,
@Param("fuzzyTableName") String fuzzyTableName,
@Param("limitCount") int limitCount,
@Param("offsetNum") int offset);
@Param("statusCodeFilter") List<Integer> statusCodeFilter,
@Param("pageNum") int pageNum,
@Param("pageSize") int pageSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.amoro.server.table;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.NoSuchTableException;
import org.apache.amoro.ServerTableIdentifier;
Expand Down Expand Up @@ -56,15 +59,14 @@
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.utils.TablePropertyUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -344,18 +346,35 @@ public List<Blocker> getBlockers(TableIdentifier tableIdentifier) {
}

@Override
public List<TableRuntimeMeta> getTableRuntimes(
public Pair<List<TableRuntimeMeta>, Integer> getTableRuntimes(
klion26 marked this conversation as resolved.
Show resolved Hide resolved
String optimizerGroup,
@Nullable String fuzzyDbName,
@Nullable String fuzzyTableName,
@Nullable List<Integer> statusCodeFilters,
int limit,
int offset) {
checkStarted();
return getAs(
TableMetaMapper.class,
mapper ->
mapper.selectTableRuntimesForOptimizerGroup(
optimizerGroup, fuzzyDbName, fuzzyTableName, limit, offset));

// page helper is 1-based
int pageNumber = (offset / limit) + 1;

try (Page<?> ignore = PageHelper.startPage(pageNumber, limit, true)) {
int total = 0;
List<TableRuntimeMeta> ret =
getAs(
TableMetaMapper.class,
mapper ->
mapper.selectTableRuntimesForOptimizerGroup(
optimizerGroup,
fuzzyDbName,
fuzzyTableName,
statusCodeFilters,
limit,
offset));
PageInfo<TableRuntimeMeta> pageInfo = new PageInfo<>(ret);
total = (int) pageInfo.getTotal();
return Pair.of(ret, total);
}
}

public InternalCatalog getInternalCatalog(String catalogName) {
Expand Down Expand Up @@ -473,33 +492,6 @@ public TableRuntime getRuntime(Long tableId) {
return tableRuntimeMap.get(tableId);
}

public Map<Long, TableRuntime> listRuntimes(
@Nullable String dbFilter, @Nullable String tableFilter) {
checkStarted();
// no filter, will return all the table runtime.
if (dbFilter == null && tableFilter == null) {
return Collections.unmodifiableMap(tableRuntimeMap);
}

Map<Long, TableRuntime> filteredRuntimes = new HashMap<>();
for (Map.Entry<Long, TableRuntime> entry : tableRuntimeMap.entrySet()) {
ServerTableIdentifier identifier = entry.getValue().getTableIdentifier();
// skip the runtime which fails the db filter.
if (dbFilter != null && !identifier.getDatabase().contains(dbFilter)) {
continue;
}

// skip the runtime which fails the table filter.
if (tableFilter != null && !identifier.getTableName().contains(tableFilter)) {
continue;
}

filteredRuntimes.put(entry.getKey(), entry.getValue());
}

return filteredRuntimes;
}

@Override
public boolean contains(Long tableId) {
checkStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;

import javax.annotation.Nullable;

import java.util.Map;

public interface TableManager extends TableRuntimeHandler {

/**
Expand All @@ -37,9 +33,6 @@ public interface TableManager extends TableRuntimeHandler {

TableRuntime getRuntime(Long tableId);

/** Return the table runtimes associated to the given filter. */
Map<Long, TableRuntime> listRuntimes(@Nullable String dbFilter, @Nullable String tableFilter);

default boolean contains(Long tableId) {
return getRuntime(tableId) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.server.catalog.CatalogService;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.commons.lang3.tuple.Pair;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -100,13 +101,18 @@ Blocker block(
* @param fuzzyDbName the fuzzy db name used to filter the result, will be null if no filter set.
* @param fuzzyTableName the fuzzy table name used to filter the result, will be null if no filter
* set.
* @param statusCodeFilters the status code used to filter the result, wil be null if no filter
* set.
* @param limit How many entries we want to retrieve.
* @param offset The entries we'll skip when retrieving the entries.
* @return A pair with the first entry is the actual list under the filters with the offset and
* limit, and second value will be the number of total entries under the filters.
*/
List<TableRuntimeMeta> getTableRuntimes(
Pair<List<TableRuntimeMeta>, Integer> getTableRuntimes(
String optimizerGroup,
@Nullable String fuzzyDbName,
@Nullable String fuzzyTableName,
@Nullable List<Integer> statusCodeFilters,
int limit,
int offset);
}
Loading
Loading