From 839cd1eb0de68615b1e7135c73d6f7635b6a94b4 Mon Sep 17 00:00:00 2001 From: Timothee Guerin Date: Thu, 28 Jun 2018 14:51:06 -0700 Subject: [PATCH 1/4] Job graph home cancel list all --- .../job-graphs-home.component.ts | 97 ++++++++++++------- app/services/core/data/list-getter.ts | 11 ++- .../refresh-btn/refresh-btn.component.ts | 14 ++- 3 files changed, 78 insertions(+), 44 deletions(-) diff --git a/app/components/job/graphs/job-graphs-home/job-graphs-home.component.ts b/app/components/job/graphs/job-graphs-home/job-graphs-home.component.ts index bdd99a1ced..c032383243 100644 --- a/app/components/job/graphs/job-graphs-home/job-graphs-home.component.ts +++ b/app/components/job/graphs/job-graphs-home/job-graphs-home.component.ts @@ -1,16 +1,16 @@ -import { Component, OnDestroy, OnInit } from "@angular/core"; +import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from "@angular/core"; import { ActivatedRoute } from "@angular/router"; import { List } from "immutable"; import * as path from "path"; -import { Observable } from "rxjs"; +import { Observable, Subscription } from "rxjs"; import { FilterBuilder, autobind } from "@batch-flask/core"; import { ElectronShell } from "@batch-flask/ui"; -import { log } from "@batch-flask/utils"; import { tasksToCsv } from "app/components/job/graphs/job-graphs-home/helpers"; import { Job, Task, TaskState } from "app/models"; -import { CacheDataService, FileSystemService, JobParams, JobService, TaskService } from "app/services"; +import { CacheDataService, FileSystemService, JobParams, JobService, TaskService } from "app/services"; import { EntityView } from "app/services/core"; +import { flatMap, share, tap } from "rxjs/operators"; import "./job-graphs-home.scss"; enum AvailableGraph { @@ -20,6 +20,7 @@ enum AvailableGraph { @Component({ selector: "bl-job-graphs-home", templateUrl: "job-graphs-home.html", + changeDetection: ChangeDetectionStrategy.OnPush, }) export class JobGraphsComponent implements OnInit, OnDestroy { public static breadcrumb(params, queryParams) { @@ -38,6 +39,7 @@ export class JobGraphsComponent implements OnInit, OnDestroy { public taskCount: number; private _data: EntityView; + private _updateTasksSub: Subscription; constructor( private route: ActivatedRoute, @@ -46,11 +48,13 @@ export class JobGraphsComponent implements OnInit, OnDestroy { private cacheDataService: CacheDataService, private shell: ElectronShell, private fs: FileSystemService, + private changeDetector: ChangeDetectorRef, ) { this._data = this.jobService.view(); this._data.item.subscribe((job) => { this.job = job; + this.changeDetector.markForCheck(); }); this._updateDescription(); } @@ -60,48 +64,44 @@ export class JobGraphsComponent implements OnInit, OnDestroy { this.jobId = params["jobId"]; this._data.params = { id: this.jobId }; this._data.fetch(); - this.updateTasks().subscribe(); + this.updateTasks(); }); this._updateTaskCount(); } + public ngOnDestroy() { + this._data.dispose(); + if (this._updateTasksSub) { + this._updateTasksSub.unsubscribe(); + } + } + public updateTasks(force = false): Observable { + if (this._updateTasksSub) { + this._updateTasksSub.unsubscribe(); + } this.loading = true; + this.changeDetector.markForCheck(); - const obs = Observable.fromPromise(this._tryLoadTasksFromCache(force)).flatMap((success) => { - if (success) { - this.loading = false; - return Observable.of(null); - } - this.taskLoadedProgress = 0; - this._updateTaskCount(); - const obs = this.taskService.listAll(this.jobId, { - select: "id,executionInfo,nodeInfo", - filter: FilterBuilder.prop("state").eq(TaskState.completed), - pageSize: 1000, - }, (x) => { - this.taskLoadedProgress = x; - }); - - obs.subscribe({ - next: (tasks) => { + const obs = Observable.fromPromise(this._tryLoadTasksFromCache(force)).pipe( + flatMap((success) => { + if (success) { this.loading = false; - this.tasks = tasks; - this.cacheDataService.cache(this._cacheKey, tasks.toJS()); - }, - error: (error) => { - log.error(`Error retrieving all tasks for job ${this.job.id}`, error); - }, - }); - return obs; - - }).share(); - obs.subscribe(); - return obs; - } + this.changeDetector.markForCheck(); - public ngOnDestroy() { - this._data.dispose(); + return Observable.of(null); + } + this.taskLoadedProgress = 0; + this.changeDetector.markForCheck(); + + this._updateTaskCount(); + return this._loadAllTasks(); + + }), + share(), + ); + this._updateTasksSub = obs.subscribe(); + return obs; } public updateGraph(newGraph: AvailableGraph) { @@ -127,9 +127,28 @@ export class JobGraphsComponent implements OnInit, OnDestroy { private _updateTaskCount() { this.jobService.getTaskCounts(this.jobId).subscribe((taskCount) => { this.taskCount = taskCount.completed; + this.changeDetector.markForCheck(); }); } + private _loadAllTasks() { + return this.taskService.listAll(this.jobId, { + select: "id,executionInfo,nodeInfo", + filter: FilterBuilder.prop("state").eq(TaskState.completed), + pageSize: 1000, + }, (x) => { + this.taskLoadedProgress = x; + this.changeDetector.markForCheck(); + }).pipe( + tap((tasks) => { + this.loading = false; + this.tasks = tasks; + this.cacheDataService.cache(this._cacheKey, tasks.toJS()); + this.changeDetector.markForCheck(); + }), + ); + } + private _updateDescription() { switch (this.currentGraph) { case AvailableGraph.runningTime: @@ -142,6 +161,7 @@ export class JobGraphsComponent implements OnInit, OnDestroy { default: this.description = "Unkown graph type."; } + this.changeDetector.markForCheck(); } private get _cacheKey() { @@ -153,6 +173,8 @@ export class JobGraphsComponent implements OnInit, OnDestroy { return false; } this.loadingFromCache = true; + this.changeDetector.markForCheck(); + const data = await this.cacheDataService.read(this._cacheKey); if (data) { this.tasks = List(data.map(x => new Task(x))); @@ -160,6 +182,7 @@ export class JobGraphsComponent implements OnInit, OnDestroy { return true; } this.loadingFromCache = false; + this.changeDetector.markForCheck(); return false; } } diff --git a/app/services/core/data/list-getter.ts b/app/services/core/data/list-getter.ts index 6fd12bbce9..1713c80283 100644 --- a/app/services/core/data/list-getter.ts +++ b/app/services/core/data/list-getter.ts @@ -38,9 +38,14 @@ export abstract class ListGetter extends GenericGetter> { - return this._fetch(params, new ListOptions(options), true).flatMap(({ items, nextLink }) => { - return this._fetchRemaining(nextLink, items.size, progress) - .map(remainingItems => List(items.concat(remainingItems))); + return this._fetch(params, new ListOptions(options), true).expand(({ items, nextLink }) => { + return nextLink ? this._fetchNext(nextLink) : Observable.empty(); + }).reduce((items: TEntity[], response: ListResponse) => { + const array = [...items, ...response.items.toJS()]; + if (progress) { progress(array.length); } + return array; + }, []).map((items) => { + return List(items); }).share(); } diff --git a/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts b/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts index 2808719ae9..6fed0e2121 100644 --- a/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts +++ b/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts @@ -1,9 +1,9 @@ import { animate, style, transition, trigger } from "@angular/animations"; import { - ChangeDetectionStrategy, ChangeDetectorRef, Component, Input, + ChangeDetectionStrategy, ChangeDetectorRef, Component, Input, OnDestroy, } from "@angular/core"; import { autobind } from "@batch-flask/core"; -import { Observable } from "rxjs"; +import { Observable, Subscription } from "rxjs"; export enum RefreshStatus { Idle, @@ -25,7 +25,7 @@ export enum RefreshStatus { ], changeDetection: ChangeDetectionStrategy.OnPush, }) -export class RefreshButtonComponent { +export class RefreshButtonComponent implements OnDestroy { public statuses = RefreshStatus; @Input() public refresh: () => Observable; @@ -42,14 +42,20 @@ export class RefreshButtonComponent { public get status() { return this._status; } private _status = RefreshStatus.Idle; + private _refreshSub: Subscription; constructor(private changeDetector: ChangeDetectorRef) { } + public ngOnDestroy() { + if (this._refreshSub) { + this._refreshSub.unsubscribe(); + } + } @autobind() public onClick() { this.status = RefreshStatus.Refreshing; - this.refresh().subscribe( + this._refreshSub = this.refresh().subscribe( () => { this.status = RefreshStatus.Succeeded; setTimeout(() => { From 8276958401eae772a5884a43199c1ced62ba5dcd Mon Sep 17 00:00:00 2001 From: Timothee Guerin Date: Thu, 28 Jun 2018 14:59:35 -0700 Subject: [PATCH 2/4] Wip --- .../ui/buttons/refresh-btn/refresh-btn.component.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts b/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts index 6fed0e2121..5c46e0b91c 100644 --- a/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts +++ b/src/@batch-flask/ui/buttons/refresh-btn/refresh-btn.component.ts @@ -52,6 +52,7 @@ export class RefreshButtonComponent implements OnDestroy { this._refreshSub.unsubscribe(); } } + @autobind() public onClick() { this.status = RefreshStatus.Refreshing; @@ -70,4 +71,5 @@ export class RefreshButtonComponent implements OnDestroy { }, ); } + } From c6b81ec4bec8e6d382b467c6f1ff8d90dc537ef2 Mon Sep 17 00:00:00 2001 From: Timothee Guerin Date: Fri, 29 Jun 2018 09:48:51 -0700 Subject: [PATCH 3/4] Added specs --- app/services/core/data/list-getter/index.ts | 1 + .../data/list-getter}/list-getter.spec.ts | 28 ++++++++++++++++++- .../data/{ => list-getter}/list-getter.ts | 23 ++------------- 3 files changed, 31 insertions(+), 21 deletions(-) create mode 100644 app/services/core/data/list-getter/index.ts rename {test/app/services/core/data => app/services/core/data/list-getter}/list-getter.spec.ts (77%) rename app/services/core/data/{ => list-getter}/list-getter.ts (84%) diff --git a/app/services/core/data/list-getter/index.ts b/app/services/core/data/list-getter/index.ts new file mode 100644 index 0000000000..3990344546 --- /dev/null +++ b/app/services/core/data/list-getter/index.ts @@ -0,0 +1 @@ +export * from "./list-getter"; diff --git a/test/app/services/core/data/list-getter.spec.ts b/app/services/core/data/list-getter/list-getter.spec.ts similarity index 77% rename from test/app/services/core/data/list-getter.spec.ts rename to app/services/core/data/list-getter/list-getter.spec.ts index b2400f070c..3152177f18 100644 --- a/test/app/services/core/data/list-getter.spec.ts +++ b/app/services/core/data/list-getter/list-getter.spec.ts @@ -1,6 +1,7 @@ +import { fakeAsync, tick } from "@angular/core/testing"; import { BasicListGetter, DataCache } from "app/services/core"; import { Observable } from "rxjs"; -import { FakeModel } from "./fake-model"; +import { FakeModel } from "test/app/services/core/data/fake-model"; const firstPage = [ { id: "1", state: "active", name: "Fake1" }, @@ -95,4 +96,29 @@ describe("ListGetter", () => { }); }); }); + + it("cancel the request when unsubscribing fetchall", fakeAsync(() => { + dataSpy = jasmine.createSpy("supplyDataSpy").and.callFake(() => { + return Observable.timer(100).map(x => ({ + data: [{ id: "1", state: "active", name: "Fake1" }], + nextLink: "more-to-load", + })); + }); + + getter = new BasicListGetter(FakeModel, { + cache: () => cache, + supplyData: dataSpy, + }); + + const sub = getter.fetchAll({}).subscribe(); + expect(dataSpy).toHaveBeenCalledTimes(1); + tick(100); + expect(dataSpy).toHaveBeenCalledTimes(2); + sub.unsubscribe(); + tick(100); + // Should not have been called anymore + expect(dataSpy).toHaveBeenCalledTimes(2); + tick(1000); + expect(dataSpy).toHaveBeenCalledTimes(2); + })); }); diff --git a/app/services/core/data/list-getter.ts b/app/services/core/data/list-getter/list-getter.ts similarity index 84% rename from app/services/core/data/list-getter.ts rename to app/services/core/data/list-getter/list-getter.ts index 1713c80283..ba84583ad4 100644 --- a/app/services/core/data/list-getter.ts +++ b/app/services/core/data/list-getter/list-getter.ts @@ -1,10 +1,10 @@ import { Type } from "@angular/core"; -import { Iterable, List, OrderedSet } from "immutable"; +import { List, OrderedSet } from "immutable"; import { Observable } from "rxjs"; import { DataCache } from "app/services/core/data-cache"; -import { GenericGetter, GenericGetterConfig } from "./generic-getter"; -import { ContinuationToken, ListOptions, ListOptionsAttributes } from "./list-options"; +import { GenericGetter, GenericGetterConfig } from "../generic-getter"; +import { ContinuationToken, ListOptions, ListOptionsAttributes } from "../list-options"; export type FetchAllProgressCallback = (count: number) => void; @@ -72,23 +72,6 @@ export abstract class ListGetter extends GenericGetter this._processItems(cache, x, token.params, token.options, false)); } - private _fetchRemaining( - nextLink: ContinuationToken, - currentCount: number, - progress?: FetchAllProgressCallback): Observable> { - if (progress) { - progress(currentCount); - } - if (!nextLink) { - return Observable.of(List([])); - } - return this._fetchNext(nextLink).flatMap((response) => { - const newCount = currentCount + response.items.size; - return this._fetchRemaining(response.nextLink, newCount, progress) - .map(remainingItems => response.items.concat(remainingItems)); - }).share(); - } - private _processItems( cache: DataCache, response: any, From fe178088cfb81283d39d053eba181ff5960c5585 Mon Sep 17 00:00:00 2001 From: Timothee Guerin Date: Fri, 29 Jun 2018 10:00:21 -0700 Subject: [PATCH 4/4] Update job progress status message --- .../job-progress-status.component.ts | 13 +++++++++++++ .../job-progress-status/job-progress-status.html | 4 +++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/app/components/job/details/job-progress-status/job-progress-status.component.ts b/app/components/job/details/job-progress-status/job-progress-status.component.ts index 87359bc01c..fefb9b1f7c 100644 --- a/app/components/job/details/job-progress-status/job-progress-status.component.ts +++ b/app/components/job/details/job-progress-status/job-progress-status.component.ts @@ -36,6 +36,19 @@ export class JobProgressStatusComponent implements OnChanges, OnDestroy { return this.jobTaskCounts.validationStatus === JobTaskCountsValidationStatus.validated; } + public get taskCountTooHigh() { + return this.jobTaskCounts.total > 175000; + } + + public get taskCountUnvalidatedTooltip() { + if (this.taskCountTooHigh) { + return "For performance reasons, Batch will not perform the consistency" + + " check if the job includes more than 200,000 tasks. Use with caution."; + } else { + return "Batch was unable to check the validity of the task counts. Use with caution."; + } + } + private data: ListView; private poolData: EntityView; diff --git a/app/components/job/details/job-progress-status/job-progress-status.html b/app/components/job/details/job-progress-status/job-progress-status.html index 81b0aa168e..a9adc2d718 100644 --- a/app/components/job/details/job-progress-status/job-progress-status.html +++ b/app/components/job/details/job-progress-status/job-progress-status.html @@ -22,7 +22,9 @@
-
+
Task count might not be accurate