Skip to content

Commit

Permalink
[DR] Re-check integrity of Azure blobs (linkedin#2797)
Browse files Browse the repository at this point in the history
If the verification-app encounters a crc or blob-size mismatch, it is most likely due to differing versions of blob-properties on the server and cloud. So, re-verify the integrity of Azure blob by deserializing it.
  • Loading branch information
snalli authored Jun 10, 2024
1 parent e344dca commit b08e5b9
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* Status code blob-state match operation
*/
public enum BlobMatchStatus {
BLOB_INTACT_IN_AZURE,
BLOB_CORRUPT_IN_AZURE,
BLOB_STATE_MATCH,
BLOB_ABSENT,
BLOB_ABSENT_IN_AZURE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,23 @@
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.MessageFormatRecord;
import com.github.ambry.replication.BackupCheckerThread;
import com.github.ambry.replication.RemoteReplicaInfo;
import com.github.ambry.replication.ReplicationManager;
import com.github.ambry.replication.ReplicationMetrics;
import com.github.ambry.store.BlobStore;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageReadSet;
import com.github.ambry.store.StorageManager;
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreException;
import com.github.ambry.store.StoreFindToken;
import com.github.ambry.store.StoreGetOptions;
import com.github.ambry.store.StoreInfo;
import com.github.ambry.utils.Utils;
import java.io.File;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,29 @@
import com.github.ambry.clustermap.ReplicaSyncUpManager;
import com.github.ambry.commons.ResponseHandler;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.messageformat.MessageFormatRecord;
import com.github.ambry.network.NetworkClient;
import com.github.ambry.notification.NotificationSystem;
import com.github.ambry.protocol.ReplicaMetadataResponse;
import com.github.ambry.protocol.ReplicaMetadataResponseInfo;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.store.BlobMatchStatus;
import com.github.ambry.store.BlobStore;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageReadSet;
import com.github.ambry.store.StoreGetOptions;
import com.github.ambry.store.StoreInfo;
import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyConverter;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.store.Transformer;
import com.github.ambry.utils.NettyByteBufDataInputStream;
import com.github.ambry.utils.Time;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -67,6 +76,7 @@ public class BackupCheckerThread extends ReplicaThread {
protected final BackupCheckerFileManager fileManager;
protected final ReplicationConfig replicationConfig;
private final ReplicationMetrics metrics;
private final StoreKeyFactory storeKeyFactory;
protected HashMap<String, MessageInfo> azureBlobMap;
public static final String DR_Verifier_Keyword = "dr";
public static final String BLOB_STATE_MISMATCHES_FILE = "blob_state_mismatches";
Expand All @@ -77,7 +87,7 @@ public class BackupCheckerThread extends ReplicaThread {
public BackupCheckerThread(String threadName, FindTokenHelper findTokenHelper, ClusterMap clusterMap,
AtomicInteger correlationIdGenerator, DataNodeId dataNodeId, NetworkClient networkClient,
ReplicationConfig replicationConfig, ReplicationMetrics replicationMetrics, NotificationSystem notification,
StoreKeyConverter storeKeyConverter, Transformer transformer, MetricRegistry metricRegistry,
StoreKeyConverter storeKeyConverter, StoreKeyFactory storeKeyFactory, Transformer transformer, MetricRegistry metricRegistry,
boolean replicatingOverSsl, String datacenterName, ResponseHandler responseHandler, Time time,
ReplicaSyncUpManager replicaSyncUpManager, Predicate<MessageInfo> skipPredicate,
ReplicationManager.LeaderBasedReplicationAdmin leaderBasedReplicationAdmin) {
Expand All @@ -86,6 +96,7 @@ public BackupCheckerThread(String threadName, FindTokenHelper findTokenHelper, C
datacenterName, responseHandler, time, replicaSyncUpManager, skipPredicate, leaderBasedReplicationAdmin);
fileManager = new BackupCheckerFileManager(replicationConfig, metricRegistry);
this.replicationConfig = replicationConfig;
this.storeKeyFactory = storeKeyFactory;
azureBlobMap = new HashMap<>();
metrics = new ReplicationMetrics(clusterMap.getMetricRegistry(), Collections.emptyList());
// Reset these counters if re-using the same thread object
Expand Down Expand Up @@ -166,6 +177,37 @@ protected MessageInfo mapBlob(MessageInfo blob) {
return new MessageInfo(keyConvert, blob);
}

/**
* Re-check by fetching the local blob
* @param replica
* @param serverBlob
* @param azureBlob
* @return
*/
Set<BlobMatchStatus> recheck(RemoteReplicaInfo replica, MessageInfo serverBlob, MessageInfo azureBlob,
Set<BlobMatchStatus> status) {
BlobStore store = (BlobStore) replica.getLocalStore();
EnumSet<StoreGetOptions> storeGetOptions = EnumSet.of(StoreGetOptions.Store_Include_Deleted,
StoreGetOptions.Store_Include_Expired);
MessageReadSet rdset = null;
try {
StoreInfo stinfo = store.get(Collections.singletonList(azureBlob.getStoreKey()), storeGetOptions);
rdset = stinfo.getMessageReadSet();
rdset.doPrefetch(0, 0, rdset.sizeInBytes(0));
ByteBuf bytebuf = rdset.getPrefetchedData(0);
MessageFormatRecord.deserializeBlobAll(new NettyByteBufDataInputStream(bytebuf), storeKeyFactory);
return Collections.singleton(BLOB_INTACT_IN_AZURE);
} catch (Throwable e) {
logger.error("Failed to deserialize blob due to ", e);
status.add(BLOB_CORRUPT_IN_AZURE);
} finally {
if (rdset != null && rdset.count() > 0 && rdset.getPrefetchedData(0) != null) {
rdset.getPrefetchedData(0).release();
}
}
return status;
}

/**
* Checks each blob from server with its counterpart in cloud backup.
* There are 4 cases.
Expand Down Expand Up @@ -199,8 +241,18 @@ List<ExchangeMetadataResponse> handleReplicaMetadataResponse(ReplicaMetadataResp
MessageInfo azureBlob = azureBlobMap.remove(serverBlob.getStoreKey().getID());
Set<BlobMatchStatus> status = azureBlob == null ? Collections.singleton(BLOB_ABSENT_IN_AZURE)
: serverBlob.isEqual(azureBlob);
// if size mismatch, then re-check.
// this is the only workaround due to different versions on serialization.
if (status.contains(BLOB_STATE_SIZE_MISMATCH)) {
status = recheck(replica, serverBlob, azureBlob, status);
}
return new ImmutableTriple(serverBlob, azureBlob, status);
})
.filter(tuple -> {
Set<BlobMatchStatus> status = (Set<BlobMatchStatus>) tuple.getRight();
// ignore blobs that are intact in Azure, despite a state mismatch between server and cloud
return !status.contains(BLOB_INTACT_IN_AZURE);
})
.filter(tuple -> {
Set<BlobMatchStatus> status = (Set<BlobMatchStatus>) tuple.getRight();
// ignore blobs that match between server and azure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public ReplicaId getLocalReplicaId() {
return localReplicaId;
}

Store getLocalStore() {
public Store getLocalStore() {
return localStore;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public BackupCheckerThread getBackupCheckerThread(String threadName) {
Transformer transformer = Utils.getObj(transformerClassName, storeKeyFactory, storeKeyConverter);
return new BackupCheckerThread(threadName, tokenHelper, clusterMap, correlationIdGenerator, dataNodeId,
networkClientFactory.getNetworkClient(), replicationConfig, replicationMetrics, notification,
storeKeyConverter, transformer, metricRegistry, sslEnabledDatacenters.contains(dc), dc,
storeKeyConverter, storeKeyFactory, transformer, metricRegistry, sslEnabledDatacenters.contains(dc), dc,
new ResponseHandler(clusterMap), time, replicaSyncUpManager, skipPredicate, leaderBasedReplicationAdmin);
} catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException(e);
Expand All @@ -144,19 +144,10 @@ protected ReplicaThread getReplicaThread(String threadName, FindTokenHelper find
boolean replicatingOverSsl, String datacenterName, ResponseHandler responseHandler, Time time,
ReplicaSyncUpManager replicaSyncUpManager, Predicate<MessageInfo> skipPredicate,
ReplicationManager.LeaderBasedReplicationAdmin leaderBasedReplicationAdmin) {
switch (replicationConfig.replicationThreadType) {
case ReplicationConfig.BACKUP_CHECKER_THREAD:
return new BackupCheckerThread(threadName, tokenHelper, clusterMap, correlationIdGenerator, dataNodeId,
networkClient, replicationConfig, replicationMetrics, notification, storeKeyConverter, transformer,
metricRegistry, replicatingOverSsl, datacenterName, responseHandler, time, replicaSyncUpManager,
skipPredicate, leaderBasedReplicationAdmin);
case ReplicationConfig.DEFAULT_REPLICATION_THREAD:
default:
return new ReplicaThread(threadName, tokenHelper, clusterMap, correlationIdGenerator, dataNodeId, networkClient,
replicationConfig, replicationMetrics, notification, storeKeyConverter, transformer, metricRegistry,
replicatingOverSsl, datacenterName, responseHandler, time, replicaSyncUpManager, skipPredicate,
leaderBasedReplicationAdmin);
}
return new ReplicaThread(threadName, tokenHelper, clusterMap, correlationIdGenerator, dataNodeId, networkClient,
replicationConfig, replicationMetrics, notification, storeKeyConverter, transformer, metricRegistry,
replicatingOverSsl, datacenterName, responseHandler, time, replicaSyncUpManager, skipPredicate,
leaderBasedReplicationAdmin);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ protected Pair<Map<DataNodeId, List<RemoteReplicaInfo>>, ReplicaThread> getRemot
if (createBackupCheckerThread) {
replicaThread =
new BackupCheckerThread("threadtest", findTokenHelper, clusterMap, new AtomicInteger(0), localHost.dataNodeId,
networkClient, replicationConfig, replicationMetrics, null, storeKeyConverter, transformer,
networkClient, replicationConfig, replicationMetrics, null, storeKeyConverter, storeKeyFactory, transformer,
clusterMap.getMetricRegistry(), false, localHost.dataNodeId.getDatacenterName(),
new ResponseHandler(clusterMap), time, replicaSyncUpManager, null, null);
} else {
Expand Down

0 comments on commit b08e5b9

Please sign in to comment.