Skip to content

Commit

Permalink
[ARCTIC-946][AMS] Add balanced schedule policy for optimizer (#993)
Browse files Browse the repository at this point in the history
* Add balanced schedule policy for optimizer

* Fix timezone error of int96 predicate push down

* Fix timezone error of int96 predicate push down
  • Loading branch information
shidayang authored Jan 13, 2023
1 parent 1dfd723 commit afe12e5
Show file tree
Hide file tree
Showing 22 changed files with 539 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import com.netease.arctic.ams.server.utils.ThreadPool;
import com.netease.arctic.ams.server.utils.YamlUtils;
import com.netease.arctic.utils.ConfigurationFileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
Expand Down Expand Up @@ -332,7 +332,7 @@ private static void startMetaStoreThreads(
}
} catch (Throwable t1) {
LOG.error("Failure when starting the worker threads, compact、checker、clean may not happen, " +
StringUtils.stringifyException(t1));
org.apache.hadoop.util.StringUtils.stringifyException(t1));
} finally {
startLock.unlock();
}
Expand Down Expand Up @@ -618,6 +618,20 @@ private static void initOptimizeGroupConfig() throws MetaException, NoSuchObject
optimizeQueueMeta.name = optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_GROUP_NAME);
optimizeQueueMeta.container = optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_GROUP_CONTAINER);

//init schedule policy
String schedulePolicy =
StringUtils.trim(optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY));
if (StringUtils.isBlank(schedulePolicy)) {
schedulePolicy = ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY_QUOTA;
} else if (
!(ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY_QUOTA.equalsIgnoreCase(schedulePolicy) ||
ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY_BALANCED.equalsIgnoreCase(schedulePolicy))) {
throw new IllegalArgumentException(String.format("Scheduling policy only can be %s and %s",
ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY_QUOTA,
ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY_BALANCED));
}
optimizeQueueMeta.setSchedulingPolicy(schedulePolicy);

List<Container> containers = ServiceContainer.getOptimizeQueueService().getContainers();

boolean checkContainer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class ConfigFileProperties {
public static final String OPTIMIZE_GROUP_NAME = "name";
public static final String OPTIMIZE_GROUP_CONTAINER = "container";
public static final String OPTIMIZE_GROUP_PROPERTIES = "properties";
public static final String OPTIMIZE_SCHEDULING_POLICY = "scheduling_policy";
public static final String OPTIMIZE_SCHEDULING_POLICY_QUOTA = "quota";
public static final String OPTIMIZE_SCHEDULING_POLICY_BALANCED = "balanced";

public static final String OPTIMIZE_GROUP_PARALLELISM = "parallelism";
public static final String OPTIMIZE_GROUP_MEMORY = "memory";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;

import java.sql.Timestamp;
import java.util.List;

public interface OptimizeHistoryMapper {
Expand Down Expand Up @@ -86,6 +87,11 @@ public interface OptimizeHistoryMapper {
})
List<OptimizeHistory> selectOptimizeHistory(@Param("tableIdentifier") TableIdentifier tableIdentifier);

@Select("select max(commit_time) from " + TABLE_NAME + " where " +
"catalog_name = #{tableIdentifier.catalog} and db_name = #{tableIdentifier.database} " +
"and table_name = #{tableIdentifier.tableName}")
Timestamp latestCommitTime(@Param("tableIdentifier") TableIdentifier tableIdentifier);

@Delete("delete from " + TABLE_NAME + " where catalog_name = #{tableIdentifier.catalog} and " +
"db_name = #{tableIdentifier.database} and table_name = #{tableIdentifier.tableName}")
void deleteOptimizeRecord(@Param("tableIdentifier") TableIdentifier tableIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,30 @@
public interface OptimizeQueueMapper {
String TABLE_NAME = "optimize_group";

@Select("select group_id, name, properties, container from " + TABLE_NAME)
@Select("select group_id, name, scheduling_policy, properties, container from " + TABLE_NAME)
@Results({
@Result(property = "queueId", column = "group_id"),
@Result(property = "name", column = "name"),
@Result(property = "schedulingPolicy", column = "scheduling_policy"),
@Result(property = "properties", column = "properties", typeHandler = Map2StringConverter.class),
@Result(property = "container", column = "container")
})
List<OptimizeQueueMeta> selectOptimizeQueues();

@Select("select group_id, name, properties, container from " + TABLE_NAME + " where name = #{queueName}")
@Select("select group_id, name, scheduling_policy, properties, container from " + TABLE_NAME + " where name = " +
"#{queueName}")
@Results({
@Result(property = "queueId", column = "group_id"),
@Result(property = "name", column = "name"),
@Result(property = "schedulingPolicy", column = "scheduling_policy"),
@Result(property = "properties", column = "properties", typeHandler = Map2StringConverter.class),
@Result(property = "container", column = "container")
})
OptimizeQueueMeta selectOptimizeQueue(@Param("queueName") String queueName);

@Insert("insert into " + TABLE_NAME + " (name,properties,container) values " +
"(#{optimizeQueue.name}, #{optimizeQueue.properties, typeHandler=com.netease.arctic" +
@Insert("insert into " + TABLE_NAME + " (name,scheduling_policy,properties,container) values " +
"(#{optimizeQueue.name}, #{optimizeQueue.schedulingPolicy}, " +
"#{optimizeQueue.properties, typeHandler=com.netease.arctic" +
".ams.server.mybatis.Map2StringConverter}, #{optimizeQueue.container})")
void insertQueue(@Param("optimizeQueue") OptimizeQueueMeta optimizeQueue);

Expand All @@ -66,6 +70,7 @@ public interface OptimizeQueueMapper {
@Update("update " + TABLE_NAME + " set" +
" properties = #{optimizeQueue.properties, typeHandler=com.netease.arctic.ams.server.mybatis" +
".Map2StringConverter}," +
" scheduling_policy = #{optimizeQueue.schedulingPolicy}," +
" container = #{optimizeQueue.container}" +
" where group_id = #{optimizeQueue.queueId}")
void updateQueue(@Param("optimizeQueue") OptimizeQueueMeta optimizeQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class OptimizeQueueMeta {
public int queueId;
public String name;
public String container;
private String schedulingPolicy;
public Map<String, String> properties;

public int getQueueId() {
Expand All @@ -50,6 +51,14 @@ public void setContainer(String container) {
this.container = container;
}

public String getSchedulingPolicy() {
return schedulingPolicy;
}

public void setSchedulingPolicy(String schedulingPolicy) {
this.schedulingPolicy = schedulingPolicy;
}

public Map<String, String> getProperties() {
return properties;
}
Expand All @@ -64,6 +73,7 @@ public String toString() {
"queueId=" + queueId +
", name='" + name + '\'' +
", container='" + container + '\'' +
", schedulingPolicy='" + schedulingPolicy + '\'' +
", properties=" + properties +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public interface IOptimizeService {
*/
List<OptimizeHistory> getOptimizeHistory(TableIdentifier tableIdentifier);

/**
* Get the latest commit time of a table
* @param identifier
* @return
*/
Long getLatestCommitTime(TableIdentifier identifier);

/**
* Get max optimize history id.
* @return max optimize history id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -438,6 +439,20 @@ public List<OptimizeHistory> getOptimizeHistory(TableIdentifier identifier) {
}
}

@Override
public Long getLatestCommitTime(TableIdentifier identifier) {
try (SqlSession sqlSession = getSqlSession(true)) {
OptimizeHistoryMapper optimizeHistoryMapper =
getMapper(sqlSession, OptimizeHistoryMapper.class);

Timestamp latestCommitTime = optimizeHistoryMapper.latestCommitTime(identifier);
if (latestCommitTime == null) {
return 0L;
}
return latestCommitTime.getTime();
}
}

@Override
public long maxOptimizeHistoryId() {
try (SqlSession sqlSession = getSqlSession(true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ public boolean apply(@Nullable Long snapshotId) {
}
};

/**
* -1: not initialized
* 0: not committed
*/
private volatile long latestCommitTime = -1L;

public TableOptimizeItem(ArcticTable arcticTable, TableMetadata tableMetadata) {
this.arcticTable = arcticTable;
this.metaRefreshTime = -1;
Expand Down Expand Up @@ -253,6 +259,13 @@ public double getQuotaCache() {
return quotaCache;
}

public long getLatestCommitTime() {
if (latestCommitTime == -1L) {
latestCommitTime = ServiceContainer.getOptimizeService().getLatestCommitTime(tableIdentifier);
}
return latestCommitTime;
}

public String getGroupNameCache() {
return groupNameCache;
}
Expand Down Expand Up @@ -697,6 +710,9 @@ private void optimizeTasksCommitted(BaseOptimizeCommit optimizeCommit,
// persist optimize task history
OptimizeHistory record = buildOptimizeRecord(tasks, commitTime);
optimizeHistoryMapper.insertOptimizeHistory(record);

// update the latest commit time in memory
latestCommitTime = Math.max(latestCommitTime, commitTime);
} catch (Throwable t) {
LOG.warn("failed to persist optimize history after commit, ignore. " + getTableIdentifier(), t);
sqlSession.rollback(true);
Expand Down Expand Up @@ -823,17 +839,15 @@ public void clearOptimizeTasks() {
* GetOptimizeTasksToExecute
* include Init, Failed.
*
* @param maxCnt - max task cnt to pool
* @return List of OptimizeTaskItem
*/
public List<OptimizeTaskItem> getOptimizeTasksToExecute(int maxCnt) {
public List<OptimizeTaskItem> getOptimizeTasksToExecute() {
// lock for conflict with add new tasks, because files with be removed from OptimizeTask after tasks added
tasksLock.lock();
try {
return optimizeTasks.values().stream()
.filter(taskItem -> taskItem.canExecute(this::optimizeMaxRetry))
.sorted(Comparator.comparingLong(o -> o.getOptimizeTask().getCreateTime()))
.limit(maxCnt)
.collect(Collectors.toList());
} finally {
tasksLock.unlock();
Expand Down
Loading

0 comments on commit afe12e5

Please sign in to comment.