Skip to content

Commit

Permalink
add support for on_failure parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
alisonelizabeth committed Apr 7, 2020
1 parent edaf515 commit 5d65c88
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ describe('pipeline_serialization', () => {
},
},
],
on_failure: [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
],
},
pipeline2: {
description: 'pipeline2 description',
Expand All @@ -39,6 +47,14 @@ describe('pipeline_serialization', () => {
},
},
],
onFailure: [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
],
},
{
name: 'pipeline2',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@ import { PipelinesByName, Pipeline } from '../types';
export function deserializePipelines(pipelinesByName: PipelinesByName): Pipeline[] {
const pipelineNames: string[] = Object.keys(pipelinesByName);

const deserializedTemplates = pipelineNames.map((name: string) => {
const { description, version, processors } = pipelinesByName[name];
const deserializedPipelines = pipelineNames.map((name: string) => {
const { description, version, processors, on_failure } = pipelinesByName[name];

return {
const pipeline = {
name,
description,
version,
processors,
onFailure: on_failure,
};

// Remove any undefined values
return Object.entries(pipeline).reduce((pipelineDefinition: any, [key, value]) => {
if (value !== undefined) {
pipelineDefinition[key] = value;
}
return pipelineDefinition;
}, {});
});

return deserializedTemplates;
return deserializedPipelines;
}
8 changes: 7 additions & 1 deletion x-pack/plugins/ingest_pipelines/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ export interface Pipeline {
description: string;
version?: number;
processors: Processor[];
onFailure?: Processor[];
}

export interface PipelinesByName {
[key: string]: Omit<Pipeline, 'name'>;
[key: string]: {
description: string;
version?: number;
processors: Processor[];
on_failure?: Processor[];
};
}
6 changes: 4 additions & 2 deletions x-pack/plugins/ingest_pipelines/server/routes/api/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import { RouteDependencies } from '../../types';
const bodySchema = schema.object({
name: schema.string(),
description: schema.string(),
processors: schema.arrayOf(schema.any()), // todo fix
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
version: schema.maybe(schema.number()),
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
});

export const registerCreateRoute = ({
Expand All @@ -33,7 +34,7 @@ export const registerCreateRoute = ({
const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient;
const pipeline = req.body as Pipeline;

const { name, description, processors, version } = pipeline;
const { name, description, processors, version, onFailure } = pipeline;

try {
// Check that a pipeline with the same name doesn't already exist
Expand Down Expand Up @@ -62,6 +63,7 @@ export const registerCreateRoute = ({
description,
processors,
version,
on_failure: onFailure,
},
});

Expand Down
6 changes: 4 additions & 2 deletions x-pack/plugins/ingest_pipelines/server/routes/api/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import { RouteDependencies } from '../../types';

const bodySchema = schema.object({
description: schema.string(),
processors: schema.arrayOf(schema.any()), // todo fix
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
version: schema.maybe(schema.number()),
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
});

const paramsSchema = schema.object({
Expand All @@ -37,7 +38,7 @@ export const registerUpdateRoute = ({
const { name } = req.params as typeof paramsSchema.type;
const pipeline = req.body as Pipeline;

const { description, processors, version } = pipeline;
const { description, processors, version, onFailure } = pipeline;

try {
// Verify pipeline exists; ES will throw 404 if it doesn't
Expand All @@ -49,6 +50,7 @@ export const registerUpdateRoute = ({
description,
processors,
version,
on_failure: onFailure,
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ export default function({ getService }: FtrProviderContext) {
},
},
],
onFailure: [
{
set: {
field: 'error.message',
value: '{{ failure_message }}',
},
},
],
version: 1,
})
.expect(200);
Expand Down

0 comments on commit 5d65c88

Please sign in to comment.