Skip to content

Commit

Permalink
Install prebuilt rules package using stream-based approach
Browse files Browse the repository at this point in the history
  • Loading branch information
xcrzx committed Nov 4, 2024
1 parent ac04952 commit e374721
Show file tree
Hide file tree
Showing 38 changed files with 680 additions and 81 deletions.
15 changes: 15 additions & 0 deletions x-pack/plugins/fleet/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,25 @@ export type InstallablePackage = RegistryPackage | ArchivePackage;

export type AssetsMap = Map<string, Buffer | undefined>;

export interface ArchiveEntry {
path: string;
buffer?: Buffer;
}

export interface ArchiveIterator {
traverseEntries: (onEntry: (entry: ArchiveEntry) => Promise<void>) => Promise<void>;
getPaths: () => Promise<string[]>;
}

export interface PackageInstallContext {
packageInfo: InstallablePackage;
/**
* @deprecated Use `archiveIterator` to access the package archive entries
* without loading them all into memory at once.
*/
assetsMap: AssetsMap;
paths: string[];
archiveIterator: ArchiveIterator;
}

export type ArchivePackage = PackageSpecManifest &
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/fleet/server/routes/epm/file_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_p
import { getFile, getInstallation } from '../../services/epm/packages/get';
import type { FleetRequestHandlerContext } from '../..';
import { appContextService } from '../../services';
import { unpackBufferEntries } from '../../services/epm/archive';
import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive';
import { getAsset } from '../../services/epm/archive/storage';

import { getFileHandler } from './file_handler';
Expand All @@ -29,7 +29,7 @@ jest.mock('../../services/epm/packages/get');
const mockedGetBundledPackageByPkgKey = jest.mocked(getBundledPackageByPkgKey);
const mockedGetInstallation = jest.mocked(getInstallation);
const mockedGetFile = jest.mocked(getFile);
const mockedUnpackBufferEntries = jest.mocked(unpackBufferEntries);
const mockedUnpackBufferEntries = jest.mocked(unpackArchiveEntriesIntoMemory);
const mockedGetAsset = jest.mocked(getAsset);

function mockContext() {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/fleet/server/routes/epm/file_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { defaultFleetErrorHandler } from '../../errors';
import { getAsset } from '../../services/epm/archive/storage';
import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_packages';
import { pkgToPkgKey } from '../../services/epm/registry';
import { unpackBufferEntries } from '../../services/epm/archive';
import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive';

const CACHE_CONTROL_10_MINUTES_HEADER: HttpResponseOptions['headers'] = {
'cache-control': 'max-age=600',
Expand Down Expand Up @@ -69,7 +69,7 @@ export const getFileHandler: FleetRequestHandler<
pkgToPkgKey({ name: pkgName, version: pkgVersion })
);
if (bundledPackage) {
const bufferEntries = await unpackBufferEntries(
const bufferEntries = await unpackArchiveEntriesIntoMemory(
await bundledPackage.getBuffer(),
'application/zip'
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
FleetRequestHandler,
InstallKibanaAssetsRequestSchema,
} from '../../types';
import { createArchiveIteratorFromMap } from '../../services/epm/archive/archive_iterator';

export const installPackageKibanaAssetsHandler: FleetRequestHandler<
TypeOf<typeof InstallKibanaAssetsRequestSchema.params>,
Expand Down Expand Up @@ -69,6 +70,7 @@ export const installPackageKibanaAssetsHandler: FleetRequestHandler<
packageInfo,
paths: installedPkgWithAssets.paths,
assetsMap: installedPkgWithAssets.assetsMap,
archiveIterator: createArchiveIteratorFromMap(installedPkgWithAssets.assetsMap),
},
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { AssetsMap, ArchiveIterator, ArchiveEntry } from '../../../../common/types';

import { traverseArchiveEntries } from '.';

/**
* Creates an iterator for traversing and extracting paths from an archive
* buffer. This iterator is intended to be used for memory efficient traversal
* of archive contents without extracting the entire archive into memory.
*
* @param archiveBuffer - The buffer containing the archive data.
* @param contentType - The content type of the archive (e.g.,
* 'application/zip').
* @returns ArchiveIterator instance.
*
*/
export const createArchiveIterator = (
archiveBuffer: Buffer,
contentType: string
): ArchiveIterator => {
const paths: string[] = [];

const traverseEntries = async (
onEntry: (entry: ArchiveEntry) => Promise<void>
): Promise<void> => {
await traverseArchiveEntries(archiveBuffer, contentType, async (entry) => {
await onEntry(entry);
});
};

const getPaths = async (): Promise<string[]> => {
if (paths.length) {
return paths;
}

await traverseEntries(async (entry) => {
paths.push(entry.path);
});

return paths;
};

return {
traverseEntries,
getPaths,
};
};

/**
* Creates an archive iterator from the assetsMap. This is a stop-gap solution
* to provide a uniform interface for traversing assets while assetsMap is still
* in use. It works with a map of assets loaded into memory and is not intended
* for use with large archives.
*
* @param assetsMap - A map where the keys are asset paths and the values are
* asset buffers.
* @returns ArchiveIterator instance.
*
*/
export const createArchiveIteratorFromMap = (assetsMap: AssetsMap): ArchiveIterator => {
const traverseEntries = async (
onEntry: (entry: ArchiveEntry) => Promise<void>
): Promise<void> => {
for (const [path, buffer] of assetsMap) {
await onEntry({ path, buffer });
}
};

const getPaths = async (): Promise<string[]> => {
return [...assetsMap.keys()];
};

return {
traverseEntries,
getPaths,
};
};
9 changes: 4 additions & 5 deletions x-pack/plugins/fleet/server/services/epm/archive/extract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import * as tar from 'tar';
import yauzl from 'yauzl';

import { bufferToStream, streamToBuffer } from '../streams';

import type { ArchiveEntry } from '.';
import type { ArchiveEntry } from '../../../../common/types';

export async function untarBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
onEntry = async (entry: ArchiveEntry): Promise<void> => {}
) {
const deflatedStream = bufferToStream(buffer);
// use tar.list vs .extract to avoid writing to disk
Expand All @@ -37,7 +36,7 @@ export async function untarBuffer(
export async function unzipBuffer(
buffer: Buffer,
filter = (entry: ArchiveEntry): boolean => true,
onEntry = (entry: ArchiveEntry): void => {}
onEntry = async (entry: ArchiveEntry): Promise<void> => {}
): Promise<unknown> {
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
zipfile.readEntry();
Expand All @@ -46,7 +45,7 @@ export async function unzipBuffer(
if (!filter({ path })) return zipfile.readEntry();

const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
onEntry({ buffer: entryBuffer, path });
await onEntry({ buffer: entryBuffer, path });
zipfile.readEntry();
});
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
Expand Down
100 changes: 63 additions & 37 deletions x-pack/plugins/fleet/server/services/epm/archive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
* 2.0.
*/

import type { AssetParts, AssetsMap } from '../../../../common/types';
import type {
ArchiveEntry,
ArchiveIterator,
AssetParts,
AssetsMap,
} from '../../../../common/types';
import {
PackageInvalidArchiveError,
PackageUnsupportedMediaTypeError,
PackageNotFoundError,
} from '../../../errors';

import { createArchiveIterator } from './archive_iterator';

import { deletePackageInfo } from './cache';
import type { SharedKey } from './cache';
import { getBufferExtractor } from './extract';
Expand All @@ -20,66 +27,85 @@ export * from './cache';
export { getBufferExtractor, untarBuffer, unzipBuffer } from './extract';
export { generatePackageInfoFromArchiveBuffer } from './parse';

export interface ArchiveEntry {
path: string;
buffer?: Buffer;
}

export async function unpackBufferToAssetsMap({
name,
version,
contentType,
archiveBuffer,
useStreaming,
}: {
name: string;
version: string;
contentType: string;
archiveBuffer: Buffer;
}): Promise<{ paths: string[]; assetsMap: AssetsMap }> {
const assetsMap = new Map<string, Buffer | undefined>();
const paths: string[] = [];
const entries = await unpackBufferEntries(archiveBuffer, contentType);

entries.forEach((entry) => {
const { path, buffer } = entry;
if (buffer) {
assetsMap.set(path, buffer);
paths.push(path);
}
});

return { assetsMap, paths };
useStreaming: boolean | undefined;
}): Promise<{ paths: string[]; assetsMap: AssetsMap; archiveIterator: ArchiveIterator }> {
const archiveIterator = createArchiveIterator(archiveBuffer, contentType);
let paths: string[] = [];
let assetsMap: AssetsMap = new Map();
if (useStreaming) {
paths = await archiveIterator.getPaths();
// We keep the assetsMap empty as we don't want to load all the assets in memory
assetsMap = new Map();
} else {
const entries = await unpackArchiveEntriesIntoMemory(archiveBuffer, contentType);

entries.forEach((entry) => {
const { path, buffer } = entry;
if (buffer) {
assetsMap.set(path, buffer);
paths.push(path);
}
});
}

return { paths, assetsMap, archiveIterator };
}

export async function unpackBufferEntries(
/**
* This function extracts all archive entries into memory.
*
* NOTE: This is potentially dangerous for large archives and can cause OOM
* errors. Use 'traverseArchiveEntries' instead to iterate over the entries
* without storing them all in memory at once.
*
* @param archiveBuffer
* @param contentType
* @returns All the entries in the archive buffer
*/
export async function unpackArchiveEntriesIntoMemory(
archiveBuffer: Buffer,
contentType: string
): Promise<ArchiveEntry[]> {
const entries: ArchiveEntry[] = [];
const addToEntries = async (entry: ArchiveEntry) => void entries.push(entry);
await traverseArchiveEntries(archiveBuffer, contentType, addToEntries);

// While unpacking a tar.gz file with unzipBuffer() will result in a thrown
// error, unpacking a zip file with untarBuffer() just results in nothing.
if (entries.length === 0) {
throw new PackageInvalidArchiveError(
`Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}
return entries;
}

export async function traverseArchiveEntries(
archiveBuffer: Buffer,
contentType: string,
onEntry: (entry: ArchiveEntry) => Promise<void>
) {
const bufferExtractor = getBufferExtractor({ contentType });
if (!bufferExtractor) {
throw new PackageUnsupportedMediaTypeError(
`Unsupported media type ${contentType}. Please use 'application/gzip' or 'application/zip'`
);
}
const entries: ArchiveEntry[] = [];
try {
const onlyFiles = ({ path }: ArchiveEntry): boolean => !path.endsWith('/');
const addToEntries = (entry: ArchiveEntry) => entries.push(entry);
await bufferExtractor(archiveBuffer, onlyFiles, addToEntries);
await bufferExtractor(archiveBuffer, onlyFiles, onEntry);
} catch (error) {
throw new PackageInvalidArchiveError(
`Error during extraction of package: ${error}. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}

// While unpacking a tar.gz file with unzipBuffer() will result in a thrown error in the try-catch above,
// unpacking a zip file with untarBuffer() just results in nothing.
if (entries.length === 0) {
throw new PackageInvalidArchiveError(
`Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.`
);
}
return entries;
}

export const deletePackageCache = ({ name, version }: SharedKey) => {
Expand Down
5 changes: 2 additions & 3 deletions x-pack/plugins/fleet/server/services/epm/archive/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import {
import { PackageInvalidArchiveError } from '../../../errors';
import { pkgToPkgKey } from '../registry';

import { unpackBufferEntries } from '.';
import { traverseArchiveEntries } from '.';

const readFileAsync = promisify(readFile);
export const MANIFEST_NAME = 'manifest.yml';
Expand Down Expand Up @@ -160,9 +160,8 @@ export async function generatePackageInfoFromArchiveBuffer(
contentType: string
): Promise<{ paths: string[]; packageInfo: ArchivePackage }> {
const assetsMap: AssetsBufferMap = {};
const entries = await unpackBufferEntries(archiveBuffer, contentType);
const paths: string[] = [];
entries.forEach(({ path: bufferPath, buffer }) => {
await traverseArchiveEntries(archiveBuffer, contentType, async ({ path: bufferPath, buffer }) => {
paths.push(bufferPath);
if (buffer && filterAssetPathForParseAndVerifyArchive(bufferPath)) {
assetsMap[bufferPath] = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';

import { ASSETS_SAVED_OBJECT_TYPE } from '../../../../common';
import type {
ArchiveEntry,
InstallablePackage,
InstallSource,
PackageAssetReference,
Expand All @@ -24,7 +25,6 @@ import { PackageInvalidArchiveError, PackageNotFoundError } from '../../../error
import { appContextService } from '../../app_context';

import { setPackageInfo } from '.';
import type { ArchiveEntry } from '.';
import { filterAssetPathForParseAndVerifyArchive, parseAndVerifyArchive } from './parse';

const ONE_BYTE = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ import type {
PackageInfo,
} from '../../../../types';
import { getAssetFromAssetsMap, getPathParts } from '../../archive';
import type { ArchiveEntry } from '../../archive';
import {
FLEET_FINAL_PIPELINE_CONTENT,
FLEET_FINAL_PIPELINE_ID,
FLEET_FINAL_PIPELINE_VERSION,
} from '../../../../constants';
import { getPipelineNameForDatastream } from '../../../../../common/services';
import type { PackageInstallContext } from '../../../../../common/types';
import type { ArchiveEntry, PackageInstallContext } from '../../../../../common/types';

import { appendMetadataToIngestPipeline } from '../meta';
import { retryTransientEsErrors } from '../retry';
Expand Down
Loading

0 comments on commit e374721

Please sign in to comment.