diff --git a/fdbclient/BackupAgentBase.actor.cpp b/fdbclient/BackupAgentBase.actor.cpp
index 9639cbc08ad..5c7ffda624d 100644
--- a/fdbclient/BackupAgentBase.actor.cpp
+++ b/fdbclient/BackupAgentBase.actor.cpp
@@ -24,6 +24,7 @@
 #include "fdbclient/BackupAgent.actor.h"
 #include "fdbclient/BlobCipher.h"
 #include "fdbclient/CommitTransaction.h"
+#include "fdbclient/FDBTypes.h"
 #include "fdbclient/GetEncryptCipherKeys.actor.h"
 #include "fdbclient/DatabaseContext.h"
 #include "fdbclient/ManagementAPI.actor.h"
@@ -32,7 +33,6 @@
 #include "fdbclient/TenantManagement.actor.h"
 #include "fdbrpc/simulator.h"
 #include "flow/ActorCollection.h"
-#include "flow/Trace.h"
 #include "flow/actorcompiler.h" // has to be last include
 
 FDB_DEFINE_BOOLEAN_PARAM(LockDB);
@@ -252,6 +252,34 @@ Version getLogKeyVersion(Key key) {
 	return bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t)));
 }
 
+bool validTenantAccess(std::map<int64_t, TenantName>* tenantMap,
+                       MutationRef m,
+                       bool provisionalProxy,
+                       Version version) {
+	if (isSystemKey(m.param1)) {
+		return true;
+	}
+	int64_t tenantId = TenantInfo::INVALID_TENANT;
+	if (m.isEncrypted()) {
+		tenantId = m.encryptionHeader()->cipherTextDetails.encryptDomainId;
+	} else {
+		tenantId = TenantAPI::extractTenantIdFromMutation(m);
+	}
+	ASSERT(tenantMap != nullptr);
+	if (m.isEncrypted() && isReservedEncryptDomain(tenantId)) {
+		// These are valid encrypt domains so don't check the tenant map
+	} else if (tenantMap->find(tenantId) == tenantMap->end()) {
+		// If a tenant is not found for a given mutation then exclude it from the batch
+		ASSERT(!provisionalProxy);
+		TraceEvent(SevWarnAlways, "MutationLogRestoreTenantNotFound")
+		    .detail("Version", version)
+		    .detail("TenantId", tenantId);
+		CODE_PROBE(true, "mutation log restore tenant not found");
+		return false;
+	}
+	return true;
+}
+
 // Given a key from one of the ranges returned by get_log_ranges,
 // returns(version, part) where version is the database version number of
 // the transaction log data in the value, and part is 0 for the first such
@@ -320,29 +348,49 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
 			offset += len2;
 			state Optional<MutationRef> encryptedLogValue = Optional<MutationRef>();
 
+			// Check for valid tenant in required tenant mode. If the tenant does not exist in our tenant map then
+			// we EXCLUDE the mutation (of that respective tenant) during the restore. NOTE: This simply allows a
+			// restore to make progress in the event of tenant deletion, but tenant deletion should be considered
+			// carefully so that we do not run into this case. We do this check here so if encrypted mutations are not
+			// found in the tenant map then we exit early without needing to reach out to the EKP.
+			if (config.tenantMode == TenantMode::REQUIRED &&
+			    config.encryptionAtRestMode.mode != EncryptionAtRestMode::CLUSTER_AWARE &&
+			    !validTenantAccess(tenantMap, logValue, provisionalProxy, version)) {
+				consumed += BackupAgentBase::logHeaderSize + len1 + len2;
+				continue;
+			}
+
 			// Decrypt mutation ref if encrypted
 			if (logValue.isEncrypted()) {
 				encryptedLogValue = logValue;
+				state EncryptCipherDomainId domainId = logValue.encryptionHeader()->cipherTextDetails.encryptDomainId;
 				Reference<AsyncVar<ClientDBInfo> const> dbInfo = cx->clientInfo;
-				TextAndHeaderCipherKeys cipherKeys =
-				    wait(getEncryptCipherKeys(dbInfo, *logValue.encryptionHeader(), BlobCipherMetrics::BACKUP));
-				logValue = logValue.decrypt(cipherKeys, tempArena, BlobCipherMetrics::BACKUP);
+				try {
+					TextAndHeaderCipherKeys cipherKeys =
+					    wait(getEncryptCipherKeys(dbInfo, *logValue.encryptionHeader(), BlobCipherMetrics::RESTORE));
+					logValue = logValue.decrypt(cipherKeys, tempArena, BlobCipherMetrics::BACKUP);
+				} catch (Error& e) {
+					// It's possible a tenant was deleted and the encrypt key fetch failed
+					TraceEvent(SevWarnAlways, "MutationLogRestoreEncryptKeyFetchFailed")
+					    .detail("Version", version)
+					    .detail("TenantId", domainId);
+					if (e.code() == error_code_encrypt_keys_fetch_failed) {
+						CODE_PROBE(true, "mutation log restore encrypt keys not found");
+						consumed += BackupAgentBase::logHeaderSize + len1 + len2;
+						continue;
+					} else {
+						throw;
+					}
+				}
 			}
 			ASSERT(!logValue.isEncrypted());
 
-			if (config.tenantMode == TenantMode::REQUIRED && !isSystemKey(logValue.param1)) {
-				// If a tenant is not found for a given mutation then exclude it from the batch
-				int64_t tenantId = TenantAPI::extractTenantIdFromMutation(logValue);
-				ASSERT(tenantMap != nullptr);
-				if (tenantMap->find(tenantId) == tenantMap->end()) {
-					ASSERT(!provisionalProxy);
-					TraceEvent(SevWarnAlways, "MutationLogRestoreTenantNotFound")
-					    .detail("Version", version)
-					    .detail("TenantId", tenantId);
-					CODE_PROBE(true, "mutation log restore tenant not found");
-					consumed += BackupAgentBase::logHeaderSize + len1 + len2;
-					continue;
-				}
+			// If the mutation was encrypted using cluster aware encryption then check after decryption
+			if (config.tenantMode == TenantMode::REQUIRED &&
+			    config.encryptionAtRestMode.mode == EncryptionAtRestMode::CLUSTER_AWARE &&
+			    !validTenantAccess(tenantMap, logValue, provisionalProxy, version)) {
+				consumed += BackupAgentBase::logHeaderSize + len1 + len2;
+				continue;
 			}
 
 			MutationRef originalLogValue = logValue;
diff --git a/fdbclient/BackupContainerFileSystem.actor.cpp b/fdbclient/BackupContainerFileSystem.actor.cpp
index 7cd267b12c0..197f31bf217 100644
--- a/fdbclient/BackupContainerFileSystem.actor.cpp
+++ b/fdbclient/BackupContainerFileSystem.actor.cpp
@@ -973,23 +973,6 @@ class BackupContainerFileSystemImpl {
 				continue;
 
 			restorable.snapshot = snapshots[i];
-			// TODO: Reenable the sanity check after TooManyFiles error is resolved
-			if (false && g_network->isSimulated()) {
-				// Sanity check key ranges
-				// TODO: If we want to re-enable this codepath, make sure that we are passing a valid DB object (instead
-				// of the DB object created on the line below)
-				ASSERT(false);
-				state Database cx;
-				state std::map<std::string, KeyRange>::iterator rit;
-				for (rit = restorable.keyRanges.begin(); rit != restorable.keyRanges.end(); rit++) {
-					auto it = std::find_if(restorable.ranges.begin(),
-					                       restorable.ranges.end(),
-					                       [file = rit->first](const RangeFile f) { return f.fileName == file; });
-					ASSERT(it != restorable.ranges.end());
-					KeyRange result = wait(bc->getSnapshotFileKeyRange(*it, cx));
-					ASSERT(rit->second.begin <= result.begin && rit->second.end >= result.end);
-				}
-			}
 
 			// No logs needed if there is a complete filtered key space snapshot at the target version.
 			if (minKeyRangeVersion == maxKeyRangeVersion && maxKeyRangeVersion == restorable.targetVersion) {
diff --git a/fdbclient/BlobCipher.cpp b/fdbclient/BlobCipher.cpp
index 1eb338f13c6..25ed22564e3 100644
--- a/fdbclient/BlobCipher.cpp
+++ b/fdbclient/BlobCipher.cpp
@@ -85,6 +85,7 @@ BlobCipherMetrics::BlobCipherMetrics()
                   CounterSet(cc, "KVRedwood"),
                   CounterSet(cc, "BlobGranule"),
                   CounterSet(cc, "Backup"),
+                  CounterSet(cc, "Restore"),
                   CounterSet(cc, "Test") }) {
 	specialCounter(cc, "CacheSize", []() { return BlobCipherKeyCache::getInstance()->getSize(); });
 	traceFuture = cc.traceCounters("BlobCipherMetrics", UID(), FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL);
@@ -102,6 +103,8 @@ std::string toString(BlobCipherMetrics::UsageType type) {
 		return "BlobGranule";
 	case BlobCipherMetrics::UsageType::BACKUP:
 		return "Backup";
+	case BlobCipherMetrics::UsageType::RESTORE:
+		return "Restore";
 	case BlobCipherMetrics::UsageType::TEST:
 		return "Test";
 	default:
diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp
index 62554b793c6..b4ff5005ad3 100644
--- a/fdbclient/ClientKnobs.cpp
+++ b/fdbclient/ClientKnobs.cpp
@@ -301,6 +301,7 @@ void ClientKnobs::initialize(Randomize randomize) {
 	init( CLIENT_ENABLE_USING_CLUSTER_ID_KEY,     false );
 
 	init( ENABLE_ENCRYPTION_CPU_TIME_LOGGING,     false );
+	init( SIMULATION_EKP_TENANT_IDS_TO_DROP,        "-1" );
 	// clang-format on
 }
 
diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp
index baa667d5d64..49e23c205bc 100644
--- a/fdbclient/FileBackupAgent.actor.cpp
+++ b/fdbclient/FileBackupAgent.actor.cpp
@@ -21,6 +21,7 @@
 #include "fdbclient/DatabaseConfiguration.h"
 #include "fdbclient/TenantEntryCache.actor.h"
 #include "fdbclient/TenantManagement.actor.h"
+#include "fdbrpc/TenantInfo.h"
 #include "fdbrpc/simulator.h"
 #include "flow/FastRef.h"
 #include "fmt/format.h"
@@ -610,7 +611,7 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
 	                                           int64_t dataLen,
 	                                           Arena* arena) {
 		Reference<AsyncVar<ClientDBInfo> const> dbInfo = cx->clientInfo;
-		TextAndHeaderCipherKeys cipherKeys = wait(getEncryptCipherKeys(dbInfo, header, BlobCipherMetrics::BACKUP));
+		TextAndHeaderCipherKeys cipherKeys = wait(getEncryptCipherKeys(dbInfo, header, BlobCipherMetrics::RESTORE));
 		ASSERT(cipherKeys.cipherHeaderKey.isValid() && cipherKeys.cipherTextKey.isValid());
 		validateEncryptionHeader(cipherKeys.cipherHeaderKey, cipherKeys.cipherTextKey, header);
 		DecryptBlobCipherAes256Ctr decryptor(
@@ -1131,6 +1132,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
 		wait(tenantCache.get()->init());
 	}
 	state EncryptionAtRestMode encryptMode = config.encryptionAtRestMode;
+	state int64_t blockTenantId = TenantInfo::INVALID_TENANT;
 
 	try {
 		// Read header, currently only decoding BACKUP_AGENT_SNAPSHOT_FILE_VERSION or
@@ -1142,7 +1144,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
 		} else if (file_version == BACKUP_AGENT_ENCRYPTED_SNAPSHOT_FILE_VERSION) {
 			CODE_PROBE(true, "decoding encrypted block");
 			// decode options struct
-			uint32_t optionsLen = reader.consumeNetworkUInt32();
+			state uint32_t optionsLen = reader.consumeNetworkUInt32();
 			const uint8_t* o = reader.consume(optionsLen);
 			StringRef optionsStringRef = StringRef(o, optionsLen);
 			EncryptedRangeFileWriter::Options options =
@@ -1150,9 +1152,17 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
 			ASSERT(!options.compressionEnabled);
 
 			// read encryption header
-			const uint8_t* headerStart = reader.consume(BlobCipherEncryptHeader::headerSize);
+			state const uint8_t* headerStart = reader.consume(BlobCipherEncryptHeader::headerSize);
 			StringRef headerS = StringRef(headerStart, BlobCipherEncryptHeader::headerSize);
 			state BlobCipherEncryptHeader header = BlobCipherEncryptHeader::fromStringRef(headerS);
+			blockTenantId = header.cipherTextDetails.encryptDomainId;
+			if (config.tenantMode == TenantMode::REQUIRED && !isReservedEncryptDomain(blockTenantId)) {
+				ASSERT(tenantCache.present());
+				Optional<TenantEntryCachePayload<Void>> payload = wait(tenantCache.get()->getById(blockTenantId));
+				if (!payload.present()) {
+					throw tenant_not_found();
+				}
+			}
 			const uint8_t* dataPayloadStart = headerStart + BlobCipherEncryptHeader::headerSize;
 			// calculate the total bytes read up to (and including) the header
 			int64_t bytesRead = sizeof(int32_t) + sizeof(uint32_t) + optionsLen + BlobCipherEncryptHeader::headerSize;
@@ -1167,6 +1177,13 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
 		}
 		return results;
 	} catch (Error& e) {
+		if (e.code() == error_code_encrypt_keys_fetch_failed) {
+			TraceEvent(SevWarnAlways, "SnapshotRestoreEncryptKeyFetchFailed").detail("TenantId", blockTenantId);
+			CODE_PROBE(true, "Snapshot restore encrypt keys not found");
+		} else if (e.code() == error_code_tenant_not_found) {
+			TraceEvent(SevWarnAlways, "EncryptedSnapshotRestoreTenantNotFound").detail("TenantId", blockTenantId);
+			CODE_PROBE(true, "Encrypted Snapshot restore tenant not found");
+		}
 		TraceEvent(SevWarn, "FileRestoreDecodeRangeFileBlockFailed")
 		    .error(e)
 		    .detail("Filename", file->getFilename())
@@ -3552,9 +3569,6 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase {
 		}
 		state int64_t tenantId = TenantAPI::extractTenantIdFromKeyRef(key);
 		Optional<TenantEntryCachePayload<Void>> payload = wait(tenantCache->getById(tenantId));
-		if (!payload.present()) {
-			TraceEvent(SevError, "SnapshotRestoreInvalidTenantAccess").detail("Tenant", tenantId);
-		}
 		ASSERT(payload.present());
 		return Void();
 	}
@@ -3607,8 +3621,17 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase {
 		}
 
 		state Reference<IAsyncFile> inFile = wait(bc.get()->readFile(rangeFile.fileName));
-		state Standalone<VectorRef<KeyValueRef>> blockData =
-		    wait(decodeRangeFileBlock(inFile, readOffset, readLen, cx));
+		state Standalone<VectorRef<KeyValueRef>> blockData;
+		try {
+			Standalone<VectorRef<KeyValueRef>> data = wait(decodeRangeFileBlock(inFile, readOffset, readLen, cx));
+			blockData = data;
+		} catch (Error& e) {
+			// It's possible a tenant was deleted and the encrypt key fetch failed
+			if (e.code() == error_code_encrypt_keys_fetch_failed || e.code() == error_code_tenant_not_found) {
+				return Void();
+			}
+			throw;
+		}
 		state Optional<Reference<TenantEntryCache<Void>>> tenantCache;
 		state std::vector<Future<Void>> validTenantCheckFutures;
 		state Arena arena;
diff --git a/fdbclient/include/fdbclient/BlobCipher.h b/fdbclient/include/fdbclient/BlobCipher.h
index f0855d8f8df..1088e9c7e6b 100644
--- a/fdbclient/include/fdbclient/BlobCipher.h
+++ b/fdbclient/include/fdbclient/BlobCipher.h
@@ -70,6 +70,7 @@ class BlobCipherMetrics : public NonCopyable {
 		KV_REDWOOD,
 		BLOB_GRANULE,
 		BACKUP,
+		RESTORE,
 		TEST,
 		MAX,
 	};
diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h
index a4d60e6d85d..205436e695e 100644
--- a/fdbclient/include/fdbclient/ClientKnobs.h
+++ b/fdbclient/include/fdbclient/ClientKnobs.h
@@ -297,6 +297,10 @@ class ClientKnobs : public KnobsImpl<ClientKnobs> {
 
 	// Encryption-at-rest
 	bool ENABLE_ENCRYPTION_CPU_TIME_LOGGING;
+	// This Knob will be a comma-delimited string (i.e 0,1,2,3) that specifies which tenants the the EKP should throw
+	// key_not_found errors for. If TenantInfo::INVALID_TENANT is contained within the list then no tenants will be
+	// dropped. This Knob should ONLY be used in simulation for testing purposes
+	std::string SIMULATION_EKP_TENANT_IDS_TO_DROP;
 
 	ClientKnobs(Randomize randomize);
 	void initialize(Randomize randomize);
diff --git a/fdbclient/include/fdbclient/GetEncryptCipherKeys.actor.h b/fdbclient/include/fdbclient/GetEncryptCipherKeys.actor.h
index c1eeadd34d6..84f4e2432d9 100644
--- a/fdbclient/include/fdbclient/GetEncryptCipherKeys.actor.h
+++ b/fdbclient/include/fdbclient/GetEncryptCipherKeys.actor.h
@@ -19,6 +19,7 @@
  */
 #pragma once
 #include "flow/EncryptUtils.h"
+#include "flow/genericactors.actor.h"
 #if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_GETCIPHERKEYS_ACTOR_G_H)
 #define FDBCLIENT_GETCIPHERKEYS_ACTOR_G_H
 #include "fdbclient/GetEncryptCipherKeys.actor.g.h"
@@ -27,7 +28,9 @@
 
 #include "fdbclient/BlobCipher.h"
 #include "fdbclient/EncryptKeyProxyInterface.h"
+#include "fdbclient/Knobs.h"
 #include "fdbrpc/Stats.h"
+#include "fdbrpc/TenantInfo.h"
 #include "flow/Knobs.h"
 #include "flow/IRandom.h"
 
@@ -182,6 +185,18 @@ Future<EKPGetBaseCipherKeysByIdsReply> getUncachedEncryptCipherKeys(Reference<As
 			TraceEvent(SevWarn, "GetEncryptCipherKeys_RequestFailed").error(reply.error.get());
 			throw encrypt_keys_fetch_failed();
 		}
+		if (g_network && g_network->isSimulated() && usageType == BlobCipherMetrics::RESTORE) {
+			std::unordered_set<int64_t> tenantIdsToDrop =
+			    parseStringToUnorderedSet<int64_t>(CLIENT_KNOBS->SIMULATION_EKP_TENANT_IDS_TO_DROP, ',');
+			if (!tenantIdsToDrop.count(TenantInfo::INVALID_TENANT)) {
+				for (auto& baseCipherInfo : request.baseCipherInfos) {
+					if (tenantIdsToDrop.count(baseCipherInfo.domainId)) {
+						TraceEvent("GetEncryptCipherKeys_SimulatedError").detail("DomainId", baseCipherInfo.domainId);
+						throw encrypt_keys_fetch_failed();
+					}
+				}
+			}
+		}
 		return reply;
 	} catch (Error& e) {
 		TraceEvent("GetEncryptCipherKeys_CaughtError").error(e);
diff --git a/fdbclient/include/fdbclient/TenantEntryCache.actor.h b/fdbclient/include/fdbclient/TenantEntryCache.actor.h
index 199bdb04fa7..4c74a5fbcc2 100644
--- a/fdbclient/include/fdbclient/TenantEntryCache.actor.h
+++ b/fdbclient/include/fdbclient/TenantEntryCache.actor.h
@@ -220,7 +220,7 @@ class TenantEntryCache : public ReferenceCounted<TenantEntryCache<T>>, NonCopyab
 			if (!cache->lastTenantId.present()) {
 				return false;
 			}
-			return cache->lastTenantId.get() > 0;
+			return cache->lastTenantId.get() >= 0;
 		}
 		return true;
 	}
diff --git a/fdbserver/workloads/BulkLoadWithTenants.actor.cpp b/fdbserver/workloads/BulkLoadWithTenants.actor.cpp
index 34b0930cf46..3ff04c3f99f 100644
--- a/fdbserver/workloads/BulkLoadWithTenants.actor.cpp
+++ b/fdbserver/workloads/BulkLoadWithTenants.actor.cpp
@@ -18,9 +18,11 @@
  * limitations under the License.
  */
 
+#include "fdbclient/ClientKnobs.h"
 #include "fdbclient/TenantEntryCache.actor.h"
 #include "fdbclient/TenantManagement.actor.h"
 #include "fdbrpc/ContinuousSample.h"
+#include "fdbrpc/TenantInfo.h"
 #include "fdbserver/Knobs.h"
 #include "fdbserver/TesterInterface.actor.h"
 #include "fdbserver/workloads/workloads.actor.h"
@@ -39,6 +41,9 @@ struct BulkSetupWorkload : TestWorkload {
 	std::vector<Reference<Tenant>> tenants;
 	bool deleteTenants;
 	double testDuration;
+	std::unordered_map<int64_t, std::vector<KeyValueRef>> numKVPairsPerTenant;
+	bool enableEKPKeyFetchFailure;
+	Arena arena;
 
 	BulkSetupWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
 		transactionsPerSecond = getOption(options, "transactionsPerSecond"_sr, 5000.0) / clientCount;
@@ -50,6 +55,7 @@ struct BulkSetupWorkload : TestWorkload {
 		deleteTenants = getOption(options, "deleteTenants"_sr, false);
 		ASSERT(minNumTenants <= maxNumTenants);
 		testDuration = getOption(options, "testDuration"_sr, -1);
+		enableEKPKeyFetchFailure = getOption(options, "enableEKPKeyFetchFailure"_sr, false);
 	}
 
 	void getMetrics(std::vector<PerfMetric>& m) override {}
@@ -60,6 +66,30 @@ struct BulkSetupWorkload : TestWorkload {
 
 	Standalone<KeyValueRef> operator()(int n) { return KeyValueRef(key(n), value((n + 1) % nodeCount)); }
 
+	ACTOR static Future<std::vector<KeyValueRef>> getKVPairsForTenant(BulkSetupWorkload* workload,
+	                                                                  Reference<Tenant> tenant,
+	                                                                  Database cx) {
+		state KeySelector begin = firstGreaterOrEqual(normalKeys.begin);
+		state KeySelector end = firstGreaterOrEqual(normalKeys.end);
+		state std::vector<KeyValueRef> kvPairs;
+		state ReadYourWritesTransaction tr = ReadYourWritesTransaction(cx, tenant);
+		loop {
+			try {
+				RangeResult kvRange = wait(tr.getRange(begin, end, 1000));
+				if (!kvRange.more && kvRange.size() == 0) {
+					break;
+				}
+				for (int i = 0; i < kvRange.size(); i++) {
+					kvPairs.push_back(KeyValueRef(workload->arena, KeyValueRef(kvRange[i].key, kvRange[i].value)));
+				}
+				begin = firstGreaterThan(kvRange.end()[-1].key);
+			} catch (Error& e) {
+				wait(tr.onError(e));
+			}
+		}
+		return kvPairs;
+	}
+
 	ACTOR static Future<Void> _setup(BulkSetupWorkload* workload, Database cx) {
 		// create a bunch of tenants (between min and max tenants)
 		state int numTenantsToCreate =
@@ -70,13 +100,13 @@ struct BulkSetupWorkload : TestWorkload {
 			state std::vector<Future<Optional<TenantMapEntry>>> tenantFutures;
 			for (int i = 0; i < numTenantsToCreate; i++) {
 				TenantName tenantName = TenantNameRef(format("BulkSetupTenant_%04d", i));
-				TraceEvent("CreatingTenant").detail("Tenant", tenantName);
 				tenantFutures.push_back(TenantAPI::createTenant(cx.getReference(), tenantName));
 			}
 			wait(waitForAll(tenantFutures));
 			for (auto& f : tenantFutures) {
 				ASSERT(f.get().present());
 				workload->tenants.push_back(makeReference<Tenant>(f.get().get().id, f.get().get().tenantName));
+				TraceEvent("BulkSetupCreatedTenant").detail("Tenant", workload->tenants.back());
 			}
 		}
 		wait(bulkSetup(cx,
@@ -94,14 +124,82 @@ struct BulkSetupWorkload : TestWorkload {
 		               0,
 		               workload->tenants));
 
+		state int i;
+		state bool added = false;
+		for (i = 0; i < workload->tenants.size(); i++) {
+			std::vector<KeyValueRef> keysForCurTenant = wait(getKVPairsForTenant(workload, workload->tenants[i], cx));
+			if (workload->enableEKPKeyFetchFailure && keysForCurTenant.size() > 0 && !added) {
+				IKnobCollection::getMutableGlobalKnobCollection().setKnob(
+				    "simulation_ekp_tenant_ids_to_drop",
+				    KnobValueRef::create(std::to_string(workload->tenants[i]->id())));
+				TraceEvent("BulkSetupTenantForEKPToDrop")
+				    .detail("Tenant", CLIENT_KNOBS->SIMULATION_EKP_TENANT_IDS_TO_DROP);
+				added = true;
+			}
+			workload->numKVPairsPerTenant[workload->tenants[i]->id()] = keysForCurTenant;
+		}
+		return Void();
+	}
+
+	ACTOR static Future<bool> _check(BulkSetupWorkload* workload, Database cx) {
+		state int i;
+		state std::unordered_set<int64_t> tenantIdsToDrop =
+		    parseStringToUnorderedSet<int64_t>(CLIENT_KNOBS->SIMULATION_EKP_TENANT_IDS_TO_DROP, ',');
+		for (i = 0; i < workload->tenants.size(); i++) {
+			state Reference<Tenant> tenant = workload->tenants[i];
+			std::vector<KeyValueRef> keysForCurTenant = wait(getKVPairsForTenant(workload, tenant, cx));
+			if (tenantIdsToDrop.count(tenant->id())) {
+				// Don't check the tenants that the EKP would throw errors for
+				continue;
+			}
+			std::vector<KeyValueRef> expectedKeysForCurTenant = workload->numKVPairsPerTenant[tenant->id()];
+			if (keysForCurTenant.size() != expectedKeysForCurTenant.size()) {
+				TraceEvent(SevError, "BulkSetupNumKeysMismatch")
+				    .detail("TenantName", tenant)
+				    .detail("ActualCount", keysForCurTenant.size())
+				    .detail("ExpectedCount", expectedKeysForCurTenant.size());
+				return false;
+			} else {
+				TraceEvent("BulkSetupNumKeys")
+				    .detail("TenantName", tenant)
+				    .detail("ActualCount", keysForCurTenant.size());
+			}
+
+			for (int j = 0; j < expectedKeysForCurTenant.size(); j++) {
+				if (expectedKeysForCurTenant[j].key != keysForCurTenant[j].key) {
+					TraceEvent(SevError, "BulkSetupNumKeyMismatch")
+					    .detail("TenantName", tenant)
+					    .detail("ActualKey", keysForCurTenant[j].key)
+					    .detail("ExpectedKey", expectedKeysForCurTenant[j].key);
+					return false;
+				}
+				if (expectedKeysForCurTenant[j].value != keysForCurTenant[j].value) {
+					TraceEvent(SevError, "BulkSetupNumValueMismatch")
+					    .detail("TenantName", tenant)
+					    .detail("ActualValue", keysForCurTenant[j].value)
+					    .detail("ExpectedValue", expectedKeysForCurTenant[j].value);
+					return false;
+				}
+			}
+		}
+		return true;
+	}
+
+	ACTOR static Future<Void> _start(BulkSetupWorkload* workload, Database cx) {
 		// We want to ensure that tenant deletion happens before the restore phase starts
-		if (workload->deleteTenants) {
-			state int numTenantsToDelete = deterministicRandom()->randomInt(0, workload->tenants.size() + 1);
+		// If there is only one tenant don't delete that tenant
+		if (workload->deleteTenants && workload->tenants.size() > 1) {
+			state Reference<TenantEntryCache<Void>> tenantCache =
+			    makeReference<TenantEntryCache<Void>>(cx, TenantEntryCacheRefreshMode::WATCH);
+			wait(tenantCache->init());
+			state int numTenantsToDelete = deterministicRandom()->randomInt(0, workload->tenants.size());
+			TraceEvent("BulkSetupTenantDeletion").detail("NumTenants", numTenantsToDelete);
 			if (numTenantsToDelete > 0) {
 				state int i;
 				for (i = 0; i < numTenantsToDelete; i++) {
 					state int tenantIndex = deterministicRandom()->randomInt(0, workload->tenants.size());
 					state Reference<Tenant> tenant = workload->tenants[tenantIndex];
+					workload->tenants.erase(workload->tenants.begin() + tenantIndex);
 					TraceEvent("BulkSetupTenantDeletionClearing")
 					    .detail("Tenant", tenant)
 					    .detail("TotalNumTenants", workload->tenants.size());
@@ -118,31 +216,35 @@ struct BulkSetupWorkload : TestWorkload {
 					}
 					// delete the tenant
 					wait(success(TenantAPI::deleteTenant(cx.getReference(), tenant->name.get(), tenant->id())));
-					workload->tenants.erase(workload->tenants.begin() + tenantIndex);
-
-					TraceEvent("BulkSetupTenantDeletionDone")
-					    .detail("Tenant", tenant)
-					    .detail("TotalNumTenants", workload->tenants.size());
 				}
 			}
 		}
 		return Void();
 	}
 
-	Future<Void> setup(Database const& cx) override { return Void(); }
+	Future<Void> setup(Database const& cx) override {
+		if (clientId == 0) {
+			return _setup(this, cx);
+		}
+		return Void();
+	}
 
 	Future<Void> start(Database const& cx) override {
 		if (clientId == 0) {
 			if (testDuration > 0) {
-				return timeout(_setup(this, cx), testDuration, Void());
-			} else {
-				return _setup(this, cx);
+				return timeout(_start(this, cx), testDuration, Void());
 			}
+			return _start(this, cx);
 		}
 		return Void();
 	}
 
-	Future<bool> check(Database const& cx) override { return true; }
+	Future<bool> check(Database const& cx) override {
+		if (clientId == 0) {
+			return _check(this, cx);
+		}
+		return true;
+	}
 };
 
 WorkloadFactory<BulkSetupWorkload> BulkSetupWorkloadFactory;
diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp
index 6d2e37b0030..b7a21513878 100644
--- a/fdbserver/workloads/ReadWrite.actor.cpp
+++ b/fdbserver/workloads/ReadWrite.actor.cpp
@@ -471,7 +471,7 @@ struct ReadWriteWorkload : ReadWriteCommon {
 		}
 	}
 
-	Future<Void> start(Database const& cx) override { return _start(cx, this); }
+	Future<Void> start(Database const& cx) override { return timeout(_start(cx, this), testDuration, Void()); }
 
 	ACTOR template <class Trans>
 	static Future<Void> readOp(Trans* tr, std::vector<int64_t> keys, ReadWriteWorkload* self, bool shouldRecord) {
diff --git a/fdbserver/workloads/RestoreBackup.actor.cpp b/fdbserver/workloads/RestoreBackup.actor.cpp
index 4e4a37de974..0b882de2150 100644
--- a/fdbserver/workloads/RestoreBackup.actor.cpp
+++ b/fdbserver/workloads/RestoreBackup.actor.cpp
@@ -100,6 +100,7 @@ struct RestoreBackupWorkload : TestWorkload {
 		state Transaction tr(cx);
 		loop {
 			try {
+				tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
 				tr.clear(normalKeys);
 				for (auto& r : getSystemBackupRanges()) {
 					tr.clear(r);
@@ -120,20 +121,19 @@ struct RestoreBackupWorkload : TestWorkload {
 
 		if (config.tenantMode == TenantMode::REQUIRED) {
 			// restore system keys
-			VectorRef<KeyRangeRef> systemBackupRanges = getSystemBackupRanges();
-			state std::vector<Future<Version>> restores;
-			for (int i = 0; i < systemBackupRanges.size(); i++) {
-				restores.push_back((self->backupAgent.restore(cx,
-				                                              cx,
-				                                              "system_restore"_sr,
-				                                              Key(self->backupContainer->getURL()),
-				                                              self->backupContainer->getProxy(),
-				                                              WaitForComplete::True,
-				                                              ::invalidVersion,
-				                                              Verbose::True,
-				                                              systemBackupRanges[i])));
+			state VectorRef<KeyRangeRef> systemBackupRanges = getSystemBackupRanges();
+			state int i;
+			for (i = 0; i < systemBackupRanges.size(); i++) {
+				wait(success(self->backupAgent.restore(cx,
+				                                       cx,
+				                                       "system_restore"_sr,
+				                                       Key(self->backupContainer->getURL()),
+				                                       self->backupContainer->getProxy(),
+				                                       WaitForComplete::True,
+				                                       ::invalidVersion,
+				                                       Verbose::True,
+				                                       systemBackupRanges[i])));
 			}
-			waitForAll(restores);
 			// restore non-system keys
 			wait(success(self->backupAgent.restore(cx,
 			                                       cx,
diff --git a/fdbserver/workloads/SaveAndKill.actor.cpp b/fdbserver/workloads/SaveAndKill.actor.cpp
index 0c21bdaf187..6173bfaeb86 100644
--- a/fdbserver/workloads/SaveAndKill.actor.cpp
+++ b/fdbserver/workloads/SaveAndKill.actor.cpp
@@ -18,6 +18,8 @@
  * limitations under the License.
  */
 
+#include "fdbclient/DatabaseConfiguration.h"
+#include "fdbclient/ManagementAPI.actor.h"
 #include "fdbclient/NativeAPI.actor.h"
 #include "fdbserver/Knobs.h"
 #include "fdbserver/TesterInterface.actor.h"
@@ -58,6 +60,7 @@ struct SaveAndKillWorkload : TestWorkload {
 	ACTOR Future<Void> _start(SaveAndKillWorkload* self, Database cx) {
 		state int i;
 		wait(delay(deterministicRandom()->random01() * self->testDuration));
+		DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
 
 		CSimpleIni ini;
 		ini.SetUnicode();
@@ -71,7 +74,7 @@ struct SaveAndKillWorkload : TestWorkload {
 		ini.SetValue("META", "testerCount", format("%d", g_simulator->testerCount).c_str());
 		ini.SetValue("META", "tssMode", format("%d", g_simulator->tssMode).c_str());
 		ini.SetValue("META", "mockDNS", INetworkConnections::net()->convertMockDNSToString().c_str());
-		ini.SetValue("META", "tenantMode", cx->clientInfo->get().tenantMode.toString().c_str());
+		ini.SetValue("META", "tenantMode", config.tenantMode.toString().c_str());
 		if (cx->defaultTenant.present()) {
 			ini.SetValue("META", "defaultTenant", cx->defaultTenant.get().toString().c_str());
 		}
diff --git a/flow/EncryptUtils.cpp b/flow/EncryptUtils.cpp
index 4d59ceceb83..0395ff43216 100644
--- a/flow/EncryptUtils.cpp
+++ b/flow/EncryptUtils.cpp
@@ -128,4 +128,9 @@ EncryptAuthTokenAlgo getRandomAuthTokenAlgo() {
 	                                : EncryptAuthTokenAlgo::ENCRYPT_HEADER_AUTH_TOKEN_ALGO_HMAC_SHA;
 
 	return algo;
+}
+
+bool isReservedEncryptDomain(EncryptCipherDomainId domainId) {
+	return domainId == SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID || domainId == ENCRYPT_HEADER_DOMAIN_ID ||
+	       domainId == FDB_DEFAULT_ENCRYPT_DOMAIN_ID;
 }
\ No newline at end of file
diff --git a/flow/include/flow/EncryptUtils.h b/flow/include/flow/EncryptUtils.h
index e126a462ff3..528d3fbb50a 100644
--- a/flow/include/flow/EncryptUtils.h
+++ b/flow/include/flow/EncryptUtils.h
@@ -108,4 +108,6 @@ std::string getEncryptDbgTraceKeyWithTS(std::string_view prefix,
 
 int getEncryptHeaderAuthTokenSize(int algo);
 
+bool isReservedEncryptDomain(EncryptCipherDomainId domainId);
+
 #endif
diff --git a/flow/include/flow/genericactors.actor.h b/flow/include/flow/genericactors.actor.h
index 7b1d5420c29..36c34e1b7f1 100644
--- a/flow/include/flow/genericactors.actor.h
+++ b/flow/include/flow/genericactors.actor.h
@@ -26,6 +26,7 @@
 #include "flow/network.h"
 #include <utility>
 #include <functional>
+#include <unordered_set>
 #if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H)
 #define FLOW_GENERICACTORS_ACTOR_G_H
 #include "flow/genericactors.actor.g.h"
@@ -115,6 +116,21 @@ std::vector<T> parseStringToVector(std::string str, char delim) {
 	return result;
 }
 
+template <class T>
+std::unordered_set<T> parseStringToUnorderedSet(std::string str, char delim) {
+	std::unordered_set<T> result;
+	std::stringstream stream(str);
+	std::string token;
+	while (stream.good()) {
+		getline(stream, token, delim);
+		std::istringstream tokenStream(token);
+		T item;
+		tokenStream >> item;
+		result.emplace(item);
+	}
+	return result;
+}
+
 template <class T>
 ErrorOr<T> errorOr(T t) {
 	return ErrorOr<T>(t);
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 805fce49820..9815b4cd8a6 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -131,6 +131,7 @@ if(WITH_PYTHON)
   add_fdb_test(TEST_FILES fast/BackupAzureBlobCorrectness.toml IGNORE)
   add_fdb_test(TEST_FILES fast/BackupS3BlobCorrectness.toml IGNORE)
   add_fdb_test(TEST_FILES fast/BackupCorrectness.toml)
+  add_fdb_test(TEST_FILES fast/BackupCorrectnessWithEKPKeyFetchFailures.toml)
   add_fdb_test(TEST_FILES fast/BackupCorrectnessWithTenantDeletion.toml)
   add_fdb_test(TEST_FILES fast/EncryptedBackupCorrectness.toml)
   add_fdb_test(TEST_FILES fast/BackupCorrectnessClean.toml)
@@ -163,6 +164,7 @@ if(WITH_PYTHON)
   add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
   add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)
   add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)
+  add_fdb_test(TEST_FILES fast/IncrementalBackupWithEKPKeyFetchFailures.toml)
   add_fdb_test(TEST_FILES fast/IncrementalBackupWithTenantDeletion.toml)
   add_fdb_test(TEST_FILES fast/IncrementTest.toml)
   add_fdb_test(TEST_FILES fast/InventoryTestAlmostReadOnly.toml)
diff --git a/tests/fast/BackupCorrectnessWithEKPKeyFetchFailures.toml b/tests/fast/BackupCorrectnessWithEKPKeyFetchFailures.toml
new file mode 100644
index 00000000000..da8c6457eb0
--- /dev/null
+++ b/tests/fast/BackupCorrectnessWithEKPKeyFetchFailures.toml
@@ -0,0 +1,28 @@
+[configuration]
+allowDefaultTenant = false
+tenantModes = ['required']
+allowCreatingTenants = false
+encryptModes = ['domain_aware']
+
+[[knobs]]
+enable_encryption = true
+
+[[test]]
+testTitle = 'BackupAndRestoreWithEKPKeyFetchFailures'
+clearAfterTest = false
+simBackupAgents = 'BackupToFile'
+
+    [[test.workload]]
+    testName = 'BulkLoadWithTenants'
+    maxNumTenants = 100
+    minNumTenants = 1
+    enableEKPKeyFetchFailure = true
+    transactionsPerSecond = 2500.0
+    testDuration = 60.0
+
+    [[test.workload]]
+    testName = 'BackupAndRestoreCorrectness'
+    defaultBackup = true
+    backupAfter = 10.0
+    restoreAfter = 100.0
+    backupRangesCount = -1
diff --git a/tests/fast/EncryptedBackupCorrectness.toml b/tests/fast/EncryptedBackupCorrectness.toml
index 8825bc4dba1..daa69ff699f 100644
--- a/tests/fast/EncryptedBackupCorrectness.toml
+++ b/tests/fast/EncryptedBackupCorrectness.toml
@@ -1,4 +1,6 @@
 [configuration]
+allowDefaultTenant = false
+allowCreatingTenants = false
 tenantModes = ['required']
 encryptModes = ['domain_aware']
 
diff --git a/tests/fast/IncrementalBackupWithEKPKeyFetchFailures.toml b/tests/fast/IncrementalBackupWithEKPKeyFetchFailures.toml
new file mode 100644
index 00000000000..b3055c1f4b1
--- /dev/null
+++ b/tests/fast/IncrementalBackupWithEKPKeyFetchFailures.toml
@@ -0,0 +1,48 @@
+[configuration]
+allowDefaultTenant = false
+tenantModes = ['required']
+allowCreatingTenants = false
+encryptModes = ['domain_aware']
+
+[[knobs]]
+enable_encryption = true
+
+[[test]] 
+testTitle = 'SubmitBackup'
+simBackupAgents = 'BackupToFile'
+runConsistencyCheck = false
+
+    [[test.workload]]
+    testName = 'IncrementalBackup'
+    tag = 'default'
+    submitOnly = true
+    waitForBackup = true
+
+[[test]]
+testTitle = 'BulkLoad'
+clearAfterTest = true
+simBackupAgents = 'BackupToFile'
+
+    [[test.workload]]
+    testName = 'BulkLoadWithTenants'
+    maxNumTenants = 100
+    minNumTenants = 1
+    transactionsPerSecond = 3000.0
+    enableEKPKeyFetchFailure = true
+
+    [[test.workload]]
+    testName = 'IncrementalBackup'
+    tag = 'default'
+    waitForBackup = true
+    stopBackup = true
+
+[[test]]
+testTitle = 'SubmitRestore'
+clearAfterTest = false
+simBackupAgents = 'BackupToFile'
+
+    [[test.workload]]
+    testName = 'IncrementalBackup'
+    tag = 'default'
+    restoreOnly = true
+