From 45186a3af1c132564df4f623c5efdc813a420954 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Wed, 20 Nov 2024 11:58:10 -0800 Subject: [PATCH 01/10] [server][dvc] Add Blob Transfer related metrics --- .../com/linkedin/davinci/DaVinciBackend.java | 9 +- .../blobtransfer/BlobTransferManager.java | 7 + .../blobtransfer/BlobTransferUtil.java | 35 ++- .../blobtransfer/BlobTransferUtils.java | 65 +++++ .../NettyP2PBlobTransferManager.java | 48 +++- .../BlobTransferNettyChannelInitializer.java | 12 +- .../server/P2PBlobTransferService.java | 10 +- .../server/P2PFileTransferServerHandler.java | 70 +++++- .../ingestion/DefaultIngestionBackend.java | 17 ++ .../stats/AggVersionedBlobTransferStats.java | 59 +++++ .../davinci/stats/BlobTransferStats.java | 226 ++++++++++++++++++ .../stats/BlobTransferStatsReporter.java | 88 +++++++ .../TestNettyP2PBlobTransferManager.java | 13 +- .../TestP2PFileTransferServerHandler.java | 10 +- .../DefaultIngestionBackendTest.java | 9 +- .../linkedin/venice/server/VeniceServer.java | 7 +- 16 files changed, 657 insertions(+), 28 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 38c4b879a86..2f9db67f1dc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -23,6 +23,7 @@ import com.linkedin.davinci.kafka.consumer.StoreIngestionService; import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; import com.linkedin.davinci.stats.MetadataUpdateStats; import com.linkedin.davinci.stats.RocksDBMemoryStats; @@ -111,6 +112,7 @@ public class DaVinciBackend implements Closeable { private final AggVersionedStorageEngineStats aggVersionedStorageEngineStats; private final boolean useDaVinciSpecificExecutionStatusForError; private BlobTransferManager blobTransferManager; + private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; private final boolean writeBatchingPushStatus; public DaVinciBackend( @@ -294,6 +296,9 @@ public DaVinciBackend( throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer."); } + aggVersionedBlobTransferStats = + new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig()); + blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart( configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(), configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(), @@ -304,8 +309,10 @@ public DaVinciBackend( storageService.getStorageEngineRepository(), backendConfig.getMaxConcurrentSnapshotUser(), backendConfig.getSnapshotRetentionTimeInMin(), - backendConfig.getBlobTransferMaxTimeoutInMin()); + backendConfig.getBlobTransferMaxTimeoutInMin(), + aggVersionedBlobTransferStats); } else { + aggVersionedBlobTransferStats = null; blobTransferManager = null; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java index 1d3b7e6ac92..d1810e6d87d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManager.java @@ -1,5 +1,6 @@ package com.linkedin.davinci.blobtransfer; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.venice.annotation.Experimental; import com.linkedin.venice.exceptions.VenicePeersNotFoundException; import java.io.InputStream; @@ -46,4 +47,10 @@ CompletionStage get(String storeName, int version, int pa * Close the blob transfer manager and related resources */ void close() throws Exception; + + /** + * Get the blob transfer stats + * @return the blob transfer stats + */ + AggVersionedBlobTransferStats getAggVersionedBlobTransferStats(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java index eaba7bb9177..03d4b8dfba3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java @@ -4,6 +4,7 @@ import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.blobtransfer.DaVinciBlobFinder; @@ -39,7 +40,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( StorageEngineRepository storageEngineRepository, int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, - int blobTransferMaxTimeoutInMin) { + int blobTransferMaxTimeoutInMin, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { return getP2PBlobTransferManagerForDVCAndStart( p2pTransferPort, p2pTransferPort, @@ -50,7 +52,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( storageEngineRepository, maxConcurrentSnapshotUser, snapshotRetentionTimeInMin, - blobTransferMaxTimeoutInMin); + blobTransferMaxTimeoutInMin, + aggVersionedBlobTransferStats); } public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( @@ -63,7 +66,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( StorageEngineRepository storageEngineRepository, int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, - int blobTransferMaxTimeoutInMin) { + int blobTransferMaxTimeoutInMin, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { try { BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( readOnlyStoreRepository, @@ -73,11 +77,18 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( snapshotRetentionTimeInMin); AbstractAvroStoreClient storeClient = new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig); + P2PBlobTransferService p2pBlobTransferService = new P2PBlobTransferService( + p2pTransferServerPort, + baseDir, + blobTransferMaxTimeoutInMin, + blobSnapshotManager, + aggVersionedBlobTransferStats); BlobTransferManager manager = new NettyP2PBlobTransferManager( - new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), + p2pBlobTransferService, new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new DaVinciBlobFinder(storeClient), - baseDir); + baseDir, + aggVersionedBlobTransferStats); manager.start(); return manager; } catch (Exception e) { @@ -105,7 +116,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta StorageEngineRepository storageEngineRepository, int maxConcurrentSnapshotUser, int snapshotRetentionTimeInMin, - int blobTransferMaxTimeoutInMin) { + int blobTransferMaxTimeoutInMin, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { try { BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager( readOnlyStoreRepository, @@ -113,11 +125,18 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta storageMetadataService, maxConcurrentSnapshotUser, snapshotRetentionTimeInMin); + P2PBlobTransferService p2pBlobTransferService = new P2PBlobTransferService( + p2pTransferServerPort, + baseDir, + blobTransferMaxTimeoutInMin, + blobSnapshotManager, + aggVersionedBlobTransferStats); BlobTransferManager manager = new NettyP2PBlobTransferManager( - new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), + p2pBlobTransferService, new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new ServerBlobFinder(customizedViewFuture), - baseDir); + baseDir, + aggVersionedBlobTransferStats); manager.start(); return manager; } catch (Exception e) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java index 1e79354e5b1..9c413a89625 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java @@ -1,12 +1,16 @@ package com.linkedin.davinci.blobtransfer; +import static com.linkedin.venice.store.rocksdb.RocksDBUtils.composePartitionDbDir; import static org.apache.commons.codec.digest.DigestUtils.md5Hex; +import com.linkedin.venice.meta.Version; import io.netty.handler.codec.http.HttpResponse; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; public class BlobTransferUtils { @@ -18,6 +22,16 @@ public enum BlobTransferType { FILE, METADATA } + /** + * Enum for the status of a blob transfer + * SUCCESS: the blob transfer is successfully + * REJECTED: the blob transfer request is rejected, no any file is sent + * FAILED: the blob transfer is failed during the transfer + */ + public enum BlobTransferStatus { + SUCCESS, REJECTED, FAILED + } + /** * Check if the HttpResponse message is for metadata. * @param msg the HttpResponse message @@ -46,4 +60,55 @@ public static String generateFileChecksum(Path filePath) throws IOException { } return md5Digest; } + + /** + * Calculate throughput in MB/sec for a given partition directory + */ + private static double calculateThroughputInMBPerSec(File partitionDir, double transferTimeInSec) throws IOException { + if (!partitionDir.exists() || !partitionDir.isDirectory()) { + throw new IllegalArgumentException( + "Partition directory does not exist or is not a directory: " + partitionDir.getAbsolutePath()); + } + // Calculate total size of all files in the directory + long totalSizeInBytes = getTotalSizeOfFiles(partitionDir); + // Convert bytes to MB + double totalSizeInMB = totalSizeInBytes / (1024.0 * 1024.0); + // Calculate throughput in MB/sec + double throughput = totalSizeInMB / transferTimeInSec; + return throughput; + } + + /** + * Get total size of all files in a directory + */ + private static long getTotalSizeOfFiles(File dir) throws IOException { + return Files.walk(dir.toPath()).filter(Files::isRegularFile).mapToLong(path -> path.toFile().length()).sum(); + } + + /** + * Calculate throughput per partition in MB/sec + * @param baseDir the base directory of the underlying storage + * @param storeName the store name + * @param version the version of the store + * @param partition the partition number + * @param transferTimeInSec the transfer time in seconds + * @return the throughput in MB/sec + */ + static double getThroughputPerPartition( + String baseDir, + String storeName, + int version, + int partition, + double transferTimeInSec) { + String topicName = Version.composeKafkaTopic(storeName, version); + String partitionDir = composePartitionDbDir(baseDir, topicName, partition); + Path path = null; + try { + path = Paths.get(partitionDir); + File partitionFile = path.toFile(); + return calculateThroughputInMBPerSec(partitionFile, transferTimeInSec); + } catch (Exception e) { + return 0; + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java index 8c7fb413ae9..47eb49f36de 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java @@ -1,7 +1,10 @@ package com.linkedin.davinci.blobtransfer; +import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.getThroughputPerPartition; + import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.venice.blobtransfer.BlobFinder; import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse; import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException; @@ -44,6 +47,8 @@ public class NettyP2PBlobTransferManager implements P2PBlobTransferManager private final P2PBlobTransferService blobTransferService; // netty client is responsible to make requests against other peers for blob fetching protected final NettyFileTransferClient nettyClient; + // blob transfer stats to record all blob transfer related stats + protected final AggVersionedBlobTransferStats aggVersionedBlobTransferStats; // peer finder is responsible to find the peers that have the requested blob protected final BlobFinder peerFinder; private final String baseDir; @@ -52,11 +57,13 @@ public NettyP2PBlobTransferManager( P2PBlobTransferService blobTransferService, NettyFileTransferClient nettyClient, BlobFinder peerFinder, - String baseDir) { + String baseDir, + AggVersionedBlobTransferStats aggVersionedBlobTransferStats) { this.blobTransferService = blobTransferService; this.nettyClient = nettyClient; this.peerFinder = peerFinder; this.baseDir = baseDir; + this.aggVersionedBlobTransferStats = aggVersionedBlobTransferStats; } @Override @@ -147,12 +154,11 @@ private void processPeersSequentially( .toCompletableFuture() .thenAccept(inputStream -> { // Success case: Complete the future with the input stream - LOGGER.info( - FETCHED_BLOB_SUCCESS_MSG, - replicaId, - chosenHost, - Duration.between(startTime, Instant.now()).getSeconds()); + long transferTime = Duration.between(startTime, Instant.now()).getSeconds(); + LOGGER.info(FETCHED_BLOB_SUCCESS_MSG, replicaId, chosenHost, transferTime); resultFuture.complete(inputStream); + // Updating the blob transfer stats with the transfer time and throughput + updateBlobTransferFileReceiveStats(transferTime, storeName, version, partition); }) .exceptionally(ex -> { handlePeerFetchException(ex, chosenHost, storeName, version, partition, replicaId); @@ -201,4 +207,34 @@ public void close() throws Exception { nettyClient.close(); peerFinder.close(); } + + /** + * Get the blob transfer stats + * @return the blob transfer stats + */ + public AggVersionedBlobTransferStats getAggVersionedBlobTransferStats() { + return aggVersionedBlobTransferStats; + } + + /** + * Basd on the transfer time, store name, version, and partition, update the blob transfer file receive stats + * @param transferTime the transfer time in seconds + * @param storeName the name of the store + * @param version the version of the store + * @param partition the partition of the store + */ + private void updateBlobTransferFileReceiveStats(double transferTime, String storeName, int version, int partition) { + try { + double throughput = getThroughputPerPartition(baseDir, storeName, version, partition, transferTime); + aggVersionedBlobTransferStats.recordBlobTransferTimeInSec(storeName, version, transferTime); + aggVersionedBlobTransferStats.recordBlobTransferFileReceiveThroughput(storeName, version, throughput); + } catch (Exception e) { + LOGGER.error( + "Failed to update updateBlobTransferFileReceiveStats for store {} version {} partition {}", + storeName, + version, + partition, + e); + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java index 44fd34e6f33..019c2ac18b7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.blobtransfer.server; import com.linkedin.davinci.blobtransfer.BlobSnapshotManager; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; @@ -14,14 +15,17 @@ public class BlobTransferNettyChannelInitializer extends ChannelInitializer { if (future.isSuccess()) { + updateBlobTransferStatsForSuccessTransfer(finalStoreName, finalVersion); LOGGER.debug("All files sent successfully for {}", fullResourceName); } else { LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause()); @@ -289,4 +309,52 @@ private BlobTransferPayload parseBlobTransferPayload(URI uri) throws IllegalArgu throw new IllegalArgumentException("Invalid request for fetching blob at " + uri.getPath()); } } + + /** + * Update blob transfer stats based on the error type + * @param httpResponseStatus + */ + private void updateBlobTransferStatsBasedOnErrorType( + String storeName, + int version, + HttpResponseStatus httpResponseStatus) { + try { + if (httpResponseStatus.equals(HttpResponseStatus.BAD_REQUEST) + || httpResponseStatus.equals(HttpResponseStatus.NOT_FOUND) + || httpResponseStatus.equals(HttpResponseStatus.FORBIDDEN) + || httpResponseStatus.equals(HttpResponseStatus.METHOD_NOT_ALLOWED)) { + aggVersionedBlobTransferStats + .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.REJECTED); + } else if (httpResponseStatus.equals(HttpResponseStatus.INTERNAL_SERVER_ERROR) + || httpResponseStatus.equals(HttpResponseStatus.REQUEST_TIMEOUT)) { + aggVersionedBlobTransferStats + .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.FAILED); + } else { + aggVersionedBlobTransferStats + .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.FAILED); + } + } catch (Exception e) { + LOGGER.error( + "Failed to update blob transfer request stats based on error type for store {} version {}", + storeName, + version, + e); + } + } + + /** + * Update blob transfer stats when having success transfer + */ + private void updateBlobTransferStatsForSuccessTransfer(String storeName, int version) { + try { + aggVersionedBlobTransferStats + .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.SUCCESS); + } catch (Exception e) { + LOGGER.error( + "Failed to update blob transfer request stats based on success transfer for store {} version {}", + storeName, + version, + e); + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index 42878ee8904..6c34e1ea44a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -116,6 +116,7 @@ CompletionStage bootstrapFromBlobs( String storeName = store.getName(); return blobTransferManager.get(storeName, versionNumber, partitionId).handle((inputStream, throwable) -> { + updateBlobTransferResponseStats(throwable == null, storeName, versionNumber); if (throwable != null) { LOGGER.error( "Failed to bootstrap partition {} from blobs transfer for store {} with exception {}", @@ -256,4 +257,20 @@ private void syncStoreVersionConfig(Store store, VeniceStoreVersionConfig storeC storeConfig.setBlobTransferEnabled(true); } } + + /** + * Update the blob transfer response stats based on the blob transfer success. + * @param isBlobTransferSuccess true if the blob transfer is successful, false otherwise. + */ + private void updateBlobTransferResponseStats(boolean isBlobTransferSuccess, String storeName, int version) { + try { + // Record the blob transfer request count. + blobTransferManager.getAggVersionedBlobTransferStats().recordBlobTransferResponsesCount(storeName, version); + // Record the blob transfer response based on the blob transfer status. + blobTransferManager.getAggVersionedBlobTransferStats() + .recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, version, isBlobTransferSuccess); + } catch (Exception e) { + LOGGER.error("Failed to update blob transfer response stats for store {} version {}", storeName, version, e); + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java new file mode 100644 index 00000000000..4173fc38c77 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java @@ -0,0 +1,59 @@ +package com.linkedin.davinci.stats; + +import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferStatus; +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import io.tehuti.metrics.MetricsRepository; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * The store level stats for blob transfer + */ +public class AggVersionedBlobTransferStats + extends AbstractVeniceAggVersionedStats { + private static final Logger LOGGER = LogManager.getLogger(AggVersionedBlobTransferStats.class); + + public AggVersionedBlobTransferStats( + MetricsRepository metricsRepository, + ReadOnlyStoreRepository metadataRepository, + VeniceServerConfig serverConfig) { + super( + metricsRepository, + metadataRepository, + () -> new BlobTransferStats(), + BlobTransferStatsReporter::new, + serverConfig.isUnregisterMetricForDeletedStoreEnabled()); + } + + public void recordBlobTransferRequestsCount(String storeName, int version) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferRequestsCount()); + } + + public void recordBlobTransferRequestsStatus(String storeName, int version, BlobTransferStatus status) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferRequestsStatus(status)); + } + + public void recordBlobTransferResponsesCount(String storeName, int version) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferResponsesCount()); + } + + public void recordBlobTransferResponsesBasedOnBoostrapStatus( + String storeName, + int version, + boolean isBlobTransferSuccess) { + recordVersionedAndTotalStat( + storeName, + version, + stats -> stats.recordBlobTransferResponsesBasedOnBoostrapStatus(isBlobTransferSuccess)); + } + + public void recordBlobTransferFileReceiveThroughput(String storeName, int version, double throughput) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferFileReceiveThroughput(throughput)); + } + + public void recordBlobTransferTimeInSec(String storeName, int version, double timeInSec) { + recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferTimeInSec(timeInSec)); + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java new file mode 100644 index 00000000000..32eb096d690 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java @@ -0,0 +1,226 @@ +package com.linkedin.davinci.stats; + +import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferStatus; +import io.tehuti.metrics.MetricConfig; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; +import io.tehuti.metrics.stats.Count; +import io.tehuti.metrics.stats.Gauge; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * Class that exposes stats related to blob transfers + */ +public class BlobTransferStats { + private static final Logger LOGGER = LogManager.getLogger(BlobTransferStats.class); + + // Per-instance metrics + // As a receiver, track the number of requests received from remote hosts, + // along with counts for successful, failed, and rejected requests handled by this host. + protected static final String BLOB_TRANSFER_TOTAL_NUM_REQUESTS = "blob_transfer_total_num_requests"; + protected static final String BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS = "blob_transfer_successful_num_requests"; + protected static final String BLOB_TRANSFER_FAILED_NUM_REQUESTS = "blob_transfer_failed_num_requests"; + protected static final String BLOB_TRANSFER_REJECTED_NUM_REQUESTS = "blob_transfer_rejected_num_requests"; + + // As a sender, track the number of requests sent for bootstrap, + // including counts for successful and failed responses from the remote receiver. + // This can also represent the number of partitions successfully or unsuccessfully bootstrapped via blob transfer. + protected static final String BLOB_TRANSFER_TOTAL_NUM_RESPONSES = "blob_transfer_total_num_responses"; + protected static final String BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES = "blob_transfer_successful_num_responses"; + protected static final String BLOB_TRANSFER_FAILED_NUM_RESPONSES = "blob_transfer_failed_num_responses"; + + // The blob file receiving throughput (in MB/sec) and time (in sec) + protected static final String BLOB_TRANSFER_THROUGHPUT = "blob_transfer_file_receive_throughput"; + protected static final String BLOB_TRANSFER_TIME = "blob_transfer_time"; + + private static final MetricConfig METRIC_CONFIG = new MetricConfig(); + private final MetricsRepository localMetricRepository; + private Count blobTransferTotalNumRequestsCount = new Count(); + private Sensor blobTransferTotalNumRequestsSensor; + private Count blobTransferSuccessNumRequestsCount = new Count(); + private Sensor blobTransferSuccessNumRequestsSensor; + private Count blobTransferFailedNumRequestsCount = new Count(); + private Sensor blobTransferFailedNumRequestsSensor; + private Count blobTransferRejectedNumRequestsCount = new Count(); + private Sensor blobTransferRejectedNumRequestsSensor; + private Count blobTransferTotalNumResponsesCount = new Count(); + private Sensor blobTransferTotalNumResponsesSensor; + private Count blobTransferSuccessNumResponsesCount = new Count(); + private Sensor blobTransferSuccessNumResponsesSensor; + private Count blobTransferFailedNumResponsesCount = new Count(); + private Sensor blobTransferFailedNumResponsesSensor; + private Gauge blobTransferFileReceiveThroughputGauge = new Gauge(); + private Sensor blobTransferFileReceiveThroughputSensor; + private Gauge blobTransferTimeGauge = new Gauge(); + private Sensor blobTransferTimeSensor; + + public BlobTransferStats() { + localMetricRepository = new MetricsRepository(METRIC_CONFIG); + + blobTransferTotalNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_TOTAL_NUM_REQUESTS); + blobTransferTotalNumRequestsSensor.add(BLOB_TRANSFER_TOTAL_NUM_REQUESTS, blobTransferTotalNumRequestsCount); + + blobTransferSuccessNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS); + blobTransferSuccessNumRequestsSensor + .add(BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS, blobTransferSuccessNumRequestsCount); + + blobTransferFailedNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_FAILED_NUM_REQUESTS); + blobTransferFailedNumRequestsSensor.add(BLOB_TRANSFER_FAILED_NUM_REQUESTS, blobTransferFailedNumRequestsCount); + + blobTransferRejectedNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_REJECTED_NUM_REQUESTS); + blobTransferRejectedNumRequestsSensor + .add(BLOB_TRANSFER_REJECTED_NUM_REQUESTS, blobTransferRejectedNumRequestsCount); + + blobTransferTotalNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_TOTAL_NUM_RESPONSES); + blobTransferTotalNumResponsesSensor.add(BLOB_TRANSFER_TOTAL_NUM_RESPONSES, blobTransferTotalNumResponsesCount); + + blobTransferSuccessNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES); + blobTransferSuccessNumResponsesSensor + .add(BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES, blobTransferSuccessNumResponsesCount); + + blobTransferFailedNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_FAILED_NUM_RESPONSES); + blobTransferFailedNumResponsesSensor.add(BLOB_TRANSFER_FAILED_NUM_RESPONSES, blobTransferFailedNumResponsesCount); + + blobTransferFileReceiveThroughputSensor = localMetricRepository.sensor(BLOB_TRANSFER_THROUGHPUT); + blobTransferFileReceiveThroughputSensor.add(BLOB_TRANSFER_THROUGHPUT, blobTransferFileReceiveThroughputGauge); + + blobTransferTimeSensor = localMetricRepository.sensor(BLOB_TRANSFER_TIME); + blobTransferTimeSensor.add(BLOB_TRANSFER_TIME, blobTransferTimeGauge); + } + + /** + * When receiving a blob transfer request, bump the total requests amount this host receive. + */ + public void recordBlobTransferRequestsCount() { + blobTransferTotalNumRequestsSensor.record(); + } + + /** + * When receiving a blob transfer request, bump the total requests amount this host receive, + * based on the host status, bump the successful, failed or rejected requests amount. + * @param status the status of the blob transfer request + */ + public void recordBlobTransferRequestsStatus(BlobTransferStatus status) { + if (status.equals(BlobTransferStatus.SUCCESS)) { + blobTransferSuccessNumRequestsSensor.record(); + } else if (status.equals(BlobTransferStatus.FAILED)) { + blobTransferFailedNumRequestsSensor.record(); + } else if (status.equals(BlobTransferStatus.REJECTED)) { + blobTransferRejectedNumRequestsSensor.record(); + } + } + + /** + * Update the blob transfer response stats regardless the response status. + */ + public void recordBlobTransferResponsesCount() { + blobTransferTotalNumResponsesSensor.record(); + } + + /** + * When receiving a blob transfer response from other remote host, + * based on the blob transfer bootstrap status, bump the successful or failed responses amount. + * @param isblobTransferSuccess the status of the blob transfer response, true for success, false for failure + */ + public void recordBlobTransferResponsesBasedOnBoostrapStatus(boolean isblobTransferSuccess) { + if (isblobTransferSuccess) { + blobTransferSuccessNumResponsesSensor.record(); + } else { + blobTransferFailedNumResponsesSensor.record(); + } + } + + /** + * Record the blob transfer time. + * @param throughput in MB/sec + */ + public void recordBlobTransferFileReceiveThroughput(double throughput) { + blobTransferFileReceiveThroughputSensor.record(throughput, System.currentTimeMillis()); + } + + /** + * Record the blob transfer time. + * @param time the time in second + */ + public void recordBlobTransferTimeInSec(double time) { + blobTransferTimeSensor.record(time, System.currentTimeMillis()); + } + + /** + * All get methods to get the sensor value + * @return the sensor value + */ + public double getBlobTransferTotalNumRequests() { + if (blobTransferTotalNumRequestsCount == null) { + return 0; + } else { + return blobTransferTotalNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferSuccessNumRequests() { + if (blobTransferSuccessNumRequestsCount == null) { + return 0; + } else { + return blobTransferSuccessNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferFailedNumRequests() { + if (blobTransferFailedNumRequestsCount == null) { + return 0; + } else { + return blobTransferFailedNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferRejectedNumRequests() { + if (blobTransferRejectedNumRequestsCount == null) { + return 0; + } else { + return blobTransferRejectedNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferTotalNumResponses() { + if (blobTransferTotalNumResponsesCount == null) { + return 0; + } else { + return blobTransferTotalNumResponsesCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferSuccessNumResponses() { + if (blobTransferSuccessNumResponsesCount == null) { + return 0; + } else { + return blobTransferSuccessNumResponsesCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferFailedNumResponses() { + if (blobTransferFailedNumResponsesCount == null) { + return 0; + } else { + return blobTransferFailedNumResponsesCount.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferFileReceiveThroughput() { + if (blobTransferFileReceiveThroughputGauge == null) { + return 0; + } else { + return blobTransferFileReceiveThroughputGauge.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } + + public double getBlobTransferTime() { + if (blobTransferTimeGauge == null) { + return 0; + } else { + return blobTransferTimeGauge.measure(METRIC_CONFIG, System.currentTimeMillis()); + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java new file mode 100644 index 00000000000..b986fd0efb9 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java @@ -0,0 +1,88 @@ +package com.linkedin.davinci.stats; + +import static com.linkedin.venice.stats.StatsErrorCode.NULL_INGESTION_STATS; + +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.stats.AsyncGauge; +import java.util.function.DoubleSupplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This class is the reporting class for stats class {@link BlobTransferStats} + * Metrics reporting logics are registered into {@link MetricsRepository} here and send out to external metrics + * collection/visualization system. + */ +public class BlobTransferStatsReporter extends AbstractVeniceStatsReporter { + private static final Logger LOGGER = LogManager.getLogger(IngestionStatsReporter.class); + + public BlobTransferStatsReporter(MetricsRepository metricsRepository, String storeName) { + super(metricsRepository, storeName); + } + + @Override + protected void registerStats() { + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferTotalNumRequests(), + BlobTransferStats.BLOB_TRANSFER_TOTAL_NUM_REQUESTS)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferSuccessNumRequests(), + BlobTransferStats.BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferFailedNumRequests(), + BlobTransferStats.BLOB_TRANSFER_FAILED_NUM_REQUESTS)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferRejectedNumRequests(), + BlobTransferStats.BLOB_TRANSFER_REJECTED_NUM_REQUESTS)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferTotalNumResponses(), + BlobTransferStats.BLOB_TRANSFER_TOTAL_NUM_RESPONSES)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferSuccessNumResponses(), + BlobTransferStats.BLOB_TRANSFER_SUCCESSFUL_NUM_RESPONSES)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferFailedNumResponses(), + BlobTransferStats.BLOB_TRANSFER_FAILED_NUM_RESPONSES)); + registerSensor( + new IngestionStatsGauge( + this, + () -> getStats().getBlobTransferFileReceiveThroughput(), + BlobTransferStats.BLOB_TRANSFER_THROUGHPUT)); + registerSensor( + new IngestionStatsGauge(this, () -> getStats().getBlobTransferTime(), BlobTransferStats.BLOB_TRANSFER_TIME)); + } + + protected static class IngestionStatsGauge extends AsyncGauge { + IngestionStatsGauge(AbstractVeniceStatsReporter reporter, DoubleSupplier supplier, String metricName) { + this(reporter, supplier, NULL_INGESTION_STATS.code, metricName); + } + + IngestionStatsGauge( + AbstractVeniceStatsReporter reporter, + DoubleSupplier supplier, + int defaultValue, + String metricName) { + /** + * If a version doesn't exist, the corresponding reporter stat doesn't exist after the host restarts, + * which is not an error. The users of the stats should decide whether it's reasonable to emit an error + * code simply because the version is not created yet. + */ + super((ignored, ignored2) -> reporter.getStats() == null ? defaultValue : supplier.getAsDouble(), metricName); + } + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java index e73c748d66d..bc19592266c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java @@ -7,6 +7,7 @@ import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient; import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.blobtransfer.BlobFinder; @@ -49,6 +50,7 @@ public class TestNettyP2PBlobTransferManager { NettyP2PBlobTransferManager manager; StorageMetadataService storageMetadataService; BlobSnapshotManager blobSnapshotManager; + AggVersionedBlobTransferStats blobTransferStats; Path tmpSnapshotDir; Path tmpPartitionDir; String TEST_STORE = "test_store"; @@ -72,17 +74,22 @@ public void setUp() throws Exception { // intentionally use different directories for snapshot and partition so that we can verify the file transfer storageMetadataService = mock(StorageMetadataService.class); + blobTransferStats = mock(AggVersionedBlobTransferStats.class); ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class); StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class); blobSnapshotManager = Mockito.spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); - server = - new P2PBlobTransferService(port, tmpSnapshotDir.toString(), blobTransferMaxTimeoutInMin, blobSnapshotManager); + server = new P2PBlobTransferService( + port, + tmpSnapshotDir.toString(), + blobTransferMaxTimeoutInMin, + blobSnapshotManager, + blobTransferStats); client = Mockito.spy(new NettyFileTransferClient(port, tmpPartitionDir.toString(), storageMetadataService)); finder = mock(BlobFinder.class); - manager = new NettyP2PBlobTransferManager(server, client, finder, tmpPartitionDir.toString()); + manager = new NettyP2PBlobTransferManager(server, client, finder, tmpPartitionDir.toString(), blobTransferStats); manager.start(); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 6197269ba8b..53620207de7 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.blobtransfer.server.P2PFileTransferServerHandler; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.kafka.protocol.state.PartitionState; @@ -54,6 +55,7 @@ public class TestP2PFileTransferServerHandler { BlobSnapshotManager blobSnapshotManager; ReadOnlyStoreRepository readOnlyStoreRepository; StorageEngineRepository storageEngineRepository; + AggVersionedBlobTransferStats blobTransferStats; @BeforeMethod public void setUp() throws IOException { @@ -62,11 +64,15 @@ public void setUp() throws IOException { storageMetadataService = Mockito.mock(StorageMetadataService.class); readOnlyStoreRepository = Mockito.mock(ReadOnlyStoreRepository.class); storageEngineRepository = Mockito.mock(StorageEngineRepository.class); + blobTransferStats = Mockito.mock(AggVersionedBlobTransferStats.class); blobSnapshotManager = new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService); - serverHandler = - new P2PFileTransferServerHandler(baseDir.toString(), blobTransferMaxTimeoutInMin, blobSnapshotManager); + serverHandler = new P2PFileTransferServerHandler( + baseDir.toString(), + blobTransferMaxTimeoutInMin, + blobSnapshotManager, + blobTransferStats); ch = new EmbeddedChannel(serverHandler); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java index 4dfc5aaddf2..70f9746b17a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java @@ -17,6 +17,7 @@ import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.storage.StorageService; import com.linkedin.davinci.store.AbstractStorageEngine; @@ -46,6 +47,8 @@ public class DefaultIngestionBackendTest { @Mock private BlobTransferManager blobTransferManager; @Mock + private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; + @Mock private DefaultIngestionBackend ingestionBackend; @Mock private VeniceStoreVersionConfig storeConfig; @@ -89,6 +92,8 @@ public void setUp() { when(storageMetadataService.getLastOffset(Version.composeKafkaTopic(STORE_NAME, VERSION_NUMBER), PARTITION)) .thenReturn(offsetRecord); + when(blobTransferManager.getAggVersionedBlobTransferStats()).thenReturn(aggVersionedBlobTransferStats); + // Create the DefaultIngestionBackend instance with mocked dependencies ingestionBackend = new DefaultIngestionBackend( storageMetadataService, @@ -98,11 +103,11 @@ public void setUp() { veniceServerConfig); } - // verify that blobTransferManager was called given it is a hybrid & blob enabled + // verify that blobTransferManager was called given it is blob enabled @Test public void testStartConsumptionWithBlobTransfer() { when(store.isBlobTransferEnabled()).thenReturn(true); - when(store.isHybrid()).thenReturn(false); + when(store.isHybrid()).thenReturn(true); when(blobTransferManager.get(eq(STORE_NAME), eq(VERSION_NUMBER), eq(PARTITION))) .thenReturn(CompletableFuture.completedFuture(null)); when(veniceServerConfig.getRocksDBPath()).thenReturn(BASE_DIR); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 3175f1d75f3..ff78168f1fd 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -11,6 +11,7 @@ import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.kafka.consumer.RemoteIngestionRepairService; import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.stats.AggVersionedStorageEngineStats; import com.linkedin.davinci.stats.RocksDBMemoryStats; import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; @@ -118,6 +119,7 @@ public class VeniceServer { private HeartbeatMonitoringService heartbeatMonitoringService; private ServerReadMetadataRepository serverReadMetadataRepository; private BlobTransferManager blobTransferManager; + private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; /** * @deprecated Use {@link VeniceServer#VeniceServer(VeniceServerContext)} instead. @@ -454,6 +456,7 @@ private List createServices() { * Initialize Blob transfer manager for Service */ if (serverConfig.isBlobTransferManagerEnabled()) { + aggVersionedBlobTransferStats = new AggVersionedBlobTransferStats(metricsRepository, metadataRepo, serverConfig); blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForServerAndStart( serverConfig.getDvcP2pBlobTransferServerPort(), serverConfig.getDvcP2pBlobTransferClientPort(), @@ -464,8 +467,10 @@ private List createServices() { storageService.getStorageEngineRepository(), serverConfig.getMaxConcurrentSnapshotUser(), serverConfig.getSnapshotRetentionTimeInMin(), - serverConfig.getBlobTransferMaxTimeoutInMin()); + serverConfig.getBlobTransferMaxTimeoutInMin(), + aggVersionedBlobTransferStats); } else { + aggVersionedBlobTransferStats = null; blobTransferManager = null; } From 36100a7da87f48ff7acb5c2e23fa51bc636f9033 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Wed, 27 Nov 2024 11:39:46 -0800 Subject: [PATCH 02/10] fix compileJava error --- .../com/linkedin/davinci/stats/BlobTransferStatsReporter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java index b986fd0efb9..7e19389fc40 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java @@ -17,7 +17,7 @@ public class BlobTransferStatsReporter extends AbstractVeniceStatsReporter { private static final Logger LOGGER = LogManager.getLogger(IngestionStatsReporter.class); - public BlobTransferStatsReporter(MetricsRepository metricsRepository, String storeName) { + public BlobTransferStatsReporter(MetricsRepository metricsRepository, String storeName, String clusterName) { super(metricsRepository, storeName); } From 2af41a7c5f8f9a8b3bd24208891391d036ebf08b Mon Sep 17 00:00:00 2001 From: jingy-li Date: Wed, 27 Nov 2024 11:59:30 -0800 Subject: [PATCH 03/10] fix StaticAnalysis spotbugsMain error --- .../blobtransfer/server/P2PFileTransferServerHandler.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java index a8d6bf1ca25..78c04d94b04 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java @@ -325,10 +325,6 @@ private void updateBlobTransferStatsBasedOnErrorType( || httpResponseStatus.equals(HttpResponseStatus.METHOD_NOT_ALLOWED)) { aggVersionedBlobTransferStats .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.REJECTED); - } else if (httpResponseStatus.equals(HttpResponseStatus.INTERNAL_SERVER_ERROR) - || httpResponseStatus.equals(HttpResponseStatus.REQUEST_TIMEOUT)) { - aggVersionedBlobTransferStats - .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.FAILED); } else { aggVersionedBlobTransferStats .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.FAILED); From e18d36a68e4a45cb687d17dca9a11675efc154e7 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Wed, 27 Nov 2024 16:16:45 -0800 Subject: [PATCH 04/10] add more unit tests --- .../server/P2PFileTransferServerHandler.java | 2 +- .../TestP2PFileTransferServerHandler.java | 19 +++++++++++++++++++ .../DefaultIngestionBackendTest.java | 13 +++++++++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java index 78c04d94b04..076d012ae3b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java @@ -119,9 +119,9 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque version = Version.parseVersionFromKafkaTopicName(blobTransferRequest.getTopicName()); snapshotDir = new File(blobTransferRequest.getSnapshotDir()); try { - transferPartitionMetadata = blobSnapshotManager.getTransferMetadata(blobTransferRequest); // Record request in metrics aggVersionedBlobTransferStats.recordBlobTransferRequestsCount(storeName, version); + transferPartitionMetadata = blobSnapshotManager.getTransferMetadata(blobTransferRequest); } catch (Exception e) { updateBlobTransferStatsBasedOnErrorType(storeName, version, HttpResponseStatus.NOT_FOUND); setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 53620207de7..9779a42b53e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -4,6 +4,8 @@ import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.blobtransfer.server.P2PFileTransferServerHandler; @@ -94,6 +96,7 @@ public void testRejectNonGETMethod() { ch.writeInbound(request); FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 405); + Mockito.verify(blobTransferStats, Mockito.never()).recordBlobTransferRequestsCount(anyString(), anyInt()); } @Test @@ -102,6 +105,7 @@ public void testRejectInvalidPath() { ch.writeInbound(request); FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 400); + Mockito.verify(blobTransferStats, Mockito.never()).recordBlobTransferRequestsCount(anyString(), anyInt()); } @Test @@ -120,6 +124,9 @@ public void testRejectNonExistPath() { FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 404); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); + Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); + Mockito.verify(blobTransferStats, Mockito.times(1)) + .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.REJECTED); } @Test @@ -142,6 +149,9 @@ public void testFailOnAccessPath() throws IOException { FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 500); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); + Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); + Mockito.verify(blobTransferStats, Mockito.times(1)) + .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.FAILED); } @Test @@ -203,6 +213,9 @@ public void testTransferSingleFileAndSingleMetadataForBatchStore() throws IOExce Assert.assertTrue(response instanceof DefaultHttpResponse); DefaultHttpResponse endOfTransfer = (DefaultHttpResponse) response; Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); + Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); + Mockito.verify(blobTransferStats, Mockito.times(1)) + .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.SUCCESS); // end of STATUS response Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); @@ -293,6 +306,9 @@ public void testTransferMultipleFiles() throws IOException { Assert.assertTrue(response instanceof DefaultHttpResponse); DefaultHttpResponse endOfTransfer = (DefaultHttpResponse) response; Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); + Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); + Mockito.verify(blobTransferStats, Mockito.times(1)) + .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.SUCCESS); // end of STATUS response Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); @@ -319,5 +335,8 @@ public void testWhenMetadataCreateError() throws IOException { Assert.assertEquals(((DefaultHttpResponse) response).status(), HttpResponseStatus.NOT_FOUND); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); + Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); + Mockito.verify(blobTransferStats, Mockito.times(1)) + .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.REJECTED); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java index 70f9746b17a..fe6910cac66 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/ingestion/DefaultIngestionBackendTest.java @@ -45,10 +45,10 @@ public class DefaultIngestionBackendTest { @Mock private StorageService storageService; @Mock - private BlobTransferManager blobTransferManager; - @Mock private AggVersionedBlobTransferStats aggVersionedBlobTransferStats; @Mock + private BlobTransferManager blobTransferManager; + @Mock private DefaultIngestionBackend ingestionBackend; @Mock private VeniceStoreVersionConfig storeConfig; @@ -114,6 +114,9 @@ public void testStartConsumptionWithBlobTransfer() { ingestionBackend.startConsumption(storeConfig, PARTITION); verify(blobTransferManager).get(eq(STORE_NAME), eq(VERSION_NUMBER), eq(PARTITION)); + verify(aggVersionedBlobTransferStats).recordBlobTransferResponsesCount(eq(STORE_NAME), eq(VERSION_NUMBER)); + verify(aggVersionedBlobTransferStats) + .recordBlobTransferResponsesBasedOnBoostrapStatus(eq(STORE_NAME), eq(VERSION_NUMBER), eq(true)); } @Test @@ -127,6 +130,9 @@ public void testStartConsumptionWithBlobTransferWhenNoPeerFound() { CompletableFuture future = ingestionBackend.bootstrapFromBlobs(store, VERSION_NUMBER, PARTITION, 100L).toCompletableFuture(); assertTrue(future.isDone()); + verify(aggVersionedBlobTransferStats).recordBlobTransferResponsesCount(eq(STORE_NAME), eq(VERSION_NUMBER)); + verify(aggVersionedBlobTransferStats) + .recordBlobTransferResponsesBasedOnBoostrapStatus(eq(STORE_NAME), eq(VERSION_NUMBER), eq(false)); } @Test @@ -146,6 +152,9 @@ public void testNotStartBootstrapFromBlobTransferWhenNotLagging() { ingestionBackend.bootstrapFromBlobs(store, VERSION_NUMBER, PARTITION, laggingThreshold).toCompletableFuture(); assertTrue(result.isDone()); verify(blobTransferManager, never()).get(eq(STORE_NAME), eq(VERSION_NUMBER), eq(PARTITION)); + verify(aggVersionedBlobTransferStats, never()).recordBlobTransferResponsesCount(eq(STORE_NAME), eq(VERSION_NUMBER)); + verify(aggVersionedBlobTransferStats, never()) + .recordBlobTransferResponsesBasedOnBoostrapStatus(eq(STORE_NAME), eq(VERSION_NUMBER), eq(false)); } @Test From cd4d52fb0699997cc907b515ff55a427929f1dd5 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 3 Dec 2024 14:20:32 -0800 Subject: [PATCH 05/10] code review: 1. add unit test for blob stats. 2. change the calculation --- .../blobtransfer/BlobTransferUtils.java | 2 +- .../AggVersionedBlobTransferStatsTest.java | 144 ++++++++++++++++++ 2 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java index 9c413a89625..4eec114b785 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java @@ -72,7 +72,7 @@ private static double calculateThroughputInMBPerSec(File partitionDir, double tr // Calculate total size of all files in the directory long totalSizeInBytes = getTotalSizeOfFiles(partitionDir); // Convert bytes to MB - double totalSizeInMB = totalSizeInBytes / (1024.0 * 1024.0); + double totalSizeInMB = totalSizeInBytes / (1000.0 * 1000.0); // Calculate throughput in MB/sec double throughput = totalSizeInMB / transferTimeInSec; return throughput; diff --git a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java new file mode 100644 index 00000000000..596ffbd2f61 --- /dev/null +++ b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java @@ -0,0 +1,144 @@ +package com.linkedin.venice.stats; + +import static java.lang.Double.NaN; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferStatus; +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; +import com.linkedin.venice.meta.OfflinePushStrategy; +import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.ReadOnlyStoreRepository; +import com.linkedin.venice.meta.ReadStrategy; +import com.linkedin.venice.meta.RoutingStrategy; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.ZKStore; +import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; +import io.tehuti.metrics.MetricsRepository; +import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; +import java.util.ArrayList; +import java.util.List; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class AggVersionedBlobTransferStatsTest { + @Test + public void testRecordBlobTransferMetrics() { + MetricsRepository metricsRepo = MetricsRepositoryUtils.createSingleThreadedMetricsRepository(); + MockTehutiReporter reporter = new MockTehutiReporter(); + VeniceServerConfig mockVeniceServerConfig = Mockito.mock(VeniceServerConfig.class); + + String storeName = Utils.getUniqueString("store_foo"); + + metricsRepo.addReporter(reporter); + ReadOnlyStoreRepository mockMetaRepository = mock(ReadOnlyStoreRepository.class); + doReturn(Int2ObjectMaps.emptyMap()).when(mockVeniceServerConfig).getKafkaClusterIdToAliasMap(); + doReturn(true).when(mockVeniceServerConfig).isUnregisterMetricForDeletedStoreEnabled(); + + AggVersionedBlobTransferStats stats = + new AggVersionedBlobTransferStats(metricsRepo, mockMetaRepository, mockVeniceServerConfig); + + Store mockStore = createStore(storeName); + List storeList = new ArrayList<>(); + storeList.add(mockStore); + + doReturn(mockStore).when(mockMetaRepository).getStoreOrThrow(any()); + doReturn(storeList).when(mockMetaRepository).getAllStores(); + + stats.loadAllStats(); + storeName = mockStore.getName(); + // initial stats + // Gauge default value is NaN + Assert + .assertEquals(reporter.query("." + storeName + "_total--blob_transfer_time.IngestionStatsGauge").value(), NaN); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + NaN); + // Count default value is 0.0 + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_failed_num_requests.IngestionStatsGauge").value(), + 0.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_rejected_num_requests.IngestionStatsGauge").value(), + 0.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_successful_num_requests.IngestionStatsGauge").value(), + 0.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_total_num_requests.IngestionStatsGauge").value(), + 0.0); + + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + 0.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + 0.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + 0.0); + + // Record request count + stats.recordBlobTransferRequestsCount(storeName, 1); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_total_num_requests.IngestionStatsGauge").value(), + 1.0); + // Record request status + stats.recordBlobTransferRequestsStatus(storeName, 1, BlobTransferStatus.SUCCESS); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_successful_num_requests.IngestionStatsGauge").value(), + 1.0); + stats.recordBlobTransferRequestsStatus(storeName, 1, BlobTransferStatus.FAILED); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_failed_num_requests.IngestionStatsGauge").value(), + 1.0); + stats.recordBlobTransferRequestsStatus(storeName, 1, BlobTransferStatus.REJECTED); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_rejected_num_requests.IngestionStatsGauge").value(), + 1.0); + + // Record response count + stats.recordBlobTransferResponsesCount(storeName, 1); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + 1.0); + // Record response status + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, 1, true); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + 1.0); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, 1, false); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + 1.0); + + // Record file receive throughput + stats.recordBlobTransferFileReceiveThroughput(storeName, 1, 1000.0); + Assert.assertEquals( + reporter.query("." + storeName + "_total--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + 1000.0); + + // Record blob transfer time + stats.recordBlobTransferTimeInSec(storeName, 1, 20.0); + Assert + .assertEquals(reporter.query("." + storeName + "_total--blob_transfer_time.IngestionStatsGauge").value(), 20.0); + } + + private Store createStore(String storeName) { + return new ZKStore( + storeName, + "", + 10, + PersistenceType.ROCKS_DB, + RoutingStrategy.CONSISTENT_HASH, + ReadStrategy.ANY_OF_ONLINE, + OfflinePushStrategy.WAIT_ALL_REPLICAS, + 1); + } +} From 60a5f0dc20106c2e215fb941f5615980172d6353 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 3 Dec 2024 16:25:05 -0800 Subject: [PATCH 06/10] Remove the request related metrics and tests --- .../blobtransfer/BlobTransferUtil.java | 16 +--- .../blobtransfer/BlobTransferUtils.java | 10 --- .../BlobTransferNettyChannelInitializer.java | 12 +-- .../server/P2PBlobTransferService.java | 10 +-- .../server/P2PFileTransferServerHandler.java | 66 +------------- .../ingestion/DefaultIngestionBackend.java | 8 ++ .../stats/AggVersionedBlobTransferStats.java | 32 +++++-- .../davinci/stats/BlobTransferStats.java | 85 ------------------- .../stats/BlobTransferStatsReporter.java | 20 ----- .../TestNettyP2PBlobTransferManager.java | 8 +- .../TestP2PFileTransferServerHandler.java | 26 +----- .../AggVersionedBlobTransferStatsTest.java | 21 +---- 12 files changed, 43 insertions(+), 271 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java index 03d4b8dfba3..f857de5ec34 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtil.java @@ -77,14 +77,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForDVCAndStart( snapshotRetentionTimeInMin); AbstractAvroStoreClient storeClient = new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig); - P2PBlobTransferService p2pBlobTransferService = new P2PBlobTransferService( - p2pTransferServerPort, - baseDir, - blobTransferMaxTimeoutInMin, - blobSnapshotManager, - aggVersionedBlobTransferStats); BlobTransferManager manager = new NettyP2PBlobTransferManager( - p2pBlobTransferService, + new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new DaVinciBlobFinder(storeClient), baseDir, @@ -125,14 +119,8 @@ public static BlobTransferManager getP2PBlobTransferManagerForServerAndSta storageMetadataService, maxConcurrentSnapshotUser, snapshotRetentionTimeInMin); - P2PBlobTransferService p2pBlobTransferService = new P2PBlobTransferService( - p2pTransferServerPort, - baseDir, - blobTransferMaxTimeoutInMin, - blobSnapshotManager, - aggVersionedBlobTransferStats); BlobTransferManager manager = new NettyP2PBlobTransferManager( - p2pBlobTransferService, + new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager), new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService), new ServerBlobFinder(customizedViewFuture), baseDir, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java index 4eec114b785..5da87d39eae 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java @@ -22,16 +22,6 @@ public enum BlobTransferType { FILE, METADATA } - /** - * Enum for the status of a blob transfer - * SUCCESS: the blob transfer is successfully - * REJECTED: the blob transfer request is rejected, no any file is sent - * FAILED: the blob transfer is failed during the transfer - */ - public enum BlobTransferStatus { - SUCCESS, REJECTED, FAILED - } - /** * Check if the HttpResponse message is for metadata. * @param msg the HttpResponse message diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java index 019c2ac18b7..44fd34e6f33 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java @@ -1,7 +1,6 @@ package com.linkedin.davinci.blobtransfer.server; import com.linkedin.davinci.blobtransfer.BlobSnapshotManager; -import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; @@ -15,17 +14,14 @@ public class BlobTransferNettyChannelInitializer extends ChannelInitializer { if (future.isSuccess()) { - updateBlobTransferStatsForSuccessTransfer(finalStoreName, finalVersion); LOGGER.debug("All files sent successfully for {}", fullResourceName); } else { LOGGER.error("Failed to send all files for {}", fullResourceName, future.cause()); @@ -309,48 +289,4 @@ private BlobTransferPayload parseBlobTransferPayload(URI uri) throws IllegalArgu throw new IllegalArgumentException("Invalid request for fetching blob at " + uri.getPath()); } } - - /** - * Update blob transfer stats based on the error type - * @param httpResponseStatus - */ - private void updateBlobTransferStatsBasedOnErrorType( - String storeName, - int version, - HttpResponseStatus httpResponseStatus) { - try { - if (httpResponseStatus.equals(HttpResponseStatus.BAD_REQUEST) - || httpResponseStatus.equals(HttpResponseStatus.NOT_FOUND) - || httpResponseStatus.equals(HttpResponseStatus.FORBIDDEN) - || httpResponseStatus.equals(HttpResponseStatus.METHOD_NOT_ALLOWED)) { - aggVersionedBlobTransferStats - .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.REJECTED); - } else { - aggVersionedBlobTransferStats - .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.FAILED); - } - } catch (Exception e) { - LOGGER.error( - "Failed to update blob transfer request stats based on error type for store {} version {}", - storeName, - version, - e); - } - } - - /** - * Update blob transfer stats when having success transfer - */ - private void updateBlobTransferStatsForSuccessTransfer(String storeName, int version) { - try { - aggVersionedBlobTransferStats - .recordBlobTransferRequestsStatus(storeName, version, BlobTransferUtils.BlobTransferStatus.SUCCESS); - } catch (Exception e) { - LOGGER.error( - "Failed to update blob transfer request stats based on success transfer for store {} version {}", - storeName, - version, - e); - } - } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java index 6c34e1ea44a..344263cb634 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/DefaultIngestionBackend.java @@ -263,6 +263,14 @@ private void syncStoreVersionConfig(Store store, VeniceStoreVersionConfig storeC * @param isBlobTransferSuccess true if the blob transfer is successful, false otherwise. */ private void updateBlobTransferResponseStats(boolean isBlobTransferSuccess, String storeName, int version) { + if (blobTransferManager.getAggVersionedBlobTransferStats() == null) { + LOGGER.error( + "Blob transfer stats is not initialized. Skip updating blob transfer response stats for store {} version {}", + storeName, + version); + return; + } + try { // Record the blob transfer request count. blobTransferManager.getAggVersionedBlobTransferStats().recordBlobTransferResponsesCount(storeName, version); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java index 4173fc38c77..25dbabfd7eb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedBlobTransferStats.java @@ -1,6 +1,5 @@ package com.linkedin.davinci.stats; -import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferStatus; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import io.tehuti.metrics.MetricsRepository; @@ -27,18 +26,21 @@ public AggVersionedBlobTransferStats( serverConfig.isUnregisterMetricForDeletedStoreEnabled()); } - public void recordBlobTransferRequestsCount(String storeName, int version) { - recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferRequestsCount()); - } - - public void recordBlobTransferRequestsStatus(String storeName, int version, BlobTransferStatus status) { - recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferRequestsStatus(status)); - } - + /** + * Record the blob transfer request count + * @param storeName + * @param version + */ public void recordBlobTransferResponsesCount(String storeName, int version) { recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferResponsesCount()); } + /** + * Record the blob transfer request count based on the bootstrap status + * @param storeName the store name + * @param version the version of the store + * @param isBlobTransferSuccess true if the blob transfer is successful, false otherwise + */ public void recordBlobTransferResponsesBasedOnBoostrapStatus( String storeName, int version, @@ -49,10 +51,22 @@ public void recordBlobTransferResponsesBasedOnBoostrapStatus( stats -> stats.recordBlobTransferResponsesBasedOnBoostrapStatus(isBlobTransferSuccess)); } + /** + * Record the blob transfer file send throughput + * @param storeName the store name + * @param version the version of the store + * @param throughput the throughput in MB/sec + */ public void recordBlobTransferFileReceiveThroughput(String storeName, int version, double throughput) { recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferFileReceiveThroughput(throughput)); } + /** + * Record the blob transfer file receive throughput + * @param storeName the store name + * @param version the version of the store + * @param timeInSec the time in seconds + */ public void recordBlobTransferTimeInSec(String storeName, int version, double timeInSec) { recordVersionedAndTotalStat(storeName, version, stats -> stats.recordBlobTransferTimeInSec(timeInSec)); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java index 32eb096d690..20dfcf9e1ef 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStats.java @@ -1,6 +1,5 @@ package com.linkedin.davinci.stats; -import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferStatus; import io.tehuti.metrics.MetricConfig; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; @@ -16,14 +15,6 @@ public class BlobTransferStats { private static final Logger LOGGER = LogManager.getLogger(BlobTransferStats.class); - // Per-instance metrics - // As a receiver, track the number of requests received from remote hosts, - // along with counts for successful, failed, and rejected requests handled by this host. - protected static final String BLOB_TRANSFER_TOTAL_NUM_REQUESTS = "blob_transfer_total_num_requests"; - protected static final String BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS = "blob_transfer_successful_num_requests"; - protected static final String BLOB_TRANSFER_FAILED_NUM_REQUESTS = "blob_transfer_failed_num_requests"; - protected static final String BLOB_TRANSFER_REJECTED_NUM_REQUESTS = "blob_transfer_rejected_num_requests"; - // As a sender, track the number of requests sent for bootstrap, // including counts for successful and failed responses from the remote receiver. // This can also represent the number of partitions successfully or unsuccessfully bootstrapped via blob transfer. @@ -37,14 +28,6 @@ public class BlobTransferStats { private static final MetricConfig METRIC_CONFIG = new MetricConfig(); private final MetricsRepository localMetricRepository; - private Count blobTransferTotalNumRequestsCount = new Count(); - private Sensor blobTransferTotalNumRequestsSensor; - private Count blobTransferSuccessNumRequestsCount = new Count(); - private Sensor blobTransferSuccessNumRequestsSensor; - private Count blobTransferFailedNumRequestsCount = new Count(); - private Sensor blobTransferFailedNumRequestsSensor; - private Count blobTransferRejectedNumRequestsCount = new Count(); - private Sensor blobTransferRejectedNumRequestsSensor; private Count blobTransferTotalNumResponsesCount = new Count(); private Sensor blobTransferTotalNumResponsesSensor; private Count blobTransferSuccessNumResponsesCount = new Count(); @@ -59,20 +42,6 @@ public class BlobTransferStats { public BlobTransferStats() { localMetricRepository = new MetricsRepository(METRIC_CONFIG); - blobTransferTotalNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_TOTAL_NUM_REQUESTS); - blobTransferTotalNumRequestsSensor.add(BLOB_TRANSFER_TOTAL_NUM_REQUESTS, blobTransferTotalNumRequestsCount); - - blobTransferSuccessNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS); - blobTransferSuccessNumRequestsSensor - .add(BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS, blobTransferSuccessNumRequestsCount); - - blobTransferFailedNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_FAILED_NUM_REQUESTS); - blobTransferFailedNumRequestsSensor.add(BLOB_TRANSFER_FAILED_NUM_REQUESTS, blobTransferFailedNumRequestsCount); - - blobTransferRejectedNumRequestsSensor = localMetricRepository.sensor(BLOB_TRANSFER_REJECTED_NUM_REQUESTS); - blobTransferRejectedNumRequestsSensor - .add(BLOB_TRANSFER_REJECTED_NUM_REQUESTS, blobTransferRejectedNumRequestsCount); - blobTransferTotalNumResponsesSensor = localMetricRepository.sensor(BLOB_TRANSFER_TOTAL_NUM_RESPONSES); blobTransferTotalNumResponsesSensor.add(BLOB_TRANSFER_TOTAL_NUM_RESPONSES, blobTransferTotalNumResponsesCount); @@ -90,28 +59,6 @@ public BlobTransferStats() { blobTransferTimeSensor.add(BLOB_TRANSFER_TIME, blobTransferTimeGauge); } - /** - * When receiving a blob transfer request, bump the total requests amount this host receive. - */ - public void recordBlobTransferRequestsCount() { - blobTransferTotalNumRequestsSensor.record(); - } - - /** - * When receiving a blob transfer request, bump the total requests amount this host receive, - * based on the host status, bump the successful, failed or rejected requests amount. - * @param status the status of the blob transfer request - */ - public void recordBlobTransferRequestsStatus(BlobTransferStatus status) { - if (status.equals(BlobTransferStatus.SUCCESS)) { - blobTransferSuccessNumRequestsSensor.record(); - } else if (status.equals(BlobTransferStatus.FAILED)) { - blobTransferFailedNumRequestsSensor.record(); - } else if (status.equals(BlobTransferStatus.REJECTED)) { - blobTransferRejectedNumRequestsSensor.record(); - } - } - /** * Update the blob transfer response stats regardless the response status. */ @@ -152,38 +99,6 @@ public void recordBlobTransferTimeInSec(double time) { * All get methods to get the sensor value * @return the sensor value */ - public double getBlobTransferTotalNumRequests() { - if (blobTransferTotalNumRequestsCount == null) { - return 0; - } else { - return blobTransferTotalNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); - } - } - - public double getBlobTransferSuccessNumRequests() { - if (blobTransferSuccessNumRequestsCount == null) { - return 0; - } else { - return blobTransferSuccessNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); - } - } - - public double getBlobTransferFailedNumRequests() { - if (blobTransferFailedNumRequestsCount == null) { - return 0; - } else { - return blobTransferFailedNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); - } - } - - public double getBlobTransferRejectedNumRequests() { - if (blobTransferRejectedNumRequestsCount == null) { - return 0; - } else { - return blobTransferRejectedNumRequestsCount.measure(METRIC_CONFIG, System.currentTimeMillis()); - } - } - public double getBlobTransferTotalNumResponses() { if (blobTransferTotalNumResponsesCount == null) { return 0; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java index 7e19389fc40..10123adde44 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/BlobTransferStatsReporter.java @@ -23,26 +23,6 @@ public BlobTransferStatsReporter(MetricsRepository metricsRepository, String sto @Override protected void registerStats() { - registerSensor( - new IngestionStatsGauge( - this, - () -> getStats().getBlobTransferTotalNumRequests(), - BlobTransferStats.BLOB_TRANSFER_TOTAL_NUM_REQUESTS)); - registerSensor( - new IngestionStatsGauge( - this, - () -> getStats().getBlobTransferSuccessNumRequests(), - BlobTransferStats.BLOB_TRANSFER_SUCCESSFUL_NUM_REQUESTS)); - registerSensor( - new IngestionStatsGauge( - this, - () -> getStats().getBlobTransferFailedNumRequests(), - BlobTransferStats.BLOB_TRANSFER_FAILED_NUM_REQUESTS)); - registerSensor( - new IngestionStatsGauge( - this, - () -> getStats().getBlobTransferRejectedNumRequests(), - BlobTransferStats.BLOB_TRANSFER_REJECTED_NUM_REQUESTS)); registerSensor( new IngestionStatsGauge( this, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java index bc19592266c..03288d90316 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java @@ -80,12 +80,8 @@ public void setUp() throws Exception { blobSnapshotManager = Mockito.spy(new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService)); - server = new P2PBlobTransferService( - port, - tmpSnapshotDir.toString(), - blobTransferMaxTimeoutInMin, - blobSnapshotManager, - blobTransferStats); + server = + new P2PBlobTransferService(port, tmpSnapshotDir.toString(), blobTransferMaxTimeoutInMin, blobSnapshotManager); client = Mockito.spy(new NettyFileTransferClient(port, tmpPartitionDir.toString(), storageMetadataService)); finder = mock(BlobFinder.class); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 9779a42b53e..948f0dede2a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -4,8 +4,6 @@ import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_TYPE; import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferType; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.blobtransfer.server.P2PFileTransferServerHandler; @@ -70,11 +68,8 @@ public void setUp() throws IOException { blobSnapshotManager = new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService); - serverHandler = new P2PFileTransferServerHandler( - baseDir.toString(), - blobTransferMaxTimeoutInMin, - blobSnapshotManager, - blobTransferStats); + serverHandler = + new P2PFileTransferServerHandler(baseDir.toString(), blobTransferMaxTimeoutInMin, blobSnapshotManager); ch = new EmbeddedChannel(serverHandler); } @@ -96,7 +91,6 @@ public void testRejectNonGETMethod() { ch.writeInbound(request); FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 405); - Mockito.verify(blobTransferStats, Mockito.never()).recordBlobTransferRequestsCount(anyString(), anyInt()); } @Test @@ -105,7 +99,6 @@ public void testRejectInvalidPath() { ch.writeInbound(request); FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 400); - Mockito.verify(blobTransferStats, Mockito.never()).recordBlobTransferRequestsCount(anyString(), anyInt()); } @Test @@ -124,9 +117,6 @@ public void testRejectNonExistPath() { FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 404); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); - Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); - Mockito.verify(blobTransferStats, Mockito.times(1)) - .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.REJECTED); } @Test @@ -149,9 +139,6 @@ public void testFailOnAccessPath() throws IOException { FullHttpResponse response = ch.readOutbound(); Assert.assertEquals(response.status().code(), 500); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); - Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); - Mockito.verify(blobTransferStats, Mockito.times(1)) - .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.FAILED); } @Test @@ -213,9 +200,6 @@ public void testTransferSingleFileAndSingleMetadataForBatchStore() throws IOExce Assert.assertTrue(response instanceof DefaultHttpResponse); DefaultHttpResponse endOfTransfer = (DefaultHttpResponse) response; Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); - Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); - Mockito.verify(blobTransferStats, Mockito.times(1)) - .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.SUCCESS); // end of STATUS response Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); @@ -306,9 +290,6 @@ public void testTransferMultipleFiles() throws IOException { Assert.assertTrue(response instanceof DefaultHttpResponse); DefaultHttpResponse endOfTransfer = (DefaultHttpResponse) response; Assert.assertEquals(endOfTransfer.headers().get(BLOB_TRANSFER_STATUS), BLOB_TRANSFER_COMPLETED); - Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); - Mockito.verify(blobTransferStats, Mockito.times(1)) - .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.SUCCESS); // end of STATUS response Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); @@ -335,8 +316,5 @@ public void testWhenMetadataCreateError() throws IOException { Assert.assertEquals(((DefaultHttpResponse) response).status(), HttpResponseStatus.NOT_FOUND); Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers("myStore_v1", 10), 0); - Mockito.verify(blobTransferStats, Mockito.times(1)).recordBlobTransferRequestsCount("myStore", 1); - Mockito.verify(blobTransferStats, Mockito.times(1)) - .recordBlobTransferRequestsStatus("myStore", 1, BlobTransferUtils.BlobTransferStatus.REJECTED); } } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java index 596ffbd2f61..0baab965866 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java @@ -5,7 +5,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferStatus; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.venice.meta.OfflinePushStrategy; @@ -84,25 +83,6 @@ public void testRecordBlobTransferMetrics() { reporter.query("." + storeName + "_total--blob_transfer_total_num_responses.IngestionStatsGauge").value(), 0.0); - // Record request count - stats.recordBlobTransferRequestsCount(storeName, 1); - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_total_num_requests.IngestionStatsGauge").value(), - 1.0); - // Record request status - stats.recordBlobTransferRequestsStatus(storeName, 1, BlobTransferStatus.SUCCESS); - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_successful_num_requests.IngestionStatsGauge").value(), - 1.0); - stats.recordBlobTransferRequestsStatus(storeName, 1, BlobTransferStatus.FAILED); - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_failed_num_requests.IngestionStatsGauge").value(), - 1.0); - stats.recordBlobTransferRequestsStatus(storeName, 1, BlobTransferStatus.REJECTED); - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_rejected_num_requests.IngestionStatsGauge").value(), - 1.0); - // Record response count stats.recordBlobTransferResponsesCount(storeName, 1); Assert.assertEquals( @@ -113,6 +93,7 @@ public void testRecordBlobTransferMetrics() { Assert.assertEquals( reporter.query("." + storeName + "_total--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), 1.0); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(storeName, 1, false); Assert.assertEquals( reporter.query("." + storeName + "_total--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), From 1a1a0a3d9fdb186093cc468d28e31b120bdcbbd4 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Tue, 3 Dec 2024 18:22:07 -0800 Subject: [PATCH 07/10] add more unit test to fix the coverage issue --- .../davinci/stats/BlobTransferStatsTest.java | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java new file mode 100644 index 00000000000..120dbbcf76f --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java @@ -0,0 +1,106 @@ +package com.linkedin.davinci.stats; + +import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import io.tehuti.metrics.MetricsRepository; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class BlobTransferStatsTest { + @Test + public void testRecordBlobTransferResponsesCount() { + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferResponsesCount(); + Assert.assertEquals(1.0, stats.getBlobTransferTotalNumResponses()); + } + + @Test + public void testRecordBlobTransferResponsesBasedOnBootstrapStatus() { + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(true); + Assert.assertEquals(1.0, stats.getBlobTransferSuccessNumResponses()); + + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(false); + Assert.assertEquals(1.0, stats.getBlobTransferFailedNumResponses()); + } + + @Test + public void testRecordBlobTransferFileReceiveThroughput() { + double throughput = 5.0; + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferFileReceiveThroughput(throughput); + Assert.assertEquals(throughput, stats.getBlobTransferFileReceiveThroughput()); + } + + @Test + public void testRecordBlobTransferTimeInSec() { + double timeInSec = 10.0; + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferTimeInSec(timeInSec); + Assert.assertEquals(timeInSec, stats.getBlobTransferTime()); + } + + @Test + public void BlobTransferStatsReporterCanReportForGauge() { + MetricsRepository metricsRepository = new MetricsRepository(); + MockTehutiReporter reporter = new MockTehutiReporter(); + metricsRepository.addReporter(reporter); + String storeName = Utils.getUniqueString("store"); + + BlobTransferStatsReporter blobTransferStatsReporter = + new BlobTransferStatsReporter(metricsRepository, storeName, null); + + Assert.assertEquals(reporter.query("." + storeName + "--blob_transfer_time.IngestionStatsGauge").value(), -20.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + -20.0); + + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferFileReceiveThroughput(5.0); + stats.recordBlobTransferTimeInSec(10.0); + blobTransferStatsReporter.setStats(stats); + + Assert.assertEquals(reporter.query("." + storeName + "--blob_transfer_time.IngestionStatsGauge").value(), 10.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), + 5.0); + } + + @Test + public void BlobTransferStatsReporterCanReportForCount() { + MetricsRepository metricsRepository = new MetricsRepository(); + MockTehutiReporter reporter = new MockTehutiReporter(); + metricsRepository.addReporter(reporter); + String storeName = Utils.getUniqueString("store"); + + BlobTransferStatsReporter blobTransferStatsReporter = + new BlobTransferStatsReporter(metricsRepository, storeName, null); + + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + -20.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + -20.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + -20.0); + + BlobTransferStats stats = new BlobTransferStats(); + stats.recordBlobTransferResponsesCount(); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(true); + stats.recordBlobTransferResponsesBasedOnBoostrapStatus(false); + + blobTransferStatsReporter.setStats(stats); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_total_num_responses.IngestionStatsGauge").value(), + 1.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_successful_num_responses.IngestionStatsGauge").value(), + 1.0); + Assert.assertEquals( + reporter.query("." + storeName + "--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), + 1.0); + } +} From 8533d7e4a9f5953ce39ffde555693960668141cf Mon Sep 17 00:00:00 2001 From: jingy-li Date: Wed, 4 Dec 2024 10:26:23 -0800 Subject: [PATCH 08/10] remove request metrics tests --- .../stats/AggVersionedBlobTransferStatsTest.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java index 0baab965866..d49b2f10b5c 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/stats/AggVersionedBlobTransferStatsTest.java @@ -60,19 +60,6 @@ public void testRecordBlobTransferMetrics() { reporter.query("." + storeName + "_total--blob_transfer_file_receive_throughput.IngestionStatsGauge").value(), NaN); // Count default value is 0.0 - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_failed_num_requests.IngestionStatsGauge").value(), - 0.0); - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_rejected_num_requests.IngestionStatsGauge").value(), - 0.0); - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_successful_num_requests.IngestionStatsGauge").value(), - 0.0); - Assert.assertEquals( - reporter.query("." + storeName + "_total--blob_transfer_total_num_requests.IngestionStatsGauge").value(), - 0.0); - Assert.assertEquals( reporter.query("." + storeName + "_total--blob_transfer_failed_num_responses.IngestionStatsGauge").value(), 0.0); From cd70a17e118ef372661e535a972f981cf5ed2c4b Mon Sep 17 00:00:00 2001 From: jingy-li Date: Wed, 4 Dec 2024 10:49:28 -0800 Subject: [PATCH 09/10] fix spotbugsTest error --- .../davinci/blobtransfer/TestP2PFileTransferServerHandler.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java index 948f0dede2a..6197269ba8b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.blobtransfer.server.P2PFileTransferServerHandler; -import com.linkedin.davinci.stats.AggVersionedBlobTransferStats; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.venice.kafka.protocol.state.PartitionState; @@ -55,7 +54,6 @@ public class TestP2PFileTransferServerHandler { BlobSnapshotManager blobSnapshotManager; ReadOnlyStoreRepository readOnlyStoreRepository; StorageEngineRepository storageEngineRepository; - AggVersionedBlobTransferStats blobTransferStats; @BeforeMethod public void setUp() throws IOException { @@ -64,7 +62,6 @@ public void setUp() throws IOException { storageMetadataService = Mockito.mock(StorageMetadataService.class); readOnlyStoreRepository = Mockito.mock(ReadOnlyStoreRepository.class); storageEngineRepository = Mockito.mock(StorageEngineRepository.class); - blobTransferStats = Mockito.mock(AggVersionedBlobTransferStats.class); blobSnapshotManager = new BlobSnapshotManager(readOnlyStoreRepository, storageEngineRepository, storageMetadataService); From d8d6c24f051398072c8614322a93aeff607c7b24 Mon Sep 17 00:00:00 2001 From: jingy-li Date: Wed, 4 Dec 2024 11:03:12 -0800 Subject: [PATCH 10/10] fix spotbugsTest error 2 --- .../com/linkedin/davinci/stats/BlobTransferStatsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java index 120dbbcf76f..81bf64271c2 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/BlobTransferStatsTest.java @@ -42,7 +42,7 @@ public void testRecordBlobTransferTimeInSec() { } @Test - public void BlobTransferStatsReporterCanReportForGauge() { + public void blobTransferStatsReporterCanReportForGauge() { MetricsRepository metricsRepository = new MetricsRepository(); MockTehutiReporter reporter = new MockTehutiReporter(); metricsRepository.addReporter(reporter); @@ -68,7 +68,7 @@ public void BlobTransferStatsReporterCanReportForGauge() { } @Test - public void BlobTransferStatsReporterCanReportForCount() { + public void blobTransferStatsReporterCanReportForCount() { MetricsRepository metricsRepository = new MetricsRepository(); MockTehutiReporter reporter = new MockTehutiReporter(); metricsRepository.addReporter(reporter);