Skip to content

Commit

Permalink
Server-side create/update ingest pipelines (#62744)
Browse files Browse the repository at this point in the history
  • Loading branch information
alisonelizabeth authored Apr 8, 2020
1 parent a841441 commit 7de73c8
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ describe('pipeline_serialization', () => {
},
},
],
on_failure: [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
],
},
pipeline2: {
description: 'pipeline2 description',
Expand All @@ -39,6 +47,14 @@ describe('pipeline_serialization', () => {
},
},
],
onFailure: [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
],
},
{
name: 'pipeline2',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import { PipelinesByName, Pipeline } from '../types';
export function deserializePipelines(pipelinesByName: PipelinesByName): Pipeline[] {
const pipelineNames: string[] = Object.keys(pipelinesByName);

const deserializedTemplates = pipelineNames.map((name: string) => {
const { description, version, processors } = pipelinesByName[name];
const deserializedPipelines = pipelineNames.map((name: string) => {
const { description, version, processors, on_failure } = pipelinesByName[name];

return {
const pipeline = {
name,
description,
version,
processors,
onFailure: on_failure,
};

return pipeline;
});

return deserializedTemplates;
return deserializedPipelines;
}
8 changes: 7 additions & 1 deletion x-pack/plugins/ingest_pipelines/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ export interface Pipeline {
description: string;
version?: number;
processors: Processor[];
onFailure?: Processor[];
}

export interface PipelinesByName {
[key: string]: Omit<Pipeline, 'name'>;
[key: string]: {
description: string;
version?: number;
processors: Processor[];
on_failure?: Processor[];
};
}
83 changes: 83 additions & 0 deletions x-pack/plugins/ingest_pipelines/server/routes/api/create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { schema } from '@kbn/config-schema';

import { Pipeline } from '../../../common/types';
import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';

const bodySchema = schema.object({
name: schema.string(),
description: schema.string(),
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
version: schema.maybe(schema.number()),
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
});

export const registerCreateRoute = ({
router,
license,
lib: { isEsError },
}: RouteDependencies): void => {
router.put(
{
path: API_BASE_PATH,
validate: {
body: bodySchema,
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient;
const pipeline = req.body as Pipeline;

const { name, description, processors, version, onFailure } = pipeline;

try {
// Check that a pipeline with the same name doesn't already exist
const pipelineByName = await callAsCurrentUser('ingest.getPipeline', { id: name });

if (pipelineByName[name]) {
return res.conflict({
body: new Error(
i18n.translate('xpack.ingestPipelines.createRoute.duplicatePipelineIdErrorMessage', {
defaultMessage: "There is already a pipeline with name '{name}'.",
values: {
name,
},
})
),
});
}
} catch (e) {
// Silently swallow error
}

try {
const response = await callAsCurrentUser('ingest.putPipeline', {
id: name,
body: {
description,
processors,
version,
on_failure: onFailure,
},
});

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

return res.internalError({ body: error });
}
})
);
};
4 changes: 4 additions & 0 deletions x-pack/plugins/ingest_pipelines/server/routes/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
*/

export { registerGetRoutes } from './get';

export { registerCreateRoute } from './create';

export { registerUpdateRoute } from './update';
70 changes: 70 additions & 0 deletions x-pack/plugins/ingest_pipelines/server/routes/api/update.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';

import { Pipeline } from '../../../common/types';
import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';

const bodySchema = schema.object({
description: schema.string(),
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
version: schema.maybe(schema.number()),
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
});

const paramsSchema = schema.object({
name: schema.string(),
});

export const registerUpdateRoute = ({
router,
license,
lib: { isEsError },
}: RouteDependencies): void => {
router.put(
{
path: `${API_BASE_PATH}/{name}`,
validate: {
body: bodySchema,
params: paramsSchema,
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient;
const { name } = req.params;
const pipeline = req.body as Pipeline;

const { description, processors, version, onFailure } = pipeline;

try {
// Verify pipeline exists; ES will throw 404 if it doesn't
await callAsCurrentUser('ingest.getPipeline', { id: name });

const response = await callAsCurrentUser('ingest.putPipeline', {
id: name,
body: {
description,
processors,
version,
on_failure: onFailure,
},
});

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

return res.internalError({ body: error });
}
})
);
};
4 changes: 3 additions & 1 deletion x-pack/plugins/ingest_pipelines/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import { RouteDependencies } from '../types';

import { registerGetRoutes } from './api';
import { registerGetRoutes, registerCreateRoute, registerUpdateRoute } from './api';

export class ApiRoutes {
setup(dependencies: RouteDependencies) {
registerGetRoutes(dependencies);
registerCreateRoute(dependencies);
registerUpdateRoute(dependencies);
}
}
4 changes: 2 additions & 2 deletions x-pack/plugins/ingest_pipelines/server/services/license.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ export class License {
});
}

guardApiRoute(handler: RequestHandler) {
guardApiRoute<P, Q, B>(handler: RequestHandler<P, Q, B>) {
const license = this;

return function licenseCheck(
ctx: RequestHandlerContext,
request: KibanaRequest,
request: KibanaRequest<P, Q, B>,
response: KibanaResponseFactory
) {
const licenseStatus = license.getStatus();
Expand Down
1 change: 1 addition & 0 deletions x-pack/test/api_integration/apis/management/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export default function({ loadTestFile }) {
loadTestFile(require.resolve('./rollup'));
loadTestFile(require.resolve('./index_management'));
loadTestFile(require.resolve('./index_lifecycle_management'));
loadTestFile(require.resolve('./ingest_pipelines'));
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FtrProviderContext } from '../../../ftr_provider_context';

export default function({ loadTestFile }: FtrProviderContext) {
describe('Ingest Node Pipelines', () => {
loadTestFile(require.resolve('./ingest_pipelines'));
});
}
Loading

0 comments on commit 7de73c8

Please sign in to comment.