Skip to content

Commit

Permalink
Merge branch 'improvement/BB-537' into q/8.7
Browse files Browse the repository at this point in the history
  • Loading branch information
bert-e committed Oct 11, 2024
2 parents b8bd575 + 711b074 commit 93d499f
Show file tree
Hide file tree
Showing 3 changed files with 515 additions and 20 deletions.
6 changes: 6 additions & 0 deletions extensions/lifecycle/LifecycleConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ const joiSchema = joi.object({
backlogControl: joi.object({
enabled: joi.boolean().default(true),
}).default({ enabled: true }),
filter: joi.object({
deny: joi.object({
buckets: joi.array().items(joi.string()),
accounts: joi.array().items(joi.string()),
}),
}),
probeServer: probeServerJoi.default(),
vaultAdmin: hostPortJoi,
circuitBreaker: joi.object().optional(),
Expand Down
106 changes: 86 additions & 20 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const DEFAULT_CRON_RULE = '* * * * *';
const DEFAULT_CONCURRENCY = 10;
const BUCKET_CHECKPOINT_PUSH_NUMBER = 50;
const BUCKET_CHECKPOINT_PUSH_NUMBER_BUCKETD = 50;
const ACCOUNT_SPLITTER = ':';

const LIFEYCLE_CONDUCTOR_CLIENT_ID = 'lifecycle-conductor';

Expand Down Expand Up @@ -116,6 +117,12 @@ class LifecycleConductor {
// - some entries are expired for each listing
this._accountIdCache = new AccountIdCache(this._concurrency);

const blacklist = (this.lcConfig.conductor.filter && this.lcConfig.conductor.filter.deny) || {};
this.bucketsBlacklisted = new Set(blacklist.buckets);
const accountCanonicalIds = this._getAccountCanonicalIds(blacklist.accounts);
this.accountsBlacklisted = new Set(accountCanonicalIds);
this.onlyBlacklistAccounts = this.bucketsBlacklisted.size === 0 && this.accountsBlacklisted.size > 0;

this.logger = new Logger('Backbeat:Lifecycle:Conductor');
this.vaultClientWrapper = new VaultClientWrapper(
LIFEYCLE_CONDUCTOR_CLIENT_ID,
Expand Down Expand Up @@ -152,6 +159,30 @@ class LifecycleConductor {
}
}

/**
* Extract account canonical ids from configuration filter accounts
*
* @param {array} accounts from filter config -
* format: [account1:eb288756448dc58f61482903131e7ae533553d20b52b0e2ef80235599a1b9143]
* @return {array} account canonical ids
*/
_getAccountCanonicalIds(accounts) {
if (!accounts) {
return [];
}
return accounts.reduce((store, account) => {
const split = account.split(ACCOUNT_SPLITTER);
if (split.length === 2) {
store.push(split[1]);
}
return store;
}, []);
}

_isBlacklisted(canonicalId, bucketName) {
return this.bucketsBlacklisted.has(bucketName) || this.accountsBlacklisted.has(canonicalId);
}

getBucketsZkPath() {
return `${this.lcConfig.zookeeperPath}/data/buckets`;
}
Expand Down Expand Up @@ -467,18 +498,30 @@ class LifecycleConductor {
return cb(err);
}

const batch = buckets.map(bucket => {
const [canonicalId, bucketUID, bucketName] =
bucket.split(':');
if (!canonicalId || !bucketUID || !bucketName) {
log.error(
'malformed zookeeper bucket entry, skipping',
{ zkPath: zkBucketsPath, bucket });
return null;
}
const batch = buckets
.filter(bucket => {
const [canonicalId, bucketUID, bucketName] =
bucket.split(':');
if (!canonicalId || !bucketUID || !bucketName) {
log.error(
'malformed zookeeper bucket entry, skipping',
{ zkPath: zkBucketsPath, bucket });
return false;
}

return { canonicalId, bucketName };
});
if (this._isBlacklisted(canonicalId, bucketName)) {
return false;
}

return true;
})
.map(bucket => {
const split = bucket.split(':');
const canonicalId = split[0];
const bucketName = split[2];

return { canonicalId, bucketName };
});

queue.push(batch);
return process.nextTick(cb, null, batch.length);
Expand Down Expand Up @@ -578,10 +621,19 @@ class LifecycleConductor {
result.Contents.forEach(o => {
marker = o.key;
const [canonicalId, bucketName] = marker.split(constants.splitter);
nEnqueued += 1;
queue.push({ canonicalId, bucketName });

this.lastSentId = o.key;
if (!this._isBlacklisted(canonicalId, bucketName)) {
nEnqueued += 1;
queue.push({ canonicalId, bucketName });
this.lastSentId = o.key;
// Optimization:
// If we only blacklist by accounts, and the last bucket is blacklisted
// we can skip listing buckets until the next account.
// To start the next listing after the blacklisted account, we construct
// a marker by appending the blacklisted account with a semicolon character.
// 'canonicalid1;' > 'canonicalid1..|..bucketname1'
} else if (this.onlyBlacklistAccounts) {
marker = `${canonicalId};`;
}
if (nEnqueued % BUCKET_CHECKPOINT_PUSH_NUMBER_BUCKETD === 0) {
needCheckpoint = true;
}
Expand Down Expand Up @@ -653,11 +705,25 @@ class LifecycleConductor {
lastSentVersion = stat.version;
}

const filter = {};
if (entry) {
filter._id = { $gt: entry };
}

if (this.accountsBlacklisted.size > 0) {
filter['value.owner'] = { $nin: Array.from(this.accountsBlacklisted) };
}

if (this.bucketsBlacklisted.size > 0) {
filter._id = {
...filter._id,
$nin: Array.from(this.bucketsBlacklisted),
};
}

const cursor = this._mongodbClient
.getCollection('__metastore')
.find(
entry ? { _id: { $gt: entry } } : {}
)
.find(filter)
.project({ '_id': 1, 'value.owner': 1, 'value.lifecycleConfiguration': 1 });

return done(null, cursor);
Expand All @@ -666,8 +732,8 @@ class LifecycleConductor {
async.during(
test => process.nextTick(
() => cursor.hasNext()
.then(res => test(null, res))
.catch(test)),
.then(res => test(null, res))
.catch(test)),
next => {
const breakerState = this._circuitBreaker.state;
const queueInfo = {
Expand Down
Loading

0 comments on commit 93d499f

Please sign in to comment.