Skip to content

Commit

Permalink
Run telemetry task immediately if versions are incompatible; add serv…
Browse files Browse the repository at this point in the history
…ice environments telemetry
  • Loading branch information
dgieselaar committed Aug 27, 2020
1 parent eceb500 commit e8d5483
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 39 deletions.
54 changes: 28 additions & 26 deletions x-pack/plugins/apm/common/__snapshots__/apm_telemetry.test.ts.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions x-pack/plugins/apm/common/apm_telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export function getApmTelemetryMapping() {
properties: {
expected_metric_document_count: long,
transaction_count: long,
ratio: long,
},
};

Expand All @@ -102,12 +103,14 @@ export function getApmTelemetryMapping() {
properties: {
current_implementation: aggregatedTransactionsProperties,
no_observer_name: aggregatedTransactionsProperties,
no_rum: aggregatedTransactionsProperties,
no_rum_no_observer_name: aggregatedTransactionsProperties,
only_rum: aggregatedTransactionsProperties,
only_rum_no_observer_name: aggregatedTransactionsProperties,
with_country: aggregatedTransactionsProperties,
},
},
environments: {
services_without_environment: long,
services_with_multiple_environments: long,
top_enviroments: keyword,
},
cloud: {
properties: {
availability_zone: keyword,
Expand Down Expand Up @@ -227,6 +230,7 @@ export function getApmTelemetryMapping() {
agents: tookProperties,
cardinality: tookProperties,
cloud: tookProperties,
environments: tookProperties,
groupings: tookProperties,
indices_stats: tookProperties,
integrations: tookProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

import { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import { tasks } from './tasks';
import {
SERVICE_NAME,
SERVICE_ENVIRONMENT,
} from '../../../../common/elasticsearch_fieldnames';

describe('data telemetry collection tasks', () => {
const indices = {
Expand All @@ -17,6 +21,63 @@ describe('data telemetry collection tasks', () => {
/* eslint-enable @typescript-eslint/naming-convention */
} as ApmIndicesConfig;

describe('environments', () => {
const task = tasks.find((t) => t.name === 'environments');

it('returns environment information', async () => {
const search = jest.fn().mockResolvedValueOnce({
aggregations: {
environments: {
buckets: [
{
key: 'production',
},
{
key: 'testing',
},
],
},
service_environments: {
buckets: [
{
key: {
[SERVICE_NAME]: 'opbeans-node',
[SERVICE_ENVIRONMENT]: 'production',
},
},
{
key: {
[SERVICE_NAME]: 'opbeans-node',
[SERVICE_ENVIRONMENT]: null,
},
},
{
key: {
[SERVICE_NAME]: 'opbeans-java',
[SERVICE_ENVIRONMENT]: 'production',
},
},
{
key: {
[SERVICE_NAME]: 'opbeans-rum',
[SERVICE_ENVIRONMENT]: null,
},
},
],
},
},
});

expect(await task?.executor({ search, indices } as any)).toEqual({
environments: {
services_with_multiple_environments: 1,
services_without_environment: 2,
top_environments: ['production', 'testing'],
},
});
});
});

describe('aggregated_transactions', () => {
const task = tasks.find((t) => t.name === 'aggregated_transactions');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { ValuesType } from 'utility-types';
import { flatten, merge, sortBy, sum } from 'lodash';
import { flatten, merge, sortBy, sum, pickBy } from 'lodash';
import { AggregationOptionsByType } from '../../../../typings/elasticsearch/aggregations';
import { ProcessorEvent } from '../../../../common/processor_event';
import { TelemetryTask } from '.';
Expand Down Expand Up @@ -294,6 +294,87 @@ export const tasks: TelemetryTask[] = [
return { cloud };
},
},
{
name: 'environments',
executor: async ({ indices, search }) => {
const response = await search({
index: [indices['apm_oss.transactionIndices']],
body: {
query: {
bool: {
filter: [{ range: { '@timestamp': { gte: 'now-1d' } } }],
},
},
aggs: {
environments: {
terms: {
field: SERVICE_ENVIRONMENT,
size: 5,
},
},
service_environments: {
composite: {
size: 1000,
sources: [
{
[SERVICE_ENVIRONMENT]: {
terms: {
field: SERVICE_ENVIRONMENT,
missing_bucket: true,
},
},
},
{
[SERVICE_NAME]: {
terms: {
field: SERVICE_NAME,
},
},
},
],
},
},
},
},
});

const topEnvironments =
response.aggregations?.environments.buckets.map(
(bucket) => bucket.key
) ?? [];
const serviceEnvironments: Record<string, Array<string | null>> = {};

const buckets = response.aggregations?.service_environments.buckets ?? [];

buckets.forEach((bucket) => {
const serviceName = bucket.key['service.name'];
const environment = bucket.key['service.environment'] as string | null;

const environments = serviceEnvironments[serviceName] ?? [];

serviceEnvironments[serviceName] = environments.concat(environment);
});

const servicesWithoutEnvironment = Object.keys(
pickBy(serviceEnvironments, (environments) =>
environments.includes(null)
)
);

const servicesWithMultipleEnvironments = Object.keys(
pickBy(serviceEnvironments, (environments) => environments.length > 1)
);

return {
environments: {
services_without_environment: servicesWithoutEnvironment.length,
services_with_multiple_environments:
servicesWithMultipleEnvironments.length,
top_environments: topEnvironments as string[],
},
};
},
},
{
name: 'processor_events',
executor: async ({ indices, search }) => {
Expand Down
37 changes: 33 additions & 4 deletions x-pack/plugins/apm/server/lib/apm_telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { DeepRequired } from 'utility-types';
import {
CoreSetup,
Logger,
Expand All @@ -27,6 +28,7 @@ import {
collectDataTelemetry,
CollectTelemetryParams,
} from './collect_data_telemetry';
import { APMDataTelemetry } from './types';

const APM_TELEMETRY_TASK_NAME = 'apm-telemetry-task';

Expand All @@ -36,12 +38,14 @@ export async function createApmTelemetry({
usageCollector,
taskManager,
logger,
kibanaVersion,
}: {
core: CoreSetup;
config$: Observable<APMConfig>;
usageCollector: UsageCollectionSetup;
taskManager: TaskManagerSetupContract;
logger: Logger;
kibanaVersion: string;
}) {
taskManager.registerTaskDefinitions({
[APM_TELEMETRY_TASK_NAME]: {
Expand Down Expand Up @@ -95,7 +99,10 @@ export async function createApmTelemetry({

await savedObjectsClient.create(
APM_TELEMETRY_SAVED_OBJECT_TYPE,
dataTelemetry,
{
...dataTelemetry,
kibanaVersion,
},
{ id: APM_TELEMETRY_SAVED_OBJECT_TYPE, overwrite: true }
);
};
Expand All @@ -105,12 +112,14 @@ export async function createApmTelemetry({
schema: getApmTelemetryMapping(),
fetch: async () => {
try {
const data = (
const { kibanaVersion: storedKibanaVersion, ...data } = (
await savedObjectsClient.get(
APM_TELEMETRY_SAVED_OBJECT_TYPE,
APM_TELEMETRY_SAVED_OBJECT_ID
)
).attributes;
).attributes as { kibanaVersion: string } & DeepRequired<
APMDataTelemetry
>;

return data;
} catch (err) {
Expand All @@ -126,7 +135,7 @@ export async function createApmTelemetry({

usageCollector.registerCollector(collector);

core.getStartServices().then(([_coreStart, pluginsStart]) => {
core.getStartServices().then(async ([_coreStart, pluginsStart]) => {
const { taskManager: taskManagerStart } = pluginsStart as {
taskManager: TaskManagerStartContract;
};
Expand All @@ -141,5 +150,25 @@ export async function createApmTelemetry({
params: {},
state: {},
});

try {
const currentData = (
await savedObjectsClient.get(
APM_TELEMETRY_SAVED_OBJECT_TYPE,
APM_TELEMETRY_SAVED_OBJECT_ID
)
).attributes as { kibanaVersion?: string };

if (currentData.kibanaVersion !== kibanaVersion) {
logger.debug(
`Stored telemetry is out of date. Task will run immediately. Stored: ${currentData.kibanaVersion}, expected: ${kibanaVersion}`
);
taskManagerStart.runNow(APM_TELEMETRY_TASK_NAME);
}
} catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
logger.warn('Failed to fetch saved telemetry data.');
}
}
});
}
Loading

0 comments on commit e8d5483

Please sign in to comment.