Skip to content

Commit

Permalink
[Ingest] Use agent template embeded in the pkg registry response
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet committed Apr 28, 2020
1 parent 9610dfb commit a9dc64e
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 54 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export interface RegistryStream {
description?: string;
enabled?: boolean;
vars?: RegistryVarsEntry[];
template?: string;
}

export type RequirementVersion = string;
Expand Down
28 changes: 15 additions & 13 deletions x-pack/plugins/ingest_manager/server/routes/datasource/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { TypeOf } from '@kbn/config-schema';
import Boom from 'boom';
import { RequestHandler } from 'src/core/server';
import { appContextService, datasourceService } from '../../services';
import { ensureInstalledPackage } from '../../services/epm/packages';
import { ensureInstalledPackage, getPackageInfo } from '../../services/epm/packages';
import {
GetDatasourcesRequestSchema,
GetOneDatasourceRequestSchema,
Expand Down Expand Up @@ -85,12 +85,13 @@ export const createDatasourceHandler: RequestHandler<
pkgName: request.body.package.name,
callCluster,
});

const pkgInfo = await getPackageInfo({
savedObjectsClient: soClient,
pkgName: request.body.package.name,
pkgVersion: request.body.package.version,
});
newData.inputs = (await datasourceService.assignPackageStream(
{
pkgName: request.body.package.name,
pkgVersion: request.body.package.version,
},
pkgInfo,
request.body.inputs
)) as TypeOf<typeof CreateDatasourceRequestSchema.body>['inputs'];
}
Expand Down Expand Up @@ -127,13 +128,14 @@ export const updateDatasourceHandler: RequestHandler<
const pkg = newData.package || datasource.package;
const inputs = newData.inputs || datasource.inputs;
if (pkg && (newData.inputs || newData.package)) {
newData.inputs = (await datasourceService.assignPackageStream(
{
pkgName: pkg.name,
pkgVersion: pkg.version,
},
inputs
)) as TypeOf<typeof CreateDatasourceRequestSchema.body>['inputs'];
const pkgInfo = await getPackageInfo({
savedObjectsClient: soClient,
pkgName: pkg.name,
pkgVersion: pkg.version,
});
newData.inputs = (await datasourceService.assignPackageStream(pkgInfo, inputs)) as TypeOf<
typeof CreateDatasourceRequestSchema.body
>['inputs'];
}

const updatedDatasource = await datasourceService.update(
Expand Down
64 changes: 38 additions & 26 deletions x-pack/plugins/ingest_manager/server/services/datasource.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,38 @@
*/

import { datasourceService } from './datasource';
import { PackageInfo } from '../types';

async function mockedGetAssetsData(_a: any, _b: any, dataset: string) {
if (dataset === 'dataset1') {
return [
{
buffer: Buffer.from(`
const TEMPLATE = `
type: log
metricset: ["dataset1"]
paths:
{{#each paths}}
- {{this}}
{{/each}}
`),
},
];
}
return [];
}

jest.mock('./epm/packages/assets', () => {
return {
getAssetsDataForPackageKey: mockedGetAssetsData,
};
});
`;

describe('Datasource service', () => {
describe('assignPackageStream', () => {
it('should work with cofig variables from the stream', async () => {
it('should work with config variables from the stream', async () => {
const inputs = await datasourceService.assignPackageStream(
{
pkgName: 'package',
pkgVersion: '1.0.0',
},
({
datasources: [
{
inputs: [
{
type: 'log',
streams: [
{
dataset: 'package.dataset1',
template: TEMPLATE,
},
],
},
],
},
],
} as unknown) as PackageInfo,
[
{
type: 'log',
Expand Down Expand Up @@ -85,10 +84,23 @@ describe('Datasource service', () => {

it('should work with config variables at the input level', async () => {
const inputs = await datasourceService.assignPackageStream(
{
pkgName: 'package',
pkgVersion: '1.0.0',
},
({
datasources: [
{
inputs: [
{
type: 'log',
streams: [
{
dataset: 'package.dataset1',
template: TEMPLATE,
},
],
},
],
},
],
} as unknown) as PackageInfo,
[
{
type: 'log',
Expand Down
38 changes: 24 additions & 14 deletions x-pack/plugins/ingest_manager/server/services/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import {
packageToConfigDatasource,
DatasourceInput,
DatasourceInputStream,
PackageInfo,
} from '../../common';
import { DATASOURCE_SAVED_OBJECT_TYPE } from '../constants';
import { NewDatasource, Datasource, ListWithKuery } from '../types';
import { agentConfigService } from './agent_config';
import { getPackageInfo, getInstallation } from './epm/packages';
import { outputService } from './output';
import { getAssetsDataForPackageKey } from './epm/packages/assets';
import { createStream } from './epm/agent/agent';

const SAVED_OBJECT_TYPE = DATASOURCE_SAVED_OBJECT_TYPE;
Expand Down Expand Up @@ -201,20 +201,16 @@ class DatasourceService {
}

public async assignPackageStream(
pkgInfo: { pkgName: string; pkgVersion: string },
pkgInfo: PackageInfo,
inputs: DatasourceInput[]
): Promise<DatasourceInput[]> {
const inputsPromises = inputs.map(input => _assignPackageStreamToInput(pkgInfo, input));

return Promise.all(inputsPromises);
}
}

const _isAgentStream = (p: string) => !!p.match(/agent\/stream\/stream\.yml/);

async function _assignPackageStreamToInput(
pkgInfo: { pkgName: string; pkgVersion: string },
input: DatasourceInput
) {
async function _assignPackageStreamToInput(pkgInfo: PackageInfo, input: DatasourceInput) {
const streamsPromises = input.streams.map(stream =>
_assignPackageStreamToStream(pkgInfo, input, stream)
);
Expand All @@ -224,21 +220,34 @@ async function _assignPackageStreamToInput(
}

async function _assignPackageStreamToStream(
pkgInfo: { pkgName: string; pkgVersion: string },
pkgInfo: PackageInfo,
input: DatasourceInput,
stream: DatasourceInputStream
) {
if (!stream.enabled) {
return { ...stream, agent_stream: undefined };
}
const dataset = getDataset(stream.dataset);
const assetsData = await getAssetsDataForPackageKey(pkgInfo, _isAgentStream, dataset);
const datasource = pkgInfo.datasources?.[0];
if (!datasource) {
throw new Error('Stream template not found, no datasource');
}

const [pkgStream] = assetsData;
if (!pkgStream || !pkgStream.buffer) {
throw new Error(`Stream template not found for dataset ${dataset}`);
const inputFromPkg = datasource.inputs.find(pkgInput => pkgInput.type === input.type);
if (!inputFromPkg) {
throw new Error(`Stream template not found, unable to found input ${input.type}`);
}

const streamFromPkg = inputFromPkg.streams.find(
pkgStream => pkgStream.dataset === stream.dataset
);
if (!streamFromPkg) {
throw new Error(`Stream template not found, unable to found stream ${stream.dataset}`);
}

if (!streamFromPkg.template) {
throw new Error(`Stream template not found for dataset ${dataset}`);
}
// Populate template variables from input config and stream config
const data: { [k: string]: string | string[] } = {};
if (input.vars) {
Expand All @@ -251,7 +260,8 @@ async function _assignPackageStreamToStream(
data[key] = stream.vars[key].value;
}
}
const yaml = safeLoad(createStream(data, pkgStream.buffer.toString()));
const yaml = safeLoad(createStream(data, streamFromPkg.template));

stream.agent_stream = yaml;
return { ...stream };
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/ingest_manager/server/services/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async function addPackageToConfig(
config.namespace
);
newDatasource.inputs = await datasourceService.assignPackageStream(
{ pkgName: packageToInstall.name, pkgVersion: packageToInstall.version },
packageInfo,
newDatasource.inputs
);

Expand Down

0 comments on commit a9dc64e

Please sign in to comment.