Skip to content
This repository has been archived by the owner on Jul 31, 2020. It is now read-only.

Commit

Permalink
Ignore records from SQS which where seen in S3
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexeyBarabash committed Apr 26, 2019
1 parent d06afa5 commit 9e55174
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions lib/s3Helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ module.exports.parseS3Key = function (key) {
return {apiVersion, userId, categoryId, timestamp, recordCrc, recordPartIndex, recordPartString}
}

function extractTimeStampt (a) {
const timeStampt = a.Key.substring(47).split('/')[1]
return timeStampt
}

function compareS3Key (a, b) {
const recordPartIndexA = a.Key.substring(47).split('/')[1]
const recordPartIndexB = b.Key.substring(47).split('/')[1]
const recordPartIndexA = extractTimeStampt(a)
const recordPartIndexB = extractTimeStampt(b)
if (recordPartIndexA < recordPartIndexB) {
return -1
} else if (recordPartIndexA > recordPartIndexB) {
Expand All @@ -93,6 +98,8 @@ function compareS3Key (a, b) {
}
}

var alreadySeenFromS3 = new Set()

/**
* list SQS notifications.
* @param {AwsSdk.SQS} SQS
Expand Down Expand Up @@ -166,7 +173,10 @@ function listNotificationsRecursively (SQS, options, category, prefix, currentCo
let contentMessage = {
Key: key
}
currentContent.push(contentMessage)
var timestampt = extractTimeStampt(contentMessage)
if (!alreadySeenFromS3.has(timestampt)) {
currentContent.push(contentMessage)
}
}
let entryToDelete = {
Id: `${currentEntriesToDelete.length}`,
Expand Down Expand Up @@ -219,6 +229,12 @@ module.exports.listObjects = function (s3, options, limitResponse) {
if (error) {
reject(error)
} else {
if (data.Contents) {
for (let content of data.Contents) {
var timestampt = extractTimeStampt(content)
alreadySeenFromS3.add(timestampt)
}
}
resolve({
contents: data.Contents,
isTruncated: data.IsTruncated,
Expand All @@ -228,6 +244,12 @@ module.exports.listObjects = function (s3, options, limitResponse) {
})
} else {
listObjectsV2Recursively(s3, options, (error, data) => {
if (data.Contents) {
for (let content of data) {
var timestampt = extractTimeStampt(content)
alreadySeenFromS3.add(timestampt)
}
}
if (error) { reject(error) }
resolve({
contents: data,
Expand Down

0 comments on commit 9e55174

Please sign in to comment.