forked from mhart/kinesalite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
262 lines (216 loc) · 9.43 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
var https = require('https'),
http = require('http'),
fs = require('fs'),
path = require('path'),
url = require('url'),
crypto = require('crypto'),
cbor = require('cbor'),
uuid = require('uuid'),
validations = require('./validations'),
db = require('./db')
var MAX_REQUEST_BYTES = 7 * 1024 * 1024
var AMZ_JSON = 'application/x-amz-json-1.1'
var AMZ_CBOR = 'application/x-amz-cbor-1.1'
var validApis = ['Kinesis_20131202'],
validOperations = ['AddTagsToStream', 'CreateStream', 'DeleteStream', 'DescribeStream', 'DescribeStreamSummary',
'GetRecords', 'GetShardIterator', 'ListShards', 'ListStreams', 'ListTagsForStream', 'MergeShards', 'PutRecord',
'PutRecords', 'RemoveTagsFromStream', 'SplitShard', 'IncreaseStreamRetentionPeriod', 'DecreaseStreamRetentionPeriod'],
actions = {},
actionValidations = {}
module.exports = kinesalite
function kinesalite(options) {
options = options || {}
var server, store = db.create(options), requestHandler = httpHandler.bind(null, store)
if (options.ssl) {
options.key = options.key || fs.readFileSync(path.join(__dirname, 'ssl', 'server-key.pem'))
options.cert = options.cert || fs.readFileSync(path.join(__dirname, 'ssl', 'server-crt.pem'))
options.ca = options.ca || fs.readFileSync(path.join(__dirname, 'ssl', 'ca-crt.pem'))
server = https.createServer(options, requestHandler)
} else {
server = http.createServer(requestHandler)
}
// Ensure we close DB when we're closing the server too
var httpServerClose = server.close, httpServerListen = server.listen
server.close = function(cb) {
store.db.close(function(err) {
if (err) return cb(err)
// Recreate the store if the user wants to listen again
server.listen = function() {
store.recreate()
httpServerListen.apply(server, arguments)
}
httpServerClose.call(server, cb)
})
}
return server
}
validOperations.forEach(function(action) {
action = validations.toLowerFirst(action)
actions[action] = require('./actions/' + action)
actionValidations[action] = require('./validations/' + action)
})
function sendRaw(req, res, body, statusCode) {
req.removeAllListeners()
res.statusCode = statusCode || 200
if (body != null) {
res.setHeader('Content-Length', Buffer.isBuffer(body) ? body.length : Buffer.byteLength(body, 'utf8'))
}
// AWS doesn't send a 'Connection' header but seems to use keep-alive behaviour
// res.setHeader('Connection', '')
// res.shouldKeepAlive = false
res.end(body)
}
function sendJson(req, res, data, statusCode) {
var body = data != null ? JSON.stringify(data) : ''
res.setHeader('Content-Type', res.contentType)
sendRaw(req, res, body, statusCode)
}
function sendCbor(req, res, data, statusCode) {
var body = data != null ? cbor.Encoder.encodeOne(data) : ''
res.setHeader('Content-Type', res.contentType)
sendRaw(req, res, body, statusCode)
}
function sendResponse(req, res, data, statusCode) {
return res.contentType == AMZ_CBOR ? sendCbor(req, res, data, statusCode) :
sendJson(req, res, data, statusCode)
}
function sendError(req, res, contentValid, type, msg) {
return contentValid ? sendResponse(req, res, {__type: type, message: msg}, 400) :
typeof msg == 'number' ? sendRaw(req, res, '<' + type + '/>\n', msg) :
sendRaw(req, res, '<' + type + '>\n <Message>' + msg + '</Message>\n</' + type + '>\n', 403)
}
function httpHandler(store, req, res) {
var body
req.on('error', function(err) { throw err })
req.on('data', function(data) {
var newLength = data.length + (body ? body.length : 0)
if (newLength > MAX_REQUEST_BYTES) {
res.setHeader('Transfer-Encoding', 'chunked')
return sendRaw(req, res, null, 413)
}
body = body ? Buffer.concat([body, data], newLength) : data
})
req.on('end', function() {
// All responses after this point have a RequestId
res.setHeader('x-amzn-RequestId', uuid.v1())
if (req.method != 'OPTIONS' || !req.headers.origin)
res.setHeader('x-amz-id-2', crypto.randomBytes(72).toString('base64'))
// FIRST check if we've got an origin header:
if (req.headers.origin) {
res.setHeader('Access-Control-Allow-Origin', '*')
// If it's a valid OPTIONS call, return here
if (req.method == 'OPTIONS') {
if (req.headers['access-control-request-headers'])
res.setHeader('Access-Control-Allow-Headers', req.headers['access-control-request-headers'])
if (req.headers['access-control-request-method'])
res.setHeader('Access-Control-Allow-Methods', req.headers['access-control-request-method'])
res.setHeader('Access-Control-Max-Age', 172800)
return sendRaw(req, res, '')
}
res.setHeader('Access-Control-Expose-Headers', 'x-amzn-RequestId,x-amzn-ErrorType,x-amz-request-id,x-amz-id-2,x-amzn-ErrorMessage,Date')
}
if (req.method != 'POST') {
return sendError(req, res, false, 'AccessDeniedException',
'Unable to determine service/operation name to be authorized')
}
var contentType = (req.headers['content-type'] || '').split(';')[0].trim()
var contentValid = ~[AMZ_JSON, AMZ_CBOR, 'application/json'].indexOf(contentType)
var target = (req.headers['x-amz-target'] || '').split('.')
var service = target[0]
var operation = target[1]
var serviceValid = service && ~validApis.indexOf(service)
var operationValid = operation && ~validOperations.indexOf(operation)
// AWS doesn't seem to care about the HTTP path, so no checking needed for that
// THEN if the method and content-type are ok, see if the JSON parses:
if (!body) {
res.contentType = contentType == AMZ_JSON ? AMZ_JSON : AMZ_CBOR
return sendResponse(req, res, {__type: serviceValid && operationValid ? 'SerializationException' : 'UnknownOperationException'}, 400)
}
if (!contentValid) {
if (!service || !operation) {
return sendError(req, res, false, 'AccessDeniedException',
'Unable to determine service/operation name to be authorized')
}
return sendError(req, res, false, 'UnknownOperationException', 404)
}
res.contentType = contentType
var data
if (contentType == AMZ_CBOR) {
try { data = cbor.Decoder.decodeFirstSync(body) } catch (e) { }
} else {
try { data = JSON.parse(body.toString()) } catch (e) { }
}
if (typeof data != 'object' || data == null) {
if (contentType == 'application/json') {
return sendJson(req, res, {
Output: {__type: 'com.amazon.coral.service#SerializationException'},
Version: '1.0',
}, 400)
}
return sendResponse(req, res, {__type: 'SerializationException'}, 400)
}
// After this point, application/json doesn't seem to progress any further
if (contentType == 'application/json') {
return sendJson(req, res, {
Output: {__type: 'com.amazon.coral.service#UnknownOperationException'},
Version: '1.0',
}, 404)
}
if (!serviceValid || !operationValid) {
return sendResponse(req, res, {__type: 'UnknownOperationException'}, 400)
}
// THEN check auth:
var authHeader = req.headers.authorization
var query = ~req.url.indexOf('?') ? url.parse(req.url, true).query : {}
var authQuery = 'X-Amz-Algorithm' in query
var msg = '', params
if (authHeader && authQuery) {
return sendError(req, res, contentValid, 'InvalidSignatureException',
'Found both \'X-Amz-Algorithm\' as a query-string param and \'Authorization\' as HTTP header.')
}
if (!authHeader && !authQuery) {
return sendError(req, res, contentValid, 'MissingAuthenticationTokenException', 'Missing Authentication Token')
}
if (authHeader) {
params = ['Credential', 'Signature', 'SignedHeaders']
var authParams = authHeader.split(/,| /).slice(1).filter(Boolean).reduce(function(obj, x) {
var keyVal = x.trim().split('=')
obj[keyVal[0]] = keyVal[1]
return obj
}, {})
params.forEach(function(param) {
if (!authParams[param])
msg += 'Authorization header requires \'' + param + '\' parameter. '
})
if (!req.headers['x-amz-date'] && !req.headers.date)
msg += 'Authorization header requires existence of either a \'X-Amz-Date\' or a \'Date\' header. '
if (msg) msg += 'Authorization=' + authHeader
} else {
params = ['X-Amz-Algorithm', 'X-Amz-Credential', 'X-Amz-Signature', 'X-Amz-SignedHeaders', 'X-Amz-Date']
params.forEach(function(param) {
if (!query[param])
msg += 'AWS query-string parameters must include \'' + param + '\'. '
})
if (msg) msg += 'Re-examine the query-string parameters.'
}
if (msg) {
return sendError(req, res, contentValid, 'IncompleteSignatureException', msg)
}
// If we've reached here, we're good to go:
var action = validations.toLowerFirst(operation)
var actionValidation = actionValidations[action]
try {
data = validations.checkTypes(data, actionValidation.types)
validations.checkValidations(data, actionValidation.types, actionValidation.custom, operation)
} catch (e) {
if (e.statusCode) return sendResponse(req, res, e.body, e.statusCode)
throw e
}
actions[action](store, data, function(err, data) {
if (err && err.statusCode) return sendResponse(req, res, err.body, err.statusCode)
if (err) throw err
sendResponse(req, res, data)
})
})
}
if (require.main === module) kinesalite().listen(4567)