Skip to content

Commit

Permalink
Don't rollback on saved objects conflict errors. (#85131)
Browse files Browse the repository at this point in the history
  • Loading branch information
skh authored Dec 14, 2020
1 parent 8858749 commit 1b3a1bb
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 143 deletions.
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/errors/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
PackageNotFoundError,
AgentPolicyNameExistsError,
PackageUnsupportedMediaTypeError,
ConcurrentInstallOperationError,
} from './index';

type IngestErrorHandler = (
Expand Down Expand Up @@ -69,7 +70,9 @@ const getHTTPResponseCode = (error: IngestManagerError): number => {
if (error instanceof PackageUnsupportedMediaTypeError) {
return 415; // Unsupported Media Type
}

if (error instanceof ConcurrentInstallOperationError) {
return 409; // Conflict
}
return 400; // Bad Request
};

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/server/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ export class PackageInvalidArchiveError extends IngestManagerError {}
export class PackageCacheError extends IngestManagerError {}
export class PackageOperationNotSupportedError extends IngestManagerError {}
export class FleetAdminUserInvalidError extends IngestManagerError {}
export class ConcurrentInstallOperationError extends IngestManagerError {}
298 changes: 156 additions & 142 deletions x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { deleteKibanaSavedObjectsAssets } from './remove';
import { installTransform } from '../elasticsearch/transform/install';
import { createInstallation, saveKibanaAssetsRefs, updateVersion } from './install';
import { saveArchiveEntries } from '../archive/storage';
import { ConcurrentInstallOperationError } from '../../../errors';

// this is only exported for testing
// use a leading underscore to indicate it's not the supported path
Expand All @@ -53,163 +54,176 @@ export async function _installPackage({
installSource: InstallSource;
}): Promise<AssetReference[]> {
const { name: pkgName, version: pkgVersion } = packageInfo;
// if some installation already exists
if (installedPkg) {
// if the installation is currently running, don't try to install
// instead, only return already installed assets
if (
installedPkg.attributes.install_status === 'installing' &&
Date.now() - Date.parse(installedPkg.attributes.install_started_at) <
MAX_TIME_COMPLETE_INSTALL
) {
let assets: AssetReference[] = [];
assets = assets.concat(installedPkg.attributes.installed_es);
assets = assets.concat(installedPkg.attributes.installed_kibana);
return assets;
try {
// if some installation already exists
if (installedPkg) {
// if the installation is currently running, don't try to install
// instead, only return already installed assets
if (
installedPkg.attributes.install_status === 'installing' &&
Date.now() - Date.parse(installedPkg.attributes.install_started_at) <
MAX_TIME_COMPLETE_INSTALL
) {
throw new ConcurrentInstallOperationError(
`Concurrent installation or upgrade of ${pkgName || 'unknown'}-${
pkgVersion || 'unknown'
} detected, aborting.`
);
} else {
// if no installation is running, or the installation has been running longer than MAX_TIME_COMPLETE_INSTALL
// (it might be stuck) update the saved object and proceed
await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installing',
install_started_at: new Date().toISOString(),
install_source: installSource,
});
}
} else {
// if no installation is running, or the installation has been running longer than MAX_TIME_COMPLETE_INSTALL
// (it might be stuck) update the saved object and proceed
await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installing',
install_started_at: new Date().toISOString(),
install_source: installSource,
await createInstallation({
savedObjectsClient,
packageInfo,
installSource,
});
}
} else {
await createInstallation({
savedObjectsClient,
packageInfo,
installSource,
});
}

// kick off `installIndexPatterns` & `installKibanaAssets` as early as possible because they're the longest running operations
// we don't `await` here because we don't want to delay starting the many other `install*` functions
// however, without an `await` or a `.catch` we haven't defined how to handle a promise rejection
// we define it many lines and potentially seconds of wall clock time later in
// `await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);`
// if we encounter an error before we there, we'll have an "unhandled rejection" which causes its own problems
// the program will log something like this _and exit/crash_
// Unhandled Promise rejection detected:
// RegistryResponseError or some other error
// Terminating process...
// server crashed with status code 1
//
// add a `.catch` to prevent the "unhandled rejection" case
// in that `.catch`, set something that indicates a failure
// check for that failure later and act accordingly (throw, ignore, return)
let installIndexPatternError;
const installIndexPatternPromise = installIndexPatterns(
savedObjectsClient,
pkgName,
pkgVersion,
installSource
).catch((reason) => (installIndexPatternError = reason));
const kibanaAssets = await getKibanaAssets(paths);
if (installedPkg)
await deleteKibanaSavedObjectsAssets(
// kick off `installIndexPatterns` & `installKibanaAssets` as early as possible because they're the longest running operations
// we don't `await` here because we don't want to delay starting the many other `install*` functions
// however, without an `await` or a `.catch` we haven't defined how to handle a promise rejection
// we define it many lines and potentially seconds of wall clock time later in
// `await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);`
// if we encounter an error before we there, we'll have an "unhandled rejection" which causes its own problems
// the program will log something like this _and exit/crash_
// Unhandled Promise rejection detected:
// RegistryResponseError or some other error
// Terminating process...
// server crashed with status code 1
//
// add a `.catch` to prevent the "unhandled rejection" case
// in that `.catch`, set something that indicates a failure
// check for that failure later and act accordingly (throw, ignore, return)
let installIndexPatternError;
const installIndexPatternPromise = installIndexPatterns(
savedObjectsClient,
installedPkg.attributes.installed_kibana
pkgName,
pkgVersion,
installSource
).catch((reason) => (installIndexPatternError = reason));
const kibanaAssets = await getKibanaAssets(paths);
if (installedPkg)
await deleteKibanaSavedObjectsAssets(
savedObjectsClient,
installedPkg.attributes.installed_kibana
);
// save new kibana refs before installing the assets
const installedKibanaAssetsRefs = await saveKibanaAssetsRefs(
savedObjectsClient,
pkgName,
kibanaAssets
);
// save new kibana refs before installing the assets
const installedKibanaAssetsRefs = await saveKibanaAssetsRefs(
savedObjectsClient,
pkgName,
kibanaAssets
);
let installKibanaAssetsError;
const installKibanaAssetsPromise = installKibanaAssets({
savedObjectsClient,
pkgName,
kibanaAssets,
}).catch((reason) => (installKibanaAssetsError = reason));

// the rest of the installation must happen in sequential order
// currently only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per data stream and we should then save them
await installILMPolicy(paths, callCluster);

// installs versionized pipelines without removing currently installed ones
const installedPipelines = await installPipelines(
packageInfo,
paths,
callCluster,
savedObjectsClient
);
// install or update the templates referencing the newly installed pipelines
const installedTemplates = await installTemplates(
packageInfo,
callCluster,
paths,
savedObjectsClient
);

// update current backing indices of each data stream
await updateCurrentWriteIndices(callCluster, installedTemplates);
let installKibanaAssetsError;
const installKibanaAssetsPromise = installKibanaAssets({
savedObjectsClient,
pkgName,
kibanaAssets,
}).catch((reason) => (installKibanaAssetsError = reason));

const installedTransforms = await installTransform(
packageInfo,
paths,
callCluster,
savedObjectsClient
);
// the rest of the installation must happen in sequential order
// currently only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per data stream and we should then save them
await installILMPolicy(paths, callCluster);

// if this is an update or retrying an update, delete the previous version's pipelines
if ((installType === 'update' || installType === 'reupdate') && installedPkg) {
await deletePreviousPipelines(
// installs versionized pipelines without removing currently installed ones
const installedPipelines = await installPipelines(
packageInfo,
paths,
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.version
savedObjectsClient
);
}
// pipelines from a different version may have installed during a failed update
if (installType === 'rollback' && installedPkg) {
await deletePreviousPipelines(
// install or update the templates referencing the newly installed pipelines
const installedTemplates = await installTemplates(
packageInfo,
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.install_version
paths,
savedObjectsClient
);
}
const installedTemplateRefs = installedTemplates.map((template) => ({
id: template.templateName,
type: ElasticsearchAssetType.indexTemplate,
}));

// make sure the assets are installed (or didn't error)
if (installIndexPatternError) throw installIndexPatternError;
if (installKibanaAssetsError) throw installKibanaAssetsError;
await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);
// update current backing indices of each data stream
await updateCurrentWriteIndices(callCluster, installedTemplates);

const packageAssetResults = await saveArchiveEntries({
savedObjectsClient,
paths,
packageInfo,
installSource,
});
const packageAssetRefs: PackageAssetReference[] = packageAssetResults.saved_objects.map(
(result) => ({
id: result.id,
type: ASSETS_SAVED_OBJECT_TYPE,
})
);
const installedTransforms = await installTransform(
packageInfo,
paths,
callCluster,
savedObjectsClient
);

// update to newly installed version when all assets are successfully installed
if (installedPkg) await updateVersion(savedObjectsClient, pkgName, pkgVersion);
// if this is an update or retrying an update, delete the previous version's pipelines
if ((installType === 'update' || installType === 'reupdate') && installedPkg) {
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.version
);
}
// pipelines from a different version may have installed during a failed update
if (installType === 'rollback' && installedPkg) {
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.install_version
);
}
const installedTemplateRefs = installedTemplates.map((template) => ({
id: template.templateName,
type: ElasticsearchAssetType.indexTemplate,
}));

await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installed',
package_assets: packageAssetRefs,
});
// make sure the assets are installed (or didn't error)
if (installIndexPatternError) throw installIndexPatternError;
if (installKibanaAssetsError) throw installKibanaAssetsError;
await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);

return [
...installedKibanaAssetsRefs,
...installedPipelines,
...installedTemplateRefs,
...installedTransforms,
];
const packageAssetResults = await saveArchiveEntries({
savedObjectsClient,
paths,
packageInfo,
installSource,
});
const packageAssetRefs: PackageAssetReference[] = packageAssetResults.saved_objects.map(
(result) => ({
id: result.id,
type: ASSETS_SAVED_OBJECT_TYPE,
})
);

// update to newly installed version when all assets are successfully installed
if (installedPkg) await updateVersion(savedObjectsClient, pkgName, pkgVersion);

await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installed',
package_assets: packageAssetRefs,
});

return [
...installedKibanaAssetsRefs,
...installedPipelines,
...installedTemplateRefs,
...installedTransforms,
];
} catch (err) {
if (savedObjectsClient.errors.isConflictError(err)) {
throw new ConcurrentInstallOperationError(
`Concurrent installation or upgrade of ${pkgName || 'unknown'}-${
pkgVersion || 'unknown'
} detected, aborting. Original error: ${err.message}`
);
} else {
throw err;
}
}
}

0 comments on commit 1b3a1bb

Please sign in to comment.