Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;

import org.apache.uniffle.common.util.Constants;

public abstract class GRPCMetrics {
// Grpc server internal executor metrics
Expand All @@ -36,6 +39,8 @@ public abstract class GRPCMetrics {
private boolean isRegister = false;
protected Map<String, Counter> counterMap = Maps.newConcurrentMap();
protected Map<String, Gauge> gaugeMap = Maps.newConcurrentMap();
protected Map<String, Summary> transportTimeSummaryMap = Maps.newConcurrentMap();
protected Map<String, Summary> processTimeSummaryMap = Maps.newConcurrentMap();
protected Gauge gaugeGrpcOpen;
protected Counter counterGrpcTotal;
protected MetricsManager metricsManager;
Expand Down Expand Up @@ -100,6 +105,20 @@ public void decCounter(String methodName) {
}
}

public void recordTransportTime(String methodName, long transportTimeInMillionSecond) {
Summary summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public void recordProcessTime(String methodName, long processTimeInMillionSecond) {
Summary summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public CollectorRegistry getCollectorRegistry() {
return metricsManager.getCollectorRegistry();
}
Expand All @@ -119,4 +138,12 @@ public Gauge getGaugeGrpcOpen() {
public Counter getCounterGrpcTotal() {
return counterGrpcTotal;
}

public Map<String, Summary> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}

public Map<String, Summary> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;

public class MetricsManager {
private CollectorRegistry collectorRegistry;
private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99};
private static final double QUANTILE_ERROR = 0.01;

public MetricsManager() {
this(null);
Expand Down Expand Up @@ -64,4 +67,12 @@ public Histogram addHistogram(String name, double[] buckets, String... labels) {
public Histogram addHistogram(String name, String help, double[] buckets, String[] labels) {
return Histogram.build().name(name).buckets(buckets).labelNames(labels).help(help).register(collectorRegistry);
}

public Summary addSummary(String name) {
Summary.Builder builder = Summary.build().name(name).help("Summary " + name);
for (int i = 0; i < QUANTILES.length; i++) {
builder = builder.quantile(QUANTILES[i], QUANTILE_ERROR);
}
return builder.register(collectorRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ public class Constants {
public static int MR_REDUCE_LIMIT_DEFAULT_VALUE = 0;
public static final String MR_SLOW_START = "mapreduce.job.reduce.slowstart.completedmaps";
public static double MR_SLOW_START_DEFAULT_VALUE = 0.05;

public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,14 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
throw new RssException(String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]", allocateSize, host, port));
}
long start = System.currentTimeMillis();
SendShuffleDataRequest rpcRequest = SendShuffleDataRequest.newBuilder()
.setAppId(appId)
.setShuffleId(stb.getKey())
.setRequireBufferId(requireId)
.addAllShuffleData(shuffleData)
.setTimestamp(start)
.build();
long start = System.currentTimeMillis();
SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
LOG.info("Do sendShuffleData to {}:{} rpc cost:" + (System.currentTimeMillis() - start)
+ " ms for " + allocateSize + " bytes with " + finalBlockNum + " blocks", host, port);
Expand Down Expand Up @@ -522,6 +523,7 @@ public RssGetShuffleResultResponse getShuffleResultForMultiPart(RssGetShuffleRes

@Override
public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request) {
long start = System.currentTimeMillis();
GetLocalShuffleDataRequest rpcRequest = GetLocalShuffleDataRequest
.newBuilder()
.setAppId(request.getAppId())
Expand All @@ -531,8 +533,8 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request
.setPartitionNum(request.getPartitionNum())
.setOffset(request.getOffset())
.setLength(request.getLength())
.setTimestamp(start)
.build();
long start = System.currentTimeMillis();
GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
+ request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
Expand Down Expand Up @@ -595,16 +597,17 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ
@Override
public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData(
RssGetInMemoryShuffleDataRequest request) {
long start = System.currentTimeMillis();
GetMemoryShuffleDataRequest rpcRequest = GetMemoryShuffleDataRequest
.newBuilder()
.setAppId(request.getAppId())
.setShuffleId(request.getShuffleId())
.setPartitionId(request.getPartitionId())
.setLastBlockId(request.getLastBlockId())
.setReadBufferSize(request.getReadBufferSize())
.setTimestamp(start)
.build();

long start = System.currentTimeMillis();
GetMemoryShuffleDataResponse rpcResponse = getBlockingStub().getMemoryShuffleData(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
+ request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
Expand Down
3 changes: 3 additions & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message GetLocalShuffleDataRequest {
int32 partitionNum = 5;
int64 offset = 6;
int32 length = 7;
int64 timestamp = 8;
}

message GetLocalShuffleDataResponse {
Expand All @@ -90,6 +91,7 @@ message GetMemoryShuffleDataRequest {
int32 partitionId = 3;
int64 lastBlockId = 4;
int32 readBufferSize = 5;
int64 timestamp = 6;
}

message GetMemoryShuffleDataResponse {
Expand Down Expand Up @@ -195,6 +197,7 @@ message SendShuffleDataRequest {
int32 shuffleId = 2;
int64 requireBufferId = 3;
repeated ShuffleData shuffleData = 4;
int64 timestamp = 5;
}

message SendShuffleDataResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
"grpc_get_in_memory_shuffle_data_total";
private static final String GRPC_GET_SHUFFLE_INDEX_TOTAL = "grpc_get_local_shuffle_index_total";

private static final String GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY =
"grpc_send_shuffle_data_transport_latency";
private static final String GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY =
"grpc_get_local_shuffle_data_transport_latency";
private static final String GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY =
"grpc_get_in_memory_shuffle_data_transport_latency";

private static final String GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY = "grpc_send_shuffle_data_process_latency";
private static final String GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY = "grpc_get_local_shuffle_data_process_latency";
private static final String GRPC_GET_IN_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
"grpc_get_in_memory_shuffle_data_process_latency";

@Override
public void registerMetrics() {
gaugeGrpcOpen = metricsManager.addGauge(GRPC_OPEN);
Expand Down Expand Up @@ -110,6 +122,20 @@ public void registerMetrics() {
metricsManager.addCounter(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TOTAL));
counterMap.putIfAbsent(GET_SHUFFLE_INDEX_METHOD,
metricsManager.addCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL));

transportTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY));
transportTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY));
transportTimeSummaryMap.putIfAbsent(GET_IN_MEMORY_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY));

processTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(GET_IN_MEMORY_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ public void sendShuffleData(SendShuffleDataRequest req,
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireBufferId();
long timestamp = req.getTimestamp();
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact of data size on transport time.
* The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer.getGrpcMetrics().recordTransportTime(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime);
}
}
int requireSize = shuffleServer
.getShuffleTaskManager().getRequireBufferSize(requireBufferId);

Expand Down Expand Up @@ -240,8 +255,10 @@ public void sendShuffleData(SendShuffleDataRequest req,
}
}
reply = SendShuffleDataResponse.newBuilder().setStatus(valueOf(ret)).setRetMsg(responseMessage).build();
long costTime = System.currentTimeMillis() - start;
shuffleServer.getGrpcMetrics().recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
LOG.debug("Cache Shuffle Data for appId[" + appId + "], shuffleId[" + shuffleId
+ "], cost " + (System.currentTimeMillis() - start)
+ "], cost " + costTime
+ " ms with " + shufflePartitionedData.size() + " blocks and " + requireSize + " bytes");
} else {
reply = SendShuffleDataResponse
Expand Down Expand Up @@ -476,6 +493,14 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request,
int partitionNum = request.getPartitionNum();
long offset = request.getOffset();
int length = request.getLength();
long timestamp = request.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer.getGrpcMetrics().recordTransportTime(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, transportTime);
}
}
String storageType = shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE);
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
Expand All @@ -497,6 +522,8 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request,
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length);
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length);
shuffleServer.getGrpcMetrics().recordProcessTime(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
LOG.info("Successfully getShuffleData cost {} ms for shuffle"
+ " data with {}", readTime, requestInfo);
reply = GetLocalShuffleDataResponse.newBuilder()
Expand Down Expand Up @@ -607,6 +634,14 @@ public void getMemoryShuffleData(GetMemoryShuffleDataRequest request,
int partitionId = request.getPartitionId();
long blockId = request.getLastBlockId();
int readBufferSize = request.getReadBufferSize();
long timestamp = request.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer.getGrpcMetrics().recordTransportTime(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, transportTime);
}
}
long start = System.currentTimeMillis();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
Expand All @@ -627,8 +662,11 @@ public void getMemoryShuffleData(GetMemoryShuffleDataRequest request,
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
}
long costTime = System.currentTimeMillis() - start;
shuffleServer.getGrpcMetrics().recordProcessTime(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, costTime);
LOG.info("Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle"
+ " data for {}", (System.currentTimeMillis() - start), data.length, requestInfo);
+ " data for {}", costTime, data.length, requestInfo);

reply = GetMemoryShuffleDataResponse.newBuilder()
.setStatus(valueOf(status))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server;

import java.util.Map;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ShuffleServerGrpcMetricsTest {
@Test
public void testLatencyMetrics() {
ShuffleServerGrpcMetrics metrics = new ShuffleServerGrpcMetrics();
metrics.register(new CollectorRegistry(true));
metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 1000);
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 500);
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, 200);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 1000);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 500);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, 200);
Map<String, Summary> sendTimeSummaryTime = metrics.getTransportTimeSummaryMap();
Map<String, Summary> processTimeSummaryTime = metrics.getProcessTimeSummaryMap();
assertEquals(3, sendTimeSummaryTime.size());
assertEquals(3, processTimeSummaryTime.size());

assertEquals(1D, sendTimeSummaryTime.get(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.5D, sendTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.2D, sendTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD).get().sum);

assertEquals(1D, processTimeSummaryTime.get(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.5D, processTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.2D, processTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD).get().sum);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testGrpcMetrics() throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
assertEquals(27, actualObj.get("metrics").size());
assertEquals(69, actualObj.get("metrics").size());
}

@Test
Expand Down