diff --git a/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts b/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts index 58385fb544a57..0117595bd984e 100644 --- a/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts @@ -17,7 +17,7 @@ import type { Logger, } from '@kbn/core/server'; import { createListStream } from '@kbn/utils'; -import { partition } from 'lodash'; +import { partition, chunk } from 'lodash'; import { getAssetFromAssetsMap, getPathParts } from '../../archive'; import { KibanaAssetType, KibanaSavedObjectType } from '../../../../types'; @@ -36,6 +36,8 @@ import { withPackageSpan } from '../../packages/utils'; import { tagKibanaAssets } from './tag_assets'; import { getSpaceAwareSaveobjectsClients } from './saved_objects'; +const MAX_ASSETS_TO_INSTALL_IN_PARALLEL = 1000; + type SavedObjectsImporterContract = Pick; const formatImportErrorsForLog = (errors: SavedObjectsImportFailure[]) => JSON.stringify( @@ -144,11 +146,33 @@ export async function installKibanaAssets(options: { await makeManagedIndexPatternsGlobal(savedObjectsClient); - const installedAssets = await installKibanaSavedObjects({ - logger, - savedObjectsImporter, - kibanaAssets: assetsToInstall, - }); + let installedAssets: SavedObjectsImportSuccess[] = []; + + if ( + assetsToInstall.length > MAX_ASSETS_TO_INSTALL_IN_PARALLEL && + !hasReferences(assetsToInstall) + ) { + // If the package size is too large, we need to install in chunks to avoid + // memory issues as the SO import creates a lot of objects in memory + + // NOTE: if there are references, we can't chunk the install because + // referenced objects might end up in different chunks leading to import + // errors. + for (const assetChunk of chunk(assetsToInstall, MAX_ASSETS_TO_INSTALL_IN_PARALLEL)) { + const result = await installKibanaSavedObjects({ + logger, + savedObjectsImporter, + kibanaAssets: assetChunk, + }); + installedAssets = installedAssets.concat(result); + } + } else { + installedAssets = await installKibanaSavedObjects({ + logger, + savedObjectsImporter, + kibanaAssets: assetsToInstall, + }); + } return installedAssets; } @@ -395,95 +419,94 @@ export async function installKibanaSavedObjects({ savedObjectsImporter: SavedObjectsImporterContract; logger: Logger; }) { - const toBeSavedObjects = await Promise.all( - kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset)) - ); + if (!kibanaAssets.length) { + return []; + } + + const toBeSavedObjects = kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset)); let allSuccessResults: SavedObjectsImportSuccess[] = []; - if (toBeSavedObjects.length === 0) { - return []; - } else { - const { - successResults: importSuccessResults = [], - errors: importErrors = [], - success, - } = await retryImportOnConflictError(() => - savedObjectsImporter.import({ - overwrite: true, - readStream: createListStream(toBeSavedObjects), - createNewCopies: false, - refresh: false, - managed: true, - }) - ); + const { + successResults: importSuccessResults = [], + errors: importErrors = [], + success, + } = await retryImportOnConflictError(() => { + const readStream = createListStream(toBeSavedObjects); + return savedObjectsImporter.import({ + overwrite: true, + readStream, + createNewCopies: false, + refresh: false, + managed: true, + }); + }); - if (success) { - allSuccessResults = importSuccessResults; - } + if (success) { + allSuccessResults = importSuccessResults; + } - const [referenceErrors, otherErrors] = partition( - importErrors, - (e) => e?.error?.type === 'missing_references' - ); + const [referenceErrors, otherErrors] = partition( + importErrors, + (e) => e?.error?.type === 'missing_references' + ); - if (otherErrors?.length) { - throw new KibanaSOReferenceError( - `Encountered ${ - otherErrors.length - } errors creating saved objects: ${formatImportErrorsForLog(otherErrors)}` - ); - } + if (otherErrors?.length) { + throw new KibanaSOReferenceError( + `Encountered ${otherErrors.length} errors creating saved objects: ${formatImportErrorsForLog( + otherErrors + )}` + ); + } - /* + /* A reference error here means that a saved object reference in the references array cannot be found. This is an error in the package its-self but not a fatal one. For example a dashboard may still refer to the legacy `metricbeat-*` index pattern. We ignore reference errors here so that legacy version of a package can still be installed, but if a warning is logged it should be reported to the integrations team. */ - if (referenceErrors.length) { - logger.debug( - () => - `Resolving ${ - referenceErrors.length - } reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}` - ); - - const retries = toBeSavedObjects.map(({ id, type }) => { - if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) { - return { - id, - type, - ignoreMissingReferences: true, - replaceReferences: [], - overwrite: true, - }; - } - return { id, type, overwrite: true, replaceReferences: [] }; - }); + if (referenceErrors.length) { + logger.debug( + () => + `Resolving ${ + referenceErrors.length + } reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}` + ); - const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } = - await savedObjectsImporter.resolveImportErrors({ - readStream: createListStream(toBeSavedObjects), - createNewCopies: false, - managed: true, - retries, - }); - - if (resolveErrors?.length) { - throw new KibanaSOReferenceError( - `Encountered ${ - resolveErrors.length - } errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}` - ); + const retries = toBeSavedObjects.map(({ id, type }) => { + if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) { + return { + id, + type, + ignoreMissingReferences: true, + replaceReferences: [], + overwrite: true, + }; } + return { id, type, overwrite: true, replaceReferences: [] }; + }); + + const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } = + await savedObjectsImporter.resolveImportErrors({ + readStream: createListStream(toBeSavedObjects), + createNewCopies: false, + managed: true, + retries, + }); - allSuccessResults = allSuccessResults.concat(resolveSuccessResults); + if (resolveErrors?.length) { + throw new KibanaSOReferenceError( + `Encountered ${ + resolveErrors.length + } errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}` + ); } - return allSuccessResults; + allSuccessResults = allSuccessResults.concat(resolveSuccessResults); } + + return allSuccessResults; } // Filter out any reserved index patterns @@ -498,3 +521,7 @@ export function toAssetReference({ id, type }: SavedObject) { return reference; } + +function hasReferences(assetsToInstall: ArchiveAsset[]) { + return assetsToInstall.some((asset) => asset.references?.length); +}