diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 38c4b879a8..2f9db67f1d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -23,6 +23,7 @@ import com.linkedin.davinci.kafka.consumer.StoreIngestionService; import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; import com.linkedin.davinci.stats.MetadataUpdateStats; import com.linkedin.davinci.stats.RocksDBMemoryStats; @@ -111,6 +112,7 @@ public class DaVinciBackend implements Closeable { private final AggVersionedStorageEngineStats aggVersionedStorageEngineStats; private final boolean useDaVinciSpecificExecutionStatusForError; private BlobTransferManager blobTransferManager; + private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; private final boolean writeBatchingPushStatus; public DaVinciBackend( @@ -294,6 +296,9 @@ public DaVinciBackend( throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer."); } + aggVersionedBlobTransferStats = + new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig()); + blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart( configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(), configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(), @@ -304,8 +309,10 @@ public DaVinciBackend( storageService.getStorageEngineRepository(), backendConfig.getMaxConcurrentSnapshotUser(), backendConfig.getSnapshotRetentionTimeInMin(), - backendConfig.getBlobTransferMaxTimeoutInMin()); + backendConfig.getBlobTransferMaxTimeoutInMin(), + aggVersionedBlobTransferStats); } else { + aggVersionedBlobTransferStats = null; blobTransferManager = null; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java index 1d3b7e6ac9..d1810e6d87 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java @@ -1,5 +1,6 @@ package com.linkedin.davinci.blobtransfer; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.venice.annotation.Experimental; import com.linkedin.venice.exceptions.VenicePeersNotFoundException; import java.io.InputStream; @@ -46,4 +47,10 @@ CompletionStage get(String storeName, int version, int pa * Close the blob transfer manager and related resources */ void close() throws Exception; + + /** + * Get the blob transfer stats + * @return the blob transfer stats + */ + AggVersionedBlobTransferStats getAggVersionedBlobTransferStats(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java index eaba7bb917..f857de5ec3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java @@ -4,6 +4,7 @@ import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.blobtransfer.DaVinciBlobFinder; @@ -39,7 +40,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( StorageEngineRepository storageEngineRepository, int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, - int blobTransferMaxTimeoutInMin) { + int blobTransferMaxTimeoutInMin, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { return getP2PBlobTransferManagerForDVCAndStart( p2pTransferPort, p2pTransferPort, @@ -50,7 +52,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( storageEngineRepository, maxConcurrentSnapshotUser, snapshotRetentionTimeInMin, - blobTransferMaxTimeoutInMin); + blobTransferMaxTimeoutInMin, + aggVersionedBlobTransferStats); } public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( @@ -63,7 +66,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( StorageEngineRepository storageEngineRepository, int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, - int blobTransferMaxTimeoutInMin) { + int blobTransferMaxTimeoutInMin, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { try { BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( readOnlyStoreRepository, @@ -77,7 +81,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new DaVinciBlobFinder(storeClient), - baseDir); + baseDir, + aggVersionedBlobTransferStats); manager.start(); return manager; } catch (Exception e) { @@ -105,7 +110,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta StorageEngineRepository storageEngineRepository, int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, - int blobTransferMaxTimeoutInMin) { + int blobTransferMaxTimeoutInMin, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { try { BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( readOnlyStoreRepository, @@ -117,7 +123,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new ServerBlobFinder(customizedViewFuture), - baseDir); + baseDir, + aggVersionedBlobTransferStats); manager.start(); return manager; } catch (Exception e) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java index 1e79354e5b..5da87d39ea 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java @@ -1,12 +1,16 @@ package com.linkedin.davinci.blobtransfer; +import static com.linkedin.venice.store.rocksdb.RocksDBUtils.composePartitionDbDir; import static org.apache.commons.codec.digest.DigestUtils.md5Hex; +import com.linkedin.venice.meta.Version; import io.netty.handler.codec.http.HttpResponse; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; public class BlobTransferUtils { @@ -46,4 +50,55 @@ public static String generateFileChecksum(Path filePath) throws IOException { } return md5Digest; } + + /** + * Calculate throughput in MB/sec for a given partition directory + */ + private static double calculateThroughputInMBPerSec(File partitionDir, double transferTimeInSec) throws IOException { + if (!partitionDir.exists() || !partitionDir.isDirectory()) { + throw new IllegalArgumentException( + "Partition directory does not exist or is not a directory: " + partitionDir.getAbsolutePath()); + } + // Calculate total size of all files in the directory + long totalSizeInBytes = getTotalSizeOfFiles(partitionDir); + // Convert bytes to MB + double totalSizeInMB = totalSizeInBytes / (1000.0 * 1000.0); + // Calculate throughput in MB/sec + double throughput = totalSizeInMB / transferTimeInSec; + return throughput; + } + + /** + * Get total size of all files in a directory + */ + private static long getTotalSizeOfFiles(File dir) throws IOException { + return Files.walk(dir.toPath()).filter(Files::isRegularFile).mapToLong(path -> path.toFile().length()).sum(); + } + + /** + * Calculate throughput per partition in MB/sec + * @param baseDir the base directory of the underlying storage + * @param storeName the store name + * @param version the version of the store + * @param partition the partition number + * @param transferTimeInSec the transfer time in seconds + * @return the throughput in MB/sec + */ + static double getThroughputPerPartition( + String baseDir, + String storeName, + int version, + int partition, + double transferTimeInSec) { + String topicName = Version.composeKafkaTopic(storeName, version); + String partitionDir = composePartitionDbDir(baseDir, topicName, partition); + Path path = null; + try { + path = Paths.get(partitionDir); + File partitionFile = path.toFile(); + return calculateThroughputInMBPerSec(partitionFile, transferTimeInSec); + } catch (Exception e) { + return 0; + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java index 8c7fb413ae..47eb49f36d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java @@ -1,7 +1,10 @@ package com.linkedin.davinci.blobtransfer; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.getThroughputPerPartition; + import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.venice.blobtransfer.BlobFinder; import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse; import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException; @@ -44,6 +47,8 @@ public class NettyP2PBlobTransferManager implements P2PBlobTransferManager private final P2PBlobTransferService blobTransferService; // netty client is responsible to make requests against other peers for blob fetching protected final NettyFileTransferClient nettyClient; + // blob transfer stats to record all blob transfer related stats + protected final AggVersionedBlobTransferStats aggVersionedBlobTransferStats; // peer finder is responsible to find the peers that have the requested blob protected final BlobFinder peerFinder; private final String baseDir; @@ -52,11 +57,13 @@ public NettyP2PBlobTransferManager( P2PBlobTransferService blobTransferService, NettyFileTransferClient nettyClient, BlobFinder peerFinder, - String baseDir) { + String baseDir, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { this.blobTransferService = blobTransferService; this.nettyClient = nettyClient; this.peerFinder = peerFinder; this.baseDir = baseDir; + this.aggVersionedBlobTransferStats = aggVersionedBlobTransferStats; } @Override @@ -147,12 +154,11 @@ private void processPeersSequentially( .toCompletableFuture() .thenAccept(inputStream -> { // Success case: Complete the future with the input stream - LOGGER.info( - FETCHED_BLOB_SUCCESS_MSG, - replicaId, - chosenHost, - Duration.between(startTime, Instant.now()).getSeconds()); + long transferTime = Duration.between(startTime, Instant.now()).getSeconds(); + LOGGER.info(FETCHED_BLOB_SUCCESS_MSG, replicaId, chosenHost, transferTime); resultFuture.complete(inputStream); + // Updating the blob transfer stats with the transfer time and throughput + updateBlobTransferFileReceiveStats(transferTime, storeName, version, partition); }) .exceptionally(ex -> { handlePeerFetchException(ex, chosenHost, storeName, version, partition, replicaId); @@ -201,4 +207,34 @@ public void close() throws Exception { nettyClient.close(); peerFinder.close(); } + + /** + * Get the blob transfer stats + * @return the blob transfer stats + */ + public AggVersionedBlobTransferStats getAggVersionedBlobTransferStats() { + return aggVersionedBlobTransferStats; + } + + /** + * Basd on the transfer time, store name, version, and partition, update the blob transfer file receive stats + * @param transferTime the transfer time in seconds + * @param storeName the name of the store + * @param version the version of the store + * @param partition the partition of the store + */ + private void updateBlobTransferFileReceiveStats(double transferTime, String storeName, int version, int partition) { + try { + double throughput = getThroughputPerPartition(baseDir, storeName, version, partition, transferTime); + aggVersionedBlobTransferStats.recordBlobTransferTimeInSec(storeName, version, transferTime); + aggVersionedBlobTransferStats.recordBlobTransferFileReceiveThroughput(storeName, version, throughput); + } catch (Exception e) { + LOGGER.error( + "Failed to update updateBlobTransferFileReceiveStats for store {} version {} partition {}", + storeName, + version, + partition, + e); + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index 42878ee890..344263cb63 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -116,6 +116,7 @@ CompletionStage bootstrapFromBlobs( String storeName = store.getName(); return blobTransferManager.get(storeName, versionNumber, partitionId).handle((inputStream, throwable) -> { + updateBlobTransferResponseStats(throwable == null, storeName, versionNumber); if (throwable != null) { LOGGER.error( "Failed to bootstrap partition {} from blobs transfer for store {} with exception {}", @@ -256,4 +257,28 @@ private void syncStoreVersionConfig(Store store, VeniceStoreVersionConfig storeC storeConfig.setBlobTransferEnabled(true); } } + + /** + * Update the blob transfer response stats based on the blob transfer success. + * @param isBlobTransferSuccess true if the blob transfer is successful, false otherwise. + */ + private void updateBlobTransferResponseStats(boolean isBlobTransferSuccess, String storeName, int version) { + if (blobTransferManager.getAggVersionedBlobTransferStats() == null) { + LOGGER.error( + "Blob transfer stats is not initialized. Skip updating blob transfer response stats for store {} version {}", + storeName, + version); + return; + } + + try { + // Record the blob transfer request count. + blobTransferManager.getAggVersionedBlobTransferStats().recordBlobTransferResponsesCount(storeName, version); + // Record the blob transfer response based on the blob transfer status. + blobTransferManager.getAggVersionedBlobTransferStats() + .recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, version, isBlobTransferSuccess); + } catch (Exception e) { + LOGGER.error("Failed to update blob transfer response stats for store {} version {}", storeName, version, e); + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java new file mode 100644 index 0000000000..25dbabfd7e --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java @@ -0,0 +1,73 @@ +package com.linkedin.davinci.stats; + +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import io.tehuti.metrics.MetricsRepository; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * The store level stats for blob transfer + */ +public class AggVersionedBlobTransferStats + extends AbstractVeniceAggVersionedStats { + private static final Logger LOGGER = LogManager.getLogger(AggVersionedBlobTransferStats.class); + + public AggVersionedBlobTransferStats( + MetricsRepository metricsRepository, + ReadOnlyStoreRepository metadataRepository, + VeniceServerConfig serverConfig) { + super( + metricsRepository, + metadataRepository, + () -> new BlobTransferStats(), + BlobTransferStatsReporter::new, + serverConfig.isUnregisterMetricForDeletedStoreEnabled()); + } + + /** + * Record the blob transfer request count + * @param storeName + * @param version + */ + public void recordBlobTransferResponsesCount(String storeName, int version) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferResponsesCount()); + } + + /** + * Record the blob transfer request count based on the bootstrap status + * @param storeName the store name + * @param version the version of the store + * @param isBlobTransferSuccess true if the blob transfer is successful, false otherwise + */ + public void recordBlobTransferResponsesBasedOnBoostrapStatus( + String storeName, + int version, + boolean isBlobTransferSuccess) { + recordVersionedAndTotalStat( + storeName, + version, + stats -> stats.recordBlobTransferResponsesBasedOnBoostrapStatus(isBlobTransferSuccess)); + } + + /** + * Record the blob transfer file send throughput + * @param storeName the store name + * @param version the version of the store + * @param throughput the throughput in MB/sec + */ + public void recordBlobTransferFileReceiveThroughput(String storeName, int version, double throughput) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferFileReceiveThroughput(throughput)); + } + + /** + * Record the blob transfer file receive throughput + * @param storeName the store name + * @param version the version of the store + * @param timeInSec the time in seconds + */ + public void recordBlobTransferTimeInSec(String storeName, int version, double timeInSec) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferTimeInSec(timeInSec)); + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java new file mode 100644 index 0000000000..20dfcf9e1e --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java @@ -0,0 +1,141 @@ +package com.linkedin.davinci.stats; + +import io.tehuti.metrics.MetricConfig; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; +import io.tehuti.metrics.stats.Count; +import io.tehuti.metrics.stats.Gauge; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * Class that exposes stats related to blob transfers + */ +public class BlobTransferStats { + private static final Logger LOGGER = LogManager.getLogger(BlobTransferStats.class); + + // As a sender, track the number of requests sent for bootstrap, + // including counts for successful and failed responses from the remote receiver. + // This can also represent the number of partitions successfully or unsuccessfully bootstrapped via blob transfer. + protected static final String BLOB_TRANSFER_TOTAL_NUM_RESPONSES = "blob_transfer_total_num_responses"; + protected static final String BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES = "blob_transfer_successful_num_responses"; + protected static final String BLOB_TRANSFER_FAILED_NUM_RESPONSES = "blob_transfer_failed_num_responses"; + + // The blob file receiving throughput (in MB/sec) and time (in sec) + protected static final String BLOB_TRANSFER_THROUGHPUT = "blob_transfer_file_receive_throughput"; + protected static final String BLOB_TRANSFER_TIME = "blob_transfer_time"; + + private static final MetricConfig METRIC_CONFIG = new MetricConfig(); + private final MetricsRepository localMetricRepository; + private Count blobTransferTotalNumResponsesCount = new Count(); + private Sensor blobTransferTotalNumResponsesSensor; + private Count blobTransferSuccessNumResponsesCount = new Count(); + private Sensor blobTransferSuccessNumResponsesSensor; + private Count blobTransferFailedNumResponsesCount = new Count(); + private Sensor blobTransferFailedNumResponsesSensor; + private Gauge blobTransferFileReceiveThroughputGauge = new Gauge(); + private Sensor blobTransferFileReceiveThroughputSensor; + private Gauge blobTransferTimeGauge = new Gauge(); + private Sensor blobTransferTimeSensor; + + public BlobTransferStats() { + localMetricRepository = new MetricsRepository(METRIC_CONFIG); + + blobTransferTotalNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_TOTAL_NUM_RESPONSES); + blobTransferTotalNumResponsesSensor.add(BLOB_TRANSFER_TOTAL_NUM_RESPONSES, blobTransferTotalNumResponsesCount); + + blobTransferSuccessNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES); + blobTransferSuccessNumResponsesSensor + .add(BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES, blobTransferSuccessNumResponsesCount); + + blobTransferFailedNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_FAILED_NUM_RESPONSES); + blobTransferFailedNumResponsesSensor.add(BLOB_TRANSFER_FAILED_NUM_RESPONSES, blobTransferFailedNumResponsesCount); + + blobTransferFileReceiveThroughputSensor = localMetricRepository.sensor(BLOB_TRANSFER_THROUGHPUT); + blobTransferFileReceiveThroughputSensor.add(BLOB_TRANSFER_THROUGHPUT, blobTransferFileReceiveThroughputGauge); + + blobTransferTimeSensor = localMetricRepository.sensor(BLOB_TRANSFER_TIME); + blobTransferTimeSensor.add(BLOB_TRANSFER_TIME, blobTransferTimeGauge); + } + + /** + * Update the blob transfer response stats regardless the response status. + */ + public void recordBlobTransferResponsesCount() { + blobTransferTotalNumResponsesSensor.record(); + } + + /** + * When receiving a blob transfer response from other remote host, + * based on the blob transfer bootstrap status, bump the successful or failed responses amount. + * @param isblobTransferSuccess the status of the blob transfer response, true for success, false for failure + */ + public void recordBlobTransferResponsesBasedOnBoostrapStatus(boolean isblobTransferSuccess) { + if (isblobTransferSuccess) { + blobTransferSuccessNumResponsesSensor.record(); + } else { + blobTransferFailedNumResponsesSensor.record(); + } + } + + /** + * Record the blob transfer time. + * @param throughput in MB/sec + */ + public void recordBlobTransferFileReceiveThroughput(double throughput) { + blobTransferFileReceiveThroughputSensor.record(throughput, System.currentTimeMillis()); + } + + /** + * Record the blob transfer time. + * @param time the time in second + */ + public void recordBlobTransferTimeInSec(double time) { + blobTransferTimeSensor.record(time, System.currentTimeMillis()); + } + + /** + * All get methods to get the sensor value + * @return the sensor value + */ + public double getBlobTransferTotalNumResponses() { + if (blobTransferTotalNumResponsesCount == null) { + return 0; + } else { + return blobTransferTotalNumResponsesCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferSuccessNumResponses() { + if (blobTransferSuccessNumResponsesCount == null) { + return 0; + } else { + return blobTransferSuccessNumResponsesCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferFailedNumResponses() { + if (blobTransferFailedNumResponsesCount == null) { + return 0; + } else { + return blobTransferFailedNumResponsesCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferFileReceiveThroughput() { + if (blobTransferFileReceiveThroughputGauge == null) { + return 0; + } else { + return blobTransferFileReceiveThroughputGauge.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferTime() { + if (blobTransferTimeGauge == null) { + return 0; + } else { + return blobTransferTimeGauge.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java new file mode 100644 index 0000000000..10123adde4 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java @@ -0,0 +1,68 @@ +package com.linkedin.davinci.stats; + +import static com.linkedin.venice.stats.StatsErrorCode.NULL_INGESTION_STATS; + +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.stats.AsyncGauge; +import java.util.function.DoubleSupplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This class is the reporting class for stats class {@link BlobTransferStats} + * Metrics reporting logics are registered into {@link MetricsRepository} here and send out to external metrics + * collection/visualization system. + */ +public class BlobTransferStatsReporter extends AbstractVeniceStatsReporter { + private static final Logger LOGGER = LogManager.getLogger(IngestionStatsReporter.class); + + public BlobTransferStatsReporter(MetricsRepository metricsRepository, String storeName, String clusterName) { + super(metricsRepository, storeName); + } + + @Override + protected void registerStats() { + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferTotalNumResponses(), + BlobTransferStats.BLOB_TRANSFER_TOTAL_NUM_RESPONSES)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferSuccessNumResponses(), + BlobTransferStats.BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferFailedNumResponses(), + BlobTransferStats.BLOB_TRANSFER_FAILED_NUM_RESPONSES)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferFileReceiveThroughput(), + BlobTransferStats.BLOB_TRANSFER_THROUGHPUT)); + registerSensor( + new IngestionStatsGauge(this, () -> getStats().getBlobTransferTime(), BlobTransferStats.BLOB_TRANSFER_TIME)); + } + + protected static class IngestionStatsGauge extends AsyncGauge { + IngestionStatsGauge(AbstractVeniceStatsReporter reporter, DoubleSupplier supplier, String metricName) { + this(reporter, supplier, NULL_INGESTION_STATS.code, metricName); + } + + IngestionStatsGauge( + AbstractVeniceStatsReporter reporter, + DoubleSupplier supplier, + int defaultValue, + String metricName) { + /** + * If a version doesn't exist, the corresponding reporter stat doesn't exist after the host restarts, + * which is not an error. The users of the stats should decide whether it's reasonable to emit an error + * code simply because the version is not created yet. + */ + super((ignored, ignored2) -> reporter.getStats() == null ? defaultValue : supplier.getAsDouble(), metricName); + } + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java index e73c748d66..03288d9031 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java @@ -7,6 +7,7 @@ import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.blobtransfer.BlobFinder; @@ -49,6 +50,7 @@ public class TestNettyP2PBlobTransferManager { NettyP2PBlobTransferManager manager; StorageMetadataService storageMetadataService; BlobSnapshotManager blobSnapshotManager; + AggVersionedBlobTransferStats blobTransferStats; Path tmpSnapshotDir; Path tmpPartitionDir; String TEST_STORE = "test_store"; @@ -72,6 +74,7 @@ public void setUp() throws Exception { // intentionally use different directories for snapshot and partition so that we can verify the file transfer storageMetadataService = mock(StorageMetadataService.class); + blobTransferStats = mock(AggVersionedBlobTransferStats.class); ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class); blobSnapshotManager = @@ -82,7 +85,7 @@ public void setUp() throws Exception { client = Mockito.spy(new NettyFileTransferClient(port, tmpPartitionDir.toString(), storageMetadataService)); finder = mock(BlobFinder.class); - manager = new NettyP2PBlobTransferManager(server, client, finder, tmpPartitionDir.toString()); + manager = new NettyP2PBlobTransferManager(server, client, finder, tmpPartitionDir.toString(), blobTransferStats); manager.start(); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java index 4dfc5aaddf..fe6910cac6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java @@ -17,6 +17,7 @@ import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -44,6 +45,8 @@ public class DefaultIngestionBackendTest { @Mock private StorageService storageService; @Mock + private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; + @Mock private BlobTransferManager blobTransferManager; @Mock private DefaultIngestionBackend ingestionBackend; @@ -89,6 +92,8 @@ public void setUp() { when(storageMetadataService.getLastOffset(Version.composeKafkaTopic(STORE_NAME, VERSION_NUMBER), PARTITION)) .thenReturn(offsetRecord); + when(blobTransferManager.getAggVersionedBlobTransferStats()).thenReturn(aggVersionedBlobTransferStats); + // Create the DefaultIngestionBackend instance with mocked dependencies ingestionBackend = new DefaultIngestionBackend( storageMetadataService, @@ -98,17 +103,20 @@ public void setUp() { veniceServerConfig); } - // verify that blobTransferManager was called given it is a hybrid & blob enabled + // verify that blobTransferManager was called given it is blob enabled @Test public void testStartConsumptionWithBlobTransfer() { when(store.isBlobTransferEnabled()).thenReturn(true); - when(store.isHybrid()).thenReturn(false); + when(store.isHybrid()).thenReturn(true); when(blobTransferManager.get(eq(STORE_NAME), eq(VERSION_NUMBER), eq(PARTITION))) .thenReturn(CompletableFuture.completedFuture(null)); when(veniceServerConfig.getRocksDBPath()).thenReturn(BASE_DIR); ingestionBackend.startConsumption(storeConfig, PARTITION); verify(blobTransferManager).get(eq(STORE_NAME), eq(VERSION_NUMBER), eq(PARTITION)); + verify(aggVersionedBlobTransferStats).recordBlobTransferResponsesCount(eq(STORE_NAME), eq(VERSION_NUMBER)); + verify(aggVersionedBlobTransferStats) + .recordBlobTransferResponsesBasedOnBoostrapStatus(eq(STORE_NAME), eq(VERSION_NUMBER), eq(true)); } @Test @@ -122,6 +130,9 @@ public void testStartConsumptionWithBlobTransferWhenNoPeerFound() { CompletableFuture future = ingestionBackend.bootstrapFromBlobs(store, VERSION_NUMBER, PARTITION, 100L).toCompletableFuture(); assertTrue(future.isDone()); + verify(aggVersionedBlobTransferStats).recordBlobTransferResponsesCount(eq(STORE_NAME), eq(VERSION_NUMBER)); + verify(aggVersionedBlobTransferStats) + .recordBlobTransferResponsesBasedOnBoostrapStatus(eq(STORE_NAME), eq(VERSION_NUMBER), eq(false)); } @Test @@ -141,6 +152,9 @@ public void testNotStartBootstrapFromBlobTransferWhenNotLagging() { ingestionBackend.bootstrapFromBlobs(store, VERSION_NUMBER, PARTITION, laggingThreshold).toCompletableFuture(); assertTrue(result.isDone()); verify(blobTransferManager, never()).get(eq(STORE_NAME), eq(VERSION_NUMBER), eq(PARTITION)); + verify(aggVersionedBlobTransferStats, never()).recordBlobTransferResponsesCount(eq(STORE_NAME), eq(VERSION_NUMBER)); + verify(aggVersionedBlobTransferStats, never()) + .recordBlobTransferResponsesBasedOnBoostrapStatus(eq(STORE_NAME), eq(VERSION_NUMBER), eq(false)); } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java new file mode 100644 index 0000000000..81bf64271c --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java @@ -0,0 +1,106 @@ +package com.linkedin.davinci.stats; + +import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import io.tehuti.metrics.MetricsRepository; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class BlobTransferStatsTest { + @Test + public void testRecordBlobTransferResponsesCount() { + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferResponsesCount(); + Assert.assertEquals(1.0, stats.getBlobTransferTotalNumResponses()); + } + + @Test + public void testRecordBlobTransferResponsesBasedOnBootstrapStatus() { + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(true); + Assert.assertEquals(1.0, stats.getBlobTransferSuccessNumResponses()); + + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(false); + Assert.assertEquals(1.0, stats.getBlobTransferFailedNumResponses()); + } + + @Test + public void testRecordBlobTransferFileReceiveThroughput() { + double throughput = 5.0; + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferFileReceiveThroughput(throughput); + Assert.assertEquals(throughput, stats.getBlobTransferFileReceiveThroughput()); + } + + @Test + public void testRecordBlobTransferTimeInSec() { + double timeInSec = 10.0; + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferTimeInSec(timeInSec); + Assert.assertEquals(timeInSec, stats.getBlobTransferTime()); + } + + @Test + public void blobTransferStatsReporterCanReportForGauge() { + MetricsRepository metricsRepository = new MetricsRepository(); + MockTehutiReporter reporter = new MockTehutiReporter(); + metricsRepository.addReporter(reporter); + String storeName = Utils.getUniqueString("store"); + + BlobTransferStatsReporter blobTransferStatsReporter = + new BlobTransferStatsReporter(metricsRepository, storeName, null); + + Assert.assertEquals(reporter.query("." + storeName + "--blob_transfer_time.IngestionStatsGauge").value(), -20.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + -20.0); + + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferFileReceiveThroughput(5.0); + stats.recordBlobTransferTimeInSec(10.0); + blobTransferStatsReporter.setStats(stats); + + Assert.assertEquals(reporter.query("." + storeName + "--blob_transfer_time.IngestionStatsGauge").value(), 10.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + 5.0); + } + + @Test + public void blobTransferStatsReporterCanReportForCount() { + MetricsRepository metricsRepository = new MetricsRepository(); + MockTehutiReporter reporter = new MockTehutiReporter(); + metricsRepository.addReporter(reporter); + String storeName = Utils.getUniqueString("store"); + + BlobTransferStatsReporter blobTransferStatsReporter = + new BlobTransferStatsReporter(metricsRepository, storeName, null); + + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + -20.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + -20.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + -20.0); + + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferResponsesCount(); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(true); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(false); + + blobTransferStatsReporter.setStats(stats); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + 1.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + 1.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + 1.0); + } +} diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index d60cce4bdb..c05e842802 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -11,6 +11,7 @@ import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.kafka.consumer.RemoteIngestionRepairService; import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; import com.linkedin.davinci.stats.RocksDBMemoryStats; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; @@ -118,6 +119,7 @@ public class VeniceServer { private HeartbeatMonitoringService heartbeatMonitoringService; private ServerReadMetadataRepository serverReadMetadataRepository; private BlobTransferManager blobTransferManager; + private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; /** * @deprecated Use {@link VeniceServer#VeniceServer(VeniceServerContext)} instead. @@ -455,6 +457,7 @@ private List createServices() { * Initialize Blob transfer manager for Service */ if (serverConfig.isBlobTransferManagerEnabled()) { + aggVersionedBlobTransferStats = new AggVersionedBlobTransferStats(metricsRepository, metadataRepo, serverConfig); blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForServerAndStart( serverConfig.getDvcP2pBlobTransferServerPort(), serverConfig.getDvcP2pBlobTransferClientPort(), @@ -465,8 +468,10 @@ private List createServices() { storageService.getStorageEngineRepository(), serverConfig.getMaxConcurrentSnapshotUser(), serverConfig.getSnapshotRetentionTimeInMin(), - serverConfig.getBlobTransferMaxTimeoutInMin()); + serverConfig.getBlobTransferMaxTimeoutInMin(), + aggVersionedBlobTransferStats); } else { + aggVersionedBlobTransferStats = null; blobTransferManager = null; } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java new file mode 100644 index 0000000000..d49b2f10b5 --- /dev/null +++ b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java @@ -0,0 +1,112 @@ +package com.linkedin.venice.stats; + +import static java.lang.Double.NaN; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; +import com.linkedin.venice.meta.OfflinePushStrategy; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.ReadStrategy; +import com.linkedin.venice.meta.RoutingStrategy; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; +import io.tehuti.metrics.MetricsRepository; +import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import java.util.ArrayList; +import java.util.List; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class AggVersionedBlobTransferStatsTest { + @Test + public void testRecordBlobTransferMetrics() { + MetricsRepository metricsRepo = MetricsRepositoryUtils.createSingleThreadedMetricsRepository(); + MockTehutiReporter reporter = new MockTehutiReporter(); + VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + + String storeName = Utils.getUniqueString("store_foo"); + + metricsRepo.addReporter(reporter); + ReadOnlyStoreRepository mockMetaRepository = mock(ReadOnlyStoreRepository.class); + doReturn(Int2ObjectMaps.emptyMap()).when(mockVeniceServerConfig).getKafkaClusterIdToAliasMap(); + doReturn(true).when(mockVeniceServerConfig).isUnregisterMetricForDeletedStoreEnabled(); + + AggVersionedBlobTransferStats stats = + new AggVersionedBlobTransferStats(metricsRepo, mockMetaRepository, mockVeniceServerConfig); + + Store mockStore = createStore(storeName); + List storeList = new ArrayList<>(); + storeList.add(mockStore); + + doReturn(mockStore).when(mockMetaRepository).getStoreOrThrow(any()); + doReturn(storeList).when(mockMetaRepository).getAllStores(); + + stats.loadAllStats(); + storeName = mockStore.getName(); + // initial stats + // Gauge default value is NaN + Assert + .assertEquals(reporter.query("." + storeName + "_total--blob_transfer_time.IngestionStatsGauge").value(), NaN); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + NaN); + // Count default value is 0.0 + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + 0.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + 0.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + 0.0); + + // Record response count + stats.recordBlobTransferResponsesCount(storeName, 1); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + 1.0); + // Record response status + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, 1, true); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + 1.0); + + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, 1, false); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + 1.0); + + // Record file receive throughput + stats.recordBlobTransferFileReceiveThroughput(storeName, 1, 1000.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + 1000.0); + + // Record blob transfer time + stats.recordBlobTransferTimeInSec(storeName, 1, 20.0); + Assert + .assertEquals(reporter.query("." + storeName + "_total--blob_transfer_time.IngestionStatsGauge").value(), 20.0); + } + + private Store createStore(String storeName) { + return new ZKStore( + storeName, + "", + 10, + PersistenceType.ROCKS_DB, + RoutingStrategy.CONSISTENT_HASH, + ReadStrategy.ANY_OF_ONLINE, + OfflinePushStrategy.WAIT_ALL_REPLICAS, + 1); + } +}