-
Notifications
You must be signed in to change notification settings - Fork 21
/
tail-sqs.js
102 lines (85 loc) · 2.46 KB
/
tail-sqs.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
const _ = require("lodash");
const { getAWSSDK } = require("../lib/aws");
const { getQueueUrl } = require("../lib/sqs");
const { Command, flags } = require("@oclif/command");
const { checkVersion } = require("../lib/version-check");
const { track } = require("../lib/analytics");
require("colors");
let seenMessageIds = new Set();
class TailSqsCommand extends Command {
async run() {
const { flags } = this.parse(TailSqsCommand);
const { queueName, region, profile } = flags;
global.region = region;
global.profile = profile;
checkVersion();
track("tail-sqs", { region });
this.log(`finding the queue [${queueName}] in [${region}]`);
const queueUrl = await getQueueUrl(queueName);
this.log(`polling SQS queue [${queueUrl}]...`);
this.log("press <any key> to stop");
await this.pollSqs(queueUrl);
this.exit(0);
}
async pollSqs(queueUrl) {
const AWS = getAWSSDK();
const SQS = new AWS.SQS();
let polling = true;
const readline = require("readline");
readline.emitKeypressEvents(process.stdin);
process.stdin.setRawMode(true);
const stdin = process.openStdin();
stdin.once("keypress", () => {
polling = false;
this.log("stopping...");
seenMessageIds = new Set();
});
// eslint-disable-next-line no-constant-condition
while (polling) {
const resp = await SQS.receiveMessage({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 5,
MessageAttributeNames: ["All"]
}).promise();
if (_.isEmpty(resp.Messages)) {
continue;
}
resp.Messages.forEach(msg => {
if (!seenMessageIds.has(msg.MessageId)) {
const timestamp = new Date().toJSON().grey.bold.bgWhite;
const message = {
Body: msg.Body,
MessageAttributes: msg.MessageAttributes
};
this.log(timestamp, "\n", JSON.stringify(message, undefined, 2));
seenMessageIds.add(msg.MessageId);
// only remember 100000 messages
if (seenMessageIds.length > 100000) {
seenMessageIds.delete(msg.MessageId);
}
}
});
}
this.log("stopped");
}
}
TailSqsCommand.description = "Tails the messages going into a SQS queue";
TailSqsCommand.flags = {
queueName: flags.string({
char: "n",
description: "name of the SQS queue, e.g. task-queue-dev",
required: true
}),
region: flags.string({
char: "r",
description: "AWS region, e.g. us-east-1",
required: true
}),
profile: flags.string({
char: "p",
description: "AWS CLI profile name",
required: false
})
};
module.exports = TailSqsCommand;