Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Entities are refreshing every 10 seconds automatically #436

Merged
merged 7 commits into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class ApplicationListComponent extends ListOrTableBase implements OnInit,

this.status = this.data.status;
this._subs.push(applicationService.onApplicationAdded.subscribe((applicationId) => {
this.data.loadNewItem(applicationService.get(applicationId));
this.data.loadNewItem(applicationService.getOnce(applicationId));
}));
}

Expand Down
18 changes: 6 additions & 12 deletions app/components/job/action/delete/delete-job-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,17 @@ export class WaitForDeleteJobPoller {
@autobind()
public start(progress: BehaviorSubject<any>): Observable<any> {
const obs = new AsyncSubject();
const data = this.jobService.get(this.jobId);
const errorCallback = (e) => {
progress.next(100);
clearInterval(interval);
obs.complete();
};

let interval = setInterval(() => {
data.fetch().subscribe({
error: errorCallback,
this.jobService.getOnce(this.jobId).subscribe({
error: (e) => {
progress.next(100);
clearInterval(interval);
obs.complete();
},
});
}, 5000);

progress.next(-1);
data.item.subscribe({
error: errorCallback,
});

return obs;
}
Expand Down
2 changes: 1 addition & 1 deletion app/components/job/browse/job-list.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class JobListComponent extends ListOrTableBase implements OnInit, OnDestr
this.data = this.jobService.list(this._baseOptions);
this.status = this.data.status;
this._onJobAddedSub = jobService.onJobAdded.subscribe((jobId) => {
this.data.loadNewItem(jobService.get(jobId));
this.data.loadNewItem(jobService.getOnce(jobId));
});
}

Expand Down
1 change: 1 addition & 0 deletions app/components/job/details/job-details.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class JobDetailsComponent implements OnInit, OnDestroy {

public ngOnDestroy() {
this._paramsSubscriber.unsubscribe();
this.data.dispose();
}

public get filterPlaceholderText() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class JobProgressStatusComponent implements OnChanges, OnDestroy {
public ngOnDestroy() {
this._polls.forEach(x => x.destroy());
this._subs.forEach(x => x.unsubscribe());
this.poolData.dispose();
}

public countRunningTasks() {
Expand Down
2 changes: 2 additions & 0 deletions app/components/node/details/node-details.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ export class NodeDetailsComponent implements OnInit, OnDestroy {

public ngOnDestroy() {
this._paramsSubscribers.forEach(x => x.unsubscribe());
this.poolData.dispose();
this.data.dispose();
}

@autobind()
Expand Down
32 changes: 13 additions & 19 deletions app/components/pool/action/delete/delete-pool-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PoolService } from "app/services";
import { LongRunningDeleteAction } from "app/services/core";

export class DeletePoolTask extends LongRunningDeleteAction {
constructor(private poolService: PoolService, poolIds: string[]) {
constructor(private poolService: PoolService, poolIds: string[]) {
super("pool", poolIds);
}

Expand Down Expand Up @@ -53,31 +53,25 @@ export class WaitForDeletePoolPollTask {
@autobind()
public start(progress: BehaviorSubject<any>): Observable<any> {
const obs = new AsyncSubject();
const data = this.poolService.get(this.poolId);
let interval;

const errorCallback = (e) => {
progress.next(100);
clearInterval(interval);
obs.complete();
};

interval = setInterval(() => {
data.fetch().subscribe({
error: errorCallback,
this.poolService.getOnce(this.poolId).subscribe({
next: (pool: Pool) => {
if (pool) {
const currentNodes = pool.currentNodes;
progress.next(this._getProgress(currentNodes));
}
},
error: (e) => {
progress.next(100);
clearInterval(interval);
obs.complete();
},
});
}, this.refreshRate);

progress.next(10);
data.item.subscribe({
next: (pool: Pool) => {
if (pool) {
const currentNodes = pool.currentNodes;
progress.next(this._getProgress(currentNodes));
}
},
error: errorCallback,
});

return obs;
}
Expand Down
2 changes: 1 addition & 1 deletion app/components/pool/browse/pool-list.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class PoolListComponent extends ListOrTableBase implements OnInit, OnDest
this.data = this.poolService.list();
this.status = this.data.status;
this._subs.push(poolService.onPoolAdded.subscribe((poolId) => {
this.data.loadNewItem(poolService.get(poolId));
this.data.loadNewItem(poolService.getOnce(poolId));
}));
this._subs.push(this.data.items.subscribe((pools) => {
this.pools = List<PoolDecorator>(pools.map(x => new PoolDecorator(x)));
Expand Down
1 change: 1 addition & 0 deletions app/components/pool/details/pool-details.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class PoolDetailsComponent implements OnInit, OnDestroy {

public ngOnDestroy() {
this._paramsSubscriber.unsubscribe();
this.data.dispose();
}

public get filterPlaceholderText() {
Expand Down
19 changes: 6 additions & 13 deletions app/components/task/action/delete/delete-task-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,17 @@ export class WaitForDeleteTaskPoller {
@autobind()
public start(progress: BehaviorSubject<any>): Observable<any> {
const obs = new AsyncSubject();
const data = this.taskService.get(this.jobId, this.taskId);
const errorCallback = (e) => {
progress.next(100);
clearInterval(interval);
obs.complete();
};

let interval = setInterval(() => {
data.fetch().subscribe({
error: errorCallback,
this.taskService.getOnce(this.jobId, this.taskId).subscribe({
error: (e) => {
progress.next(100);
clearInterval(interval);
obs.complete();
},
});
}, 5000);

progress.next(-1);
data.item.subscribe({
error: errorCallback,
});

return obs;
}
}
2 changes: 1 addition & 1 deletion app/components/task/browse/task-list.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class TaskListComponent extends SelectableList implements OnInit {
super();

this._onTaskAddedSub = taskService.onTaskAdded.subscribe((item: TaskParams) => {
this.data.loadNewItem(taskService.get(item.jobId, item.id));
this.data.loadNewItem(taskService.getOnce(item.jobId, item.id));
});
}

Expand Down
2 changes: 2 additions & 0 deletions app/components/task/details/task-details.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export class TaskDetailsComponent implements OnInit, OnDestroy {

public ngOnDestroy() {
this._paramsSubscribers.forEach(x => x.unsubscribe());
this.jobData.dispose();
this.data.dispose();
}

@autobind()
Expand Down
64 changes: 56 additions & 8 deletions app/services/core/rx-entity-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ import { AsyncSubject, BehaviorSubject, Observable } from "rxjs";

import { LoadingStatus } from "app/components/base/loading";
import { ServerError } from "app/models";
import { PollObservable } from "app/services/core";
import { HttpCode } from "app/utils/constants";
import { RxProxyBase, RxProxyBaseConfig } from "./rx-proxy-base";

export interface RxEntityProxyConfig<TParams, TEntity> extends RxProxyBaseConfig<TParams, TEntity> {
/**
* If you want to have the entity proxy poll automatically for you every given milliseconds.
* @default Disabled
*/
poll?: number;
}

export abstract class RxEntityProxy<TParams, TEntity> extends RxProxyBase<TParams, any, TEntity> {
Expand All @@ -17,6 +23,7 @@ export abstract class RxEntityProxy<TParams, TEntity> extends RxProxyBase<TParam
public item: Observable<TEntity>;

private _itemKey = new BehaviorSubject<string>(null);
private _pollTracker: PollObservable;

/**
* @param _type Class for TEntity used to instantiate
Expand All @@ -27,11 +34,15 @@ export abstract class RxEntityProxy<TParams, TEntity> extends RxProxyBase<TParam
constructor(type: Type<TEntity>, config: RxEntityProxyConfig<TParams, TEntity>) {
super(type, config);
this.params = config.initialParams || {} as TParams;
this.item = this._itemKey.map((key) => {
this.item = this._itemKey.distinctUntilChanged().map((key) => {
return this.cache.items.map((items) => {
return items.get(key);
});
}).switch();
}).switch().distinctUntilChanged().takeUntil(this.isDisposed);

if (config.poll) {
this._pollTracker = this.startPoll(5000);
}
}

/**
Expand Down Expand Up @@ -59,10 +70,31 @@ export abstract class RxEntityProxy<TParams, TEntity> extends RxProxyBase<TParam
});
}

/**
* @see RxProxyBase#dispose()
*/
public dispose() {
super.dispose();
this._itemKey.complete();
this.stopPoll();
}

public refresh(): Observable<any> {
return this.fetch();
}

/**
* Stop the automatically started poll if applicable
*/
public stopPoll() {
if (this._pollTracker) {
this._pollTracker.destroy();
}
}

/**
* Abstract method implementation of what to do when the polling calls.
*/
protected pollRefresh() {
return this.refresh();
}
Expand All @@ -83,22 +115,38 @@ export abstract class RxEntityProxy<TParams, TEntity> extends RxProxyBase<TParam

export function getOnceProxy<TEntity>(getProxy: RxEntityProxy<any, TEntity>): Observable<TEntity> {
const obs = new AsyncSubject<TEntity>();
getProxy.stopPoll();

getProxy.fetch().subscribe({
next: () => {
getProxy.item.first().subscribe((item: TEntity) => {
if (item) {
obs.next(item);
obs.complete();
getProxy.dispose();
}
getProxy.item.subscribe((item: TEntity) => {
obs.next(item);
obs.complete();
getProxy.dispose();
});
},
error: (e) => {
obs.error(e);
obs.complete();
getProxy.dispose();
},
});

return obs.asObservable();
}

// const sub = new BehaviorSubject(0);
// const until = new AsyncSubject();
// let i = 0;
// setInterval(() => {
// sub.next(i++);
// if (i === 4) {
// until.next(true);
// until.complete();
// }
// }, 2000);

// const obs = sub.takeUntil(until);
// obs.subscribe((x) => {
// console.log("New value", x);
// });
9 changes: 3 additions & 6 deletions app/services/core/rx-list-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { AsyncSubject, BehaviorSubject, Observable } from "rxjs";
import { LoadingStatus } from "app/components/base/loading";
import { log } from "app/utils";
import { ListOptions, ListOptionsAttributes } from "./list-options";
import { RxEntityProxy } from "./rx-entity-proxy";
import { RxProxyBase, RxProxyBaseConfig } from "./rx-proxy-base";

export interface RxListProxyConfig<TParams, TEntity> extends RxProxyBaseConfig<TParams, TEntity> {
Expand Down Expand Up @@ -129,18 +128,16 @@ export abstract class RxListProxy<TParams, TEntity> extends RxProxyBase<TParams,
* It should not add the new item if already present.
* The cache system will handle updating it already.
*/
public loadNewItem(entityProxy: RxEntityProxy<any, TEntity>): Observable<any> {
const obs = entityProxy.fetch().flatMap(() => entityProxy.item.first());
obs.subscribe({
public loadNewItem(getOnceObs: Observable<TEntity>): Observable<any> {
getOnceObs.subscribe({
next: (newItem) => {
this._addItemToList(newItem);
}, error: (error) => {
log.error("Error loading new item into RxListProxy",
{ error, params: this._params, options: this._options });
},
});

return obs;
return getOnceObs;
}

/**
Expand Down
15 changes: 13 additions & 2 deletions app/services/core/rx-proxy-base.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Type } from "@angular/core";
import { BehaviorSubject, Observable, Subject, Subscription } from "rxjs";
import { AsyncSubject, BehaviorSubject, Observable, Subject, Subscription } from "rxjs";

import { LoadingStatus } from "app/components/base/loading";
import { ServerError } from "app/models";
Expand Down Expand Up @@ -62,6 +62,11 @@ export abstract class RxProxyBase<TParams, TOptions extends ProxyOptions, TEntit
*/
public newDataStatus: Observable<LoadingStatus>;

/**
* Sets to after calling dispose()
*/
public isDisposed: AsyncSubject<boolean>;

protected _status = new BehaviorSubject<LoadingStatus>(LoadingStatus.Loading);
protected _newDataStatus = new BehaviorSubject<LoadingStatus>(LoadingStatus.Loading);
protected _error = new BehaviorSubject<ServerError>(null);
Expand All @@ -86,6 +91,7 @@ export abstract class RxProxyBase<TParams, TOptions extends ProxyOptions, TEntit
this.status = this._status.asObservable();
this.newDataStatus = this._newDataStatus.asObservable();
this.error = this._error.asObservable();
this.isDisposed = new AsyncSubject();

this.status.subscribe((status) => {
if (status === LoadingStatus.Loading) {
Expand Down Expand Up @@ -163,7 +169,12 @@ export abstract class RxProxyBase<TParams, TOptions extends ProxyOptions, TEntit
* otherwise internal subscribe will never get cleared and the list porxy will not get GC
*/
public dispose() {
this.isDisposed.next(true);
this.isDisposed.complete();
this._clearDeleteSub();
this._deleted.complete();
this._status.complete();
this._error.complete();
}

protected set cache(cache: DataCache<TEntity>) {
Expand Down Expand Up @@ -265,7 +276,7 @@ export abstract class RxProxyBase<TParams, TOptions extends ProxyOptions, TEntit

private _key() {
const paramsKey = ObjectUtils.serialize(this._params);
const optionsKey = ObjectUtils.serialize(this._options.original);
const optionsKey = this._options && ObjectUtils.serialize(this._options.original);
return `${paramsKey}|${optionsKey}`;
}
}
Loading