Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][dvc] Add Blob Transfer Related Metrics #1352

Merged
merged 11 commits into from
Dec 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.linkedin.davinci.kafka.consumer.StoreIngestionService;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder;
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.davinci.stats.AggVersionedStorageEngineStats;
import com.linkedin.davinci.stats.MetadataUpdateStats;
import com.linkedin.davinci.stats.RocksDBMemoryStats;
Expand Down Expand Up @@ -111,6 +112,7 @@ public class DaVinciBackend implements Closeable {
private final AggVersionedStorageEngineStats aggVersionedStorageEngineStats;
private final boolean useDaVinciSpecificExecutionStatusForError;
private BlobTransferManager<Void> blobTransferManager;
private AggVersionedBlobTransferStats aggVersionedBlobTransferStats;
private final boolean writeBatchingPushStatus;

public DaVinciBackend(
Expand Down Expand Up @@ -294,6 +296,9 @@ public DaVinciBackend(
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

aggVersionedBlobTransferStats =
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());

blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(),
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(),
Expand All @@ -304,8 +309,10 @@ public DaVinciBackend(
storageService.getStorageEngineRepository(),
backendConfig.getMaxConcurrentSnapshotUser(),
backendConfig.getSnapshotRetentionTimeInMin(),
backendConfig.getBlobTransferMaxTimeoutInMin());
backendConfig.getBlobTransferMaxTimeoutInMin(),
aggVersionedBlobTransferStats);
} else {
aggVersionedBlobTransferStats = null;
blobTransferManager = null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.blobtransfer;

import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.exceptions.VenicePeersNotFoundException;
import java.io.InputStream;
Expand Down Expand Up @@ -46,4 +47,10 @@ CompletionStage<? extends InputStream> get(String storeName, int version, int pa
* Close the blob transfer manager and related resources
*/
void close() throws Exception;

/**
* Get the blob transfer stats
* @return the blob transfer stats
*/
AggVersionedBlobTransferStats getAggVersionedBlobTransferStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.blobtransfer.DaVinciBlobFinder;
Expand Down Expand Up @@ -39,7 +40,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin) {
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats) {
return getP2PBlobTransferManagerForDVCAndStart(
p2pTransferPort,
p2pTransferPort,
Expand All @@ -50,7 +52,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
storageEngineRepository,
maxConcurrentSnapshotUser,
snapshotRetentionTimeInMin,
blobTransferMaxTimeoutInMin);
blobTransferMaxTimeoutInMin,
aggVersionedBlobTransferStats);
}

public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
Expand All @@ -63,7 +66,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin) {
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
Expand All @@ -77,7 +81,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new DaVinciBlobFinder(storeClient),
baseDir);
baseDir,
aggVersionedBlobTransferStats);
manager.start();
return manager;
} catch (Exception e) {
Expand Down Expand Up @@ -105,7 +110,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
StorageEngineRepository storageEngineRepository,
int maxConcurrentSnapshotUser,
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin) {
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
Expand All @@ -117,7 +123,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new ServerBlobFinder(customizedViewFuture),
baseDir);
baseDir,
aggVersionedBlobTransferStats);
manager.start();
return manager;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.linkedin.davinci.blobtransfer;

import static com.linkedin.venice.store.rocksdb.RocksDBUtils.composePartitionDbDir;
import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

import com.linkedin.venice.meta.Version;
import io.netty.handler.codec.http.HttpResponse;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;


public class BlobTransferUtils {
Expand Down Expand Up @@ -46,4 +50,55 @@ public static String generateFileChecksum(Path filePath) throws IOException {
}
return md5Digest;
}

/**
* Calculate throughput in MB/sec for a given partition directory
*/
private static double calculateThroughputInMBPerSec(File partitionDir, double transferTimeInSec) throws IOException {
if (!partitionDir.exists() || !partitionDir.isDirectory()) {
throw new IllegalArgumentException(
"Partition directory does not exist or is not a directory: " + partitionDir.getAbsolutePath());
}
// Calculate total size of all files in the directory
long totalSizeInBytes = getTotalSizeOfFiles(partitionDir);
// Convert bytes to MB
double totalSizeInMB = totalSizeInBytes / (1000.0 * 1000.0);
// Calculate throughput in MB/sec
double throughput = totalSizeInMB / transferTimeInSec;
return throughput;
}

/**
* Get total size of all files in a directory
*/
private static long getTotalSizeOfFiles(File dir) throws IOException {
return Files.walk(dir.toPath()).filter(Files::isRegularFile).mapToLong(path -> path.toFile().length()).sum();
}

/**
* Calculate throughput per partition in MB/sec
* @param baseDir the base directory of the underlying storage
* @param storeName the store name
* @param version the version of the store
* @param partition the partition number
* @param transferTimeInSec the transfer time in seconds
* @return the throughput in MB/sec
*/
static double getThroughputPerPartition(
String baseDir,
String storeName,
int version,
int partition,
double transferTimeInSec) {
String topicName = Version.composeKafkaTopic(storeName, version);
String partitionDir = composePartitionDbDir(baseDir, topicName, partition);
Path path = null;
try {
path = Paths.get(partitionDir);
File partitionFile = path.toFile();
return calculateThroughputInMBPerSec(partitionFile, transferTimeInSec);
} catch (Exception e) {
return 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.linkedin.davinci.blobtransfer;

import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.getThroughputPerPartition;

import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
import com.linkedin.venice.blobtransfer.BlobFinder;
import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse;
import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException;
Expand Down Expand Up @@ -44,6 +47,8 @@ public class NettyP2PBlobTransferManager implements P2PBlobTransferManager<Void>
private final P2PBlobTransferService blobTransferService;
// netty client is responsible to make requests against other peers for blob fetching
protected final NettyFileTransferClient nettyClient;
// blob transfer stats to record all blob transfer related stats
protected final AggVersionedBlobTransferStats aggVersionedBlobTransferStats;
// peer finder is responsible to find the peers that have the requested blob
protected final BlobFinder peerFinder;
private final String baseDir;
Expand All @@ -52,11 +57,13 @@ public NettyP2PBlobTransferManager(
P2PBlobTransferService blobTransferService,
NettyFileTransferClient nettyClient,
BlobFinder peerFinder,
String baseDir) {
String baseDir,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats) {
this.blobTransferService = blobTransferService;
this.nettyClient = nettyClient;
this.peerFinder = peerFinder;
this.baseDir = baseDir;
this.aggVersionedBlobTransferStats = aggVersionedBlobTransferStats;
}

@Override
Expand Down Expand Up @@ -147,12 +154,11 @@ private void processPeersSequentially(
.toCompletableFuture()
.thenAccept(inputStream -> {
// Success case: Complete the future with the input stream
LOGGER.info(
FETCHED_BLOB_SUCCESS_MSG,
replicaId,
chosenHost,
Duration.between(startTime, Instant.now()).getSeconds());
long transferTime = Duration.between(startTime, Instant.now()).getSeconds();
LOGGER.info(FETCHED_BLOB_SUCCESS_MSG, replicaId, chosenHost, transferTime);
resultFuture.complete(inputStream);
// Updating the blob transfer stats with the transfer time and throughput
updateBlobTransferFileReceiveStats(transferTime, storeName, version, partition);
})
.exceptionally(ex -> {
handlePeerFetchException(ex, chosenHost, storeName, version, partition, replicaId);
Expand Down Expand Up @@ -201,4 +207,34 @@ public void close() throws Exception {
nettyClient.close();
peerFinder.close();
}

/**
* Get the blob transfer stats
* @return the blob transfer stats
*/
public AggVersionedBlobTransferStats getAggVersionedBlobTransferStats() {
return aggVersionedBlobTransferStats;
}

/**
* Basd on the transfer time, store name, version, and partition, update the blob transfer file receive stats
* @param transferTime the transfer time in seconds
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
*/
private void updateBlobTransferFileReceiveStats(double transferTime, String storeName, int version, int partition) {
try {
double throughput = getThroughputPerPartition(baseDir, storeName, version, partition, transferTime);
aggVersionedBlobTransferStats.recordBlobTransferTimeInSec(storeName, version, transferTime);
aggVersionedBlobTransferStats.recordBlobTransferFileReceiveThroughput(storeName, version, throughput);
} catch (Exception e) {
LOGGER.error(
"Failed to update updateBlobTransferFileReceiveStats for store {} version {} partition {}",
storeName,
version,
partition,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ CompletionStage<Void> bootstrapFromBlobs(

String storeName = store.getName();
return blobTransferManager.get(storeName, versionNumber, partitionId).handle((inputStream, throwable) -> {
updateBlobTransferResponseStats(throwable == null, storeName, versionNumber);
if (throwable != null) {
LOGGER.error(
"Failed to bootstrap partition {} from blobs transfer for store {} with exception {}",
Expand Down Expand Up @@ -256,4 +257,28 @@ private void syncStoreVersionConfig(Store store, VeniceStoreVersionConfig storeC
storeConfig.setBlobTransferEnabled(true);
}
}

/**
* Update the blob transfer response stats based on the blob transfer success.
* @param isBlobTransferSuccess true if the blob transfer is successful, false otherwise.
*/
private void updateBlobTransferResponseStats(boolean isBlobTransferSuccess, String storeName, int version) {
if (blobTransferManager.getAggVersionedBlobTransferStats() == null) {
LOGGER.error(
"Blob transfer stats is not initialized. Skip updating blob transfer response stats for store {} version {}",
storeName,
version);
return;
}

try {
// Record the blob transfer request count.
blobTransferManager.getAggVersionedBlobTransferStats().recordBlobTransferResponsesCount(storeName, version);
// Record the blob transfer response based on the blob transfer status.
blobTransferManager.getAggVersionedBlobTransferStats()
.recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, version, isBlobTransferSuccess);
} catch (Exception e) {
LOGGER.error("Failed to update blob transfer response stats for store {} version {}", storeName, version, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.linkedin.davinci.stats;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import io.tehuti.metrics.MetricsRepository;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* The store level stats for blob transfer
*/
public class AggVersionedBlobTransferStats
extends AbstractVeniceAggVersionedStats<BlobTransferStats, BlobTransferStatsReporter> {
private static final Logger LOGGER = LogManager.getLogger(AggVersionedBlobTransferStats.class);

public AggVersionedBlobTransferStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
VeniceServerConfig serverConfig) {
super(
metricsRepository,
metadataRepository,
() -> new BlobTransferStats(),
BlobTransferStatsReporter::new,
serverConfig.isUnregisterMetricForDeletedStoreEnabled());
}

/**
* Record the blob transfer request count
* @param storeName
* @param version
*/
public void recordBlobTransferResponsesCount(String storeName, int version) {
recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferResponsesCount());
}

/**
* Record the blob transfer request count based on the bootstrap status
* @param storeName the store name
* @param version the version of the store
* @param isBlobTransferSuccess true if the blob transfer is successful, false otherwise
*/
public void recordBlobTransferResponsesBasedOnBoostrapStatus(
String storeName,
int version,
boolean isBlobTransferSuccess) {
recordVersionedAndTotalStat(
storeName,
version,
stats -> stats.recordBlobTransferResponsesBasedOnBoostrapStatus(isBlobTransferSuccess));
}

/**
* Record the blob transfer file send throughput
* @param storeName the store name
* @param version the version of the store
* @param throughput the throughput in MB/sec
*/
public void recordBlobTransferFileReceiveThroughput(String storeName, int version, double throughput) {
recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferFileReceiveThroughput(throughput));
}

/**
* Record the blob transfer file receive throughput
* @param storeName the store name
* @param version the version of the store
* @param timeInSec the time in seconds
*/
public void recordBlobTransferTimeInSec(String storeName, int version, double timeInSec) {
recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferTimeInSec(timeInSec));
}
}
Loading
Loading