diff --git a/x-pack/plugins/fleet/server/errors/handlers.ts b/x-pack/plugins/fleet/server/errors/handlers.ts index 222554e97eb91..fecfcf145ca99 100644 --- a/x-pack/plugins/fleet/server/errors/handlers.ts +++ b/x-pack/plugins/fleet/server/errors/handlers.ts @@ -20,6 +20,7 @@ import { PackageNotFoundError, AgentPolicyNameExistsError, PackageUnsupportedMediaTypeError, + ConcurrentInstallOperationError, } from './index'; type IngestErrorHandler = ( @@ -69,7 +70,9 @@ const getHTTPResponseCode = (error: IngestManagerError): number => { if (error instanceof PackageUnsupportedMediaTypeError) { return 415; // Unsupported Media Type } - + if (error instanceof ConcurrentInstallOperationError) { + return 409; // Conflict + } return 400; // Bad Request }; diff --git a/x-pack/plugins/fleet/server/errors/index.ts b/x-pack/plugins/fleet/server/errors/index.ts index fad4eef66215d..700750761def4 100644 --- a/x-pack/plugins/fleet/server/errors/index.ts +++ b/x-pack/plugins/fleet/server/errors/index.ts @@ -30,3 +30,4 @@ export class PackageInvalidArchiveError extends IngestManagerError {} export class PackageCacheError extends IngestManagerError {} export class PackageOperationNotSupportedError extends IngestManagerError {} export class FleetAdminUserInvalidError extends IngestManagerError {} +export class ConcurrentInstallOperationError extends IngestManagerError {} diff --git a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts index b0a76016e8eee..5e6ecad9b72f1 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts @@ -30,6 +30,7 @@ import { deleteKibanaSavedObjectsAssets } from './remove'; import { installTransform } from '../elasticsearch/transform/install'; import { createInstallation, saveKibanaAssetsRefs, updateVersion } from './install'; import { saveArchiveEntries } from '../archive/storage'; +import { ConcurrentInstallOperationError } from '../../../errors'; // this is only exported for testing // use a leading underscore to indicate it's not the supported path @@ -53,163 +54,176 @@ export async function _installPackage({ installSource: InstallSource; }): Promise { const { name: pkgName, version: pkgVersion } = packageInfo; - // if some installation already exists - if (installedPkg) { - // if the installation is currently running, don't try to install - // instead, only return already installed assets - if ( - installedPkg.attributes.install_status === 'installing' && - Date.now() - Date.parse(installedPkg.attributes.install_started_at) < - MAX_TIME_COMPLETE_INSTALL - ) { - let assets: AssetReference[] = []; - assets = assets.concat(installedPkg.attributes.installed_es); - assets = assets.concat(installedPkg.attributes.installed_kibana); - return assets; + try { + // if some installation already exists + if (installedPkg) { + // if the installation is currently running, don't try to install + // instead, only return already installed assets + if ( + installedPkg.attributes.install_status === 'installing' && + Date.now() - Date.parse(installedPkg.attributes.install_started_at) < + MAX_TIME_COMPLETE_INSTALL + ) { + throw new ConcurrentInstallOperationError( + `Concurrent installation or upgrade of ${pkgName || 'unknown'}-${ + pkgVersion || 'unknown' + } detected, aborting.` + ); + } else { + // if no installation is running, or the installation has been running longer than MAX_TIME_COMPLETE_INSTALL + // (it might be stuck) update the saved object and proceed + await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, { + install_version: pkgVersion, + install_status: 'installing', + install_started_at: new Date().toISOString(), + install_source: installSource, + }); + } } else { - // if no installation is running, or the installation has been running longer than MAX_TIME_COMPLETE_INSTALL - // (it might be stuck) update the saved object and proceed - await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, { - install_version: pkgVersion, - install_status: 'installing', - install_started_at: new Date().toISOString(), - install_source: installSource, + await createInstallation({ + savedObjectsClient, + packageInfo, + installSource, }); } - } else { - await createInstallation({ - savedObjectsClient, - packageInfo, - installSource, - }); - } - // kick off `installIndexPatterns` & `installKibanaAssets` as early as possible because they're the longest running operations - // we don't `await` here because we don't want to delay starting the many other `install*` functions - // however, without an `await` or a `.catch` we haven't defined how to handle a promise rejection - // we define it many lines and potentially seconds of wall clock time later in - // `await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);` - // if we encounter an error before we there, we'll have an "unhandled rejection" which causes its own problems - // the program will log something like this _and exit/crash_ - // Unhandled Promise rejection detected: - // RegistryResponseError or some other error - // Terminating process... - // server crashed with status code 1 - // - // add a `.catch` to prevent the "unhandled rejection" case - // in that `.catch`, set something that indicates a failure - // check for that failure later and act accordingly (throw, ignore, return) - let installIndexPatternError; - const installIndexPatternPromise = installIndexPatterns( - savedObjectsClient, - pkgName, - pkgVersion, - installSource - ).catch((reason) => (installIndexPatternError = reason)); - const kibanaAssets = await getKibanaAssets(paths); - if (installedPkg) - await deleteKibanaSavedObjectsAssets( + // kick off `installIndexPatterns` & `installKibanaAssets` as early as possible because they're the longest running operations + // we don't `await` here because we don't want to delay starting the many other `install*` functions + // however, without an `await` or a `.catch` we haven't defined how to handle a promise rejection + // we define it many lines and potentially seconds of wall clock time later in + // `await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);` + // if we encounter an error before we there, we'll have an "unhandled rejection" which causes its own problems + // the program will log something like this _and exit/crash_ + // Unhandled Promise rejection detected: + // RegistryResponseError or some other error + // Terminating process... + // server crashed with status code 1 + // + // add a `.catch` to prevent the "unhandled rejection" case + // in that `.catch`, set something that indicates a failure + // check for that failure later and act accordingly (throw, ignore, return) + let installIndexPatternError; + const installIndexPatternPromise = installIndexPatterns( savedObjectsClient, - installedPkg.attributes.installed_kibana + pkgName, + pkgVersion, + installSource + ).catch((reason) => (installIndexPatternError = reason)); + const kibanaAssets = await getKibanaAssets(paths); + if (installedPkg) + await deleteKibanaSavedObjectsAssets( + savedObjectsClient, + installedPkg.attributes.installed_kibana + ); + // save new kibana refs before installing the assets + const installedKibanaAssetsRefs = await saveKibanaAssetsRefs( + savedObjectsClient, + pkgName, + kibanaAssets ); - // save new kibana refs before installing the assets - const installedKibanaAssetsRefs = await saveKibanaAssetsRefs( - savedObjectsClient, - pkgName, - kibanaAssets - ); - let installKibanaAssetsError; - const installKibanaAssetsPromise = installKibanaAssets({ - savedObjectsClient, - pkgName, - kibanaAssets, - }).catch((reason) => (installKibanaAssetsError = reason)); - - // the rest of the installation must happen in sequential order - // currently only the base package has an ILM policy - // at some point ILM policies can be installed/modified - // per data stream and we should then save them - await installILMPolicy(paths, callCluster); - - // installs versionized pipelines without removing currently installed ones - const installedPipelines = await installPipelines( - packageInfo, - paths, - callCluster, - savedObjectsClient - ); - // install or update the templates referencing the newly installed pipelines - const installedTemplates = await installTemplates( - packageInfo, - callCluster, - paths, - savedObjectsClient - ); - - // update current backing indices of each data stream - await updateCurrentWriteIndices(callCluster, installedTemplates); + let installKibanaAssetsError; + const installKibanaAssetsPromise = installKibanaAssets({ + savedObjectsClient, + pkgName, + kibanaAssets, + }).catch((reason) => (installKibanaAssetsError = reason)); - const installedTransforms = await installTransform( - packageInfo, - paths, - callCluster, - savedObjectsClient - ); + // the rest of the installation must happen in sequential order + // currently only the base package has an ILM policy + // at some point ILM policies can be installed/modified + // per data stream and we should then save them + await installILMPolicy(paths, callCluster); - // if this is an update or retrying an update, delete the previous version's pipelines - if ((installType === 'update' || installType === 'reupdate') && installedPkg) { - await deletePreviousPipelines( + // installs versionized pipelines without removing currently installed ones + const installedPipelines = await installPipelines( + packageInfo, + paths, callCluster, - savedObjectsClient, - pkgName, - installedPkg.attributes.version + savedObjectsClient ); - } - // pipelines from a different version may have installed during a failed update - if (installType === 'rollback' && installedPkg) { - await deletePreviousPipelines( + // install or update the templates referencing the newly installed pipelines + const installedTemplates = await installTemplates( + packageInfo, callCluster, - savedObjectsClient, - pkgName, - installedPkg.attributes.install_version + paths, + savedObjectsClient ); - } - const installedTemplateRefs = installedTemplates.map((template) => ({ - id: template.templateName, - type: ElasticsearchAssetType.indexTemplate, - })); - // make sure the assets are installed (or didn't error) - if (installIndexPatternError) throw installIndexPatternError; - if (installKibanaAssetsError) throw installKibanaAssetsError; - await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]); + // update current backing indices of each data stream + await updateCurrentWriteIndices(callCluster, installedTemplates); - const packageAssetResults = await saveArchiveEntries({ - savedObjectsClient, - paths, - packageInfo, - installSource, - }); - const packageAssetRefs: PackageAssetReference[] = packageAssetResults.saved_objects.map( - (result) => ({ - id: result.id, - type: ASSETS_SAVED_OBJECT_TYPE, - }) - ); + const installedTransforms = await installTransform( + packageInfo, + paths, + callCluster, + savedObjectsClient + ); - // update to newly installed version when all assets are successfully installed - if (installedPkg) await updateVersion(savedObjectsClient, pkgName, pkgVersion); + // if this is an update or retrying an update, delete the previous version's pipelines + if ((installType === 'update' || installType === 'reupdate') && installedPkg) { + await deletePreviousPipelines( + callCluster, + savedObjectsClient, + pkgName, + installedPkg.attributes.version + ); + } + // pipelines from a different version may have installed during a failed update + if (installType === 'rollback' && installedPkg) { + await deletePreviousPipelines( + callCluster, + savedObjectsClient, + pkgName, + installedPkg.attributes.install_version + ); + } + const installedTemplateRefs = installedTemplates.map((template) => ({ + id: template.templateName, + type: ElasticsearchAssetType.indexTemplate, + })); - await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, { - install_version: pkgVersion, - install_status: 'installed', - package_assets: packageAssetRefs, - }); + // make sure the assets are installed (or didn't error) + if (installIndexPatternError) throw installIndexPatternError; + if (installKibanaAssetsError) throw installKibanaAssetsError; + await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]); - return [ - ...installedKibanaAssetsRefs, - ...installedPipelines, - ...installedTemplateRefs, - ...installedTransforms, - ]; + const packageAssetResults = await saveArchiveEntries({ + savedObjectsClient, + paths, + packageInfo, + installSource, + }); + const packageAssetRefs: PackageAssetReference[] = packageAssetResults.saved_objects.map( + (result) => ({ + id: result.id, + type: ASSETS_SAVED_OBJECT_TYPE, + }) + ); + + // update to newly installed version when all assets are successfully installed + if (installedPkg) await updateVersion(savedObjectsClient, pkgName, pkgVersion); + + await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, { + install_version: pkgVersion, + install_status: 'installed', + package_assets: packageAssetRefs, + }); + + return [ + ...installedKibanaAssetsRefs, + ...installedPipelines, + ...installedTemplateRefs, + ...installedTransforms, + ]; + } catch (err) { + if (savedObjectsClient.errors.isConflictError(err)) { + throw new ConcurrentInstallOperationError( + `Concurrent installation or upgrade of ${pkgName || 'unknown'}-${ + pkgVersion || 'unknown' + } detected, aborting. Original error: ${err.message}` + ); + } else { + throw err; + } + } }