Skip to content

Commit

Permalink
Pass more information to LowMemoryKiller
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed May 12, 2022
1 parent c3566b5 commit d8a965a
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
Expand All @@ -28,10 +29,13 @@
import io.trino.execution.LocationFactory;
import io.trino.execution.QueryExecution;
import io.trino.execution.QueryIdGenerator;
import io.trino.execution.QueryInfo;
import io.trino.execution.StageInfo;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.memory.LowMemoryKiller.ForQueryLowMemoryKiller;
import io.trino.memory.LowMemoryKiller.ForTaskLowMemoryKiller;
import io.trino.memory.LowMemoryKiller.QueryMemoryInfo;
import io.trino.memory.LowMemoryKiller.RunningQueryInfo;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.operator.RetryPolicy;
Expand Down Expand Up @@ -251,7 +255,7 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli

private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
{
List<QueryMemoryInfo> queryMemoryInfoList = Streams.stream(runningQueries)
List<RunningQueryInfo> runningQueryInfos = Streams.stream(runningQueries)
.map(this::createQueryMemoryInfo)
.collect(toImmutableList());

Expand All @@ -263,7 +267,7 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)

for (LowMemoryKiller lowMemoryKiller : lowMemoryKillers) {
List<MemoryInfo> nodeMemoryInfos = ImmutableList.copyOf(nodeMemoryInfosByNode.values());
Optional<KillTarget> killTarget = lowMemoryKiller.chooseTargetToKill(queryMemoryInfoList, nodeMemoryInfos);
Optional<KillTarget> killTarget = lowMemoryKiller.chooseTargetToKill(runningQueryInfos, nodeMemoryInfos);

if (killTarget.isPresent()) {
if (killTarget.get().isWholeQuery()) {
Expand Down Expand Up @@ -414,14 +418,28 @@ private boolean isClusterOutOfMemory()
return pool.getBlockedNodes() > 0;
}

private QueryMemoryInfo createQueryMemoryInfo(QueryExecution query)
private RunningQueryInfo createQueryMemoryInfo(QueryExecution query)
{
return new QueryMemoryInfo(
QueryInfo queryInfo = query.getQueryInfo();
ImmutableMap.Builder<TaskId, TaskInfo> taskInfosBuilder = ImmutableMap.builder();
queryInfo.getOutputStage().ifPresent(stage -> getTaskInfos(stage, taskInfosBuilder));
return new RunningQueryInfo(
query.getQueryId(),
query.getTotalMemoryReservation().toBytes(),
taskInfosBuilder.buildOrThrow(),
getRetryPolicy(query.getSession()));
}

private void getTaskInfos(StageInfo stageInfo, ImmutableMap.Builder<TaskId, TaskInfo> taskInfosBuilder)
{
for (TaskInfo taskInfo : stageInfo.getTasks()) {
taskInfosBuilder.put(taskInfo.getTaskStatus().getTaskId(), taskInfo);
}
for (StageInfo subStage : stageInfo.getSubStages()) {
getTaskInfos(subStage, taskInfosBuilder);
}
}

private long getQueryMemoryReservation(QueryExecution query)
{
return query.getTotalMemoryReservation().toBytes();
Expand Down
23 changes: 20 additions & 3 deletions core/trino-main/src/main/java/io/trino/memory/LowMemoryKiller.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package io.trino.memory;

import com.google.common.collect.ImmutableMap;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;

Expand All @@ -22,6 +25,7 @@
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -33,18 +37,25 @@

public interface LowMemoryKiller
{
Optional<KillTarget> chooseTargetToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes);
Optional<KillTarget> chooseTargetToKill(List<RunningQueryInfo> runningQueries, List<MemoryInfo> nodes);

class QueryMemoryInfo
class RunningQueryInfo
{
private final QueryId queryId;
private final long memoryReservation;
private final Map<TaskId, TaskInfo> taskInfos;
private final RetryPolicy retryPolicy;

public QueryMemoryInfo(QueryId queryId, long memoryReservation, RetryPolicy retryPolicy)
public RunningQueryInfo(
QueryId queryId,
long memoryReservation,
Map<TaskId, TaskInfo> taskInfos,
RetryPolicy retryPolicy)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.memoryReservation = memoryReservation;
requireNonNull(taskInfos, "taskInfos is null");
this.taskInfos = ImmutableMap.copyOf(taskInfos);
this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null");
}

Expand All @@ -58,6 +69,11 @@ public long getMemoryReservation()
return memoryReservation;
}

public Map<TaskId, TaskInfo> getTaskInfos()
{
return taskInfos;
}

public RetryPolicy getRetryPolicy()
{
return retryPolicy;
Expand All @@ -69,6 +85,7 @@ public String toString()
return toStringHelper(this)
.add("queryId", queryId)
.add("memoryReservation", memoryReservation)
.add("taskStats", taskInfos)
.add("retryPolicy", retryPolicy)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class NoneLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<KillTarget> chooseTargetToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
public Optional<KillTarget> chooseTargetToKill(List<RunningQueryInfo> runningQueries, List<MemoryInfo> nodes)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ public class TotalReservationLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<KillTarget> chooseTargetToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
public Optional<KillTarget> chooseTargetToKill(List<RunningQueryInfo> runningQueries, List<MemoryInfo> nodes)
{
Optional<QueryId> biggestQuery = Optional.empty();
long maxMemory = 0;
for (QueryMemoryInfo query : runningQueries) {
for (RunningQueryInfo query : runningQueries) {
long bytesUsed = query.getMemoryReservation();
if (bytesUsed > maxMemory) {
biggestQuery = Optional.of(query.getQueryId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class TotalReservationOnBlockedNodesQueryLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<KillTarget> chooseTargetToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
public Optional<KillTarget> chooseTargetToKill(List<RunningQueryInfo> runningQueries, List<MemoryInfo> nodes)
{
Map<QueryId, QueryMemoryInfo> queriesById = Maps.uniqueIndex(runningQueries, QueryMemoryInfo::getQueryId);
Map<QueryId, RunningQueryInfo> queriesById = Maps.uniqueIndex(runningQueries, RunningQueryInfo::getQueryId);
Map<QueryId, Long> memoryReservationOnBlockedNodes = new HashMap<>();
for (MemoryInfo node : nodes) {
MemoryPoolInfo memoryPool = node.getPool();
Expand All @@ -44,7 +44,7 @@ public Optional<KillTarget> chooseTargetToKill(List<QueryMemoryInfo> runningQuer
}
Map<QueryId, Long> queryMemoryReservations = memoryPool.getQueryMemoryReservations();
queryMemoryReservations.forEach((queryId, memoryReservation) -> {
QueryMemoryInfo queryMemoryInfo = queriesById.get(queryId);
RunningQueryInfo queryMemoryInfo = queriesById.get(queryId);
if (queryMemoryInfo != null && queryMemoryInfo.getRetryPolicy() == RetryPolicy.TASK) {
// Do not kill whole queries which run with task retries enabled
// Most of the time if query with task retries enabled is a root cause of cluster out-of-memory error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public class TotalReservationOnBlockedNodesTaskLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<KillTarget> chooseTargetToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
public Optional<KillTarget> chooseTargetToKill(List<RunningQueryInfo> runningQueries, List<MemoryInfo> nodes)
{
Set<QueryId> queriesWithTaskRetryPolicy = runningQueries.stream()
.filter(query -> query.getRetryPolicy() == RetryPolicy.TASK)
.map(QueryMemoryInfo::getQueryId)
.map(RunningQueryInfo::getQueryId)
.collect(toImmutableSet());

if (queriesWithTaskRetryPolicy.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,23 @@ static TaskId taskId(String query, int partition)
return new TaskId(new StageId(QueryId.valueOf(query), 0), partition, 0);
}

static List<LowMemoryKiller.QueryMemoryInfo> toQueryMemoryInfoList(Map<String, Map<String, Long>> queries)
static List<LowMemoryKiller.RunningQueryInfo> toRunningQueryInfoList(Map<String, Map<String, Long>> queries)
{
return toQueryMemoryInfoList(queries, ImmutableSet.of());
return toRunningQueryInfoList(queries, ImmutableSet.of());
}

static List<LowMemoryKiller.QueryMemoryInfo> toQueryMemoryInfoList(Map<String, Map<String, Long>> queries, Set<String> queriesWithTaskLevelRetries)
static List<LowMemoryKiller.RunningQueryInfo> toRunningQueryInfoList(Map<String, Map<String, Long>> queries, Set<String> queriesWithTaskLevelRetries)
{
ImmutableList.Builder<LowMemoryKiller.QueryMemoryInfo> result = ImmutableList.builder();
ImmutableList.Builder<LowMemoryKiller.RunningQueryInfo> result = ImmutableList.builder();
for (Map.Entry<String, Map<String, Long>> entry : queries.entrySet()) {
String queryId = entry.getKey();
long totalReservation = entry.getValue().values().stream()
.mapToLong(x -> x)
.sum();
result.add(new LowMemoryKiller.QueryMemoryInfo(
result.add(new LowMemoryKiller.RunningQueryInfo(
new QueryId(queryId),
totalReservation,
ImmutableMap.of(),
queriesWithTaskLevelRetries.contains(queryId) ? RetryPolicy.TASK : RetryPolicy.NONE));
}
return result.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Optional;

import static io.trino.memory.LowMemoryKillerTestingUtils.toNodeMemoryInfoList;
import static io.trino.memory.LowMemoryKillerTestingUtils.toQueryMemoryInfoList;
import static io.trino.memory.LowMemoryKillerTestingUtils.toRunningQueryInfoList;
import static io.trino.testing.assertions.Assert.assertEquals;

public class TestTotalReservationLowMemoryKiller
Expand All @@ -38,7 +38,7 @@ public void testMemoryPoolHasNoReservation()
.buildOrThrow();
assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries)),
Optional.empty());
}
Expand All @@ -56,7 +56,7 @@ public void testSkewedQuery()
.buildOrThrow();
assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries)),
Optional.of(KillTarget.wholeQuery(new QueryId("q_2"))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Optional;

import static io.trino.memory.LowMemoryKillerTestingUtils.toNodeMemoryInfoList;
import static io.trino.memory.LowMemoryKillerTestingUtils.toQueryMemoryInfoList;
import static io.trino.memory.LowMemoryKillerTestingUtils.toRunningQueryInfoList;
import static io.trino.testing.assertions.Assert.assertEquals;

public class TestTotalReservationOnBlockedNodesQueryLowMemoryKiller
Expand All @@ -40,7 +40,7 @@ public void testMemoryPoolHasNoReservation()

assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries)),
Optional.empty());
}
Expand All @@ -55,7 +55,7 @@ public void testMemoryPoolNotBlocked()
.buildOrThrow();
assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries)),
Optional.empty());
}
Expand All @@ -73,7 +73,7 @@ public void testSkewedQuery()
.buildOrThrow();
assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries)),
Optional.of(KillTarget.wholeQuery(new QueryId("q_1"))));
}
Expand Down Expand Up @@ -101,7 +101,7 @@ public void testWillNotKillWholeQueryWithTaskRetries()

assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries, ImmutableSet.of("q_2")),
toRunningQueryInfoList(queries, ImmutableSet.of("q_2")),
toNodeMemoryInfoList(memoryPool, queries, tasks)),
Optional.of(KillTarget.wholeQuery(new QueryId("q_1"))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import static io.trino.memory.LowMemoryKillerTestingUtils.taskId;
import static io.trino.memory.LowMemoryKillerTestingUtils.toNodeMemoryInfoList;
import static io.trino.memory.LowMemoryKillerTestingUtils.toQueryMemoryInfoList;
import static io.trino.memory.LowMemoryKillerTestingUtils.toRunningQueryInfoList;
import static io.trino.testing.assertions.Assert.assertEquals;

public class TestTotalReservationOnBlockedNodesTaskLowMemoryKiller
Expand All @@ -40,7 +40,7 @@ public void testMemoryPoolHasNoReservation()

assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries)),
Optional.empty());
}
Expand All @@ -55,7 +55,7 @@ public void testMemoryPoolNotBlocked()
.buildOrThrow();
assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries)),
Optional.empty());
}
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testWillNotKillTaskForQueryWithoutTaskRetriesEnabled()

assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries),
toRunningQueryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries, tasks)),
Optional.empty());
}
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testPreferKillingTasks()

assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries, ImmutableSet.of("q_2")),
toRunningQueryInfoList(queries, ImmutableSet.of("q_2")),
toNodeMemoryInfoList(memoryPool, queries, tasks)),
Optional.of(KillTarget.selectedTasks(
ImmutableSet.of(
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testKillsBiggestTasks()

assertEquals(
lowMemoryKiller.chooseTargetToKill(
toQueryMemoryInfoList(queries, ImmutableSet.of("q_1", "q_2")),
toRunningQueryInfoList(queries, ImmutableSet.of("q_1", "q_2")),
toNodeMemoryInfoList(memoryPool, queries, tasks)),
Optional.of(KillTarget.selectedTasks(
ImmutableSet.of(
Expand Down

0 comments on commit d8a965a

Please sign in to comment.