Skip to content
This repository has been archived by the owner on Oct 2, 2024. It is now read-only.

Commit

Permalink
Adding paging
Browse files Browse the repository at this point in the history
  • Loading branch information
rpeach-sag committed Jul 1, 2020
1 parent 1992a7d commit 2a9e1cf
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 34 deletions.
20 changes: 11 additions & 9 deletions src/datahub-widget/datahub-widget.component.html
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
<ngx-datatable
class="bootstrap"
class="material"
[rows]="rows"
[loadingIndicator]="true"
[columns]="cols"
[columnMode]="ColumnMode.force"
[headerHeight]="40"
[summaryRow]="true"
[summaryPosition]="'bottom'"
[footerHeight]="40"
[limit]="10"
rowHeight="auto"
[reorderable]="true"
[headerHeight]="50"
[loadingIndicator]="pagesLoading > 0 || !this.currentJob"
[scrollbarV]="true"
[footerHeight]="50"
[rowHeight]="50"
[externalPaging]="true"
[externalSorting]="true"
[count]="totalRowCount"
[offset]="pageInfo?.offset || 0"
(page)="setPage($event)"
></ngx-datatable>
168 changes: 149 additions & 19 deletions src/datahub-widget/datahub-widget.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,49 @@

import {Component, Input, OnDestroy} from '@angular/core';
import {BehaviorSubject, from, interval, Subscription} from "rxjs";
import {delay, distinctUntilChanged, mapTo, retryWhen, startWith, switchMap} from "rxjs/operators";
import {
concatMap,
delay,
distinctUntilChanged, filter,
map,
mapTo,
reduce,
retryWhen, scan,
startWith,
switchMap
} from "rxjs/operators";
import {IDatahubWidgetConfig} from "./datahub-widget-config.component";
import {QueryService} from "./query.service";
import {Job, JobStatus, QueryService} from "./query.service";
import { ColumnMode } from '@swimlane/ngx-datatable';

interface PageInfo {
offset: number;
pageSize: number;
limit: number;
count: number;
}

@Component({
templateUrl: './datahub-widget.component.html',
styles: [ `.text { transform: scaleX(-1); font-size: 3em ;}` ]
styles: [ `
.ngx-datatable.scroll-vertical {
height: 100%;
}
.ngx-datatable.material .datatable-header .datatable-header-cell {
text-align: left;
padding: .9rem 1.2rem;
padding-top: 0.9rem;
padding-right: 1.2rem;
padding-bottom: 0.9rem;
padding-left: 1.2rem;
font-weight: 400;
background-color: #fff;
color: rgba(0,0,0,.54);
vertical-align: bottom;
font-size: 12px;
font-weight: 500;
}
` ]
})
export class DatahubWidgetComponent implements OnDestroy {
_config: IDatahubWidgetConfig = {
Expand All @@ -34,6 +69,15 @@ export class DatahubWidgetComponent implements OnDestroy {
refreshPeriod: 60000,
columns: []
};
ColumnMode = ColumnMode;
subscriptions = new Subscription();
querySubject = new BehaviorSubject<undefined | string>(undefined);
cols: { prop?: string, name: string, sortable?: boolean }[];
rows: {[colName: string]: any}[] = [];
currentJob: Job;
totalRowCount: number = 0;
pageInfo: PageInfo;
pagesLoading = 0;

@Input() set config(config: IDatahubWidgetConfig) {
this._config = Object.assign(config, {
Expand All @@ -45,40 +89,126 @@ export class DatahubWidgetComponent implements OnDestroy {
.filter(col => col.visibility == 'visible')
.map(col => ({
prop: col.colName,
name: col.displayName
name: col.displayName,
sortable: false
}));
};
get config(): IDatahubWidgetConfig {
return this._config
}

ColumnMode = ColumnMode;

subscriptions = new Subscription();
querySubject = new BehaviorSubject<undefined | string>(undefined);

cols: {
prop: string,
name: string
}[];
rows: {[colName: string]: any}[] = [];

constructor(private queryService: QueryService) {
this.subscriptions.add(
this.querySubject
.pipe(
distinctUntilChanged(),
// Filter out any blank or undefined queries
filter(query => !!query),
// Re-query every refreshPeriod to refresh the data
switchMap(val => interval(this.config.refreshPeriod).pipe(mapTo(val), startWith(val))),
switchMap(query => from(this.queryService.queryForResults(query, {timeout: this.config.refreshPeriod}))),
// Start the job - Keep the responses in order by using concatMap
concatMap(query => from(this.queryService.postQuery(query))),
// Cancel the previous job if we get a new one
scan((previousJob, job) => {
// Ignore the result - if we can't cancel the query it has probably finished already
this.queryService.cancelJob(previousJob.id);
return job;
}),
// Wait for the job to complete - erroring if it does not complete successfully
switchMap(job => this.queryService.waitForJobToComplete$(job.id).pipe(
map(jobStatus => {
if (jobStatus.jobState === 'COMPLETED') {
return [job, jobStatus] as [Job, JobStatus];
} else {
throw new Error("Query job failed");
}
})
)),
// Handle errors by retrying
retryWhen(e => e.pipe(delay(this.config.refreshPeriod)))
)
.subscribe(results => {
this.rows = results.rows
.subscribe(([job, jobStatus]) => {
// We have a new job that has completed successfully

// We're using virtual paging so need to know the total number of rows
this.totalRowCount = jobStatus.rowCount || 0;
this.rows = new Array(this.totalRowCount).fill(undefined);
this.currentJob = job;
this.pagesLoading = 0;
this.setPage(this.pageInfo);
})
);
}

setPage(pageInfo: PageInfo) {
this.pageInfo = pageInfo;

const pageNumber = this.pageInfo.offset;
const pageSize = this.pageInfo.pageSize;

const pageStart = pageNumber * pageSize;

if (!this.areRowsLoaded(pageStart, pageSize)) {
this.loadRows(pageStart, pageSize)
.catch(e => console.error(e));
}
// We also load the previous/next page so that scrolling isn't too slow and so that refreshes don't mess up when on a boundary
const previousPageStart = (pageNumber - 1) * pageSize;
if (previousPageStart >= 0 && !this.areRowsLoaded(previousPageStart, pageSize)) {
this.loadRows(previousPageStart, pageSize)
.catch(e => console.error(e));
}
const nextPageStart = (pageNumber + 1) * pageSize;
if (nextPageStart < this.rows.length && !this.areRowsLoaded(nextPageStart, pageSize)) {
this.loadRows(nextPageStart, pageSize)
.catch(e => console.error(e));
}
}

areRowsLoaded(start: number, count: number, includeLoadingRows: boolean = true) {
// undefined means row not loaded, null means loading, and a value mean it is loaded
return !this.rows.slice(start, start + count).some(row => includeLoadingRows ? row === undefined : (row === undefined || row === null))
}

async loadRows(start: number, count: number) {
if (!this.currentJob) return;

// Store the job we are working on - by the time we get the results back it might have changed
const job = this.currentJob;

this.pagesLoading++;

// Mark the rows as loading (So that they aren't loaded twice)
for (let i = start; i <= start + count && i < this.rows.length; i++) {
// null means loading
this.rows[i] = this.rows[i] || null;
}

const results = await this.queryService.getJobResults(job.id, start, count);

// Check that the job hasn't changed since we started loading the page, if it has then these results are stale
if (job.id !== this.currentJob.id) return;

// Clone the rows array - change detection assumes that the objects + arrays are immutable
const rows = [...this.rows];
// Add the new items
rows.splice(start, results.rows.length, ...results.rows);
// Update the list of rows
this.rows = rows;

// Check if dremio/datahub actually returned all the rows we wanted, if not then request the rows that were missed
if (
results.rows.length < count // Did datahub miss any expected rows?
&& start + count < this.rows.length // Were we actually expecting these rows?
&& results.rows.length > 0 // Did it just return 0 rows? - if so then there's probably something else wrong (maybe the rowCount is currently wrong)
&& !this.areRowsLoaded(start + results.rows.length, count - results.rows.length, false) // Have we already loaded these rows?
) {
console.debug(`DataHub didn't provide ${count - results.rows.length} rows, requesting missing rows...`)
await this.loadRows(start + results.rows.length, count - results.rows.length);
}

this.pagesLoading--;
};

ngOnDestroy() {
this.subscriptions.unsubscribe();
}
Expand Down
27 changes: 21 additions & 6 deletions src/datahub-widget/query.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { Injectable, Injector } from '@angular/core';
import { FetchClient } from '@c8y/ngx-components/api';
import { IFetchOptions } from '@c8y/client';
import {IFetchResponse} from "@c8y/client/lib/src/core/IFetchResponse";
import {defer, interval, Observable} from "rxjs";
import {filter, first, repeatWhen} from "rxjs/operators";

export interface QueryConfig {
timeout: number,
Expand All @@ -21,6 +23,10 @@ export interface DatasetField {
}
}

export interface Job {
id: string
}

export interface JobResult<T> {
rowCount: Number,
schema: DatasetField[],
Expand Down Expand Up @@ -56,7 +62,7 @@ export class QueryService {
const fullConfig: QueryConfig = { timeout: Number.POSITIVE_INFINITY, offset: 0, limit: 100, ...config };

//post job to api
const job = await this.postQuery(JSON.stringify({ sql: queryString }));
const job = await this.postQuery(queryString);
const jobId = job.id.toString();

const jobStatusOrTimeout = await Promise.race([this.waitForJobToComplete(jobId), this.sleep(config.timeout)]);
Expand All @@ -73,7 +79,7 @@ export class QueryService {
const jobStatus = jobStatusOrTimeout as JobStatus

if (jobStatus.jobState === "COMPLETED") {
return await this.getJobResults<T>(jobId, fullConfig);
return await this.getJobResults<T>(jobId, fullConfig.offset, fullConfig.limit);
} else if (jobStatus.jobState === "CANCELLED") {
throw new Error(`DataHub Query Job Cancelled`);
} else if (jobStatus.errorMessage) {
Expand All @@ -92,6 +98,15 @@ export class QueryService {
return jobStatus;
}

waitForJobToComplete$(jobId: string): Observable<JobStatus> {
return defer(() => this.getJobStatus(jobId))
.pipe(
repeatWhen(() => interval(500)),
filter(jobStatus => ["COMPLETED", "CANCELLED", "FAILED"].includes(jobStatus.jobState)),
first()
)
}

async getJobStatus(jobId: string): Promise<JobStatus> {
const response = await this.fetchClient.fetch(`${this.dataHubDremioApi}/job/${jobId}`, this.fetchOptions);
if (response.status >= 200 && response.status < 300) {
Expand All @@ -101,17 +116,17 @@ export class QueryService {
}
}

async getJobResults<T = any>(jobId: string, config: QueryConfig): Promise<JobResult<T>> {
const response = await this.fetchClient.fetch(`${this.dataHubDremioApi}/job/${jobId}/results?offset=${config.offset}&limit=${config.limit}`, this.fetchOptions)
async getJobResults<T = any>(jobId: string, offset: number = 0, limit: number = 100): Promise<JobResult<T>> {
const response = await this.fetchClient.fetch(`${this.dataHubDremioApi}/job/${jobId}/results?offset=${offset}&limit=${limit}`, this.fetchOptions)
if (response.status >= 200 && response.status < 300) {
return response.json();
} else {
await this.throwErrorResponse(response);
}
}

async postQuery(query: String): Promise<any> {
const response = await this.fetchClient.fetch(this.dataHubDremioApi + '/sql', { ...this.fetchOptions, method: 'POST', body: query })
async postQuery(query: String): Promise<Job> {
const response = await this.fetchClient.fetch(this.dataHubDremioApi + '/sql', { ...this.fetchOptions, method: 'POST', body: JSON.stringify({ sql: query }) })
if (response.status >= 200 && response.status < 300) {
return response.json();
} else {
Expand Down

0 comments on commit 2a9e1cf

Please sign in to comment.