Skip to content

Commit

Permalink
[VCR] Impl upload blob to Azure
Browse files Browse the repository at this point in the history
  • Loading branch information
snalli committed Aug 18, 2023
1 parent dbb41b0 commit 1e1c1a5
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 4 deletions.
41 changes: 41 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/MessageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.github.ambry.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;


Expand Down Expand Up @@ -182,6 +184,45 @@ public MessageInfo(StoreKey key, long size, boolean deleted, boolean ttlUpdated,
this.lifeVersion = lifeVersion;
}

public static final String ACCOUNT_ID = "accountId";
public static final String CONTAINER_ID = "containerId";
public static final String CRC = "crc";
public static final String EXPIRATION_TIME = "expirationTimeInMs";
public static final String LIFE_VERSION = "lifeVersion";
public static final String OPERATION_TIME = "operationTimeMs";
public static final String SIZE = "size";
public static final String DELETE_TIME = "deleteTimeMs";
public static final String UPDATE_TTL_TIME = "updateTTLTimeMs";
public static final String UNDELETE_TIME = "undeleteTimeMs";

/**
* @return a {@link HashMap} of metadata key-value pairs.
*/
public Map<String, String> toMap() {
// if we are here, then we are just putting the blob in cloud
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put(ACCOUNT_ID, String.valueOf(accountId));
hashMap.put(CONTAINER_ID, String.valueOf(containerId));
hashMap.put(CRC, String.valueOf(crc));
hashMap.put(EXPIRATION_TIME, String.valueOf(expirationTimeInMs));
hashMap.put(LIFE_VERSION, String.valueOf(lifeVersion));
hashMap.put(OPERATION_TIME, String.valueOf(operationTimeMs));
hashMap.put(SIZE, String.valueOf(size));
return hashMap;
}

/**
* @return a {@link MessageInfo} of metadata
*/
public static MessageInfo fromMap(StoreKey storeKey, Map<String, String> map) {
// TODO : Check crc = null
// Having booleans for delete, undelete, ttl-update is bad bad bad idea ! But here we are !
return new MessageInfo(storeKey, Long.parseLong(map.get(SIZE)), map.containsKey(DELETE_TIME),
map.containsKey(UPDATE_TTL_TIME), map.containsKey(UNDELETE_TIME), Long.parseLong(map.get(EXPIRATION_TIME)),
Long.parseLong(map.get(CRC)), Short.parseShort(map.get(ACCOUNT_ID)), Short.parseShort(map.get(CONTAINER_ID)),
Long.parseLong(map.get(OPERATION_TIME)), Short.parseShort(map.get(LIFE_VERSION)));
}

public StoreKey getStoreKey() {
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,24 @@

package com.github.ambry.cloud;

import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.BlockBlobItem;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.ambry.cloud.azure.AzureCloudConfig;
import com.github.ambry.cloud.azure.AzureMetrics;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.config.CloudConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.MessageFormatWriteSet;
import com.github.ambry.replication.FindToken;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.MessageInfo;
Expand All @@ -32,6 +42,9 @@
import com.github.ambry.store.StoreInfo;
import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreStats;
import com.github.ambry.utils.ByteBufferInputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -63,10 +76,78 @@ public CloudBlobStoreV2(VerifiableProperties properties, MetricRegistry metricRe
this.blobContainerClient = blobContainerClient;
}

/**
* Uploads Ambry blob to Azure blob storage.
* If blob already exists, then it catches BLOB_ALREADY_EXISTS exception and proceeds to next blob.
* It fails for any other exception.
* @param messageSetToWrite The message set to write to the store
* Only the StoreKey, OperationTime, ExpirationTime, LifeVersion should be used in this method.
* @throws StoreException
*/
@Override
public void put(MessageWriteSet messageSetToWrite) throws StoreException {
// TODO
}
MessageFormatWriteSet messageFormatWriteSet = (MessageFormatWriteSet) messageSetToWrite;
Timer.Context storageTimer = null;
// For-each loop must be outside try-catch. Loop must continue on BLOB_ALREADY_EXISTS exception
for (MessageInfo messageInfo : messageFormatWriteSet.getMessageSetInfo()) {
try {
// Read blob from input stream. It is not possible to just pass the input stream to azure-SDK.
// The SDK expects the stream to contain exactly the number of bytes to be read. If there are
// more bytes, it fails assuming the stream is corrupted. The inputStream we have here is a
// concatenation of many blobs and not just one blob.
ByteBuffer messageBuf = ByteBuffer.allocate((int) messageInfo.getSize());
if (messageFormatWriteSet.getStreamToWrite().read(messageBuf.array()) == -1) {
throw new RuntimeException(
String.format("Failed to read blob %s of %s bytes as end of stream has been reached",
messageInfo.getStoreKey().getID(), messageInfo.getSize()));
}

// Prepare to upload blob to Azure blob storage
// There is no parallelism but we still need to create and pass this object to SDK.
BlobParallelUploadOptions blobParallelUploadOptions =
new BlobParallelUploadOptions(new ByteBufferInputStream(messageBuf));
// To avoid overwriting, pass "*" to setIfNoneMatch(String ifNoneMatch)
// https://learn.microsoft.com/en-us/java/api/com.azure.storage.blob.blobclient?view=azure-java-stable
blobParallelUploadOptions.setRequestConditions(new BlobRequestConditions().setIfNoneMatch("*"));
blobParallelUploadOptions.setMetadata(messageInfo.toMap());
BlobHttpHeaders headers = new BlobHttpHeaders();
// Without content-type, download blob warns lack of content-type which floods the log.
headers.setContentType("application/octet-stream");
blobParallelUploadOptions.setHeaders(headers);

////////////////////////////////// Upload blob to Azure blob storage ////////////////////////////////////////
storageTimer = azureMetrics.blobUploadTime.time();
Response<BlockBlobItem> blockBlobItemResponse =
blobContainerClient.getBlobClient(messageInfo.getStoreKey().getID())
.uploadWithResponse(blobParallelUploadOptions, Duration.ofMillis(cloudConfig.cloudRequestTimeout),
Context.NONE);
////////////////////////////////// Upload blob to Azure blob storage ////////////////////////////////////////

// Metrics and log
// Success rate is effective, Counter monotonically increases
azureMetrics.blobUploadSuccessRate.mark();
// Measure ingestion rate, helps decide fleet size
azureMetrics.backupSuccessByteRate.mark(messageInfo.getSize());
logger.trace("Successful upload of blob {} to Azure blob storage with statusCode = {}",
messageInfo.getStoreKey().getID(), blockBlobItemResponse.getStatusCode());
} catch (Exception e) {
logger.error("Failed to upload blob {} to Azure blob storage because {}", messageInfo.getStoreKey().getID(),
e.getMessage());
if (e instanceof BlobStorageException
&& ((BlobStorageException) e).getErrorCode() == BlobErrorCode.BLOB_ALREADY_EXISTS) {
azureMetrics.blobUploadConflictCount.inc();
// This should never happen. If we invoke put(), then blob must be absent in the Store
return;
}
azureMetrics.blobUploadErrorCount.inc();
throw new RuntimeException(e);
} finally {
if (storageTimer != null) {
storageTimer.stop();
}
} // try-catch
} // for-each
} // put()

@Override
public void delete(List<MessageInfo> messageInfos) throws StoreException {
Expand Down Expand Up @@ -148,7 +229,8 @@ public void forceDelete(List<MessageInfo> infosToDelete) {
}

@Override
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries, String hostname, String remoteReplicaPath) throws StoreException {
public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries, String hostname,
String remoteReplicaPath) throws StoreException {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class AzureMetrics {
// Metric name constants
public static final String BLOB_UPLOAD_REQUEST_COUNT = "BlobUploadRequestCount";
public static final String BLOB_UPLOAD_SUCCESS_COUNT = "BlobUploadSuccessCount";
public static final String BLOB_UPLOAD_SUCCESS_RATE = "BlobUploadSuccessRate";
public static final String BLOB_UPLOAD_ERROR_COUNT = "BlobUploadErrorCount";
public static final String BLOB_DOWNLOAD_REQUEST_COUNT = "BlobDownloadRequestCount";
public static final String BLOB_DOWNLOAD_SUCCESS_COUNT = "BlobDownloadSuccessCount";
public static final String BLOB_DOWNLOAD_ERROR_COUNT = "BlobDownloadErrorCount";
Expand Down Expand Up @@ -81,6 +83,9 @@ public class AzureMetrics {
// Metrics
public final Counter blobUploadRequestCount;
public final Counter blobUploadSuccessCount;
public final Meter blobUploadSuccessRate;

public final Counter blobUploadErrorCount;
public final Counter blobDownloadRequestCount;
public final Counter blobDownloadSuccessCount;
public final Counter blobDownloadErrorCount;
Expand Down Expand Up @@ -142,6 +147,8 @@ public AzureMetrics(MetricRegistry registry) {
registry.counter(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPLOAD_REQUEST_COUNT));
blobUploadSuccessCount =
registry.counter(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPLOAD_SUCCESS_COUNT));
blobUploadSuccessRate = registry.meter(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPLOAD_SUCCESS_RATE));
blobUploadErrorCount = registry.counter(MetricRegistry.name(AzureCloudDestination.class, BLOB_UPLOAD_ERROR_COUNT));
blobDownloadRequestCount =
registry.counter(MetricRegistry.name(AzureCloudDestination.class, BLOB_DOWNLOAD_REQUEST_COUNT));
blobDownloadSuccessCount =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.slf4j.Logger;
Expand Down Expand Up @@ -419,7 +420,18 @@ protected BlobServiceClient createBlobStorageSyncClient() {
new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort));
}
HttpClient client = new NettyAsyncHttpClientBuilder().proxy(proxyOptions).build();
return buildBlobServiceSyncClient(client, new Configuration(), new RequestRetryOptions(), azureCloudConfig);
/**
*
retryPolicyType – Default value is EXPONENTIAL
maxTries – Default is 4
tryTimeoutInSeconds – Default is Integer.MAX_VALUE seconds, therefore bring it down
retryDelayInMs – Default value is 4ms when retryPolicyType is EXPONENTIAL
maxRetryDelayInMs – Default value is 120ms.
*/
RequestRetryOptions retryOptions =
new RequestRetryOptions(null, null, (int) TimeUnit.MILLISECONDS.toSeconds(cloudConfig.cloudRequestTimeout),
null, null, null);
return buildBlobServiceSyncClient(client, new Configuration(), retryOptions, azureCloudConfig);
} catch (MalformedURLException | InterruptedException | ExecutionException ex) {
logger.error("Error building ABS blob service client: {}", ex.getMessage());
throw new IllegalStateException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,12 @@ public long writeTo(Write writeChannel) throws StoreException {
public List<MessageInfo> getMessageSetInfo() {
return streamInfo;
}

/**
* Returns input stream
* @return InputStream
*/
public InputStream getStreamToWrite() {
return streamToWrite;
}
}

0 comments on commit 1e1c1a5

Please sign in to comment.