Skip to content

Commit

Permalink
[Fleet] Allow to preconfigure multiple ES outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet committed Sep 2, 2021
1 parent 95eab7c commit e0bfc4f
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import type { PackagePolicy, FullAgentPolicyInput, FullAgentPolicyInputStream }
import { DEFAULT_OUTPUT } from '../constants';

export const storedPackagePoliciesToAgentInputs = (
packagePolicies: PackagePolicy[]
packagePolicies: PackagePolicy[],
outputId: string = DEFAULT_OUTPUT.name
): FullAgentPolicyInput[] => {
const fullInputs: FullAgentPolicyInput[] = [];

Expand All @@ -32,7 +33,7 @@ export const storedPackagePoliciesToAgentInputs = (
data_stream: {
namespace: packagePolicy.namespace || 'default',
},
use_output: DEFAULT_OUTPUT.name,
use_output: outputId,
...(input.compiled_input || {}),
...(input.streams.length
? {
Expand Down
7 changes: 6 additions & 1 deletion x-pack/plugins/fleet/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
export * from './models';
export * from './rest_spec';

import type { PreconfiguredAgentPolicy, PreconfiguredPackage } from './models/preconfiguration';
import type {
PreconfiguredAgentPolicy,
PreconfiguredPackage,
PreconfiguredOutput,
} from './models/preconfiguration';

export interface FleetConfigType {
enabled: boolean;
Expand All @@ -26,6 +30,7 @@ export interface FleetConfigType {
};
agentPolicies?: PreconfiguredAgentPolicy[];
packages?: PreconfiguredPackage[];
outputs?: PreconfiguredOutput[];
agentIdVerificationEnabled?: boolean;
}

Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/fleet/common/types/models/agent_policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export interface NewAgentPolicy {
monitoring_enabled?: MonitoringType;
unenroll_timeout?: number;
is_preconfigured?: boolean;
data_output_id?: string;
monitoring_output_id?: string;
}

export interface AgentPolicy extends NewAgentPolicy {
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/fleet/common/types/models/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export interface NewOutput {
hosts?: string[];
ca_sha256?: string;
api_key?: string;
config?: Record<string, any>;
config_yaml?: string;
}

Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/fleet/common/types/models/preconfiguration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
NewPackagePolicyInput,
} from './package_policy';
import type { NewAgentPolicy } from './agent_policy';
import type { Output } from './output';

export type InputsOverride = Partial<NewPackagePolicyInput> & {
vars?: Array<NewPackagePolicyInput['vars'] & { name: string }>;
Expand All @@ -29,3 +30,7 @@ export interface PreconfiguredAgentPolicy extends Omit<NewAgentPolicy, 'namespac
}

export type PreconfiguredPackage = Omit<PackagePolicyPackage, 'title'>;

export interface PreconfiguredOutput extends Omit<Output, 'config_yaml'> {
config?: any;
}
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/server/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { isESClientError } from './utils';

export { defaultIngestErrorHandler, ingestErrorToResponseOptions } from './handlers';

export { isESClientError } from './utils';
export { isESClientError, isSavedObjectNotFoundError } from './utils';

export class IngestManagerError extends Error {
constructor(message?: string, public readonly meta?: unknown) {
Expand Down
9 changes: 7 additions & 2 deletions x-pack/plugins/fleet/server/errors/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
*/

import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { isBoom } from '@hapi/boom';

export function isESClientError(error: unknown): error is ResponseError {
return error instanceof ResponseError;
}

export const isElasticsearchVersionConflictError = (error: Error): boolean => {
export function isElasticsearchVersionConflictError(error: Error): boolean {
return isESClientError(error) && error.meta.statusCode === 409;
};
}

export function isSavedObjectNotFoundError(error: Error): boolean {
return isBoom(error) && error.output.statusCode === 404;
}
7 changes: 6 additions & 1 deletion x-pack/plugins/fleet/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import { schema } from '@kbn/config-schema';
import type { TypeOf } from '@kbn/config-schema';
import type { PluginConfigDescriptor, PluginInitializerContext } from 'src/core/server';

import { PreconfiguredPackagesSchema, PreconfiguredAgentPoliciesSchema } from './types';
import {
PreconfiguredPackagesSchema,
PreconfiguredAgentPoliciesSchema,
PreconfiguredOutputsSchema,
} from './types';

import { FleetPlugin } from './plugin';

Expand Down Expand Up @@ -84,6 +88,7 @@ export const config: PluginConfigDescriptor = {
}),
packages: PreconfiguredPackagesSchema,
agentPolicies: PreconfiguredAgentPoliciesSchema,
outputs: PreconfiguredOutputsSchema,
agentIdVerificationEnabled: schema.boolean({ defaultValue: true }),
}),
};
Expand Down
8 changes: 6 additions & 2 deletions x-pack/plugins/fleet/server/saved_objects/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ const getSavedObjectTypes = (
revision: { type: 'integer' },
monitoring_enabled: { type: 'keyword', index: false },
is_preconfigured: { type: 'keyword' },
data_output_id: { type: 'keyword' },
monitoring_output_id: { type: 'keyword' },
},
},
migrations: {
Expand Down Expand Up @@ -201,8 +203,10 @@ const getSavedObjectTypes = (
is_default: { type: 'boolean' },
hosts: { type: 'keyword' },
ca_sha256: { type: 'keyword', index: false },
config: { type: 'flattened' },
config_yaml: { type: 'text' },
config: { type: 'flattened', index: false },
config_yaml: { type: 'text', index: false },
is_preconfigured: { type: 'boolean' },
fleet_server_service_token: { type: 'text', index: false },
},
},
migrations: {
Expand Down
82 changes: 55 additions & 27 deletions x-pack/plugins/fleet/server/services/agent_policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -741,43 +741,60 @@ class AgentPolicyService {
if (!defaultOutputId) {
throw new Error('Default output is not setup');
}
const defaultOutput = await outputService.get(soClient, defaultOutputId);

const dataOutputId = agentPolicy.data_output_id || defaultOutputId;
const monitoringOutputId = agentPolicy.monitoring_output_id || defaultOutputId;

const outputs = await Promise.all(
Array.from(new Set([dataOutputId, monitoringOutputId])).map((outputId) =>
outputService.get(soClient, outputId)
)
);

const dataOutput = outputs.find((output) => output.id === dataOutputId);
if (!dataOutput) {
throw new Error(`Data output not found ${dataOutputId}`);
}
const monitoringOutput = outputs.find((output) => output.id === monitoringOutputId);
if (!monitoringOutput) {
throw new Error(`Monitoring output not found ${monitoringOutputId}`);
}

const fullAgentPolicy: FullAgentPolicy = {
id: agentPolicy.id,
outputs: {
// TEMPORARY as we only support a default output
...[defaultOutput].reduce<FullAgentPolicy['outputs']>(
...outputs.reduce<FullAgentPolicy['outputs']>((acc, output) => {
// eslint-disable-next-line @typescript-eslint/naming-convention
(outputs, { config_yaml, name, type, hosts, ca_sha256, api_key }) => {
const configJs = config_yaml ? safeLoad(config_yaml) : {};
outputs[name] = {
type,
hosts,
ca_sha256,
api_key,
...configJs,
};

if (options?.standalone) {
delete outputs[name].api_key;
outputs[name].username = 'ES_USERNAME';
outputs[name].password = 'ES_PASSWORD';
}

return outputs;
},
{}
),
const { config_yaml, name, type, hosts, ca_sha256, api_key } = output;
const configJs = config_yaml ? safeLoad(config_yaml) : {};
acc[getOutputIdForAgentPolicy(output)] = {
type,
hosts,
ca_sha256,
api_key,
...configJs,
};

if (options?.standalone) {
delete acc[name].api_key;
acc[name].username = 'ES_USERNAME';
acc[name].password = 'ES_PASSWORD';
}

return acc;
}, {}),
},
inputs: storedPackagePoliciesToAgentInputs(agentPolicy.package_policies as PackagePolicy[]),
inputs: storedPackagePoliciesToAgentInputs(
agentPolicy.package_policies as PackagePolicy[],
getOutputIdForAgentPolicy(dataOutput)
),
revision: agentPolicy.revision,
...(agentPolicy.monitoring_enabled && agentPolicy.monitoring_enabled.length > 0
? {
agent: {
monitoring: {
namespace: agentPolicy.namespace,
use_output: defaultOutput.name,
use_output: getOutputIdForAgentPolicy(monitoringOutput),
enabled: true,
logs: agentPolicy.monitoring_enabled.includes(dataTypes.Logs),
metrics: agentPolicy.monitoring_enabled.includes(dataTypes.Metrics),
Expand All @@ -801,13 +818,12 @@ class AgentPolicyService {
};

// TODO: fetch this from the elastic agent package
const monitoringOutput = fullAgentPolicy.agent?.monitoring.use_output;
const monitoringNamespace = fullAgentPolicy.agent?.monitoring.namespace;
if (
fullAgentPolicy.agent?.monitoring.enabled &&
monitoringNamespace &&
monitoringOutput &&
fullAgentPolicy.outputs[monitoringOutput]?.type === 'elasticsearch'
monitoringOutput.type === 'elasticsearch'
) {
let names: string[] = [];
if (fullAgentPolicy.agent.monitoring.logs) {
Expand Down Expand Up @@ -858,6 +874,18 @@ class AgentPolicyService {
}
}

/**
* Get id used in full agent policy (sent to the agents)
* we use "default" for the default policy to avoid breaking changes
*/
function getOutputIdForAgentPolicy(output: Output) {
if (output.is_default) {
return 'default';
}

return output.id;
}

export const agentPolicyService = new AgentPolicyService();

export async function addPackageToAgentPolicy(
Expand Down
12 changes: 12 additions & 0 deletions x-pack/plugins/fleet/server/services/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class OutputService {
});
}

public async getOutputById(soClient: SavedObjectsClientContract, id: string) {
return await soClient.get<OutputSOAttributes>(OUTPUT_SAVED_OBJECT_TYPE, id);
}

public async ensureDefaultOutput(soClient: SavedObjectsClientContract) {
const outputs = await this.getDefaultOutput(soClient);

Expand Down Expand Up @@ -76,6 +80,14 @@ class OutputService {
): Promise<Output> {
const data = { ...output };

// ensure only default output exists
if (data.is_default) {
const defaultOuput = await this.getDefaultOutputId(soClient);
if (defaultOuput) {
throw new Error(`A default output already exists (${defaultOuput})`);
}
}

if (data.hosts) {
data.hosts = data.hosts.map(normalizeHostsForAgents);
}
Expand Down
38 changes: 36 additions & 2 deletions x-pack/plugins/fleet/server/services/preconfiguration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import type { ElasticsearchClient, SavedObjectsClientContract } from 'src/core/server';
import { i18n } from '@kbn/i18n';
import { groupBy, omit, pick, isEqual } from 'lodash';
import { safeDump } from 'js-yaml';

import type {
NewPackagePolicy,
Expand All @@ -17,16 +18,16 @@ import type {
PreconfiguredAgentPolicy,
PreconfiguredPackage,
PreconfigurationError,
PreconfiguredOutput,
} from '../../common';
import { AGENT_POLICY_SAVED_OBJECT_TYPE } from '../../common';

import {
PRECONFIGURATION_DELETION_RECORD_SAVED_OBJECT_TYPE,
PRECONFIGURATION_LATEST_KEYWORD,
} from '../constants';
import { isSavedObjectNotFoundError } from '../errors';

import { escapeSearchQueryPhrase } from './saved_object';

import { pkgToPkgKey } from './epm/registry';
import { getInstallation, getPackageInfo } from './epm/packages';
import { ensurePackagesCompletedInstall } from './epm/packages/install';
Expand All @@ -35,13 +36,46 @@ import { agentPolicyService, addPackageToAgentPolicy } from './agent_policy';
import type { InputsOverride } from './package_policy';
import { overridePackageInputs } from './package_policy';
import { appContextService } from './app_context';
import { outputService } from './output';

interface PreconfigurationResult {
policies: Array<{ id: string; updated_at: string }>;
packages: string[];
nonFatalErrors: PreconfigurationError[];
}

export async function ensurePreconfiguredOutputs(
soClient: SavedObjectsClientContract,
outputs: PreconfiguredOutput[]
) {
await Promise.all(
outputs.map(async (output) => {
const existingOutput = await outputService.getOutputById(soClient, output.id).catch((err) => {
if (isSavedObjectNotFoundError(err)) {
return undefined;
}

throw err;
});

const { id, config, ...outputData } = output;

const configYaml = config ? safeDump(config) : undefined;

const data = {
...outputData,
config_yaml: configYaml,
};

if (!existingOutput) {
return outputService.create(soClient, data, { id });
} else {
return outputService.update(soClient, id, data);
}
})
);
}

export async function ensurePreconfiguredPackagesAndPolicies(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
Expand Down
Loading

0 comments on commit e0bfc4f

Please sign in to comment.