diff --git a/packages/kbn-shared-ux-components/src/toolbar/buttons/icon_button_group/__snapshots__/icon_button_group.test.tsx.snap b/packages/kbn-shared-ux-components/src/toolbar/buttons/icon_button_group/__snapshots__/icon_button_group.test.tsx.snap
index b49efda2a1cf78..b167a3b22e9cb9 100644
--- a/packages/kbn-shared-ux-components/src/toolbar/buttons/icon_button_group/__snapshots__/icon_button_group.test.tsx.snap
+++ b/packages/kbn-shared-ux-components/src/toolbar/buttons/icon_button_group/__snapshots__/icon_button_group.test.tsx.snap
@@ -19,9 +19,7 @@ exports[`
is rendered 1`] = `
is rendered 1`] = `
(iterable: IterableInput, fn: AsyncMapFn) {
- await lastValueFrom(mapWithLimit$(iterable, Infinity, fn).pipe(defaultIfEmpty()));
+ await lastValueFrom(mapWithLimit$(iterable, Infinity, fn).pipe(defaultIfEmpty(undefined)));
}
/**
@@ -40,5 +38,5 @@ export async function asyncForEachWithLimit(
limit: number,
fn: AsyncMapFn
) {
- await lastValueFrom(mapWithLimit$(iterable, limit, fn).pipe(defaultIfEmpty()));
+ await lastValueFrom(mapWithLimit$(iterable, limit, fn).pipe(defaultIfEmpty(undefined)));
}
diff --git a/packages/kbn-std/src/iteration/map.ts b/packages/kbn-std/src/iteration/map.ts
index 4c8d65df57f378..44765d69e1bd0d 100644
--- a/packages/kbn-std/src/iteration/map.ts
+++ b/packages/kbn-std/src/iteration/map.ts
@@ -6,9 +6,8 @@
* Side Public License, v 1.
*/
-import { from } from 'rxjs';
+import { from, lastValueFrom } from 'rxjs';
import { toArray } from 'rxjs/operators';
-import { lastValueFrom } from '../rxjs_7';
import { IterableInput, AsyncMapFn, AsyncMapResult } from './types';
import { mapWithLimit$ } from './observable';
diff --git a/packages/kbn-std/src/iteration/observable.test.ts b/packages/kbn-std/src/iteration/observable.test.ts
index e84750e08148d3..b2c5d5d75e07c4 100644
--- a/packages/kbn-std/src/iteration/observable.test.ts
+++ b/packages/kbn-std/src/iteration/observable.test.ts
@@ -8,7 +8,6 @@
import * as Rx from 'rxjs';
import { toArray } from 'rxjs/operators';
-import { lastValueFrom } from '../rxjs_7';
import { map$, mapWithLimit$ } from './observable';
import { list, sleep, generator } from './test_helpers';
@@ -23,7 +22,7 @@ describe('mapWithLimit$', () => {
let active = 0;
const limit = Math.random() > 0.5 ? 20 : 40;
- const results = await lastValueFrom(
+ const results = await Rx.lastValueFrom(
mapWithLimit$(list(100), limit, async (n) => {
active += 1;
if (active > maxConcurrency) {
@@ -50,7 +49,7 @@ describe('mapWithLimit$', () => {
['observable', Rx.of(1, 2, 3, 4, 5), [1, 2, 3, 4, 5]] as const,
])('works with %p', async (_, iter, expected) => {
const mock = jest.fn(async (n) => n);
- const results = await lastValueFrom(mapWithLimit$(iter, 1, mock).pipe(toArray()));
+ const results = await Rx.lastValueFrom(mapWithLimit$(iter, 1, mock).pipe(toArray()));
expect(results).toEqual(expected);
});
});
@@ -60,7 +59,7 @@ describe('map$', () => {
let maxConcurrency = 0;
let active = 0;
- const results = await lastValueFrom(
+ const results = await Rx.lastValueFrom(
map$(list(100), async (n) => {
active += 1;
if (active > maxConcurrency) {
diff --git a/packages/kbn-std/src/iteration/types.ts b/packages/kbn-std/src/iteration/types.ts
index 6e0bfd9f22d7fa..a36522af5feac4 100644
--- a/packages/kbn-std/src/iteration/types.ts
+++ b/packages/kbn-std/src/iteration/types.ts
@@ -6,8 +6,8 @@
* Side Public License, v 1.
*/
-import { Subscribable } from 'rxjs';
+import { ObservableInput } from 'rxjs';
-export type IterableInput = Iterable | Subscribable;
-export type AsyncMapResult = Promise | Subscribable;
+export type IterableInput = Iterable | ObservableInput;
+export type AsyncMapResult = Promise | ObservableInput;
export type AsyncMapFn = (item: T1, i: number) => AsyncMapResult;
diff --git a/packages/kbn-std/src/rxjs_7.test.ts b/packages/kbn-std/src/rxjs_7.test.ts
deleted file mode 100644
index 939f853394aeea..00000000000000
--- a/packages/kbn-std/src/rxjs_7.test.ts
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-import * as Rx from 'rxjs';
-
-import { firstValueFrom, lastValueFrom } from './rxjs_7';
-
-// create an empty observable that completes with no notifications
-// after a delay to ensure helpers aren't checking for the EMPTY constant
-function empty() {
- return new Rx.Observable((subscriber) => {
- setTimeout(() => {
- subscriber.complete();
- }, 0);
- });
-}
-
-describe('firstValueFrom()', () => {
- it('resolves to the first value from the observable', async () => {
- await expect(firstValueFrom(Rx.of(1, 2, 3))).resolves.toBe(1);
- });
-
- it('rejects if the observable is empty', async () => {
- await expect(firstValueFrom(empty())).rejects.toThrowErrorMatchingInlineSnapshot(
- `"no elements in sequence"`
- );
- });
-
- it('unsubscribes from a source observable that emits synchronously', async () => {
- const values = [1, 2, 3, 4];
- let unsubscribed = false;
- const source = new Rx.Observable((subscriber) => {
- while (!subscriber.closed && values.length) {
- subscriber.next(values.shift()!);
- }
- unsubscribed = subscriber.closed;
- subscriber.complete();
- });
-
- await expect(firstValueFrom(source)).resolves.toMatchInlineSnapshot(`1`);
- if (!unsubscribed) {
- throw new Error('expected source to be unsubscribed');
- }
- expect(values).toEqual([2, 3, 4]);
- });
-
- it('unsubscribes from the source observable after first async notification', async () => {
- const values = [1, 2, 3, 4];
- let unsubscribed = false;
- const source = new Rx.Observable((subscriber) => {
- setTimeout(() => {
- while (!subscriber.closed) {
- subscriber.next(values.shift()!);
- }
- unsubscribed = subscriber.closed;
- });
- });
-
- await expect(firstValueFrom(source)).resolves.toMatchInlineSnapshot(`1`);
- if (!unsubscribed) {
- throw new Error('expected source to be unsubscribed');
- }
- expect(values).toEqual([2, 3, 4]);
- });
-});
-
-describe('lastValueFrom()', () => {
- it('resolves to the last value from the observable', async () => {
- await expect(lastValueFrom(Rx.of(1, 2, 3))).resolves.toBe(3);
- });
-
- it('rejects if the observable is empty', async () => {
- await expect(lastValueFrom(empty())).rejects.toThrowErrorMatchingInlineSnapshot(
- `"no elements in sequence"`
- );
- });
-});
diff --git a/packages/kbn-std/src/rxjs_7.ts b/packages/kbn-std/src/rxjs_7.ts
deleted file mode 100644
index f31c864de53133..00000000000000
--- a/packages/kbn-std/src/rxjs_7.ts
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-import { Observable } from 'rxjs';
-import { first, last } from 'rxjs/operators';
-
-export function firstValueFrom(source: Observable) {
- // we can't use SafeSubscriber the same way that RxJS 7 does, so instead we
- return source.pipe(first()).toPromise();
-}
-
-export function lastValueFrom(source: Observable) {
- return source.pipe(last()).toPromise();
-}
diff --git a/packages/kbn-test/src/kbn_client/kbn_client_saved_objects.ts b/packages/kbn-test/src/kbn_client/kbn_client_saved_objects.ts
index 02861fcb27fdbe..97292b0268bc48 100644
--- a/packages/kbn-test/src/kbn_client/kbn_client_saved_objects.ts
+++ b/packages/kbn-test/src/kbn_client/kbn_client_saved_objects.ts
@@ -10,7 +10,6 @@ import { inspect } from 'util';
import * as Rx from 'rxjs';
import { mergeMap } from 'rxjs/operators';
-import { lastValueFrom } from '@kbn/std';
import { ToolingLog, isAxiosResponseError, createFailError } from '@kbn/dev-utils';
import { KbnClientRequester, uriencode } from './kbn_client_requester';
@@ -83,7 +82,7 @@ interface DeleteObjectsOptions {
async function concurrently(maxConcurrency: number, arr: T[], fn: (item: T) => Promise) {
if (arr.length) {
- await lastValueFrom(
+ await Rx.lastValueFrom(
Rx.from(arr).pipe(mergeMap(async (item) => await fn(item), maxConcurrency))
);
}
diff --git a/src/cli/serve/integration_tests/reload_logging_config.test.ts b/src/cli/serve/integration_tests/reload_logging_config.test.ts
index 4cee7dfae41267..7ad3495c39cd62 100644
--- a/src/cli/serve/integration_tests/reload_logging_config.test.ts
+++ b/src/cli/serve/integration_tests/reload_logging_config.test.ts
@@ -117,7 +117,7 @@ describe.skip('Server logging configuration', function () {
)
.toPromise();
- const lastMessage = await message$.pipe(take(1)).toPromise();
+ const lastMessage = await Rx.firstValueFrom(message$);
expect(containsJsonOnly(lastMessage)).toBe(true);
createConfigManager(configFilePath).modify((oldConfig) => {
diff --git a/src/core/public/application/__snapshots__/application_service.test.ts.snap b/src/core/public/application/__snapshots__/application_service.test.ts.snap
index 17de9503bd5332..712ed1a9f38f14 100644
--- a/src/core/public/application/__snapshots__/application_service.test.ts.snap
+++ b/src/core/public/application/__snapshots__/application_service.test.ts.snap
@@ -4,15 +4,15 @@ exports[`#start() getComponent returns renderable JSX tree 1`] = `
{
);
const { applications$ } = await service.start(startDeps);
- let applications = await applications$.pipe(take(1)).toPromise();
+ let applications = await firstValueFrom(applications$);
expect(applications.size).toEqual(2);
expect(applications.get('app1')).toEqual(
expect.objectContaining({
@@ -125,7 +125,7 @@ describe('#setup()', () => {
deepLinks: [{ id: 'subapp2', title: 'Subapp 2', path: '/subapp2' }],
}));
- applications = await applications$.pipe(take(1)).toPromise();
+ applications = await firstValueFrom(applications$);
expect(applications.size).toEqual(2);
expect(applications.get('app1')).toEqual(
expect.objectContaining({
@@ -205,7 +205,7 @@ describe('#setup()', () => {
})
);
const start = await service.start(startDeps);
- const applications = await start.applications$.pipe(take(1)).toPromise();
+ const applications = await firstValueFrom(start.applications$);
expect(applications.size).toEqual(2);
expect(applications.get('app1')).toEqual(
@@ -252,7 +252,7 @@ describe('#setup()', () => {
);
const { applications$ } = await service.start(startDeps);
- const applications = await applications$.pipe(take(1)).toPromise();
+ const applications = await firstValueFrom(applications$);
expect(applications.size).toEqual(2);
expect(applications.get('app1')).toEqual(
@@ -295,7 +295,7 @@ describe('#setup()', () => {
);
const start = await service.start(startDeps);
- const applications = await start.applications$.pipe(take(1)).toPromise();
+ const applications = await firstValueFrom(start.applications$);
expect(applications.size).toEqual(1);
expect(applications.get('app1')).toEqual(
@@ -410,7 +410,7 @@ describe('#setup()', () => {
updater$.next((app) => ({ defaultPath: '/foo' }));
- let appInfos = await applications$.pipe(take(1)).toPromise();
+ let appInfos = await firstValueFrom(applications$);
expect(appInfos.get('app1')!.deepLinks).toEqual([
{
@@ -445,7 +445,7 @@ describe('#setup()', () => {
],
}));
- appInfos = await applications$.pipe(take(1)).toPromise();
+ appInfos = await firstValueFrom(applications$);
expect(appInfos.get('app1')!.deepLinks).toEqual([
{
@@ -496,7 +496,7 @@ describe('#start()', () => {
register(Symbol(), createApp({ id: 'app2' }));
const { applications$ } = await service.start(startDeps);
- const availableApps = await applications$.pipe(take(1)).toPromise();
+ const availableApps = await firstValueFrom(applications$);
expect(availableApps.size).toEqual(2);
expect([...availableApps.keys()]).toEqual(['app1', 'app2']);
@@ -548,7 +548,7 @@ describe('#start()', () => {
register(Symbol(), createApp({ id: 'app2' }));
const { applications$ } = await service.start(startDeps);
- const availableApps = await applications$.pipe(take(1)).toPromise();
+ const availableApps = await firstValueFrom(applications$);
expect([...availableApps.keys()]).toEqual(['app1']);
});
@@ -802,7 +802,7 @@ describe('#start()', () => {
service.setup(setupDeps);
const { currentAppId$, navigateToApp } = await service.start(startDeps);
- const stop$ = new Subject();
+ const stop$ = new Subject();
const promise = currentAppId$.pipe(bufferCount(4), takeUntil(stop$)).toPromise();
await navigateToApp('alpha');
@@ -827,7 +827,7 @@ describe('#start()', () => {
service.setup(setupDeps);
const { currentAppId$, navigateToApp } = await service.start(startDeps);
- const stop$ = new Subject();
+ const stop$ = new Subject();
const promise = currentAppId$.pipe(bufferCount(4), takeUntil(stop$)).toPromise();
await navigateToApp('delta', { openInNewTab: true });
@@ -871,7 +871,7 @@ describe('#start()', () => {
const { navigateToApp, getComponent } = await service.start(startDeps);
const httpLoadingCount$ = startDeps.http.addLoadingCountSource.mock.calls[0][0];
- const stop$ = new Subject();
+ const stop$ = new Subject();
const currentLoadingCount$ = new BehaviorSubject(0);
httpLoadingCount$.pipe(takeUntil(stop$)).subscribe(currentLoadingCount$);
const loadingPromise = httpLoadingCount$.pipe(bufferCount(5), takeUntil(stop$)).toPromise();
diff --git a/src/core/public/application/application_service.tsx b/src/core/public/application/application_service.tsx
index 68807323c0a635..9a2703eb748720 100644
--- a/src/core/public/application/application_service.tsx
+++ b/src/core/public/application/application_service.tsx
@@ -7,7 +7,7 @@
*/
import React from 'react';
-import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
+import { BehaviorSubject, firstValueFrom, Observable, Subject, Subscription } from 'rxjs';
import { map, shareReplay, takeUntil, distinctUntilChanged, filter, take } from 'rxjs/operators';
import { createBrowserHistory, History } from 'history';
@@ -99,7 +99,7 @@ export class ApplicationService {
private currentActionMenu$ = new BehaviorSubject(undefined);
private readonly statusUpdaters$ = new BehaviorSubject