Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Job graph cancel list all #1478

Merged
merged 6 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ export class JobProgressStatusComponent implements OnChanges, OnDestroy {
return this.jobTaskCounts.validationStatus === JobTaskCountsValidationStatus.validated;
}

public get taskCountTooHigh() {
return this.jobTaskCounts.total > 175000;
}

public get taskCountUnvalidatedTooltip() {
if (this.taskCountTooHigh) {
return "For performance reasons, Batch will not perform the consistency"
+ " check if the job includes more than 200,000 tasks. Use with caution.";
} else {
return "Batch was unable to check the validity of the task counts. Use with caution.";
}
}

private data: ListView<Node, NodeListParams>;
private poolData: EntityView<Pool, PoolParams>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
</div>
</div>
<div class="warning-container">
<div class="invalidated-data" matTooltip="The batch service was unable to check the validity of the task counts. Use with caution." *ngIf="!taskCountValidated">
<div class="invalidated-data"
[matTooltip]="taskCountUnvalidatedTooltip"
*ngIf="!taskCountValidated">
<i class="fa fa-warning"></i>Task count might not be accurate
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Component, OnDestroy, OnInit } from "@angular/core";
import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from "@angular/core";
import { ActivatedRoute } from "@angular/router";
import { List } from "immutable";
import * as path from "path";
import { Observable } from "rxjs";
import { Observable, Subscription } from "rxjs";

import { FilterBuilder, autobind } from "@batch-flask/core";
import { ElectronShell } from "@batch-flask/ui";
import { log } from "@batch-flask/utils";
import { tasksToCsv } from "app/components/job/graphs/job-graphs-home/helpers";
import { Job, Task, TaskState } from "app/models";
import { CacheDataService, FileSystemService, JobParams, JobService, TaskService } from "app/services";
import { CacheDataService, FileSystemService, JobParams, JobService, TaskService } from "app/services";
import { EntityView } from "app/services/core";
import { flatMap, share, tap } from "rxjs/operators";
import "./job-graphs-home.scss";

enum AvailableGraph {
Expand All @@ -20,6 +20,7 @@ enum AvailableGraph {
@Component({
selector: "bl-job-graphs-home",
templateUrl: "job-graphs-home.html",
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class JobGraphsComponent implements OnInit, OnDestroy {
public static breadcrumb(params, queryParams) {
Expand All @@ -38,6 +39,7 @@ export class JobGraphsComponent implements OnInit, OnDestroy {
public taskCount: number;

private _data: EntityView<Job, JobParams>;
private _updateTasksSub: Subscription;

constructor(
private route: ActivatedRoute,
Expand All @@ -46,11 +48,13 @@ export class JobGraphsComponent implements OnInit, OnDestroy {
private cacheDataService: CacheDataService,
private shell: ElectronShell,
private fs: FileSystemService,
private changeDetector: ChangeDetectorRef,
) {

this._data = this.jobService.view();
this._data.item.subscribe((job) => {
this.job = job;
this.changeDetector.markForCheck();
});
this._updateDescription();
}
Expand All @@ -60,48 +64,44 @@ export class JobGraphsComponent implements OnInit, OnDestroy {
this.jobId = params["jobId"];
this._data.params = { id: this.jobId };
this._data.fetch();
this.updateTasks().subscribe();
this.updateTasks();
});
this._updateTaskCount();
}

public ngOnDestroy() {
this._data.dispose();
if (this._updateTasksSub) {
this._updateTasksSub.unsubscribe();
}
}

public updateTasks(force = false): Observable<any> {
if (this._updateTasksSub) {
this._updateTasksSub.unsubscribe();
}
this.loading = true;
this.changeDetector.markForCheck();

const obs = Observable.fromPromise(this._tryLoadTasksFromCache(force)).flatMap((success) => {
if (success) {
this.loading = false;
return Observable.of(null);
}
this.taskLoadedProgress = 0;
this._updateTaskCount();
const obs = this.taskService.listAll(this.jobId, {
select: "id,executionInfo,nodeInfo",
filter: FilterBuilder.prop("state").eq(TaskState.completed),
pageSize: 1000,
}, (x) => {
this.taskLoadedProgress = x;
});

obs.subscribe({
next: (tasks) => {
const obs = Observable.fromPromise(this._tryLoadTasksFromCache(force)).pipe(
flatMap((success) => {
if (success) {
this.loading = false;
this.tasks = tasks;
this.cacheDataService.cache(this._cacheKey, tasks.toJS());
},
error: (error) => {
log.error(`Error retrieving all tasks for job ${this.job.id}`, error);
},
});
return obs;

}).share();
obs.subscribe();
return obs;
}
this.changeDetector.markForCheck();

public ngOnDestroy() {
this._data.dispose();
return Observable.of(null);
}
this.taskLoadedProgress = 0;
this.changeDetector.markForCheck();

this._updateTaskCount();
return this._loadAllTasks();

}),
share(),
);
this._updateTasksSub = obs.subscribe();
return obs;
}

public updateGraph(newGraph: AvailableGraph) {
Expand All @@ -127,9 +127,28 @@ export class JobGraphsComponent implements OnInit, OnDestroy {
private _updateTaskCount() {
this.jobService.getTaskCounts(this.jobId).subscribe((taskCount) => {
this.taskCount = taskCount.completed;
this.changeDetector.markForCheck();
});
}

private _loadAllTasks() {
return this.taskService.listAll(this.jobId, {
select: "id,executionInfo,nodeInfo",
filter: FilterBuilder.prop("state").eq(TaskState.completed),
pageSize: 1000,
}, (x) => {
this.taskLoadedProgress = x;
this.changeDetector.markForCheck();
}).pipe(
tap((tasks) => {
this.loading = false;
this.tasks = tasks;
this.cacheDataService.cache(this._cacheKey, tasks.toJS());
this.changeDetector.markForCheck();
}),
);
}

private _updateDescription() {
switch (this.currentGraph) {
case AvailableGraph.runningTime:
Expand All @@ -142,6 +161,7 @@ export class JobGraphsComponent implements OnInit, OnDestroy {
default:
this.description = "Unkown graph type.";
}
this.changeDetector.markForCheck();
}

private get _cacheKey() {
Expand All @@ -153,13 +173,16 @@ export class JobGraphsComponent implements OnInit, OnDestroy {
return false;
}
this.loadingFromCache = true;
this.changeDetector.markForCheck();

const data = await this.cacheDataService.read(this._cacheKey);
if (data) {
this.tasks = List(data.map(x => new Task(x)));
this.loadingFromCache = false;
return true;
}
this.loadingFromCache = false;
this.changeDetector.markForCheck();
return false;
}
}
1 change: 1 addition & 0 deletions app/services/core/data/list-getter/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./list-getter";
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { fakeAsync, tick } from "@angular/core/testing";
import { BasicListGetter, DataCache } from "app/services/core";
import { Observable } from "rxjs";
import { FakeModel } from "./fake-model";
import { FakeModel } from "test/app/services/core/data/fake-model";

const firstPage = [
{ id: "1", state: "active", name: "Fake1" },
Expand Down Expand Up @@ -95,4 +96,29 @@ describe("ListGetter", () => {
});
});
});

it("cancel the request when unsubscribing fetchall", fakeAsync(() => {
dataSpy = jasmine.createSpy("supplyDataSpy").and.callFake(() => {
return Observable.timer(100).map(x => ({
data: [{ id: "1", state: "active", name: "Fake1" }],
nextLink: "more-to-load",
}));
});

getter = new BasicListGetter(FakeModel, {
cache: () => cache,
supplyData: dataSpy,
});

const sub = getter.fetchAll({}).subscribe();
expect(dataSpy).toHaveBeenCalledTimes(1);
tick(100);
expect(dataSpy).toHaveBeenCalledTimes(2);
sub.unsubscribe();
tick(100);
// Should not have been called anymore
expect(dataSpy).toHaveBeenCalledTimes(2);
tick(1000);
expect(dataSpy).toHaveBeenCalledTimes(2);
}));
});
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Type } from "@angular/core";
import { Iterable, List, OrderedSet } from "immutable";
import { List, OrderedSet } from "immutable";
import { Observable } from "rxjs";

import { DataCache } from "app/services/core/data-cache";
import { GenericGetter, GenericGetterConfig } from "./generic-getter";
import { ContinuationToken, ListOptions, ListOptionsAttributes } from "./list-options";
import { GenericGetter, GenericGetterConfig } from "../generic-getter";
import { ContinuationToken, ListOptions, ListOptionsAttributes } from "../list-options";

export type FetchAllProgressCallback = (count: number) => void;

Expand Down Expand Up @@ -38,9 +38,14 @@ export abstract class ListGetter<TEntity, TParams> extends GenericGetter<TEntity
options?: ListOptionsAttributes | ListOptions,
progress?: FetchAllProgressCallback): Observable<List<TEntity>> {

return this._fetch(params, new ListOptions(options), true).flatMap(({ items, nextLink }) => {
return this._fetchRemaining(nextLink, items.size, progress)
.map(remainingItems => List<TEntity>(items.concat(remainingItems)));
return this._fetch(params, new ListOptions(options), true).expand(({ items, nextLink }) => {
return nextLink ? this._fetchNext(nextLink) : Observable.empty();
}).reduce((items: TEntity[], response: ListResponse<TEntity>) => {
const array = [...items, ...response.items.toJS()];
if (progress) { progress(array.length); }
return array;
}, []).map((items) => {
return List(items);
}).share();
}

Expand All @@ -67,23 +72,6 @@ export abstract class ListGetter<TEntity, TParams> extends GenericGetter<TEntity
return this.listNext(token).map(x => this._processItems(cache, x, token.params, token.options, false));
}

private _fetchRemaining(
nextLink: ContinuationToken,
currentCount: number,
progress?: FetchAllProgressCallback): Observable<Iterable<any, TEntity>> {
if (progress) {
progress(currentCount);
}
if (!nextLink) {
return Observable.of(List([]));
}
return this._fetchNext(nextLink).flatMap((response) => {
const newCount = currentCount + response.items.size;
return this._fetchRemaining(response.nextLink, newCount, progress)
.map(remainingItems => response.items.concat(remainingItems));
}).share();
}

private _processItems(
cache: DataCache<TEntity>,
response: any,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { animate, style, transition, trigger } from "@angular/animations";
import {
ChangeDetectionStrategy, ChangeDetectorRef, Component, Input,
ChangeDetectionStrategy, ChangeDetectorRef, Component, Input, OnDestroy,
} from "@angular/core";
import { autobind } from "@batch-flask/core";
import { Observable } from "rxjs";
import { Observable, Subscription } from "rxjs";

export enum RefreshStatus {
Idle,
Expand All @@ -25,7 +25,7 @@ export enum RefreshStatus {
],
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class RefreshButtonComponent {
export class RefreshButtonComponent implements OnDestroy {
public statuses = RefreshStatus;

@Input() public refresh: () => Observable<any>;
Expand All @@ -42,14 +42,21 @@ export class RefreshButtonComponent {
public get status() { return this._status; }

private _status = RefreshStatus.Idle;
private _refreshSub: Subscription;

constructor(private changeDetector: ChangeDetectorRef) {
}

public ngOnDestroy() {
if (this._refreshSub) {
this._refreshSub.unsubscribe();
}
}

@autobind()
public onClick() {
this.status = RefreshStatus.Refreshing;
this.refresh().subscribe(
this._refreshSub = this.refresh().subscribe(
() => {
this.status = RefreshStatus.Succeeded;
setTimeout(() => {
Expand All @@ -64,4 +71,5 @@ export class RefreshButtonComponent {
},
);
}

}