forked from aws-samples/machine-learning-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js.template
100 lines (94 loc) · 3.12 KB
/
index.js.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// If this file contains double-curly-braces, that's because
// it is a template that has not been processed into JavaScript yet.
console.log('Loading event');
exports.handler = function(event, context) {{
var AWS = require('aws-sdk');
var sns = new AWS.SNS();
var ml = new AWS.MachineLearning();
var endpointUrl = '';
var mlModelId = '{mlModelId}';
var snsTopicArn = 'arn:aws:sns:{region}:{awsAccountId}:{snsTopic}';
var snsMessageSubject = 'Respond to tweet';
var snsMessagePrefix = 'ML model '+mlModelId+': Respond to this tweet: https://twitter.com/0/status/';
var numMessagesProcessed = 0;
var numMessagesToBeProcessed = event.Records.length;
console.log("numMessagesToBeProcessed:"+numMessagesToBeProcessed);
var updateSns = function(tweetData) {{
var params = {{}};
params['TopicArn'] = snsTopicArn;
params['Subject'] = snsMessageSubject;
params['Message'] = snsMessagePrefix+tweetData['sid'];
console.log('Calling Amazon SNS to publish.');
sns.publish(
params,
function(err, data) {{
if (err) {{
console.log(err, err.stack); // an error occurred
context.done(null, 'Failed when publishing to SNS');
}}
else {{
context.done(null, 'Published to SNS');
}}
}}
);
}}
var callPredict = function(tweetData){{
console.log('calling predict');
ml.predict(
{{
Record : tweetData,
PredictEndpoint : endpointUrl,
MLModelId: mlModelId
}},
function(err, data) {{
if (err) {{
console.log(err);
context.done(null, 'Call to predict service failed.');
}}
else {{
console.log('Predict call succeeded');
if(data.Prediction.predictedLabel === '1'){{
updateSns(tweetData);
}}
else{{
context.done(null, "Tweet doesn't require response from customer service");
}}
}}
}}
);
}}
var processRecords = function(){{
for(i = 0; i < numMessagesToBeProcessed; ++i) {{
encodedPayload = event.Records[i].kinesis.data;
// Amazon Kinesis data is base64 encoded so decode here
payload = new Buffer(encodedPayload, 'base64').toString('utf-8');
console.log("payload:"+payload);
try {{
parsedPayload = JSON.parse(payload);
callPredict(parsedPayload);
}}
catch (err) {{
console.log(err, err.stack);
context.done(null, "failed payload"+payload);
}}
}}
}}
var checkRealtimeEndpoint = function(err, data){{
if (err){{
console.log(err);
context.done(null, 'Failed to fetch endpoint status and url.');
}}
else {{
var endpointInfo = data.EndpointInfo;
if (endpointInfo.EndpointStatus === 'READY') {{
endpointUrl = endpointInfo.EndpointUrl;
console.log('Fetched endpoint url :'+endpointUrl);
processRecords();
}} else {{
console.log('Endpoint status : ' + endpointInfo.EndpointStatus);
context.done(null, 'End point is not Ready.');
}}
}}
}}
ml.getMLModel({{MLModelId:mlModelId}}, checkRealtimeEndpoint);
}};