diff --git a/app/components/application/browse/application-list.component.ts b/app/components/application/browse/application-list.component.ts index 40d4abd8ec..9680e58077 100644 --- a/app/components/application/browse/application-list.component.ts +++ b/app/components/application/browse/application-list.component.ts @@ -53,7 +53,7 @@ export class ApplicationListComponent extends ListOrTableBase implements OnInit, this.status = this.data.status; this._subs.push(applicationService.onApplicationAdded.subscribe((applicationId) => { - this.data.loadNewItem(applicationService.get(applicationId)); + this.data.loadNewItem(applicationService.getOnce(applicationId)); })); } diff --git a/app/components/job/action/delete/delete-job-action.ts b/app/components/job/action/delete/delete-job-action.ts index eaa99f322f..7255d2d326 100644 --- a/app/components/job/action/delete/delete-job-action.ts +++ b/app/components/job/action/delete/delete-job-action.ts @@ -47,23 +47,17 @@ export class WaitForDeleteJobPoller { @autobind() public start(progress: BehaviorSubject): Observable { const obs = new AsyncSubject(); - const data = this.jobService.get(this.jobId); - const errorCallback = (e) => { - progress.next(100); - clearInterval(interval); - obs.complete(); - }; - let interval = setInterval(() => { - data.fetch().subscribe({ - error: errorCallback, + this.jobService.getOnce(this.jobId).subscribe({ + error: (e) => { + progress.next(100); + clearInterval(interval); + obs.complete(); + }, }); }, 5000); progress.next(-1); - data.item.subscribe({ - error: errorCallback, - }); return obs; } diff --git a/app/components/job/browse/job-list.component.ts b/app/components/job/browse/job-list.component.ts index d5eedc1e9c..7f67b04aa6 100644 --- a/app/components/job/browse/job-list.component.ts +++ b/app/components/job/browse/job-list.component.ts @@ -73,7 +73,7 @@ export class JobListComponent extends ListOrTableBase implements OnInit, OnDestr this.data = this.jobService.list(this._baseOptions); this.status = this.data.status; this._onJobAddedSub = jobService.onJobAdded.subscribe((jobId) => { - this.data.loadNewItem(jobService.get(jobId)); + this.data.loadNewItem(jobService.getOnce(jobId)); }); } diff --git a/app/components/job/details/job-details.component.ts b/app/components/job/details/job-details.component.ts index 0b10dda18f..ae8f933fd3 100644 --- a/app/components/job/details/job-details.component.ts +++ b/app/components/job/details/job-details.component.ts @@ -73,6 +73,7 @@ export class JobDetailsComponent implements OnInit, OnDestroy { public ngOnDestroy() { this._paramsSubscriber.unsubscribe(); + this.data.dispose(); } public get filterPlaceholderText() { diff --git a/app/components/job/details/job-progress-status/job-progress-status.component.ts b/app/components/job/details/job-progress-status/job-progress-status.component.ts index a460d0ba6a..12fd3db344 100644 --- a/app/components/job/details/job-progress-status/job-progress-status.component.ts +++ b/app/components/job/details/job-progress-status/job-progress-status.component.ts @@ -73,6 +73,7 @@ export class JobProgressStatusComponent implements OnChanges, OnDestroy { public ngOnDestroy() { this._polls.forEach(x => x.destroy()); this._subs.forEach(x => x.unsubscribe()); + this.poolData.dispose(); } public countRunningTasks() { diff --git a/app/components/node/details/node-details.component.ts b/app/components/node/details/node-details.component.ts index 0548be0cd2..7e0516e9a6 100644 --- a/app/components/node/details/node-details.component.ts +++ b/app/components/node/details/node-details.component.ts @@ -77,6 +77,8 @@ export class NodeDetailsComponent implements OnInit, OnDestroy { public ngOnDestroy() { this._paramsSubscribers.forEach(x => x.unsubscribe()); + this.poolData.dispose(); + this.data.dispose(); } @autobind() diff --git a/app/components/pool/action/delete/delete-pool-task.ts b/app/components/pool/action/delete/delete-pool-task.ts index 3bd4b91e73..36076b8c45 100644 --- a/app/components/pool/action/delete/delete-pool-task.ts +++ b/app/components/pool/action/delete/delete-pool-task.ts @@ -7,7 +7,7 @@ import { PoolService } from "app/services"; import { LongRunningDeleteAction } from "app/services/core"; export class DeletePoolTask extends LongRunningDeleteAction { - constructor(private poolService: PoolService, poolIds: string[]) { + constructor(private poolService: PoolService, poolIds: string[]) { super("pool", poolIds); } @@ -53,31 +53,25 @@ export class WaitForDeletePoolPollTask { @autobind() public start(progress: BehaviorSubject): Observable { const obs = new AsyncSubject(); - const data = this.poolService.get(this.poolId); let interval; - const errorCallback = (e) => { - progress.next(100); - clearInterval(interval); - obs.complete(); - }; - interval = setInterval(() => { - data.fetch().subscribe({ - error: errorCallback, + this.poolService.getOnce(this.poolId).subscribe({ + next: (pool: Pool) => { + if (pool) { + const currentNodes = pool.currentNodes; + progress.next(this._getProgress(currentNodes)); + } + }, + error: (e) => { + progress.next(100); + clearInterval(interval); + obs.complete(); + }, }); }, this.refreshRate); progress.next(10); - data.item.subscribe({ - next: (pool: Pool) => { - if (pool) { - const currentNodes = pool.currentNodes; - progress.next(this._getProgress(currentNodes)); - } - }, - error: errorCallback, - }); return obs; } diff --git a/app/components/pool/browse/pool-list.component.ts b/app/components/pool/browse/pool-list.component.ts index a80a0e7031..27c82d86ee 100644 --- a/app/components/pool/browse/pool-list.component.ts +++ b/app/components/pool/browse/pool-list.component.ts @@ -75,7 +75,7 @@ export class PoolListComponent extends ListOrTableBase implements OnInit, OnDest this.data = this.poolService.list(); this.status = this.data.status; this._subs.push(poolService.onPoolAdded.subscribe((poolId) => { - this.data.loadNewItem(poolService.get(poolId)); + this.data.loadNewItem(poolService.getOnce(poolId)); })); this._subs.push(this.data.items.subscribe((pools) => { this.pools = List(pools.map(x => new PoolDecorator(x))); diff --git a/app/components/pool/details/pool-details.component.ts b/app/components/pool/details/pool-details.component.ts index 0f2383ec4e..c0c087141d 100644 --- a/app/components/pool/details/pool-details.component.ts +++ b/app/components/pool/details/pool-details.component.ts @@ -69,6 +69,7 @@ export class PoolDetailsComponent implements OnInit, OnDestroy { public ngOnDestroy() { this._paramsSubscriber.unsubscribe(); + this.data.dispose(); } public get filterPlaceholderText() { diff --git a/app/components/task/action/delete/delete-task-action.ts b/app/components/task/action/delete/delete-task-action.ts index a787c27748..5500bb210d 100644 --- a/app/components/task/action/delete/delete-task-action.ts +++ b/app/components/task/action/delete/delete-task-action.ts @@ -48,24 +48,17 @@ export class WaitForDeleteTaskPoller { @autobind() public start(progress: BehaviorSubject): Observable { const obs = new AsyncSubject(); - const data = this.taskService.get(this.jobId, this.taskId); - const errorCallback = (e) => { - progress.next(100); - clearInterval(interval); - obs.complete(); - }; - let interval = setInterval(() => { - data.fetch().subscribe({ - error: errorCallback, + this.taskService.getOnce(this.jobId, this.taskId).subscribe({ + error: (e) => { + progress.next(100); + clearInterval(interval); + obs.complete(); + }, }); }, 5000); progress.next(-1); - data.item.subscribe({ - error: errorCallback, - }); - return obs; } } diff --git a/app/components/task/browse/task-list.component.ts b/app/components/task/browse/task-list.component.ts index 45cbcca3b5..2d9f878f85 100644 --- a/app/components/task/browse/task-list.component.ts +++ b/app/components/task/browse/task-list.component.ts @@ -68,7 +68,7 @@ export class TaskListComponent extends SelectableList implements OnInit { super(); this._onTaskAddedSub = taskService.onTaskAdded.subscribe((item: TaskParams) => { - this.data.loadNewItem(taskService.get(item.jobId, item.id)); + this.data.loadNewItem(taskService.getOnce(item.jobId, item.id)); }); } diff --git a/app/components/task/details/task-details.component.ts b/app/components/task/details/task-details.component.ts index 9ede2f01e3..bec8fa1d50 100644 --- a/app/components/task/details/task-details.component.ts +++ b/app/components/task/details/task-details.component.ts @@ -82,6 +82,8 @@ export class TaskDetailsComponent implements OnInit, OnDestroy { public ngOnDestroy() { this._paramsSubscribers.forEach(x => x.unsubscribe()); + this.jobData.dispose(); + this.data.dispose(); } @autobind() diff --git a/app/services/core/rx-entity-proxy.ts b/app/services/core/rx-entity-proxy.ts index a408b76ed8..fa069c2cc9 100644 --- a/app/services/core/rx-entity-proxy.ts +++ b/app/services/core/rx-entity-proxy.ts @@ -3,10 +3,16 @@ import { AsyncSubject, BehaviorSubject, Observable } from "rxjs"; import { LoadingStatus } from "app/components/base/loading"; import { ServerError } from "app/models"; +import { PollObservable } from "app/services/core"; import { HttpCode } from "app/utils/constants"; import { RxProxyBase, RxProxyBaseConfig } from "./rx-proxy-base"; export interface RxEntityProxyConfig extends RxProxyBaseConfig { + /** + * If you want to have the entity proxy poll automatically for you every given milliseconds. + * @default Disabled + */ + poll?: number; } export abstract class RxEntityProxy extends RxProxyBase { @@ -17,6 +23,7 @@ export abstract class RxEntityProxy extends RxProxyBase; private _itemKey = new BehaviorSubject(null); + private _pollTracker: PollObservable; /** * @param _type Class for TEntity used to instantiate @@ -27,11 +34,15 @@ export abstract class RxEntityProxy extends RxProxyBase, config: RxEntityProxyConfig) { super(type, config); this.params = config.initialParams || {} as TParams; - this.item = this._itemKey.map((key) => { + this.item = this._itemKey.distinctUntilChanged().map((key) => { return this.cache.items.map((items) => { return items.get(key); }); - }).switch(); + }).switch().distinctUntilChanged().takeUntil(this.isDisposed); + + if (config.poll) { + this._pollTracker = this.startPoll(5000); + } } /** @@ -59,10 +70,31 @@ export abstract class RxEntityProxy extends RxProxyBase { return this.fetch(); } + /** + * Stop the automatically started poll if applicable + */ + public stopPoll() { + if (this._pollTracker) { + this._pollTracker.destroy(); + } + } + + /** + * Abstract method implementation of what to do when the polling calls. + */ protected pollRefresh() { return this.refresh(); } @@ -83,22 +115,38 @@ export abstract class RxEntityProxy extends RxProxyBase(getProxy: RxEntityProxy): Observable { const obs = new AsyncSubject(); + getProxy.stopPoll(); getProxy.fetch().subscribe({ next: () => { - getProxy.item.first().subscribe((item: TEntity) => { - if (item) { - obs.next(item); - obs.complete(); - getProxy.dispose(); - } + getProxy.item.subscribe((item: TEntity) => { + obs.next(item); + obs.complete(); + getProxy.dispose(); }); }, error: (e) => { obs.error(e); obs.complete(); + getProxy.dispose(); }, }); return obs.asObservable(); } + +// const sub = new BehaviorSubject(0); +// const until = new AsyncSubject(); +// let i = 0; +// setInterval(() => { +// sub.next(i++); +// if (i === 4) { +// until.next(true); +// until.complete(); +// } +// }, 2000); + +// const obs = sub.takeUntil(until); +// obs.subscribe((x) => { +// console.log("New value", x); +// }); diff --git a/app/services/core/rx-list-proxy.ts b/app/services/core/rx-list-proxy.ts index 572ef16df4..dc603228d0 100644 --- a/app/services/core/rx-list-proxy.ts +++ b/app/services/core/rx-list-proxy.ts @@ -5,7 +5,6 @@ import { AsyncSubject, BehaviorSubject, Observable } from "rxjs"; import { LoadingStatus } from "app/components/base/loading"; import { log } from "app/utils"; import { ListOptions, ListOptionsAttributes } from "./list-options"; -import { RxEntityProxy } from "./rx-entity-proxy"; import { RxProxyBase, RxProxyBaseConfig } from "./rx-proxy-base"; export interface RxListProxyConfig extends RxProxyBaseConfig { @@ -129,9 +128,8 @@ export abstract class RxListProxy extends RxProxyBase): Observable { - const obs = entityProxy.fetch().flatMap(() => entityProxy.item.first()); - obs.subscribe({ + public loadNewItem(getOnceObs: Observable): Observable { + getOnceObs.subscribe({ next: (newItem) => { this._addItemToList(newItem); }, error: (error) => { @@ -139,8 +137,7 @@ export abstract class RxListProxy extends RxProxyBase; + /** + * Sets to after calling dispose() + */ + public isDisposed: AsyncSubject; + protected _status = new BehaviorSubject(LoadingStatus.Loading); protected _newDataStatus = new BehaviorSubject(LoadingStatus.Loading); protected _error = new BehaviorSubject(null); @@ -86,6 +91,7 @@ export abstract class RxProxyBase { if (status === LoadingStatus.Loading) { @@ -163,7 +169,12 @@ export abstract class RxProxyBase) { @@ -265,7 +276,7 @@ export abstract class RxProxyBase this._cache, getFn: (client, params: PoolParams) => client.pool.get(params.id, options), initialParams: { id: poolId }, + poll: Constants.PollRate.entity, }); } diff --git a/app/services/task-service.ts b/app/services/task-service.ts index 6ed723e84e..ce212bf86f 100644 --- a/app/services/task-service.ts +++ b/app/services/task-service.ts @@ -4,7 +4,7 @@ import { Observable, Subject } from "rxjs"; import { SubtaskInformation, Task } from "app/models"; import { TaskCreateDto } from "app/models/dtos"; -import { log } from "app/utils"; +import { Constants, log } from "app/utils"; import { FilterBuilder } from "app/utils/filter-builder"; import { BatchClientService } from "./batch-client.service"; import { @@ -96,6 +96,7 @@ export class TaskService extends ServiceBase { return client.task.get(params.jobId, params.id, options); }, initialParams: { id: taskId, jobId: initialJobId }, + poll: Constants.PollRate.entity, }); } diff --git a/app/utils/constants.ts b/app/utils/constants.ts index c37bb667dc..528954c464 100644 --- a/app/utils/constants.ts +++ b/app/utils/constants.ts @@ -143,3 +143,7 @@ export const APIErrorCodes = { export const MetadataInternalKey = { tags: "_bl_tags", }; + +export const PollRate = { + entity: 10000, +}; diff --git a/test/app/services/core/rx-batch-list-proxy.spec.ts b/test/app/services/core/rx-batch-list-proxy.spec.ts index 90eae3eb01..3cc4135124 100644 --- a/test/app/services/core/rx-batch-list-proxy.spec.ts +++ b/test/app/services/core/rx-batch-list-proxy.spec.ts @@ -6,7 +6,7 @@ import { List, OrderedSet } from "immutable"; import { LoadingStatus } from "app/components/base/loading"; import { BatchError, ServerError } from "app/models"; -import { DataCache, RxBatchEntityProxy, RxBatchListProxy } from "app/services/core"; +import { DataCache, RxBatchEntityProxy, RxBatchListProxy, getOnceProxy } from "app/services/core"; import { BatchClientServiceMock } from "test/utils/mocks"; import { FakeModel } from "./fake-model"; @@ -209,7 +209,7 @@ describe("RxBatchListProxy", () => { { id: "3", state: "running", name: "Fake3" }, ]; - proxy.loadNewItem(entityProxy as any).subscribe(() => { + proxy.loadNewItem(getOnceProxy(entityProxy)).subscribe(() => { expect(items).toEqualImmutable(List(expected.map((x) => new FakeModel(x)))); done(); }); @@ -225,7 +225,7 @@ describe("RxBatchListProxy", () => { { id: "4", state: "running", name: "Fake4" }, ].concat(data[0]); - proxy.loadNewItem(entityProxy as any).subscribe(() => { + proxy.loadNewItem(getOnceProxy(entityProxy)).subscribe(() => { expect(items).toEqualImmutable(List(expected.map((x) => new FakeModel(x)))); done(); });