Skip to content

Commit b46d915

Browse files
authored
Merge pull request #8 from elastic/improve-ndjson-mock
Improve ndjson mock
2 parents 9a6a228 + 82e48c0 commit b46d915

File tree

2 files changed

+125
-3
lines changed

2 files changed

+125
-3
lines changed

index.js

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ function normalizeParams (params, callback) {
155155
}
156156

157157
const compression = params.headers['Content-Encoding'] === 'gzip'
158+
const type = params.headers['Content-Type'] || ''
158159

159160
if (isStream(params.body)) {
160161
normalized.body = ''
@@ -163,7 +164,9 @@ function normalizeParams (params, callback) {
163164
stream.on('error', err => callback(err, null))
164165
stream.on('data', chunk => { normalized.body += chunk })
165166
stream.on('end', () => {
166-
normalized.body = JSON.parse(normalized.body)
167+
normalized.body = type.includes('x-ndjson')
168+
? normalized.body.split(/\n|\n\r/).filter(Boolean).map(l => JSON.parse(l))
169+
: JSON.parse(normalized.body)
167170
callback(null, normalized)
168171
})
169172
} else if (params.body) {
@@ -173,11 +176,16 @@ function normalizeParams (params, callback) {
173176
if (err) {
174177
return callback(err, null)
175178
}
176-
normalized.body = JSON.parse(buffer)
179+
buffer = buffer.toString()
180+
normalized.body = type.includes('x-ndjson')
181+
? buffer.split(/\n|\n\r/).filter(Boolean).map(l => JSON.parse(l))
182+
: JSON.parse(buffer)
177183
callback(null, normalized)
178184
})
179185
} else {
180-
normalized.body = JSON.parse(params.body)
186+
normalized.body = type.includes('x-ndjson')
187+
? params.body.split(/\n|\n\r/).filter(Boolean).map(l => JSON.parse(l))
188+
: JSON.parse(params.body)
181189
setImmediate(callback, null, normalized)
182190
}
183191
} else {

test.js

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,3 +664,117 @@ test('Define multiple paths and method at once', async t => {
664664
t.deepEqual(response.body, { status: 'ok' })
665665
t.is(response.statusCode, 200)
666666
})
667+
668+
test('ndjson API support', async t => {
669+
const mock = new Mock()
670+
const client = new Client({
671+
node: 'http://localhost:9200',
672+
Connection: mock.getConnection()
673+
})
674+
675+
mock.add({
676+
method: 'POST',
677+
path: '/_bulk'
678+
}, params => {
679+
t.deepEqual(params.body, [
680+
{ foo: 'bar' },
681+
{ baz: 'fa\nz' }
682+
])
683+
return { status: 'ok' }
684+
})
685+
686+
const response = await client.bulk({
687+
body: [
688+
{ foo: 'bar' },
689+
{ baz: 'fa\nz' }
690+
]
691+
})
692+
t.deepEqual(response.body, { status: 'ok' })
693+
t.is(response.statusCode, 200)
694+
})
695+
696+
test('ndjson API support (with compression)', async t => {
697+
const mock = new Mock()
698+
const client = new Client({
699+
node: 'http://localhost:9200',
700+
Connection: mock.getConnection(),
701+
compression: 'gzip'
702+
})
703+
704+
mock.add({
705+
method: 'POST',
706+
path: '/_bulk'
707+
}, params => {
708+
t.deepEqual(params.body, [
709+
{ foo: 'bar' },
710+
{ baz: 'fa\nz' }
711+
])
712+
return { status: 'ok' }
713+
})
714+
715+
const response = await client.bulk({
716+
body: [
717+
{ foo: 'bar' },
718+
{ baz: 'fa\nz' }
719+
]
720+
})
721+
t.deepEqual(response.body, { status: 'ok' })
722+
t.is(response.statusCode, 200)
723+
})
724+
725+
test('ndjson API support (as stream)', async t => {
726+
const mock = new Mock()
727+
const client = new Client({
728+
node: 'http://localhost:9200',
729+
Connection: mock.getConnection()
730+
})
731+
732+
mock.add({
733+
method: 'POST',
734+
path: '/_bulk'
735+
}, params => {
736+
t.deepEqual(params.body, [
737+
{ foo: 'bar' },
738+
{ baz: 'fa\nz' }
739+
])
740+
return { status: 'ok' }
741+
})
742+
743+
const response = await client.bulk({
744+
body: intoStream(client.serializer.ndserialize([
745+
{ foo: 'bar' },
746+
{ baz: 'fa\nz' }
747+
]))
748+
})
749+
t.deepEqual(response.body, { status: 'ok' })
750+
t.is(response.statusCode, 200)
751+
})
752+
753+
test('ndjson API support (as stream with compression)', async t => {
754+
const mock = new Mock()
755+
const client = new Client({
756+
node: 'http://localhost:9200',
757+
Connection: mock.getConnection(),
758+
compression: 'gzip'
759+
})
760+
761+
mock.add({
762+
method: 'POST',
763+
path: '/_bulk'
764+
}, params => {
765+
t.deepEqual(params.body, [
766+
{ foo: 'bar' },
767+
{ baz: 'fa\nz' }
768+
])
769+
return { status: 'ok' }
770+
})
771+
772+
const response = await client.bulk({
773+
body: intoStream(client.serializer.ndserialize([
774+
{ foo: 'bar' },
775+
{ baz: 'fa\nz' }
776+
]))
777+
})
778+
t.deepEqual(response.body, { status: 'ok' })
779+
t.is(response.statusCode, 200)
780+
})

0 commit comments

Comments
 (0)