Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] Chunk asset installation during package install #189045

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 103 additions & 76 deletions x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type {
Logger,
} from '@kbn/core/server';
import { createListStream } from '@kbn/utils';
import { partition } from 'lodash';
import { partition, chunk } from 'lodash';

import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import { KibanaAssetType, KibanaSavedObjectType } from '../../../../types';
Expand All @@ -36,6 +36,8 @@ import { withPackageSpan } from '../../packages/utils';
import { tagKibanaAssets } from './tag_assets';
import { getSpaceAwareSaveobjectsClients } from './saved_objects';

const MAX_ASSETS_TO_INSTALL_IN_PARALLEL = 1000;

type SavedObjectsImporterContract = Pick<ISavedObjectsImporter, 'import' | 'resolveImportErrors'>;
const formatImportErrorsForLog = (errors: SavedObjectsImportFailure[]) =>
JSON.stringify(
Expand Down Expand Up @@ -144,11 +146,33 @@ export async function installKibanaAssets(options: {

await makeManagedIndexPatternsGlobal(savedObjectsClient);

const installedAssets = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetsToInstall,
});
let installedAssets: SavedObjectsImportSuccess[] = [];

if (
assetsToInstall.length > MAX_ASSETS_TO_INSTALL_IN_PARALLEL &&
!hasReferences(assetsToInstall)
) {
// If the package size is too large, we need to install in chunks to avoid
// memory issues as the SO import creates a lot of objects in memory

// NOTE: if there are references, we can't chunk the install because
// referenced objects might end up in different chunks leading to import
// errors.
for (const assetChunk of chunk(assetsToInstall, MAX_ASSETS_TO_INSTALL_IN_PARALLEL)) {
const result = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetChunk,
});
installedAssets = installedAssets.concat(result);
}
} else {
installedAssets = await installKibanaSavedObjects({
logger,
savedObjectsImporter,
kibanaAssets: assetsToInstall,
});
}

return installedAssets;
}
Expand Down Expand Up @@ -395,95 +419,94 @@ export async function installKibanaSavedObjects({
savedObjectsImporter: SavedObjectsImporterContract;
logger: Logger;
}) {
const toBeSavedObjects = await Promise.all(
kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset))
);
if (!kibanaAssets.length) {
return [];
}

const toBeSavedObjects = kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset));

let allSuccessResults: SavedObjectsImportSuccess[] = [];

if (toBeSavedObjects.length === 0) {
return [];
} else {
const {
successResults: importSuccessResults = [],
errors: importErrors = [],
success,
} = await retryImportOnConflictError(() =>
savedObjectsImporter.import({
overwrite: true,
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
refresh: false,
managed: true,
})
);
const {
successResults: importSuccessResults = [],
errors: importErrors = [],
success,
} = await retryImportOnConflictError(() => {
const readStream = createListStream(toBeSavedObjects);
return savedObjectsImporter.import({
overwrite: true,
readStream,
createNewCopies: false,
refresh: false,
managed: true,
});
});

if (success) {
allSuccessResults = importSuccessResults;
}
if (success) {
allSuccessResults = importSuccessResults;
}

const [referenceErrors, otherErrors] = partition(
importErrors,
(e) => e?.error?.type === 'missing_references'
);
const [referenceErrors, otherErrors] = partition(
importErrors,
(e) => e?.error?.type === 'missing_references'
);

if (otherErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
otherErrors.length
} errors creating saved objects: ${formatImportErrorsForLog(otherErrors)}`
);
}
if (otherErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${otherErrors.length} errors creating saved objects: ${formatImportErrorsForLog(
otherErrors
)}`
);
}

/*
/*
A reference error here means that a saved object reference in the references
array cannot be found. This is an error in the package its-self but not a fatal
one. For example a dashboard may still refer to the legacy `metricbeat-*` index
pattern. We ignore reference errors here so that legacy version of a package
can still be installed, but if a warning is logged it should be reported to
the integrations team. */
if (referenceErrors.length) {
logger.debug(
() =>
`Resolving ${
referenceErrors.length
} reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}`
);

const retries = toBeSavedObjects.map(({ id, type }) => {
if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) {
return {
id,
type,
ignoreMissingReferences: true,
replaceReferences: [],
overwrite: true,
};
}
return { id, type, overwrite: true, replaceReferences: [] };
});
if (referenceErrors.length) {
logger.debug(
() =>
`Resolving ${
referenceErrors.length
} reference errors creating saved objects: ${formatImportErrorsForLog(referenceErrors)}`
);

const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } =
await savedObjectsImporter.resolveImportErrors({
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
managed: true,
retries,
});

if (resolveErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
resolveErrors.length
} errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}`
);
const retries = toBeSavedObjects.map(({ id, type }) => {
if (referenceErrors.find(({ id: idToSearch }) => idToSearch === id)) {
return {
id,
type,
ignoreMissingReferences: true,
replaceReferences: [],
overwrite: true,
};
}
return { id, type, overwrite: true, replaceReferences: [] };
});

const { successResults: resolveSuccessResults = [], errors: resolveErrors = [] } =
await savedObjectsImporter.resolveImportErrors({
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
managed: true,
retries,
});

allSuccessResults = allSuccessResults.concat(resolveSuccessResults);
if (resolveErrors?.length) {
throw new KibanaSOReferenceError(
`Encountered ${
resolveErrors.length
} errors resolving reference errors: ${formatImportErrorsForLog(resolveErrors)}`
);
}

return allSuccessResults;
allSuccessResults = allSuccessResults.concat(resolveSuccessResults);
}

return allSuccessResults;
}

// Filter out any reserved index patterns
Expand All @@ -498,3 +521,7 @@ export function toAssetReference({ id, type }: SavedObject) {

return reference;
}

function hasReferences(assetsToInstall: ArchiveAsset[]) {
return assetsToInstall.some((asset) => asset.references?.length);
}
Loading