Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/BB 358/supportNewerNoncurrentVersion #2364

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,31 @@ class LifecycleBucketProcessor {
this.credentialsManager = new CredentialsManager('lifecycle', this._log);
this.retryWrapper = new BackbeatTask();

// helper object to facilitate the tracking of the the latest x
// noncurrent versions of an object when the field
// `NewerNoncurrentVersions` is present.
//
// Structure:
// ...
// [bucket]: {
// ...
// [object]: {
// ...
// [rule-id]: Heap of <version objects>
// }
// }
// ...
//
// Bucket level entries are removed when the listing results are not
// truncated and after the listing results have been processed.
//
// Object level entries are removed when their bucket level entries are
// removed or when:
// * the NextMarker field in the results is different key from the entry's
// * all the object/version of each listing request have been processed
//
this.ncvHeap = new Map();

// The task scheduler for processing lifecycle tasks concurrently.
this._internalTaskScheduler = async.queue((ctx, cb) => {
const { task, rules, value, s3target, backbeatMetadataProxy } = ctx;
Expand Down Expand Up @@ -143,6 +168,7 @@ class LifecycleBucketProcessor {
bucketTasksTopic: this._lcConfig.bucketTasksTopic,
objectTasksTopic: this._lcConfig.objectTasksTopic,
kafkaBacklogMetrics: this._kafkaBacklogMetrics,
ncvHeap: this.ncvHeap,
log: this._log,
};
}
Expand Down
178 changes: 175 additions & 3 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
LifecycleDateTime,
LifecycleUtils,
} = require('arsenal').s3middleware.lifecycleHelpers;
const { CompareResult, MinHeap } = require('arsenal').algorithms.Heap;

const config = require('../../../conf/Config');
const { attachReqUids } = require('../../../lib/clients/utils');
Expand All @@ -30,6 +31,30 @@ function isLifecycleUser(canonicalID) {
return serviceName === 'lifecycle';
}

/**
* compare 2 version by their stale dates returning:
* - LT (-1) if v1 is less than v2
* - EQ (0) if v1 equals v2
* - GT (1) if v1 is greater than v2
* @param {object} v1 - object version
* @param {object} v2 - object version
* @returns {number} -
*/
function noncurrentVersionCompare(v1, v2) {
const v1Date = new Date(v1.staleDate);
const v2Date = new Date(v2.staleDate);

if (v1Date < v2Date) {
return CompareResult.LT;
}

if (v1Date > v2Date) {
return CompareResult.GT;
}

return CompareResult.EQ;
}

class LifecycleTask extends BackbeatTask {
/**
* Processes Kafka Bucket entries and determines if specific Lifecycle
Expand Down Expand Up @@ -176,6 +201,105 @@ class LifecycleTask extends BackbeatTask {
], done);
}

/**
* Adds object to the noncurrent version helper Heap object.
* If the heap cap is reached:
* - compare the smallest object and the current object
* - if the current object is smaller:
* - remove top object and add the current object into the heap
* - return top object to be expired
* - if the top of the heap is smaller:
* - return the current object to be expired
* If the heap cap has not been reached:
* - add the current object into the heap and return null
* @param {string} bucketName - bucket name
* @param {object} rule - rule object
* @param {object} version - object version
* @return {object | null} - null or the version to be expired
*/
_ncvHeapAdd(bucketName, rule, version) {
const ncve = 'NoncurrentVersionExpiration';
const nncv = 'NewerNoncurrentVersions';

if (!rule[ncve] || !rule[ncve][nncv]) {
return null;
}

if (!this.ncvHeap.has(bucketName)) {
this.ncvHeap.set(bucketName, new Map());
}

if (!this.ncvHeap.get(bucketName).has(version.Key)) {
this.ncvHeap.get(bucketName).set(version.Key, new Map());
}

const ncvHeapObject = this.ncvHeap.get(bucketName).get(version.Key);

const nncvSize = parseInt(rule[ncve][nncv], 10);
if (!ncvHeapObject.get(rule.Id)) {
ncvHeapObject.set(rule.Id, new MinHeap(nncvSize, noncurrentVersionCompare));
}

const heap = ncvHeapObject.get(rule.Id);

if (heap.size < nncvSize) {
heap.add(version);
return null;
}

const heapTop = heap.peek();
if (noncurrentVersionCompare(version, heapTop) === CompareResult.LT) {
return version;
}

const toExpire = heap.remove();
heap.add(version);
return toExpire;
}

/**
* clear objects level entries from helper Heap object.
* @param {string} bucketName - bucket name
* @param {Set} uniqueObjectKeys - Set of unique object keys
* @return {undefined} -
*/
_ncvHeapObjectsClear(bucketName, uniqueObjectKeys) {
if (!this.ncvHeap.has(bucketName)) {
return;
}

const ncvHeapBucket = this.ncvHeap.get(bucketName);
uniqueObjectKeys.forEach(key => {
if (!ncvHeapBucket.has(key)) {
return;
}

ncvHeapBucket.get(key).clear();
ncvHeapBucket.delete(key);
});
return;
}

/**
* clear bucket level entry from helper Heap object.
* @param {string} bucketName - bucket name
* @return {undefined} -
*/
_ncvHeapBucketClear(bucketName) {
if (!this.ncvHeap.has(bucketName)) {
return;
}

const ncvHeapBucket = this.ncvHeap.get(bucketName);
// remove references to Heap objects under each Key entries
ncvHeapBucket.forEach(objMap => objMap.clear());
// remove references to Key maps under each Bucket entries
ncvHeapBucket.clear();
// delete reference to bucket Map
this.ncvHeap.delete(bucketName);
return;
}

/**
* Handles versioned objects (both enabled and suspended)
* @param {object} bucketData - bucket data
Expand Down Expand Up @@ -209,6 +333,18 @@ class LifecycleTask extends BackbeatTask {
bucketData.details, allVersions
);

// create Set of unique keys not matching the next marker to
// indicate the object level entries to be cleared at the end
// of the processing step
const uniqueObjectKeysNotNextMarker = new Set();
if (data.NextKeyMarker) {
allVersions.forEach(v => {
if (v.Key !== data.NextKeyMarker) {
uniqueObjectKeysNotNextMarker.add(v.Key);
}
});
}

// sending bucket entry - only once - for checking next listing
if (data.IsTruncated && allVersions.length > 0 && nbRetries === 0) {
// Uses last version whether Version or DeleteMarker
Expand Down Expand Up @@ -241,7 +377,26 @@ class LifecycleTask extends BackbeatTask {
// NoncurrentVersionExpiration Days and send expiration if
// rules all apply
return this._compareRulesToList(bucketData, bucketLCRules,
allVersionsWithStaleDate, log, versioningStatus, done);
allVersionsWithStaleDate, log, versioningStatus,
err => {
if (err) {
return done(err);
}

if (!data.IsTruncated) {
// end of bucket listing
// clear bucket level entry and all object entries
this._ncvHeapBucketClear(bucketData.target.bucket);
} else {
// clear object level entries that have been processed
this._ncvHeapObjectsClear(
bucketData.target.bucket,
uniqueObjectKeysNotNextMarker
);
}

return done();
});
});
}

Expand Down Expand Up @@ -980,10 +1135,26 @@ class LifecycleTask extends BackbeatTask {
const daysSinceInitiated = this._lifecycleDateTime.findDaysSince(new Date(staleDate));
const ncve = 'NoncurrentVersionExpiration';
const ncd = 'NoncurrentDays';
const nncv = 'NewerNoncurrentVersions';
const doesNCVExpirationRuleApply = (rules[ncve]
&& rules[ncve][ncd] !== undefined
&& daysSinceInitiated >= rules[ncve][ncd]);

if (doesNCVExpirationRuleApply) {
let verToExpire = version;

if (rules[ncve][nncv]) {
log.debug('Rule contains NewerNoncurrentVersion. Checking heap for smallest', {
method: '_checkAndApplyNCVExpirationRule',
bucket: bucketData.target.bucket,
key: version.Key,
});
verToExpire = this._ncvHeapAdd(bucketData.target.bucket, rules, version);
if (verToExpire === null) {
return;
}
}

const entry = ActionQueueEntry.create('deleteObject')
.addContext({
origin: 'lifecycle',
Expand All @@ -993,8 +1164,8 @@ class LifecycleTask extends BackbeatTask {
.setAttribute('target.owner', bucketData.target.owner)
.setAttribute('target.bucket', bucketData.target.bucket)
.setAttribute('target.accountId', bucketData.target.accountId)
.setAttribute('target.key', version.Key)
.setAttribute('target.version', version.VersionId);
.setAttribute('target.key', verToExpire.Key)
.setAttribute('target.version', verToExpire.VersionId);
this._sendObjectAction(entry, err => {
if (!err) {
log.debug('sent object entry for consumption',
Expand All @@ -1003,6 +1174,7 @@ class LifecycleTask extends BackbeatTask {
}, entry.getLogInfo()));
}
});
return;
}
}

Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
"homepage": "https://github.com/scality/backbeat#readme",
"dependencies": {
"@hapi/joi": "^15.1.0",
"arsenal": "git+https://github.com/scality/Arsenal#7.10.28",
"arsenal": "git+https://github.com/scality/arsenal#7.10.44",
"async": "^2.3.0",
"aws-sdk": "^2.938.0",
"aws-sdk": "^2.1326.0",
"backo": "^1.1.0",
"bucketclient": "git+https://github.com/scality/bucketclient#7.10.4",
"commander": "^2.11.0",
Expand All @@ -53,7 +53,7 @@
"werelogs": "git+https://github.com/scality/werelogs#7.10.3.9"
},
"devDependencies": {
"@zenko/cloudserver": "scality/cloudserver#8.2.20",
"@zenko/cloudserver": "git+https://github.com/scality/cloudserver#8.6.8",
"eslint": "^2.4.0",
"eslint-config-airbnb": "^6.0.0",
"eslint-config-scality": "scality/Guidelines#7.10.2",
Expand Down
Loading