-
Notifications
You must be signed in to change notification settings - Fork 107
/
batchGetItem.js
95 lines (82 loc) · 3.47 KB
/
batchGetItem.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
var async = require('async'),
getItem = require('./getItem'),
db = require('../db')
module.exports = function batchGetItem (store, data, cb) {
var requests = {}
async.series([
async.each.bind(async, Object.keys(data.RequestItems), addTableRequests),
async.parallel.bind(async, requests),
], function (err, responses) {
if (err) return cb(err)
var res = { Responses: {}, UnprocessedKeys: {} }, table, tableResponses = responses[1], totalSize = 0, capacities = {}
for (table in tableResponses) {
// Order is pretty random
// Assign keys before we shuffle
tableResponses[table].forEach(function (tableRes, ix) { tableRes._key = data.RequestItems[table].Keys[ix] }) // eslint-disable-line no-loop-func
shuffle(tableResponses[table])
res.Responses[table] = tableResponses[table].map(function (tableRes) { // eslint-disable-line no-loop-func
if (tableRes.Item) {
// TODO: This is totally inefficient - should fix this
var newSize = totalSize + db.itemSize(tableRes.Item)
if (newSize > (1024 * 1024 + store.options.maxItemSize - 3)) {
if (!res.UnprocessedKeys[table]) {
res.UnprocessedKeys[table] = { Keys: [] }
if (data.RequestItems[table].AttributesToGet)
res.UnprocessedKeys[table].AttributesToGet = data.RequestItems[table].AttributesToGet
if (data.RequestItems[table].ConsistentRead)
res.UnprocessedKeys[table].ConsistentRead = data.RequestItems[table].ConsistentRead
}
if (!capacities[table]) capacities[table] = 0
capacities[table] += 1
res.UnprocessedKeys[table].Keys.push(tableRes._key)
return null
}
totalSize = newSize
}
if (tableRes.ConsumedCapacity) {
if (!capacities[table]) capacities[table] = 0
capacities[table] += tableRes.ConsumedCapacity.CapacityUnits
}
return tableRes.Item
}).filter(Boolean)
}
if (~[ 'TOTAL', 'INDEXES' ].indexOf(data.ReturnConsumedCapacity)) {
res.ConsumedCapacity = Object.keys(tableResponses).map(function (table) {
return {
CapacityUnits: capacities[table],
TableName: table,
Table: data.ReturnConsumedCapacity == 'INDEXES' ? { CapacityUnits: capacities[table] } : undefined,
}
})
}
cb(null, res)
})
function addTableRequests (tableName, cb) {
store.getTable(tableName, function (err, table) {
if (err) return cb(err)
var req = data.RequestItems[tableName], i, key, options, gets = []
for (i = 0; i < req.Keys.length; i++) {
key = req.Keys[i]
let invalid = db.validateKey(key, table)
if (invalid != null) return cb(invalid)
options = { TableName: tableName, Key: key }
if (req._projection) options._projection = req._projection
if (req.AttributesToGet) options.AttributesToGet = req.AttributesToGet
if (req.ConsistentRead) options.ConsistentRead = req.ConsistentRead
if (data.ReturnConsumedCapacity) options.ReturnConsumedCapacity = data.ReturnConsumedCapacity
gets.push(options)
}
requests[tableName] = async.map.bind(async, gets, function (data, cb) { return getItem(store, data, cb) })
cb()
})
}
}
function shuffle (arr) {
var i, j, temp
for (i = arr.length - 1; i >= 1; i--) {
j = Math.floor(Math.random() * (i + 1))
temp = arr[i]
arr[i] = arr[j]
arr[j] = temp
}
}