Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet][EPM] Don't roll back on saved objects conflict errors. #85131

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/errors/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
PackageNotFoundError,
AgentPolicyNameExistsError,
PackageUnsupportedMediaTypeError,
ConcurrentInstallOperationError,
} from './index';

type IngestErrorHandler = (
Expand Down Expand Up @@ -69,7 +70,9 @@ const getHTTPResponseCode = (error: IngestManagerError): number => {
if (error instanceof PackageUnsupportedMediaTypeError) {
return 415; // Unsupported Media Type
}

if (error instanceof ConcurrentInstallOperationError) {
return 409; // Conflict
}
return 400; // Bad Request
};

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/server/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ export class PackageInvalidArchiveError extends IngestManagerError {}
export class PackageCacheError extends IngestManagerError {}
export class PackageOperationNotSupportedError extends IngestManagerError {}
export class FleetAdminUserInvalidError extends IngestManagerError {}
export class ConcurrentInstallOperationError extends IngestManagerError {}
298 changes: 156 additions & 142 deletions x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { deleteKibanaSavedObjectsAssets } from './remove';
import { installTransform } from '../elasticsearch/transform/install';
import { createInstallation, saveKibanaAssetsRefs, updateVersion } from './install';
import { saveArchiveEntries } from '../archive/storage';
import { ConcurrentInstallOperationError } from '../../../errors';

// this is only exported for testing
// use a leading underscore to indicate it's not the supported path
Expand All @@ -53,163 +54,176 @@ export async function _installPackage({
installSource: InstallSource;
}): Promise<AssetReference[]> {
const { name: pkgName, version: pkgVersion } = packageInfo;
neptunian marked this conversation as resolved.
Show resolved Hide resolved
// if some installation already exists
if (installedPkg) {
// if the installation is currently running, don't try to install
// instead, only return already installed assets
if (
installedPkg.attributes.install_status === 'installing' &&
Date.now() - Date.parse(installedPkg.attributes.install_started_at) <
MAX_TIME_COMPLETE_INSTALL
) {
let assets: AssetReference[] = [];
assets = assets.concat(installedPkg.attributes.installed_es);
assets = assets.concat(installedPkg.attributes.installed_kibana);
return assets;
try {
// if some installation already exists
if (installedPkg) {
// if the installation is currently running, don't try to install
// instead, only return already installed assets
if (
installedPkg.attributes.install_status === 'installing' &&
Date.now() - Date.parse(installedPkg.attributes.install_started_at) <
MAX_TIME_COMPLETE_INSTALL
) {
throw new ConcurrentInstallOperationError(
`Concurrent installation or upgrade of ${pkgName || 'unknown'}-${
pkgVersion || 'unknown'
} detected, aborting.`
);
} else {
// if no installation is running, or the installation has been running longer than MAX_TIME_COMPLETE_INSTALL
// (it might be stuck) update the saved object and proceed
await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installing',
install_started_at: new Date().toISOString(),
install_source: installSource,
});
}
} else {
// if no installation is running, or the installation has been running longer than MAX_TIME_COMPLETE_INSTALL
// (it might be stuck) update the saved object and proceed
await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installing',
install_started_at: new Date().toISOString(),
install_source: installSource,
await createInstallation({
savedObjectsClient,
packageInfo,
installSource,
});
}
} else {
await createInstallation({
savedObjectsClient,
packageInfo,
installSource,
});
}

// kick off `installIndexPatterns` & `installKibanaAssets` as early as possible because they're the longest running operations
// we don't `await` here because we don't want to delay starting the many other `install*` functions
// however, without an `await` or a `.catch` we haven't defined how to handle a promise rejection
// we define it many lines and potentially seconds of wall clock time later in
// `await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);`
// if we encounter an error before we there, we'll have an "unhandled rejection" which causes its own problems
// the program will log something like this _and exit/crash_
// Unhandled Promise rejection detected:
// RegistryResponseError or some other error
// Terminating process...
// server crashed with status code 1
//
// add a `.catch` to prevent the "unhandled rejection" case
// in that `.catch`, set something that indicates a failure
// check for that failure later and act accordingly (throw, ignore, return)
let installIndexPatternError;
const installIndexPatternPromise = installIndexPatterns(
savedObjectsClient,
pkgName,
pkgVersion,
installSource
).catch((reason) => (installIndexPatternError = reason));
const kibanaAssets = await getKibanaAssets(paths);
if (installedPkg)
await deleteKibanaSavedObjectsAssets(
// kick off `installIndexPatterns` & `installKibanaAssets` as early as possible because they're the longest running operations
// we don't `await` here because we don't want to delay starting the many other `install*` functions
// however, without an `await` or a `.catch` we haven't defined how to handle a promise rejection
// we define it many lines and potentially seconds of wall clock time later in
// `await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);`
// if we encounter an error before we there, we'll have an "unhandled rejection" which causes its own problems
// the program will log something like this _and exit/crash_
// Unhandled Promise rejection detected:
// RegistryResponseError or some other error
// Terminating process...
// server crashed with status code 1
//
// add a `.catch` to prevent the "unhandled rejection" case
// in that `.catch`, set something that indicates a failure
// check for that failure later and act accordingly (throw, ignore, return)
let installIndexPatternError;
const installIndexPatternPromise = installIndexPatterns(
savedObjectsClient,
installedPkg.attributes.installed_kibana
pkgName,
pkgVersion,
installSource
).catch((reason) => (installIndexPatternError = reason));
const kibanaAssets = await getKibanaAssets(paths);
if (installedPkg)
await deleteKibanaSavedObjectsAssets(
savedObjectsClient,
installedPkg.attributes.installed_kibana
);
// save new kibana refs before installing the assets
const installedKibanaAssetsRefs = await saveKibanaAssetsRefs(
savedObjectsClient,
pkgName,
kibanaAssets
);
// save new kibana refs before installing the assets
const installedKibanaAssetsRefs = await saveKibanaAssetsRefs(
savedObjectsClient,
pkgName,
kibanaAssets
);
let installKibanaAssetsError;
const installKibanaAssetsPromise = installKibanaAssets({
savedObjectsClient,
pkgName,
kibanaAssets,
}).catch((reason) => (installKibanaAssetsError = reason));

// the rest of the installation must happen in sequential order
// currently only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per data stream and we should then save them
await installILMPolicy(paths, callCluster);

// installs versionized pipelines without removing currently installed ones
const installedPipelines = await installPipelines(
packageInfo,
paths,
callCluster,
savedObjectsClient
);
// install or update the templates referencing the newly installed pipelines
const installedTemplates = await installTemplates(
packageInfo,
callCluster,
paths,
savedObjectsClient
);

// update current backing indices of each data stream
await updateCurrentWriteIndices(callCluster, installedTemplates);
let installKibanaAssetsError;
const installKibanaAssetsPromise = installKibanaAssets({
savedObjectsClient,
pkgName,
kibanaAssets,
}).catch((reason) => (installKibanaAssetsError = reason));

const installedTransforms = await installTransform(
packageInfo,
paths,
callCluster,
savedObjectsClient
);
// the rest of the installation must happen in sequential order
// currently only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per data stream and we should then save them
await installILMPolicy(paths, callCluster);

// if this is an update or retrying an update, delete the previous version's pipelines
if ((installType === 'update' || installType === 'reupdate') && installedPkg) {
await deletePreviousPipelines(
// installs versionized pipelines without removing currently installed ones
const installedPipelines = await installPipelines(
packageInfo,
paths,
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.version
savedObjectsClient
);
}
// pipelines from a different version may have installed during a failed update
if (installType === 'rollback' && installedPkg) {
await deletePreviousPipelines(
// install or update the templates referencing the newly installed pipelines
const installedTemplates = await installTemplates(
packageInfo,
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.install_version
paths,
savedObjectsClient
);
}
const installedTemplateRefs = installedTemplates.map((template) => ({
id: template.templateName,
type: ElasticsearchAssetType.indexTemplate,
}));

// make sure the assets are installed (or didn't error)
if (installIndexPatternError) throw installIndexPatternError;
if (installKibanaAssetsError) throw installKibanaAssetsError;
await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);
// update current backing indices of each data stream
await updateCurrentWriteIndices(callCluster, installedTemplates);

const packageAssetResults = await saveArchiveEntries({
savedObjectsClient,
paths,
packageInfo,
installSource,
});
const packageAssetRefs: PackageAssetReference[] = packageAssetResults.saved_objects.map(
(result) => ({
id: result.id,
type: ASSETS_SAVED_OBJECT_TYPE,
})
);
const installedTransforms = await installTransform(
packageInfo,
paths,
callCluster,
savedObjectsClient
);

// update to newly installed version when all assets are successfully installed
if (installedPkg) await updateVersion(savedObjectsClient, pkgName, pkgVersion);
// if this is an update or retrying an update, delete the previous version's pipelines
if ((installType === 'update' || installType === 'reupdate') && installedPkg) {
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.version
);
}
// pipelines from a different version may have installed during a failed update
if (installType === 'rollback' && installedPkg) {
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.install_version
);
}
const installedTemplateRefs = installedTemplates.map((template) => ({
id: template.templateName,
type: ElasticsearchAssetType.indexTemplate,
}));

await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installed',
package_assets: packageAssetRefs,
});
// make sure the assets are installed (or didn't error)
if (installIndexPatternError) throw installIndexPatternError;
if (installKibanaAssetsError) throw installKibanaAssetsError;
await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);

return [
...installedKibanaAssetsRefs,
...installedPipelines,
...installedTemplateRefs,
...installedTransforms,
];
const packageAssetResults = await saveArchiveEntries({
savedObjectsClient,
paths,
packageInfo,
installSource,
});
const packageAssetRefs: PackageAssetReference[] = packageAssetResults.saved_objects.map(
(result) => ({
id: result.id,
type: ASSETS_SAVED_OBJECT_TYPE,
})
);

// update to newly installed version when all assets are successfully installed
if (installedPkg) await updateVersion(savedObjectsClient, pkgName, pkgVersion);

await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installed',
package_assets: packageAssetRefs,
});

return [
...installedKibanaAssetsRefs,
...installedPipelines,
...installedTemplateRefs,
...installedTransforms,
];
} catch (err) {
if (savedObjectsClient.errors.isConflictError(err)) {
throw new ConcurrentInstallOperationError(
`Concurrent installation or upgrade of ${pkgName || 'unknown'}-${
pkgVersion || 'unknown'
} detected, aborting. Original error: ${err.message}`
);
} else {
throw err;
}
}
}