Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a metric for task duration in the pending queue #12492

Merged
merged 4 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`task/run/time`|Milliseconds taken to run a task.|dataSource, taskId, taskType, taskStatus.|Varies.|
|`task/pending/time`|Milliseconds taken for a task to wait for running.|dataSource, taskId, taskType.|Varies.|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.|dataSource, taskId, taskType|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.|dataSource, taskId, taskType|Varies from subsecond to a few seconds, based on action type.|
|`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
Expand All @@ -74,6 +75,8 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
Expand Down Expand Up @@ -179,6 +182,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
private final ServiceEmitter emitter;
private ProvisioningService provisioningService;

public RemoteTaskRunner(
Expand All @@ -189,7 +193,8 @@ public RemoteTaskRunner(
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
ServiceEmitter emitter
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -213,6 +218,7 @@ public RemoteTaskRunner(
config.getPendingTasksRunnerNumThreads(),
"rtr-pending-tasks-runner-%d"
);
this.emitter = emitter;
}

@Override
Expand Down Expand Up @@ -934,6 +940,13 @@ private boolean announceTask(
return false;
}

final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
emitter.emit(metricBuilder.build(
"task/pending/time",
new Duration(workItem.getQueueInsertionTime(), DateTimes.nowUtc()).getMillis())
);

RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
Expand Down Expand Up @@ -1516,6 +1529,12 @@ Map<String, String> getWorkersWithUnacknowledgedTask()
return workersWithUnacknowledgedTask;
}

@VisibleForTesting
ProvisioningStrategy<WorkerTaskRunner> getProvisioningStrategy()
{
return provisioningStrategy;
}

@Override
public Map<String, Long> getTotalTaskSlotCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;

Expand All @@ -46,6 +47,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final ProvisioningStrategy provisioningStrategy;
private final ServiceEmitter emitter;

@Inject
public RemoteTaskRunnerFactory(
Expand All @@ -56,7 +58,8 @@ public RemoteTaskRunnerFactory(
@EscalatedGlobal final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ProvisioningSchedulerConfig provisioningSchedulerConfig,
final ProvisioningStrategy provisioningStrategy
final ProvisioningStrategy provisioningStrategy,
final ServiceEmitter emitter
)
{
this.curator = curator;
Expand All @@ -67,6 +70,7 @@ public RemoteTaskRunnerFactory(
this.workerConfigRef = workerConfigRef;
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.provisioningStrategy = provisioningStrategy;
this.emitter = emitter;
}

@Override
Expand All @@ -80,7 +84,8 @@ public RemoteTaskRunner build()
new PathChildrenCacheFactory.Builder().withCompressed(true),
httpClient,
workerConfigRef,
provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>()
provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>(),
emitter
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
Expand All @@ -76,13 +77,16 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import org.joda.time.Period;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -182,6 +186,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final HttpRemoteTaskRunnerConfig config;

private final TaskStorage taskStorage;
private final ServiceEmitter emitter;

// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
private static final Joiner JOINER = Joiner.on("/");
Expand All @@ -203,7 +208,8 @@ public HttpRemoteTaskRunner(
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
TaskStorage taskStorage,
@Nullable CuratorFramework cf,
IndexerZkConfig indexerZkConfig
IndexerZkConfig indexerZkConfig,
ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
Expand All @@ -212,6 +218,7 @@ public HttpRemoteTaskRunner(
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.workerConfigRef = workerConfigRef;
this.emitter = emitter;

this.pendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
Expand Down Expand Up @@ -1548,6 +1555,14 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost());

final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, taskItem.getTask());
emitter.emit(metricBuilder.build(
"task/pending/time",
new Duration(taskItem.getCreatedTime(), DateTimes.nowUtc()).getMillis())
);

// fall through
case RUNNING:
if (worker.getHost().equals(taskItem.getWorker().getHost())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;

Expand All @@ -54,6 +55,7 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemote
private final ProvisioningStrategy provisioningStrategy;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final TaskStorage taskStorage;
private final ServiceEmitter emitter;

// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
@Nullable //Null if zk is disabled
Expand All @@ -72,7 +74,8 @@ public HttpRemoteTaskRunnerFactory(
final TaskStorage taskStorage,
final Provider<CuratorFramework> cfProvider,
final IndexerZkConfig indexerZkConfig,
final ZkEnablementConfig zkEnablementConfig
final ZkEnablementConfig zkEnablementConfig,
final ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
Expand All @@ -84,6 +87,7 @@ public HttpRemoteTaskRunnerFactory(
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.indexerZkConfig = indexerZkConfig;
this.emitter = emitter;

if (zkEnablementConfig.isEnabled()) {
this.cf = cfProvider.get();
Expand All @@ -104,7 +108,8 @@ public HttpRemoteTaskRunner build()
druidNodeDiscoveryProvider,
taskStorage,
cf,
indexerZkConfig
indexerZkConfig,
emitter
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord;

import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class RemoteTaskRunnerFactoryTest
{
@Test
public void testBuildWithAutoScale()
{
ProvisioningSchedulerConfig provisioningSchedulerConfig = Mockito.mock(ProvisioningSchedulerConfig.class);
Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(true);

RemoteTaskRunnerFactory remoteTaskRunnerFactory = getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);

Assert.assertNull(remoteTaskRunnerFactory.build().getProvisioningStrategy());
}

@Test
public void testBuildWithoutAutoScale()
{
ProvisioningSchedulerConfig provisioningSchedulerConfig = Mockito.mock(ProvisioningSchedulerConfig.class);
Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(false);

RemoteTaskRunnerFactory remoteTaskRunnerFactory = getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);

Assert.assertTrue(remoteTaskRunnerFactory.build().getProvisioningStrategy() instanceof NoopProvisioningStrategy);
}

private RemoteTaskRunnerFactory getTestRemoteTaskRunnerFactory(ProvisioningSchedulerConfig provisioningSchedulerConfig)
{
CuratorFramework curator = Mockito.mock(CuratorFramework.class);
Mockito.when(curator.newWatcherRemoveCuratorFramework()).thenReturn(null);
return new RemoteTaskRunnerFactory(
curator,
new RemoteTaskRunnerConfig(),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
null,
null,
null,
provisioningSchedulerConfig,
null,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -270,7 +271,8 @@ public TestableRemoteTaskRunner(
pathChildrenCacheFactory,
httpClient,
workerConfigRef,
provisioningStrategy
provisioningStrategy,
new NoopServiceEmitter()
);
}

Expand Down
Loading