From 75f344c38886de1e1dea1d056fbe5b9c02969dc4 Mon Sep 17 00:00:00 2001 From: Larry Gregory Date: Tue, 25 Jun 2019 16:27:52 -0400 Subject: [PATCH 1/3] transform ndjson within route handlers for SO import/export APIs --- .../import/collect_saved_objects.test.ts | 45 +++------- .../import/collect_saved_objects.ts | 8 -- .../import/import_saved_objects.test.ts | 72 +++++++-------- .../import/resolve_import_errors.test.ts | 77 ++++++++-------- ...e_saved_objects_stream_from_ndjson.test.ts | 87 +++++++++++++++++++ ...create_saved_objects_stream_from_ndjson.ts | 34 ++++++++ src/legacy/server/saved_objects/lib/index.ts | 20 +++++ .../saved_objects/routes/import.test.ts | 5 +- .../server/saved_objects/routes/import.ts | 4 +- .../routes/resolve_import_errors.ts | 3 +- 10 files changed, 239 insertions(+), 116 deletions(-) create mode 100644 src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts create mode 100644 src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts create mode 100644 src/legacy/server/saved_objects/lib/index.ts diff --git a/src/core/server/saved_objects/import/collect_saved_objects.test.ts b/src/core/server/saved_objects/import/collect_saved_objects.test.ts index 1433cc25046d6..9cccc3942f655 100644 --- a/src/core/server/saved_objects/import/collect_saved_objects.test.ts +++ b/src/core/server/saved_objects/import/collect_saved_objects.test.ts @@ -23,6 +23,7 @@ import { collectSavedObjects } from './collect_saved_objects'; describe('collectSavedObjects()', () => { test('collects nothing when stream is empty', async () => { const readStream = new Readable({ + objectMode: true, read() { this.push(null); }, @@ -38,34 +39,9 @@ Object { test('collects objects from stream', async () => { const readStream = new Readable({ + objectMode: true, read() { - this.push('{"foo":true,"type":"a"}'); - this.push(null); - }, - }); - const result = await collectSavedObjects({ - readStream, - objectLimit: 1, - supportedTypes: ['a'], - }); - expect(result).toMatchInlineSnapshot(` -Object { - "collectedObjects": Array [ - Object { - "foo": true, - "migrationVersion": Object {}, - "type": "a", - }, - ], - "errors": Array [], -} -`); - }); - - test('filters out empty lines', async () => { - const readStream = new Readable({ - read() { - this.push('{"foo":true,"type":"a"}\n\n'); + this.push({ foo: true, type: 'a' }); this.push(null); }, }); @@ -90,9 +66,10 @@ Object { test('throws error when object limit is reached', async () => { const readStream = new Readable({ + objectMode: true, read() { - this.push('{"foo":true,"type":"a"}\n'); - this.push('{"bar":true,"type":"a"}\n'); + this.push({ foo: true, type: 'a' }); + this.push({ bar: true, type: 'a' }); this.push(null); }, }); @@ -107,9 +84,10 @@ Object { test('unsupported types return as import errors', async () => { const readStream = new Readable({ + objectMode: true, read() { - this.push('{"id":"1","type":"a","attributes":{"title":"my title"}}\n'); - this.push('{"id":"2","type":"b","attributes":{"title":"my title 2"}}\n'); + this.push({ id: '1', type: 'a', attributes: { title: 'my title' } }); + this.push({ id: '2', type: 'b', attributes: { title: 'my title 2' } }); this.push(null); }, }); @@ -141,9 +119,10 @@ Object { test('unsupported types still count towards object limit', async () => { const readStream = new Readable({ + objectMode: true, read() { - this.push('{"foo":true,"type":"a"}\n'); - this.push('{"bar":true,"type":"b"}\n'); + this.push({ foo: true, type: 'a' }); + this.push({ bar: true, type: 'b' }); this.push(null); }, }); diff --git a/src/core/server/saved_objects/import/collect_saved_objects.ts b/src/core/server/saved_objects/import/collect_saved_objects.ts index 3445fe9b42406..11add36e54fb6 100644 --- a/src/core/server/saved_objects/import/collect_saved_objects.ts +++ b/src/core/server/saved_objects/import/collect_saved_objects.ts @@ -23,7 +23,6 @@ import { createFilterStream, createMapStream, createPromiseFromStreams, - createSplitStream, } from '../../../../legacy/utils/streams'; import { SavedObject } from '../service'; import { createLimitStream } from './create_limit_stream'; @@ -45,13 +44,6 @@ export async function collectSavedObjects({ const errors: ImportError[] = []; const collectedObjects: SavedObject[] = await createPromiseFromStreams([ readStream, - createSplitStream('\n'), - createMapStream((str: string) => { - if (str && str !== '') { - return JSON.parse(str); - } - }), - createFilterStream(obj => !!obj), createLimitStream(objectLimit), createFilterStream(obj => { if (supportedTypes.includes(obj.type)) { diff --git a/src/core/server/saved_objects/import/import_saved_objects.test.ts b/src/core/server/saved_objects/import/import_saved_objects.test.ts index 77010b294471d..80e5cc9a306f0 100644 --- a/src/core/server/saved_objects/import/import_saved_objects.test.ts +++ b/src/core/server/saved_objects/import/import_saved_objects.test.ts @@ -73,6 +73,7 @@ describe('importSavedObjects()', () => { test('returns early when no objects exist', async () => { const readStream = new Readable({ + objectMode: true, read() { this.push(null); }, @@ -94,8 +95,9 @@ Object { test('calls bulkCreate without overwrite', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -175,8 +177,9 @@ Object { test('calls bulkCreate with overwrite', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -256,8 +259,9 @@ Object { test('extracts errors for conflicts', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -323,39 +327,36 @@ Object { test('validates references', async () => { const readStream = new Readable({ + objectMode: true, read() { - this.push( - JSON.stringify({ - id: '1', - type: 'search', - attributes: { - title: 'My Search', + this.push({ + id: '1', + type: 'search', + attributes: { + title: 'My Search', + }, + references: [ + { + name: 'ref_0', + type: 'index-pattern', + id: '2', }, - references: [ - { - name: 'ref_0', - type: 'index-pattern', - id: '2', - }, - ], - }) + '\n' - ); - this.push( - JSON.stringify({ - id: '3', - type: 'visualization', - attributes: { - title: 'My Visualization', + ], + }); + this.push({ + id: '3', + type: 'visualization', + attributes: { + title: 'My Visualization', + }, + references: [ + { + name: 'ref_0', + type: 'search', + id: '1', }, - references: [ - { - name: 'ref_0', - type: 'search', - id: '1', - }, - ], - }) + '\n' - ); + ], + }); this.push(null); }, }); @@ -433,9 +434,10 @@ Object { test('validates supported types', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); - this.push('{"id":"1","type":"wigwags","attributes":{"title":"my title"},"references":[]}'); + savedObjects.forEach(obj => this.push(obj)); + this.push({ id: '1', type: 'wigwags', attributes: { title: 'my title' }, references: [] }); this.push(null); }, }); diff --git a/src/core/server/saved_objects/import/resolve_import_errors.test.ts b/src/core/server/saved_objects/import/resolve_import_errors.test.ts index 3d5ed43315f95..d3f36852fd796 100644 --- a/src/core/server/saved_objects/import/resolve_import_errors.test.ts +++ b/src/core/server/saved_objects/import/resolve_import_errors.test.ts @@ -79,8 +79,9 @@ describe('resolveImportErrors()', () => { test('works with empty parameters', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -105,8 +106,9 @@ Object { test('works with retries', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -162,8 +164,9 @@ Object { test('works with overwrites', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -222,8 +225,9 @@ Object { test('works wtih replaceReferences', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -291,8 +295,9 @@ Object { test('extracts errors for conflicts', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); + savedObjects.forEach(obj => this.push(obj)); this.push(null); }, }); @@ -362,39 +367,36 @@ Object { test('validates references', async () => { const readStream = new Readable({ + objectMode: true, read() { - this.push( - JSON.stringify({ - id: '1', - type: 'search', - attributes: { - title: 'My Search', + this.push({ + id: '1', + type: 'search', + attributes: { + title: 'My Search', + }, + references: [ + { + name: 'ref_0', + type: 'index-pattern', + id: '2', }, - references: [ - { - name: 'ref_0', - type: 'index-pattern', - id: '2', - }, - ], - }) + '\n' - ); - this.push( - JSON.stringify({ - id: '3', - type: 'visualization', - attributes: { - title: 'My Visualization', + ], + }); + this.push({ + id: '3', + type: 'visualization', + attributes: { + title: 'My Visualization', + }, + references: [ + { + name: 'ref_0', + type: 'search', + id: '1', }, - references: [ - { - name: 'ref_0', - type: 'search', - id: '1', - }, - ], - }) + '\n' - ); + ], + }); this.push(null); }, }); @@ -485,9 +487,10 @@ Object { test('validates object types', async () => { const readStream = new Readable({ + objectMode: true, read() { - savedObjects.forEach(obj => this.push(JSON.stringify(obj) + '\n')); - this.push('{"id":"1","type":"wigwags","attributes":{"title":"my title"},"references":[]}'); + savedObjects.forEach(obj => this.push(obj)); + this.push({ id: '1', type: 'wigwags', attributes: { title: 'my title' }, references: [] }); this.push(null); }, }); diff --git a/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts new file mode 100644 index 0000000000000..bea8dc1c4a169 --- /dev/null +++ b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { createSavedObjectsStreamFromNdJson } from './create_saved_objects_stream_from_ndjson'; +import { Readable } from 'stream'; + +async function readStream(stream: Readable) { + return new Promise((resolve, reject) => { + const data: unknown[] = []; + + stream.on('data', chunk => { + data.push(chunk); + }); + stream.on('end', () => resolve(data)); + stream.on('error', error => reject(error)); + }); +} + +describe('createSavedObjectsStreamFromNdJson', () => { + it('transforms an ndjson stream into a stream of saved objects', async () => { + const savedObjectsStream = createSavedObjectsStreamFromNdJson( + new Readable({ + read() { + this.push('{"id": "foo", "type": "foo-type"}\n'); + this.push('{"id": "bar", "type": "bar-type"}\n'); + this.push(null); + }, + }) + ); + + const result = await readStream(savedObjectsStream); + + expect(result).toEqual([ + { + id: 'foo', + type: 'foo-type', + }, + { + id: 'bar', + type: 'bar-type', + }, + ]); + }); + + it('skips empty lines', async () => { + const savedObjectsStream = createSavedObjectsStreamFromNdJson( + new Readable({ + read() { + this.push('{"id": "foo", "type": "foo-type"}\n'); + this.push('\n'); + this.push(''); + this.push('{"id": "bar", "type": "bar-type"}\n'); + this.push(null); + }, + }) + ); + + const result = await readStream(savedObjectsStream); + + expect(result).toEqual([ + { + id: 'foo', + type: 'foo-type', + }, + { + id: 'bar', + type: 'bar-type', + }, + ]); + }); +}); diff --git a/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts new file mode 100644 index 0000000000000..e119d0c880e29 --- /dev/null +++ b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Readable, pipeline } from 'stream'; +import { SavedObject } from 'kibana/server'; +import { createSplitStream, createMapStream, createFilterStream } from '../../../utils/streams'; + +export function createSavedObjectsStreamFromNdJson(ndJsonStream: Readable) { + return pipeline( + ndJsonStream, + createSplitStream('\n'), + createMapStream((str: string) => { + if (str && str.trim() !== '') { + return JSON.parse(str); + } + }), + createFilterStream(obj => !!obj) + ); +} diff --git a/src/legacy/server/saved_objects/lib/index.ts b/src/legacy/server/saved_objects/lib/index.ts new file mode 100644 index 0000000000000..1255ef67a03c2 --- /dev/null +++ b/src/legacy/server/saved_objects/lib/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export { createSavedObjectsStreamFromNdJson } from './create_saved_objects_stream_from_ndjson'; diff --git a/src/legacy/server/saved_objects/routes/import.test.ts b/src/legacy/server/saved_objects/routes/import.test.ts index 459b7a2b737d9..5530170472f1c 100644 --- a/src/legacy/server/saved_objects/routes/import.test.ts +++ b/src/legacy/server/saved_objects/routes/import.test.ts @@ -120,7 +120,7 @@ describe('POST /api/saved_objects/_import', () => { expect(firstBulkCreateCallArray[0].migrationVersion).toEqual({}); }); - test('imports an index pattern and dashboard', async () => { + test('imports an index pattern and dashboard, ignoring empty lines in the file', async () => { // NOTE: changes to this scenario should be reflected in the docs const request = { method: 'POST', @@ -131,6 +131,9 @@ describe('POST /api/saved_objects/_import', () => { 'Content-Type: application/ndjson', '', '{"type":"index-pattern","id":"my-pattern","attributes":{"title":"my-pattern-*"}}', + '', + '', + '', '{"type":"dashboard","id":"my-dashboard","attributes":{"title":"Look at my dashboard"}}', '--EXAMPLE--', ].join('\r\n'), diff --git a/src/legacy/server/saved_objects/routes/import.ts b/src/legacy/server/saved_objects/routes/import.ts index ea83328231718..53fa3ea7d5d00 100644 --- a/src/legacy/server/saved_objects/routes/import.ts +++ b/src/legacy/server/saved_objects/routes/import.ts @@ -27,6 +27,7 @@ import { SavedObjectsClientContract } from 'src/core/server'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { importSavedObjects } from '../../../../core/server/saved_objects/import'; import { Prerequisites, WithoutQueryAndParams } from './types'; +import { createSavedObjectsStreamFromNdJson } from '../lib'; interface HapiReadableStream extends Readable { hapi: { @@ -78,10 +79,11 @@ export const createImportRoute = ( if (fileExtension !== '.ndjson') { return Boom.badRequest(`Invalid file extension ${fileExtension}`); } + return await importSavedObjects({ supportedTypes, savedObjectsClient, - readStream: request.payload.file, + readStream: createSavedObjectsStreamFromNdJson(request.payload.file), objectLimit: request.server.config().get('savedObjects.maxImportExportSize'), overwrite: request.query.overwrite, }); diff --git a/src/legacy/server/saved_objects/routes/resolve_import_errors.ts b/src/legacy/server/saved_objects/routes/resolve_import_errors.ts index 24390df9b6ecb..15d02f525c2cf 100644 --- a/src/legacy/server/saved_objects/routes/resolve_import_errors.ts +++ b/src/legacy/server/saved_objects/routes/resolve_import_errors.ts @@ -27,6 +27,7 @@ import { SavedObjectsClientContract } from 'src/core/server'; // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { resolveImportErrors } from '../../../../core/server/saved_objects/import'; import { Prerequisites } from './types'; +import { createSavedObjectsStreamFromNdJson } from '../lib'; interface HapiReadableStream extends Readable { hapi: { @@ -103,7 +104,7 @@ export const createResolveImportErrorsRoute = ( return await resolveImportErrors({ supportedTypes, savedObjectsClient, - readStream: request.payload.file, + readStream: createSavedObjectsStreamFromNdJson(request.payload.file), retries: request.payload.retries, objectLimit: request.server.config().get('savedObjects.maxImportExportSize'), }); From 6bb07c97020b364b20e20682b0228614d93b45f8 Mon Sep 17 00:00:00 2001 From: Larry Gregory Date: Wed, 26 Jun 2019 09:10:53 -0400 Subject: [PATCH 2/3] convert export saved objects to return a stream --- .../get_sorted_objects_for_export.test.ts | 17 ++++++-- .../export/get_sorted_objects_for_export.ts | 6 ++- .../saved_objects/export/sort_objects.ts | 6 +-- ...e_saved_objects_stream_from_ndjson.test.ts | 17 +++----- .../saved_objects/routes/export.test.ts | 43 ++++++++++--------- .../server/saved_objects/routes/export.ts | 18 +++++++- 6 files changed, 66 insertions(+), 41 deletions(-) diff --git a/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts b/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts index 7ac3ebd412c0a..385de5b14565d 100644 --- a/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts +++ b/src/core/server/saved_objects/export/get_sorted_objects_for_export.test.ts @@ -19,6 +19,12 @@ import { getSortedObjectsForExport } from './get_sorted_objects_for_export'; import { SavedObjectsClientMock } from '../service/saved_objects_client.mock'; +import { Readable } from 'stream'; +import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams'; + +async function readStreamToCompletion(stream: Readable) { + return createPromiseFromStreams([stream, createConcatStream([])]); +} describe('getSortedObjectsForExport()', () => { const savedObjectsClient = SavedObjectsClientMock.create(); @@ -59,11 +65,14 @@ describe('getSortedObjectsForExport()', () => { per_page: 1, page: 0, }); - const response = await getSortedObjectsForExport({ + const exportStream = await getSortedObjectsForExport({ savedObjectsClient, exportSizeLimit: 500, types: ['index-pattern', 'search'], }); + + const response = await readStreamToCompletion(exportStream); + expect(response).toMatchInlineSnapshot(` Array [ Object { @@ -169,7 +178,7 @@ Array [ }, ], }); - const response = await getSortedObjectsForExport({ + const exportStream = await getSortedObjectsForExport({ exportSizeLimit: 10000, savedObjectsClient, types: ['index-pattern', 'search'], @@ -184,6 +193,7 @@ Array [ }, ], }); + const response = await readStreamToCompletion(exportStream); expect(response).toMatchInlineSnapshot(` Array [ Object { @@ -259,7 +269,7 @@ Array [ }, ], }); - const response = await getSortedObjectsForExport({ + const exportStream = await getSortedObjectsForExport({ exportSizeLimit: 10000, savedObjectsClient, types: ['index-pattern', 'search'], @@ -271,6 +281,7 @@ Array [ ], includeReferencesDeep: true, }); + const response = await readStreamToCompletion(exportStream); expect(response).toMatchInlineSnapshot(` Array [ Object { diff --git a/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts b/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts index 9782de4b553f8..e09780574a25c 100644 --- a/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts +++ b/src/core/server/saved_objects/export/get_sorted_objects_for_export.ts @@ -18,6 +18,7 @@ */ import Boom from 'boom'; +import { createListStream } from '../../../../legacy/utils/streams'; import { SavedObjectsClientContract } from '../'; import { injectNestedDependencies } from './inject_nested_depdendencies'; import { sortObjects } from './sort_objects'; @@ -86,9 +87,12 @@ export async function getSortedObjectsForExport({ savedObjectsClient, exportSizeLimit, }); - return sortObjects( + + const exportedObjects = sortObjects( includeReferencesDeep ? await injectNestedDependencies(objectsToExport, savedObjectsClient) : objectsToExport ); + + return createListStream(exportedObjects); } diff --git a/src/core/server/saved_objects/export/sort_objects.ts b/src/core/server/saved_objects/export/sort_objects.ts index 17be0fd18dbe1..84640db3635e9 100644 --- a/src/core/server/saved_objects/export/sort_objects.ts +++ b/src/core/server/saved_objects/export/sort_objects.ts @@ -20,9 +20,9 @@ import Boom from 'boom'; import { SavedObject } from '../service/saved_objects_client'; -export function sortObjects(savedObjects: SavedObject[]) { - const path = new Set(); - const sorted = new Set(); +export function sortObjects(savedObjects: SavedObject[]): SavedObject[] { + const path = new Set(); + const sorted = new Set(); const objectsByTypeId = new Map( savedObjects.map(object => [`${object.type}:${object.id}`, object] as [string, SavedObject]) ); diff --git a/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts index bea8dc1c4a169..2eab73137fff0 100644 --- a/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts +++ b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.test.ts @@ -19,17 +19,10 @@ import { createSavedObjectsStreamFromNdJson } from './create_saved_objects_stream_from_ndjson'; import { Readable } from 'stream'; +import { createPromiseFromStreams, createConcatStream } from '../../../utils/streams'; -async function readStream(stream: Readable) { - return new Promise((resolve, reject) => { - const data: unknown[] = []; - - stream.on('data', chunk => { - data.push(chunk); - }); - stream.on('end', () => resolve(data)); - stream.on('error', error => reject(error)); - }); +async function readStreamToCompletion(stream: Readable) { + return createPromiseFromStreams([stream, createConcatStream([])]); } describe('createSavedObjectsStreamFromNdJson', () => { @@ -44,7 +37,7 @@ describe('createSavedObjectsStreamFromNdJson', () => { }) ); - const result = await readStream(savedObjectsStream); + const result = await readStreamToCompletion(savedObjectsStream); expect(result).toEqual([ { @@ -71,7 +64,7 @@ describe('createSavedObjectsStreamFromNdJson', () => { }) ); - const result = await readStream(savedObjectsStream); + const result = await readStreamToCompletion(savedObjectsStream); expect(result).toEqual([ { diff --git a/src/legacy/server/saved_objects/routes/export.test.ts b/src/legacy/server/saved_objects/routes/export.test.ts index c74548ab1bbd3..6b6e6ac90a48c 100644 --- a/src/legacy/server/saved_objects/routes/export.test.ts +++ b/src/legacy/server/saved_objects/routes/export.test.ts @@ -27,6 +27,7 @@ import Hapi from 'hapi'; import * as exportMock from '../../../../core/server/saved_objects/export'; import { createMockServer } from './_mock_server'; import { createExportRoute } from './export'; +import { createListStream } from '../../../utils/streams'; const getSortedObjectsForExport = exportMock.getSortedObjectsForExport as jest.Mock; @@ -70,26 +71,28 @@ describe('POST /api/saved_objects/_export', () => { includeReferencesDeep: true, }, }; - getSortedObjectsForExport.mockResolvedValueOnce([ - { - id: '1', - type: 'index-pattern', - attributes: {}, - references: [], - }, - { - id: '2', - type: 'search', - attributes: {}, - references: [ - { - name: 'ref_0', - type: 'index-pattern', - id: '1', - }, - ], - }, - ]); + getSortedObjectsForExport.mockResolvedValueOnce( + createListStream([ + { + id: '1', + type: 'index-pattern', + attributes: {}, + references: [], + }, + { + id: '2', + type: 'search', + attributes: {}, + references: [ + { + name: 'ref_0', + type: 'index-pattern', + id: '1', + }, + ], + }, + ]) + ); const { payload, statusCode, headers } = await server.inject(request); const objects = payload.split('\n').map(row => JSON.parse(row)); diff --git a/src/legacy/server/saved_objects/routes/export.ts b/src/legacy/server/saved_objects/routes/export.ts index 4bc5d04b5585a..304f39c0b735c 100644 --- a/src/legacy/server/saved_objects/routes/export.ts +++ b/src/legacy/server/saved_objects/routes/export.ts @@ -21,6 +21,11 @@ import Hapi from 'hapi'; import Joi from 'joi'; import stringify from 'json-stable-stringify'; import { SavedObjectsClientContract } from 'src/core/server'; +import { + createPromiseFromStreams, + createMapStream, + createConcatStream, +} from '../../../utils/streams'; // Disable lint errors for imports from src/core/server/saved_objects until SavedObjects migration is complete // eslint-disable-next-line @kbn/eslint/no-restricted-paths import { getSortedObjectsForExport } from '../../../../core/server/saved_objects/export'; @@ -72,15 +77,24 @@ export const createExportRoute = ( }, async handler(request: ExportRequest, h: Hapi.ResponseToolkit) { const { savedObjectsClient } = request.pre; - const docsToExport = await getSortedObjectsForExport({ + const exportStream = await getSortedObjectsForExport({ savedObjectsClient, types: request.payload.type, objects: request.payload.objects, exportSizeLimit: server.config().get('savedObjects.maxImportExportSize'), includeReferencesDeep: request.payload.includeReferencesDeep, }); + + const docsToExport: string[] = await createPromiseFromStreams([ + exportStream, + createMapStream((obj: unknown) => { + return stringify(obj); + }), + createConcatStream([]), + ]); + return h - .response(docsToExport.map(doc => stringify(doc)).join('\n')) + .response(docsToExport.join('\n')) .header('Content-Disposition', `attachment; filename="export.ndjson"`) .header('Content-Type', 'application/ndjson'); }, From ee99983d389911f838a04d475edda3a106026911 Mon Sep 17 00:00:00 2001 From: Larry Gregory Date: Wed, 26 Jun 2019 12:27:40 -0400 Subject: [PATCH 3/3] fix stream creation --- ...create_saved_objects_stream_from_ndjson.ts | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts index e119d0c880e29..fa82e54e9fb0a 100644 --- a/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts +++ b/src/legacy/server/saved_objects/lib/create_saved_objects_stream_from_ndjson.ts @@ -16,19 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -import { Readable, pipeline } from 'stream'; +import { Readable } from 'stream'; import { SavedObject } from 'kibana/server'; import { createSplitStream, createMapStream, createFilterStream } from '../../../utils/streams'; export function createSavedObjectsStreamFromNdJson(ndJsonStream: Readable) { - return pipeline( - ndJsonStream, - createSplitStream('\n'), - createMapStream((str: string) => { - if (str && str.trim() !== '') { - return JSON.parse(str); - } - }), - createFilterStream(obj => !!obj) - ); + return ndJsonStream + .pipe(createSplitStream('\n')) + .pipe( + createMapStream((str: string) => { + if (str && str.trim() !== '') { + return JSON.parse(str); + } + }) + ) + .pipe(createFilterStream(obj => !!obj)); }