Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some unhandledRejections #168009

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dev_docs/tutorials/testing_plugins.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ The more interesting logic is in `renderApp`:
/** public/application.ts */
import React from 'react';
import ReactDOM from 'react-dom';
import { switchMap } from 'rxjs';

import { AppMountParameters, CoreStart } from 'src/core/public';
import { AppRoot } from './components/app_root';
Expand All @@ -493,10 +494,10 @@ export const renderApp = (

// uiSettings subscription
const uiSettingsClient = core.uiSettings.client;
const pollingSubscription = uiSettingClient.get$('mysetting1').subscribe(async mySetting1 => {
const pollingSubscription = uiSettingClient.get$('mysetting1').pipe(switchMap(async (mySetting1) => {
const value = core.http.fetch(/** use `mySetting1` in request **/);
// ...
});
})).subscribe();

// Render app
ReactDOM.render(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

import { takeUntil, finalize, map } from 'rxjs/operators';
import { Observable, timer } from 'rxjs';
import { Observable, timer, switchMap } from 'rxjs';
import type { ISavedObjectsRepository } from '@kbn/core/server';
import type { IEventLoopDelaysMonitor, IntervalHistogram } from '@kbn/core/server';
import {
Expand Down Expand Up @@ -46,17 +46,18 @@ export function startTrackingEventLoopDelaysUsage(
.pipe(
map((i) => (i + 1) % resetOnCount === 0),
takeUntil(stopMonitoringEventLoop$),
finalize(() => eventLoopDelaysMonitor.stop())
finalize(() => eventLoopDelaysMonitor.stop()),
switchMap(async (shouldReset) => {
const histogram = eventLoopDelaysMonitor.collect();
if (shouldReset) {
eventLoopDelaysMonitor.reset();
}
try {
await storeHistogram(histogram, internalRepository, instanceUuid);
} catch (e) {
// do not crash if cannot store a histogram.
}
})
)
.subscribe(async (shouldReset) => {
const histogram = eventLoopDelaysMonitor.collect();
if (shouldReset) {
eventLoopDelaysMonitor.reset();
}
try {
await storeHistogram(histogram, internalRepository, instanceUuid);
} catch (e) {
// do not crash if cannot store a histogram.
}
});
.subscribe();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export function startTrackingEventLoopDelaysThreshold(
takeUntil(stopMonitoringEventLoop$),
finalize(() => eventLoopDelaysMonitor.stop())
)
.subscribe(async () => {
.subscribe(() => {
const { mean: meanMS } = eventLoopDelaysMonitor.collect();

if (meanMS > warnThreshold) {
Expand Down
10 changes: 7 additions & 3 deletions src/plugins/links/public/embeddable/links_embeddable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

import React, { createContext, useContext } from 'react';
import { Subscription, distinctUntilChanged, skip } from 'rxjs';
import { Subscription, distinctUntilChanged, skip, switchMap } from 'rxjs';
import deepEqual from 'fast-deep-equal';

import {
Expand Down Expand Up @@ -104,8 +104,12 @@ export class LinksEmbeddable
// By-value panels should update the componentState when input changes
this.subscriptions.add(
this.getInput$()
.pipe(distinctUntilChanged(deepEqual), skip(1))
.subscribe(async () => await this.initializeSavedLinks())
.pipe(
distinctUntilChanged(deepEqual),
skip(1),
switchMap(async () => await this.initializeSavedLinks())
)
.subscribe()
);
}

Expand Down
48 changes: 27 additions & 21 deletions src/plugins/telemetry/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type {
import type { HomePublicPluginSetup } from '@kbn/home-plugin/public';
import { ElasticV3BrowserShipper } from '@kbn/analytics-shippers-elastic-v3-browser';

import { BehaviorSubject, map, tap } from 'rxjs';
import { BehaviorSubject, map, switchMap, tap } from 'rxjs';
import type { TelemetryConfigLabels } from '../server/config';
import { FetchTelemetryConfigRoute, INTERNAL_VERSION } from '../common/routes';
import type { v2 } from '../common/types';
Expand Down Expand Up @@ -246,26 +246,32 @@ export class TelemetryPlugin
});
this.telemetryNotifications = telemetryNotifications;

application.currentAppId$.subscribe(async () => {
// Refresh and get telemetry config
const updatedConfig = await this.refreshConfig(http);

analytics.optIn({
global: { enabled: this.telemetryService!.isOptedIn && !screenshotMode.isScreenshotMode() },
});

const isUnauthenticated = this.getIsUnauthenticated(http);
if (isUnauthenticated) {
return;
}

const telemetryBanner = updatedConfig?.banner;

this.maybeStartTelemetryPoller();
if (telemetryBanner) {
this.maybeShowOptedInNotificationBanner();
}
});
application.currentAppId$
.pipe(
switchMap(async () => {
// Refresh and get telemetry config
const updatedConfig = await this.refreshConfig(http);

analytics.optIn({
global: {
enabled: this.telemetryService!.isOptedIn && !screenshotMode.isScreenshotMode(),
},
});

const isUnauthenticated = this.getIsUnauthenticated(http);
if (isUnauthenticated) {
return;
}

const telemetryBanner = updatedConfig?.banner;

this.maybeStartTelemetryPoller();
if (telemetryBanner) {
this.maybeShowOptedInNotificationBanner();
}
})
)
.subscribe();

return {
telemetryService: this.getTelemetryServicePublicApis(),
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/aiops/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class AiopsPlugin
// This way we can pass on license changes to the route factory having always
// the current license because it's stored in a mutable attribute.
const aiopsLicense: AiopsLicense = { isActivePlatinumLicense: false };
this.licenseSubscription = plugins.licensing.license$.subscribe(async (license) => {
this.licenseSubscription = plugins.licensing.license$.subscribe((license) => {
aiopsLicense.isActivePlatinumLicense = isActiveLicense('platinum', license);

if (aiopsLicense.isActivePlatinumLicense) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import querystring from 'querystring';
import React from 'react';
import { renderToString } from 'react-dom/server';
import type { Observable, Subscription } from 'rxjs';
import { switchMap } from 'rxjs';

import type {
CapabilitiesSetup,
Expand Down Expand Up @@ -209,18 +210,22 @@ export class AuthorizationService {
validateFeaturePrivileges(allFeatures);
validateReservedPrivileges(allFeatures);

this.statusSubscription = online$.subscribe(async ({ scheduleRetry }) => {
try {
await registerPrivilegesWithCluster(
this.logger,
this.privileges,
this.applicationName,
clusterClient
);
} catch (err) {
scheduleRetry();
}
});
this.statusSubscription = online$
.pipe(
switchMap(async ({ scheduleRetry }) => {
try {
await registerPrivilegesWithCluster(
this.logger,
this.privileges,
this.applicationName,
clusterClient
);
} catch (err) {
scheduleRetry();
}
})
)
.subscribe();
}

stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { Observable, Subscription } from 'rxjs';
import { switchMap } from 'rxjs';

import type { ElasticsearchClient, HttpServiceSetup, Logger } from '@kbn/core/server';
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
Expand Down Expand Up @@ -90,13 +91,20 @@ export class SessionManagementService {
auditLogger: audit.withoutRequest,
});

this.statusSubscription = online$.subscribe(async ({ scheduleRetry }) => {
try {
await Promise.all([this.sessionIndex.initialize(), this.scheduleCleanupTask(taskManager)]);
} catch (err) {
scheduleRetry();
}
});
this.statusSubscription = online$
.pipe(
switchMap(async ({ scheduleRetry }) => {
try {
await Promise.all([
this.sessionIndex.initialize(),
this.scheduleCleanupTask(taskManager),
]);
} catch (err) {
scheduleRetry();
}
})
)
.subscribe();

return {
session: new Session({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import { Subject } from 'rxjs';
import { concatMap, Subject } from 'rxjs';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import pMap from 'p-map';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common';
Expand Down Expand Up @@ -377,45 +377,49 @@ export class SyntheticsService {

let output: ServiceData['output'] | null = null;

subject.subscribe(async (monitors) => {
try {
if (monitors.length === 0 || !this.config.manifestUrl) {
return;
}
subject
.pipe(
concatMap(async (monitors) => {
try {
if (monitors.length === 0 || !this.config.manifestUrl) {
return;
}

if (!output) {
output = await this.getOutput();
if (!output) {
output = await this.getOutput();

if (!output) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
reason: 'API key is not valid.',
message: 'Failed to push configs. API key is not valid.',
type: 'invalidApiKey',
stackVersion: service.server.stackVersion,
});
return;
}
}

if (!output) {
this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);

service.syncErrors = await this.apiClient.syncMonitors({
monitors,
output,
license,
});
} catch (e) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
reason: 'API key is not valid.',
message: 'Failed to push configs. API key is not valid.',
type: 'invalidApiKey',
reason: 'Failed to push configs to service',
message: e?.message,
type: 'pushConfigsError',
code: e?.code,
status: e.status,
stackVersion: service.server.stackVersion,
});
return;
this.logger.error(e);
}
}

this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);

service.syncErrors = await this.apiClient.syncMonitors({
monitors,
output,
license,
});
} catch (e) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
reason: 'Failed to push configs to service',
message: e?.message,
type: 'pushConfigsError',
code: e?.code,
status: e.status,
stackVersion: service.server.stackVersion,
});
this.logger.error(e);
}
});
})
)
.subscribe();

await this.getMonitorConfigs(subject);
}
Expand Down Expand Up @@ -479,25 +483,29 @@ export class SyntheticsService {
const license = await this.getLicense();
const subject = new Subject<MonitorFields[]>();

subject.subscribe(async (monitors) => {
const hasPublicLocations = monitors.some((config) =>
config.locations.some(({ isServiceManaged }) => isServiceManaged)
);

if (hasPublicLocations) {
const output = await this.getOutput();
if (!output) {
return;
}

const data = {
output,
monitors,
license,
};
return await this.apiClient.delete(data);
}
});
subject
.pipe(
concatMap(async (monitors) => {
const hasPublicLocations = monitors.some((config) =>
config.locations.some(({ isServiceManaged }) => isServiceManaged)
);

if (hasPublicLocations) {
const output = await this.getOutput();
if (!output) {
return;
}

const data = {
output,
monitors,
license,
};
return await this.apiClient.delete(data);
}
})
)
.subscribe();

await this.getMonitorConfigs(subject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class EphemeralTaskLifecycle {
);
})
)
.subscribe(async (e) => {
.subscribe((e) => {
let overallCapacity = this.getCapacity();
const capacityByType = new Map<string, number>();
const tasksWithinCapacity = [...this.ephemeralTaskQueue]
Expand Down