Skip to content

Commit

Permalink
Merge branch 'main' into nim/tenant-backup-mutation-log-fix
Browse files Browse the repository at this point in the history
* main: (23 commits)
  Handle EKP Tenant Not Found Errors (apple#9261)
  move feed cleanup check to after data is guaranteed to be available for granule (apple#9283)
  remove test timeout
  Reduce logging level for verbose events
  Added documentation for consistencyscan CLI command.
  Fix audit_storage issues (apple#9265)
  Update bindings/bindingtester/spec/tenantTester.md
  Update bindings/bindingtester/spec/tenantTester.md
  update bindingtester spec
  Fixing SkewedReadWrite to load its metadata in a transactionally consistent way (apple#9274)
  push string onto stack when active tenant is set
  Add comments on why custom encoding is needed
  patch to fix some existing bindingtester issues
  add arg and return type to the c_api for impl.py
  Fix includes
  Add from_7.0.0_until_7.2.0 for UpgradeAndBackupRestore tests
  Change UpgradeAndBackupRestore to from_7.2.4
  Add a new toml option to disable failure injection workload
  Change SubmitBackup to only reboot in Attrition
  add method to return idfuture
  ...
  • Loading branch information
sfc-gh-nwijetunga committed Feb 2, 2023
2 parents 19b2e74 + 86f3665 commit 4c0fa15
Show file tree
Hide file tree
Showing 49 changed files with 681 additions and 126 deletions.
10 changes: 9 additions & 1 deletion bindings/bindingtester/spec/tenantTester.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ The tenant API introduces some new operations:
#### TENANT_SET_ACTIVE

Pops the top item off of the stack as TENANT_NAME. Opens the tenant with
name TENANT_NAME and stores it as the active tenant.
name TENANT_NAME and stores it as the active tenant. Then, waits on a future
that initializes the tenant ID. When complete, pushes the string
"SET_ACTIVE_TENANT" onto the stack.

#### TENANT_CLEAR_ACTIVE

Expand All @@ -46,6 +48,12 @@ The tenant API introduces some new operations:
packed into a tuple as [t1,t2,t3,...,tn], and this single packed value
is pushed onto the stack.

#### TENANT_GET_ID

Attempts to resolve the active tenant's ID. Pushes the string "GOT_TENANT_ID" onto
the stack if an ID was successfully read after waiting on the ID future. Pushes the string
"NO_ACTIVE_TENANT" if there is no active tenant.

Updates to Existing Instructions
--------------------------------

Expand Down
6 changes: 5 additions & 1 deletion bindings/bindingtester/tests/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def generate(self, args, thread_number):
write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT']
txn_sizes = ['GET_APPROXIMATE_SIZE']
storage_metrics = ['GET_ESTIMATED_RANGE_SIZE', 'GET_RANGE_SPLIT_POINTS']
tenants = ['TENANT_CREATE', 'TENANT_DELETE', 'TENANT_SET_ACTIVE', 'TENANT_CLEAR_ACTIVE', 'TENANT_LIST']
tenants = ['TENANT_CREATE', 'TENANT_DELETE', 'TENANT_SET_ACTIVE', 'TENANT_CLEAR_ACTIVE', 'TENANT_LIST', 'TENANT_GET_ID']

op_choices += reads
op_choices += mutations
Expand Down Expand Up @@ -610,6 +610,7 @@ def generate(self, args, thread_number):
tenant_name = self.choose_tenant(0.8)
instructions.push_args(tenant_name)
instructions.append(op)
self.add_strings(1)
elif op == 'TENANT_CLEAR_ACTIVE':
instructions.append(op)
elif op == 'TENANT_LIST':
Expand All @@ -619,6 +620,9 @@ def generate(self, args, thread_number):
test_util.to_front(instructions, 2)
instructions.append(op)
self.add_strings(1)
elif op == "TENANT_GET_ID":
instructions.append(op)
self.add_strings(1)
else:
assert False, 'Unknown operation: ' + op

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,15 +505,27 @@ else if (op == StackOperation.TENANT_LIST) {
}, FDB.DEFAULT_EXECUTOR);
}
else if (op == StackOperation.TENANT_SET_ACTIVE) {
return inst.popParam().thenAcceptAsync(param -> {
return inst.popParam().thenComposeAsync(param -> {
byte[] tenantName = (byte[])param;
inst.context.setTenant(Optional.of(tenantName));
return inst.context.setTenant(Optional.of(tenantName)).thenAcceptAsync(id -> {
inst.push("SET_ACTIVE_TENANT".getBytes());
}, FDB.DEFAULT_EXECUTOR);
}, FDB.DEFAULT_EXECUTOR);
}
else if (op == StackOperation.TENANT_CLEAR_ACTIVE) {
inst.context.setTenant(Optional.empty());
return AsyncUtil.DONE;
}
else if (op == StackOperation.TENANT_GET_ID) {
if (inst.context.tenant.isPresent()) {
return inst.context.tenant.get().getId().thenAcceptAsync(id -> {
inst.push("GOT_TENANT_ID".getBytes());
}, FDB.DEFAULT_EXECUTOR);
} else {
inst.push("NO_ACTIVE_TENANT".getBytes());
return AsyncUtil.DONE;
}
}
else if (op == StackOperation.UNIT_TESTS) {
inst.context.db.options().setLocationCacheSize(100001);
return inst.context.db.runAsync(tr -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ public void run() {
}
}

public synchronized void setTenant(Optional<byte[]> tenantName) {
public synchronized CompletableFuture<Long> setTenant(Optional<byte[]> tenantName) {
if (tenantName.isPresent()) {
tenant = Optional.of(tenantMap.computeIfAbsent(tenantName.get(), tn -> db.openTenant(tenantName.get())));
return tenant.get().getId();
}
else {
tenant = Optional.empty();
return CompletableFuture.completedFuture(-1L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ enum StackOperation {
TENANT_LIST,
TENANT_SET_ACTIVE,
TENANT_CLEAR_ACTIVE,
TENANT_GET_ID,

LOG_STACK
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,20 @@ else if (op == StackOperation.TENANT_LIST) {
}
else if (op == StackOperation.TENANT_SET_ACTIVE) {
byte[] tenantName = (byte[])inst.popParam().join();
inst.context.setTenant(Optional.of(tenantName));
inst.context.setTenant(Optional.of(tenantName)).join();
inst.push("SET_ACTIVE_TENANT".getBytes());
}
else if (op == StackOperation.TENANT_CLEAR_ACTIVE) {
inst.context.setTenant(Optional.empty());
}
else if (op == StackOperation.TENANT_GET_ID) {
if (inst.context.tenant.isPresent()) {
inst.context.tenant.get().getId().join();
inst.push("GOT_TENANT_ID".getBytes());
} else {
inst.push("NO_ACTIVE_TENANT".getBytes());
}
}
else if (op == StackOperation.UNIT_TESTS) {
try {
inst.context.db.options().setLocationCacheSize(100001);
Expand Down
3 changes: 3 additions & 0 deletions bindings/python/fdb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,9 @@ def init_c_api():
_capi.fdb_tenant_destroy.argtypes = [ctypes.c_void_p]
_capi.fdb_tenant_destroy.restype = None

_capi.fdb_tenant_get_id.argtypes = [ctypes.c_void_p]
_capi.fdb_tenant_get_id.restype = ctypes.c_void_p

_capi.fdb_tenant_create_transaction.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.c_void_p),
Expand Down
8 changes: 8 additions & 0 deletions bindings/python/tests/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ def run(self):
elif inst.op == six.u("TENANT_SET_ACTIVE"):
name = inst.pop()
self.tenant = self.db.open_tenant(name)
self.tenant.get_id().wait()
inst.push(b"SET_ACTIVE_TENANT")
elif inst.op == six.u("TENANT_CLEAR_ACTIVE"):
self.tenant = None
elif inst.op == six.u("TENANT_LIST"):
Expand All @@ -618,6 +620,12 @@ def run(self):
except (json.decoder.JSONDecodeError, KeyError):
assert False, "Invalid Tenant Metadata"
inst.push(fdb.tuple.pack(tuple(result)))
elif inst.op == six.u("TENANT_GET_ID"):
if self.tenant != None:
self.tenant.get_id().wait()
inst.push(b"GOT_TENANT_ID")
else:
inst.push(b"NO_ACTIVE_TENANT")
elif inst.op == six.u("UNIT_TESTS"):
try:
test_db_options(db)
Expand Down
27 changes: 25 additions & 2 deletions documentation/sphinx/source/command-line-interface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,35 @@ The default is ``disabled``, which means changing the storage engine will not be
``aggressive`` tries to replace as many storages as it can at once, and will recruit a new storage server on the same process as the old one. This will be faster, but can potentially hit degraded performance or OOM with two storages on the same process. The main benefit over ``gradual`` is that this doesn't need to take one storage out of rotation, so it works for small or development clusters that have the same number of storage processes as the replication factor. Note that ``aggressive`` is not exclusive to running the perpetual wiggle.
``disabled`` means that if the storage engine is changed, fdb will not move the cluster over to the new storage engine. This will disable the perpetual wiggle from rewriting storage files.

consistencyscan
----------------

This command controls a native data consistency scan role that is automatically recruited in the FDB cluster. The consistency scan reads all replicas of each shard to verify data consistency. It is useful for finding corrupt cold data by ensuring that all data is read periodically. Any errors found will be logged as TraceEvents with Severity = 40.

The syntax is

``consistencyscan [ off | on [maxRate <RATE>] [targetInterval <INTERVAL>] [restart <RESTART>] ]``

* ``off`` will disable the consistency scan

* ``on`` will enable the scan and can be accompanied by additional options shown above

* ``RATE`` - sets the maximum read speed of the scan in bytes/s.

* ``INTERVAL`` - sets the target completion time, in seconds, for each full pass over all data in the cluster. Scan speed will target this interval with a hard limit of RATE.

* ``RESTART`` - a 1 or 0 and controls whether the process should restart from the beginning of userspace on startup or not. This should normally be set to 0 which will resume progress from the last time the scan was running.

The consistency scan role publishes its configuration and metrics in Status JSON under the path ``.cluster.consistency_scan_info``.

consistencycheck
----------------

The ``consistencycheck`` command enables or disables consistency checking. Its syntax is ``consistencycheck [on|off]``. Calling it with ``on`` enables consistency checking, and ``off`` disables it. Calling it with no arguments displays whether consistency checking is currently enabled.
Note: This command exists for backward compatibility, it is suggested to use the ``consistencyscan`` command to control FDB's internal consistency scan role instead.

This command controls a key which controls behavior of any externally configured consistency check roles. You must be running an ``fdbserver`` process with the ``consistencycheck`` role to perform consistency checking.

You must be running an ``fdbserver`` process with the ``consistencycheck`` role to perform consistency checking.
The ``consistencycheck`` command enables or disables consistency checking. Its syntax is ``consistencycheck [on|off]``. Calling it with ``on`` enables consistency checking, and ``off`` disables it. Calling it with no arguments displays whether consistency checking is currently enabled.

coordinators
------------
Expand Down
7 changes: 2 additions & 5 deletions fdbcli/AuditStorageCommand.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@ ACTOR Future<UID> auditStorageCommandActor(Reference<IClusterConnectionRecord> c
return UID();
}

Key begin, end;
if (tokens.size() == 2) {
begin = allKeys.begin;
end = allKeys.end;
} else if (tokens.size() == 3) {
Key begin = allKeys.begin, end = allKeys.end;
if (tokens.size() == 3) {
begin = tokens[2];
} else if (tokens.size() == 4) {
begin = tokens[2];
Expand Down
11 changes: 9 additions & 2 deletions fdbcli/GetAuditStatusCommand.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
namespace fdb_cli {

ACTOR Future<bool> getAuditStatusCommandActor(Database cx, std::vector<StringRef> tokens) {
if (tokens.size() != 4) {
if (tokens.size() < 3 || tokens.size() > 4) {
printUsage(tokens[0]);
return false;
}
Expand All @@ -45,11 +45,18 @@ ACTOR Future<bool> getAuditStatusCommandActor(Database cx, std::vector<StringRef
}

if (tokencmp(tokens[2], "id")) {
if (tokens.size() != 4) {
printUsage(tokens[0]);
return false;
}
const UID id = UID::fromString(tokens[3].toString());
AuditStorageState res = wait(getAuditState(cx, type, id));
printf("Audit result is:\n%s", res.toString().c_str());
} else if (tokencmp(tokens[2], "recent")) {
const int count = std::stoi(tokens[3].toString());
int count = CLIENT_KNOBS->TOO_MANY;
if (tokens.size() == 4) {
count = std::stoi(tokens[3].toString());
}
std::vector<AuditStorageState> res = wait(getLatestAuditStates(cx, type, count));
for (const auto& it : res) {
printf("Audit result is:\n%s\n", it.toString().c_str());
Expand Down
82 changes: 65 additions & 17 deletions fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "fdbclient/BlobCipher.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/ManagementAPI.actor.h"
Expand All @@ -33,7 +34,6 @@
#include "fdbclient/TenantManagement.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // has to be last include

FDB_DEFINE_BOOLEAN_PARAM(LockDB);
Expand Down Expand Up @@ -253,6 +253,34 @@ Version getLogKeyVersion(Key key) {
return bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t)));
}

bool validTenantAccess(std::map<int64_t, TenantName>* tenantMap,
MutationRef m,
bool provisionalProxy,
Version version) {
if (isSystemKey(m.param1)) {
return true;
}
int64_t tenantId = TenantInfo::INVALID_TENANT;
if (m.isEncrypted()) {
tenantId = m.encryptionHeader()->cipherTextDetails.encryptDomainId;
} else {
tenantId = TenantAPI::extractTenantIdFromMutation(m);
}
ASSERT(tenantMap != nullptr);
if (m.isEncrypted() && isReservedEncryptDomain(tenantId)) {
// These are valid encrypt domains so don't check the tenant map
} else if (tenantMap->find(tenantId) == tenantMap->end()) {
// If a tenant is not found for a given mutation then exclude it from the batch
ASSERT(!provisionalProxy);
TraceEvent(SevWarnAlways, "MutationLogRestoreTenantNotFound")
.detail("Version", version)
.detail("TenantId", tenantId);
CODE_PROBE(true, "mutation log restore tenant not found");
return false;
}
return true;
}

// Given a key from one of the ranges returned by get_log_ranges,
// returns(version, part) where version is the database version number of
// the transaction log data in the value, and part is 0 for the first such
Expand Down Expand Up @@ -322,29 +350,49 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
offset += len2;
state Optional<MutationRef> encryptedLogValue = Optional<MutationRef>();

// Check for valid tenant in required tenant mode. If the tenant does not exist in our tenant map then
// we EXCLUDE the mutation (of that respective tenant) during the restore. NOTE: This simply allows a
// restore to make progress in the event of tenant deletion, but tenant deletion should be considered
// carefully so that we do not run into this case. We do this check here so if encrypted mutations are not
// found in the tenant map then we exit early without needing to reach out to the EKP.
if (config.tenantMode == TenantMode::REQUIRED &&
config.encryptionAtRestMode.mode != EncryptionAtRestMode::CLUSTER_AWARE &&
!validTenantAccess(tenantMap, logValue, provisionalProxy, version)) {
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
continue;
}

// Decrypt mutation ref if encrypted
if (logValue.isEncrypted()) {
encryptedLogValue = logValue;
state EncryptCipherDomainId domainId = logValue.encryptionHeader()->cipherTextDetails.encryptDomainId;
Reference<AsyncVar<ClientDBInfo> const> dbInfo = cx->clientInfo;
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(dbInfo, *logValue.encryptionHeader(), BlobCipherMetrics::BACKUP));
logValue = logValue.decrypt(cipherKeys, tempArena, BlobCipherMetrics::BACKUP);
try {
TextAndHeaderCipherKeys cipherKeys =
wait(getEncryptCipherKeys(dbInfo, *logValue.encryptionHeader(), BlobCipherMetrics::RESTORE));
logValue = logValue.decrypt(cipherKeys, tempArena, BlobCipherMetrics::BACKUP);
} catch (Error& e) {
// It's possible a tenant was deleted and the encrypt key fetch failed
TraceEvent(SevWarnAlways, "MutationLogRestoreEncryptKeyFetchFailed")
.detail("Version", version)
.detail("TenantId", domainId);
if (e.code() == error_code_encrypt_keys_fetch_failed) {
CODE_PROBE(true, "mutation log restore encrypt keys not found");
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
continue;
} else {
throw;
}
}
}
ASSERT(!logValue.isEncrypted());

if (config.tenantMode == TenantMode::REQUIRED && !isSystemKey(logValue.param1)) {
// If a tenant is not found for a given mutation then exclude it from the batch
int64_t tenantId = TenantAPI::extractTenantIdFromMutation(logValue);
ASSERT(tenantMap != nullptr);
if (tenantMap->find(tenantId) == tenantMap->end()) {
ASSERT(!provisionalProxy);
TraceEvent(SevWarnAlways, "MutationLogRestoreTenantNotFound")
.detail("Version", version)
.detail("TenantId", tenantId);
CODE_PROBE(true, "mutation log restore tenant not found");
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
continue;
}
// If the mutation was encrypted using cluster aware encryption then check after decryption
if (config.tenantMode == TenantMode::REQUIRED &&
config.encryptionAtRestMode.mode == EncryptionAtRestMode::CLUSTER_AWARE &&
!validTenantAccess(tenantMap, logValue, provisionalProxy, version)) {
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
continue;
}

MutationRef originalLogValue = logValue;
Expand Down
17 changes: 0 additions & 17 deletions fdbclient/BackupContainerFileSystem.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -973,23 +973,6 @@ class BackupContainerFileSystemImpl {
continue;

restorable.snapshot = snapshots[i];
// TODO: Reenable the sanity check after TooManyFiles error is resolved
if (false && g_network->isSimulated()) {
// Sanity check key ranges
// TODO: If we want to re-enable this codepath, make sure that we are passing a valid DB object (instead
// of the DB object created on the line below)
ASSERT(false);
state Database cx;
state std::map<std::string, KeyRange>::iterator rit;
for (rit = restorable.keyRanges.begin(); rit != restorable.keyRanges.end(); rit++) {
auto it = std::find_if(restorable.ranges.begin(),
restorable.ranges.end(),
[file = rit->first](const RangeFile f) { return f.fileName == file; });
ASSERT(it != restorable.ranges.end());
KeyRange result = wait(bc->getSnapshotFileKeyRange(*it, cx));
ASSERT(rit->second.begin <= result.begin && rit->second.end >= result.end);
}
}

// No logs needed if there is a complete filtered key space snapshot at the target version.
if (minKeyRangeVersion == maxKeyRangeVersion && maxKeyRangeVersion == restorable.targetVersion) {
Expand Down
3 changes: 3 additions & 0 deletions fdbclient/BlobCipher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ BlobCipherMetrics::BlobCipherMetrics()
CounterSet(cc, "KVRedwood"),
CounterSet(cc, "BlobGranule"),
CounterSet(cc, "Backup"),
CounterSet(cc, "Restore"),
CounterSet(cc, "Test") }) {
specialCounter(cc, "CacheSize", []() { return BlobCipherKeyCache::getInstance()->getSize(); });
traceFuture = cc.traceCounters("BlobCipherMetrics", UID(), FLOW_KNOBS->ENCRYPT_KEY_CACHE_LOGGING_INTERVAL);
Expand All @@ -102,6 +103,8 @@ std::string toString(BlobCipherMetrics::UsageType type) {
return "BlobGranule";
case BlobCipherMetrics::UsageType::BACKUP:
return "Backup";
case BlobCipherMetrics::UsageType::RESTORE:
return "Restore";
case BlobCipherMetrics::UsageType::TEST:
return "Test";
default:
Expand Down
Loading

0 comments on commit 4c0fa15

Please sign in to comment.