Skip to content

Commit

Permalink
[Fleet] [Security Solution] Install prebuilt rules package using stre…
Browse files Browse the repository at this point in the history
…am-based approach (elastic#195888)

**Resolves: elastic#192350

## Summary

Implemented stream-based installation of the detection rules package.

**Background**: The installation of the detection rules package was
causing OOM (Out of Memory) errors in Serverless environments where the
available memory is limited to 1GB. The root cause of the errors was
that during installation, the package was being read and unzipped
entirely into memory. Given the large package size, this led to OOMs. To
address these memory issues, the following changes were made:

1. Added a branching logic to the `installPackageFromRegistry` and
`installPackageByUpload` methods, where based on the package name is
decided to use streaming or not. Only one `security_detection_engine`
package is currently hardcoded to use streaming.
2. In the state machine then defined a separate set of steps for the
stream-based package installation. It is reduced to cover only Kibana
assets installation at this stage.
3. A new `stepInstallKibanaAssetsWithStreaming` step is added to handle
assets installation. While this method still reads the package archive
into memory (since unzipping from a readable stream is [not possible due
to the design of the .zip
format](https://github.com/thejoshwolfe/yauzl?tab=readme-ov-file#no-streaming-unzip-api)),
the package is unzipped using streams after being read into a buffer.
This allows only a small portion of the archive (100 saved objects at a
time) to be unpacked into memory, reducing memory usage.
4. The new method also includes several optimizations, such as only
removing previously installed assets if they are missing in the new
package and using `savedObjectClient.bulkCreate` instead of the less
efficient `savedObjectClient.import`.

### Test environment

1. Prebuilt detection rules package with ~20k saved objects; 118MB
zipped.
5. Local package registry.
6. Production build of Kibana running locally with a 700MB max old space
limit, pointed to that registry.

Setting up a test environment is not completely straightforward. Here's
a rough outline of the steps:
<details>
<summary>
How to test this PR
</summary>

1. Create a package containing a large number of prebuilt rules.
1. I used the `package-storage` repository to find one of the previously
released prebuilt rules packages.
2. Multiplied the number of assets in the package to 20k historical
versions.
   4. Built the package using `elastic-package build`.
2. Start a local package registry serving the built package using
`elastic-package stack up --services package-registry`.
4. Create a production build of Kibana. To speed up the process,
unnecessary artifacts can be skipped:
    ```
node scripts/build --skip-cdn-assets --skip-docker-ubi
--skip-docker-ubuntu --skip-docker-wolfi --skip-docker-fips
    ```
7. Provide the built Kibana with a config pointing to the local
registry. The config is located in
`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/config/kibana.yml`.
You can use the following config:
    ```
    csp.strict: false
xpack.security.encryptionKey: 've4Vohnu oa0Fu9ae Eethee8c oDieg4do
Nohrah1u ao9Hu2oh Aeb4Ieyi Aew1aegi'
xpack.encryptedSavedObjects.encryptionKey: 'Shah7nai Eew6izai Eir7OoW0
Gewi2ief eiSh8woo shoogh7E Quae6hal ce6Oumah'

    xpack.fleet.internal.registry.kibanaVersionCheckEnabled: false
    xpack.fleet.registryUrl: https://localhost:8080

    elasticsearch:
      username: 'kibana_system'
      password: 'changeme'
      hosts: 'http://localhost:9200'
    ```
8. Override the Node options Kibana starts with to allow it to connect
to the local registry and set the memory limit. For this, you need to
edit the `build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64/bin/kibana`
file:
    ```
NODE_OPTIONS="--no-warnings --max-http-header-size=65536
--unhandled-rejections=warn --dns-result-order=ipv4first
--openssl-legacy-provider --max_old_space_size=700 --inspect"
NODE_ENV=production
NODE_EXTRA_CA_CERTS=~/.elastic-package/profiles/default/certs/ca-cert.pem
exec "${NODE}" "${DIR}/src/cli/dist" "${@}"
    ```
9. Navigate to the build folder:
`build/default/kibana-9.0.0-SNAPSHOT-darwin-aarch64`.
10. Start Kibana using `./bin/kibana`.
11. Kibana is now running in debug mode, with the debugger started on
port 9229. You can connect to it using VS Code's debug config or
Chrome's DevTools.
12. Now you can install prebuilt detection rules by calling the `POST
/internal/detection_engine/prebuilt_rules/_bootstrap` endpoint, which
uses the new streaming installation under the hood.

</details>

### Test results locally

**Without the streaming approach**

Guaranteed OOM. Even smaller packages, up to 10k rules, caused sporadic
OOM errors. So for comparison, tested the package installation without
memory limits.

![Screenshot 2024-10-14 at 14 15
26](https://github.com/user-attachments/assets/131cb877-2404-4638-b619-b1370a53659f)

1. Heap memory usage spikes up to 2.5GB
5. External memory consumes up to 450 Mb, which is four times the
archive size
13. RSS (Resident Set Size) exceeds 4.5GB

**With the streaming approach**

No OOM errors observed. The memory consumption chart looks like the
following:

![Screenshot 2024-10-14 at 11 15
21](https://github.com/user-attachments/assets/b47ba8c9-2ba7-42de-b921-c33104d4481e)

1. Heap memory remains stable, around 450MB, without any spikes.
2. External memory jumps to around 250MB at the beginning of the
installation, then drops to around 120MB, which is roughly equal to the
package archive size. I couldn't determine why the external memory
consumption exceeds the package size by 2x when the installation starts.
I checked the code for places where the package might be loaded into
memory twice but found nothing suspicious. This might be worth
investigating further.
3. RSS remains stable, peaking slightly above 1GB. I believe this is the
upper limit for a package that can be handled without errors in a
Serverless environment, where the memory limit is dictated by pod-level
settings rather than Node settings and is set to 1GB. I'll verify this
on a real Serverless instance to confirm.

### Test results on Serverless

![Screenshot 2024-10-31 at 12 31
34](https://github.com/user-attachments/assets/d20d2860-fa96-4e56-be2b-7b3c0b5c7b77)
  • Loading branch information
xcrzx authored Nov 5, 2024
1 parent fdc9aae commit 67cdb93
Show file tree
Hide file tree
Showing 41 changed files with 768 additions and 83 deletions.
15 changes: 15 additions & 0 deletions x-pack/plugins/fleet/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,25 @@ export type InstallablePackage = RegistryPackage | ArchivePackage;

export type AssetsMap = Map<string, Buffer | undefined>;

export interface ArchiveEntry {
path: string;
buffer?: Buffer;
}

export interface ArchiveIterator {
traverseEntries: (onEntry: (entry: ArchiveEntry) => Promise<void>) => Promise<void>;
getPaths: () => Promise<string[]>;
}

export interface PackageInstallContext {
packageInfo: InstallablePackage;
/**
* @deprecated Use `archiveIterator` to access the package archive entries
* without loading them all into memory at once.
*/
assetsMap: AssetsMap;
paths: string[];
archiveIterator: ArchiveIterator;
}

export type ArchivePackage = PackageSpecManifest &
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/fleet/server/routes/epm/file_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_p
import { getFile, getInstallation } from '../../services/epm/packages/get';
import type { FleetRequestHandlerContext } from '../..';
import { appContextService } from '../../services';
import { unpackBufferEntries } from '../../services/epm/archive';
import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive';
import { getAsset } from '../../services/epm/archive/storage';

import { getFileHandler } from './file_handler';
Expand All @@ -29,7 +29,7 @@ jest.mock('../../services/epm/packages/get');
const mockedGetBundledPackageByPkgKey = jest.mocked(getBundledPackageByPkgKey);
const mockedGetInstallation = jest.mocked(getInstallation);
const mockedGetFile = jest.mocked(getFile);
const mockedUnpackBufferEntries = jest.mocked(unpackBufferEntries);
const mockedUnpackBufferEntries = jest.mocked(unpackArchiveEntriesIntoMemory);
const mockedGetAsset = jest.mocked(getAsset);

function mockContext() {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/fleet/server/routes/epm/file_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { defaultFleetErrorHandler } from '../../errors';
import { getAsset } from '../../services/epm/archive/storage';
import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_packages';
import { pkgToPkgKey } from '../../services/epm/registry';
import { unpackBufferEntries } from '../../services/epm/archive';
import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive';

const CACHE_CONTROL_10_MINUTES_HEADER: HttpResponseOptions['headers'] = {
'cache-control': 'max-age=600',
Expand Down Expand Up @@ -69,7 +69,7 @@ export const getFileHandler: FleetRequestHandler<
pkgToPkgKey({ name: pkgName, version: pkgVersion })
);
if (bundledPackage) {
const bufferEntries = await unpackBufferEntries(
const bufferEntries = await unpackArchiveEntriesIntoMemory(
await bundledPackage.getBuffer(),
'application/zip'
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
FleetRequestHandler,
InstallKibanaAssetsRequestSchema,
} from '../../types';
import { createArchiveIteratorFromMap } from '../../services/epm/archive/archive_iterator';

export const installPackageKibanaAssetsHandler: FleetRequestHandler<
TypeOf<typeof InstallKibanaAssetsRequestSchema.params>,
Expand Down Expand Up @@ -69,6 +70,7 @@ export const installPackageKibanaAssetsHandler: FleetRequestHandler<
packageInfo,
paths: installedPkgWithAssets.paths,
assetsMap: installedPkgWithAssets.assetsMap,
archiveIterator: createArchiveIteratorFromMap(installedPkgWithAssets.assetsMap),
},
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { AssetsMap, ArchiveIterator, ArchiveEntry } from '../../../../common/types';

import { traverseArchiveEntries } from '.';

/**
* Creates an iterator for traversing and extracting paths from an archive
* buffer. This iterator is intended to be used for memory efficient traversal
* of archive contents without extracting the entire archive into memory.
*
* @param archiveBuffer - The buffer containing the archive data.
* @param contentType - The content type of the archive (e.g.,
* 'application/zip').
* @returns ArchiveIterator instance.
*
*/
export const createArchiveIterator = (
archiveBuffer: Buffer,
contentType: string
): ArchiveIterator => {
const paths: string[] = [];

const traverseEntries = async (
onEntry: (entry: ArchiveEntry) => Promise<void>
): Promise<void> => {
await traverseArchiveEntries(archiveBuffer, contentType, async (entry) => {
await onEntry(entry);
});
};

const getPaths = async (): Promise<string[]> => {
if (paths.length) {
return paths;
}

await traverseEntries(async (entry) => {
paths.push(entry.path);
});

return paths;
};

return {
traverseEntries,
getPaths,
};
};

/**
* Creates an archive iterator from the assetsMap. This is a stop-gap solution
* to provide a uniform interface for traversing assets while assetsMap is still
* in use. It works with a map of assets loaded into memory and is not intended
* for use with large archives.
*
* @param assetsMap - A map where the keys are asset paths and the values are
* asset buffers.
* @returns ArchiveIterator instance.
*
*/
export const createArchiveIteratorFromMap = (assetsMap: AssetsMap): ArchiveIterator => {
const traverseEntries = async (
onEntry: (entry: ArchiveEntry) => Promise<void>
): Promise<void> => {
for (const [path, buffer] of assetsMap) {
await onEntry({ path, buffer });
}
};

const getPaths = async (): Promise<string[]> => {
return [...assetsMap.keys()];
};

return {
traverseEntries,
getPaths,
};
};
16 changes: 9 additions & 7 deletions x-pack/plugins/fleet/server/services/epm/archive/extract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import * as tar from 'tar';
import yauzl from 'yauzl';

import { bufferToStream, streamToBuffer } from '../streams';

import type { ArchiveEntry } from '.';
import type { ArchiveEntry } from '../../../../common/types';

export async function untarBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
onEntry = async (entry: ArchiveEntry): Promise<void> => {}
) {
const deflatedStream = bufferToStream(buffer);
// use tar.list vs .extract to avoid writing to disk
Expand All @@ -37,17 +36,20 @@ export async function untarBuffer(
export async function unzipBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
onEntry = async (entry: ArchiveEntry): Promise<void> => {}
): Promise<unknown> {
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
zipfile.readEntry();
zipfile.on('entry', async (entry: yauzl.Entry) => {
const path = entry.fileName;
if (!filter({ path })) return zipfile.readEntry();

const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
onEntry({ buffer: entryBuffer, path });
zipfile.readEntry();
try {
const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
await onEntry({ buffer: entryBuffer, path });
} finally {
zipfile.readEntry();
}
});
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
}
Expand Down
100 changes: 63 additions & 37 deletions x-pack/plugins/fleet/server/services/epm/archive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
* 2.0.
*/

import type { AssetParts, AssetsMap } from '../../../../common/types';
import type {
ArchiveEntry,
ArchiveIterator,
AssetParts,
AssetsMap,
} from '../../../../common/types';
import {
PackageInvalidArchiveError,
PackageUnsupportedMediaTypeError,
PackageNotFoundError,
} from '../../../errors';

import { createArchiveIterator } from './archive_iterator';

import { deletePackageInfo } from './cache';
import type { SharedKey } from './cache';
import { getBufferExtractor } from './extract';
Expand All @@ -20,66 +27,85 @@ export * from './cache';
export { getBufferExtractor, untarBuffer, unzipBuffer } from './extract';
export { generatePackageInfoFromArchiveBuffer } from './parse';

export interface ArchiveEntry {
path: string;
buffer?: Buffer;
}

export async function unpackBufferToAssetsMap({
name,
version,
contentType,
archiveBuffer,
useStreaming,
}: {
name: string;
version: string;
contentType: string;
archiveBuffer: Buffer;
}): Promise<{ paths: string[]; assetsMap: AssetsMap }> {
const assetsMap = new Map<string, Buffer | undefined>();
const paths: string[] = [];
const entries = await unpackBufferEntries(archiveBuffer, contentType);

entries.forEach((entry) => {
const { path, buffer } = entry;
if (buffer) {
assetsMap.set(path, buffer);
paths.push(path);
}
});

return { assetsMap, paths };
useStreaming: boolean | undefined;
}): Promise<{ paths: string[]; assetsMap: AssetsMap; archiveIterator: ArchiveIterator }> {
const archiveIterator = createArchiveIterator(archiveBuffer, contentType);
let paths: string[] = [];
let assetsMap: AssetsMap = new Map();
if (useStreaming) {
paths = await archiveIterator.getPaths();
// We keep the assetsMap empty as we don't want to load all the assets in memory
assetsMap = new Map();
} else {
const entries = await unpackArchiveEntriesIntoMemory(archiveBuffer, contentType);

entries.forEach((entry) => {
const { path, buffer } = entry;
if (buffer) {
assetsMap.set(path, buffer);
paths.push(path);
}
});
}

return { paths, assetsMap, archiveIterator };
}

export async function unpackBufferEntries(
/**
* This function extracts all archive entries into memory.
*
* NOTE: This is potentially dangerous for large archives and can cause OOM
* errors. Use 'traverseArchiveEntries' instead to iterate over the entries
* without storing them all in memory at once.
*
* @param archiveBuffer
* @param contentType
* @returns All the entries in the archive buffer
*/
export async function unpackArchiveEntriesIntoMemory(
archiveBuffer: Buffer,
contentType: string
): Promise<ArchiveEntry[]> {
const entries: ArchiveEntry[] = [];
const addToEntries = async (entry: ArchiveEntry) => void entries.push(entry);
await traverseArchiveEntries(archiveBuffer, contentType, addToEntries);

// While unpacking a tar.gz file with unzipBuffer() will result in a thrown
// error, unpacking a zip file with untarBuffer() just results in nothing.
if (entries.length === 0) {
throw new PackageInvalidArchiveError(
`Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}
return entries;
}

export async function traverseArchiveEntries(
archiveBuffer: Buffer,
contentType: string,
onEntry: (entry: ArchiveEntry) => Promise<void>
) {
const bufferExtractor = getBufferExtractor({ contentType });
if (!bufferExtractor) {
throw new PackageUnsupportedMediaTypeError(
`Unsupported media type ${contentType}. Please use 'application/gzip' or 'application/zip'`
);
}
const entries: ArchiveEntry[] = [];
try {
const onlyFiles = ({ path }: ArchiveEntry): boolean => !path.endsWith('/');
const addToEntries = (entry: ArchiveEntry) => entries.push(entry);
await bufferExtractor(archiveBuffer, onlyFiles, addToEntries);
await bufferExtractor(archiveBuffer, onlyFiles, onEntry);
} catch (error) {
throw new PackageInvalidArchiveError(
`Error during extraction of package: ${error}. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}

// While unpacking a tar.gz file with unzipBuffer() will result in a thrown error in the try-catch above,
// unpacking a zip file with untarBuffer() just results in nothing.
if (entries.length === 0) {
throw new PackageInvalidArchiveError(
`Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}
return entries;
}

export const deletePackageCache = ({ name, version }: SharedKey) => {
Expand Down
5 changes: 2 additions & 3 deletions x-pack/plugins/fleet/server/services/epm/archive/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import {
import { PackageInvalidArchiveError } from '../../../errors';
import { pkgToPkgKey } from '../registry';

import { unpackBufferEntries } from '.';
import { traverseArchiveEntries } from '.';

const readFileAsync = promisify(readFile);
export const MANIFEST_NAME = 'manifest.yml';
Expand Down Expand Up @@ -160,9 +160,8 @@ export async function generatePackageInfoFromArchiveBuffer(
contentType: string
): Promise<{ paths: string[]; packageInfo: ArchivePackage }> {
const assetsMap: AssetsBufferMap = {};
const entries = await unpackBufferEntries(archiveBuffer, contentType);
const paths: string[] = [];
entries.forEach(({ path: bufferPath, buffer }) => {
await traverseArchiveEntries(archiveBuffer, contentType, async ({ path: bufferPath, buffer }) => {
paths.push(bufferPath);
if (buffer && filterAssetPathForParseAndVerifyArchive(bufferPath)) {
assetsMap[bufferPath] = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';

import { ASSETS_SAVED_OBJECT_TYPE } from '../../../../common';
import type {
ArchiveEntry,
InstallablePackage,
InstallSource,
PackageAssetReference,
Expand All @@ -24,7 +25,6 @@ import { PackageInvalidArchiveError, PackageNotFoundError } from '../../../error
import { appContextService } from '../../app_context';

import { setPackageInfo } from '.';
import type { ArchiveEntry } from '.';
import { filterAssetPathForParseAndVerifyArchive, parseAndVerifyArchive } from './parse';

const ONE_BYTE = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ import type {
PackageInfo,
} from '../../../../types';
import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import type { ArchiveEntry } from '../../archive';
import {
FLEET_FINAL_PIPELINE_CONTENT,
FLEET_FINAL_PIPELINE_ID,
FLEET_FINAL_PIPELINE_VERSION,
} from '../../../../constants';
import { getPipelineNameForDatastream } from '../../../../../common/services';
import type { PackageInstallContext } from '../../../../../common/types';
import type { ArchiveEntry, PackageInstallContext } from '../../../../../common/types';

import { appendMetadataToIngestPipeline } from '../meta';
import { retryTransientEsErrors } from '../retry';
Expand Down
Loading

0 comments on commit 67cdb93

Please sign in to comment.