Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 194 additions & 55 deletions src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,19 @@ export class ArrowModel extends DataModel {
}

protected async initialize(): Promise<void> {
const [stats, chunk00] = await Promise.all([
fetchStats({ path: this._loadingParams.path, ...this._fileOptions }),
this.fetchChunk([0, 0]),
]);

const stats = await fetchStats({ path: this._loadingParams.path, ...this._fileOptions });
this._schema = stats.schema;
this._numCols = stats.num_cols;
this._numRows = stats.num_rows;
this._chunks = new PairMap();
this._chunks.set([0, 0], chunk00);
this._chunks = new ChunkMap({
rowChunkSize: this._loadingParams.rowChunkSize,
numRows: this._numRows,
colChunkSize: this._loadingParams.colChunkSize,
numCols: this._numCols,
});

const chunkIdx00 = this._chunks.getChunkIdx({ rowIdx: 0, colIdx: 0 });
await this.fetchThenStoreChunk(chunkIdx00);
}

get fileInfo(): Readonly<FileInfo> {
Expand Down Expand Up @@ -110,62 +113,71 @@ export class ArrowModel extends DataModel {
}

private dataBody(row: number, col: number): string {
const chunkIdx = this.chunkIdx(row, col);
const chunkIdx = this._chunks.getChunkIdx({ rowIdx: row, colIdx: col });

if (this._chunks.has(chunkIdx)) {
const chunk = this._chunks.get(chunkIdx)!;
if (chunk instanceof Promise) {
// Wait for Promise to complete and mark data as modified
if (chunk.type === "pending") {
// Wait for Promise to complete, and let it will mark data as modified.
// If it was created through a prefetch, it does emit a change so we add it.
if (chunk.reason === "prefetch") {
const promise = chunk.promise.then((_) => this.emitChangedChunk(chunkIdx));
this.storeChunkData(chunkIdx, ChunkMap.makePendingChunk({ promise, reason: "query" }));
}
return this._loadingParams.loadingRepr;
}

// We have data
const row_idx_in_chunk = row % this._loadingParams.rowChunkSize;
const col_idx_in_chunk = col % this._loadingParams.colChunkSize;
const val = chunk.getChildAt(col_idx_in_chunk)?.get(row_idx_in_chunk);
const rowIdxInChunk = row - chunk.startRow;
const colIdxInChunk = col - chunk.startCol;
const val = chunk.data.getChildAt(colIdxInChunk)?.get(rowIdxInChunk);
const out = val?.toString() || this._loadingParams.nullRepr;

// Prefetch next chunks only once we have data for the current chunk.
// We chain the Promise because this can be considered a low priority operation so we want
// to reduce load on the server
const [rowChunk, colChunk] = chunkIdx;
this.prefetchChunkIfNeeded([rowChunk + 1, colChunk]).then((_) => {
this.prefetchChunkIfNeeded([rowChunk, colChunk + 1]);
});
// Prefetch next chunks only once we have the current data to prioritize current view
this.prefetchAsNeededForChunk(chunkIdx);

return out;
}

// Fetch data, however we cannot await it due to the interface required by the DataGrid.
// Instead, we fire the request, and notify of change upon completion.
const promise = this.fetchChunk(chunkIdx).then((table) => {
this._chunks.set(chunkIdx, table);
this.emitChangedChunk(chunkIdx);
});
this._chunks.set(chunkIdx, promise);
const promise = this.fetchThenStoreChunk(chunkIdx).then((_) => this.emitChangedChunk(chunkIdx));
this.storeChunkData(chunkIdx, ChunkMap.makePendingChunk({ promise, reason: "query" }));

return this._loadingParams.loadingRepr;
}

private async fetchChunk(chunkIdx: [number, number]) {
const [rowChunk, colChunk] = chunkIdx;
return await fetchTable({
private async fetchThenStoreChunk(chunkIdx: ChunkMap.ChunkIdx): Promise<void> {
const { chunkRowIdx, chunkColIdx } = chunkIdx;

const table = await fetchTable({
path: this._loadingParams.path,
row_chunk_size: this._loadingParams.rowChunkSize,
row_chunk: rowChunk,
row_chunk: chunkRowIdx,
col_chunk_size: this._loadingParams.colChunkSize,
col_chunk: colChunk,
col_chunk: chunkColIdx,
...this._fileOptions,
});
const chunk: ChunkMap.Chunk = ChunkMap.makeChunk({
data: table,
startRow: chunkRowIdx * this._loadingParams.rowChunkSize,
startCol: chunkColIdx * this._loadingParams.colChunkSize,
});

this.storeChunkData(chunkIdx, chunk);
}

private emitChangedChunk(chunkIdx: [number, number]) {
const [rowChunk, colChunk] = chunkIdx;
private storeChunkData(chunkIdx: ChunkMap.ChunkIdx, data: ChunkMap.ChunkData) {
this._chunks.set(chunkIdx, data);
}

private emitChangedChunk(chunkIdx: ChunkMap.ChunkIdx) {
const { chunkRowIdx, chunkColIdx } = chunkIdx;

// We must ensure the range is within the bounds
const rowStart = rowChunk * this._loadingParams.rowChunkSize;
const rowStart = chunkRowIdx * this._loadingParams.rowChunkSize;
const rowEnd = Math.min(rowStart + this._loadingParams.rowChunkSize, this._numRows);
const colStart = colChunk * this._loadingParams.colChunkSize;
const colStart = chunkColIdx * this._loadingParams.colChunkSize;
const colEnd = Math.min(colStart + this._loadingParams.colChunkSize, this._numCols);

this.emitChanged({
Expand All @@ -178,28 +190,34 @@ export class ArrowModel extends DataModel {
});
}

private async prefetchChunkIfNeeded(chunkIdx: [number, number]) {
if (this._chunks.has(chunkIdx) || !this.chunkIsValid(chunkIdx)) {
return;
/**
* Prefetch next chunks if available.
*
* We chain the Promise because this can be considered a low priority operation so we want
* to reduce load on the server.
*/
private prefetchAsNeededForChunk(chunkIdx: ChunkMap.ChunkIdx) {
const { chunkRowIdx, chunkColIdx } = chunkIdx;

let promise = Promise.resolve();

const nextRowsChunkIdx: ChunkMap.ChunkIdx = { chunkRowIdx: chunkRowIdx + 1, chunkColIdx };
if (!this._chunks.has(nextRowsChunkIdx) && this._chunks.chunkIsValid(nextRowsChunkIdx)) {
promise = promise.then((_) => this.fetchThenStoreChunk(nextRowsChunkIdx));
this.storeChunkData(
nextRowsChunkIdx,
ChunkMap.makePendingChunk({ promise, reason: "prefetch" }),
);
}

const promise = this.fetchChunk(chunkIdx).then((table) => {
this._chunks.set(chunkIdx, table);
});
this._chunks.set(chunkIdx, promise);
}

private chunkIdx(row: number, col: number): [number, number] {
return [
Math.floor(row / this._loadingParams.rowChunkSize),
Math.floor(col / this._loadingParams.colChunkSize),
];
}

private chunkIsValid(chunkIdx: [number, number]): boolean {
const [rowChunk, colChunk] = chunkIdx;
const [max_rowChunk, max_colChunk] = this.chunkIdx(this._numRows - 1, this._numCols - 1);
return rowChunk >= 0 && rowChunk <= max_rowChunk && colChunk >= 0 && colChunk <= max_colChunk;
const nextColsChunkIdx: ChunkMap.ChunkIdx = { chunkRowIdx, chunkColIdx: chunkColIdx + 1 };
if (!this._chunks.has(nextColsChunkIdx) && this._chunks.chunkIsValid(nextColsChunkIdx)) {
promise = promise.then((_) => this.fetchThenStoreChunk(nextColsChunkIdx));
this.storeChunkData(
nextColsChunkIdx,
ChunkMap.makePendingChunk({ promise, reason: "prefetch" }),
);
}
}

private readonly _loadingParams: Required<ArrowModel.LoadingOptions>;
Expand All @@ -209,6 +227,127 @@ export class ArrowModel extends DataModel {
private _numRows: number = 0;
private _numCols: number = 0;
private _schema!: Arrow.Schema;
private _chunks: PairMap<number, number, Arrow.Table | Promise<void>> = new PairMap();
private _chunks!: ChunkMap;
private _ready: Promise<void>;
}

class ChunkMap {
constructor(parameters: ChunkMap.Parameters) {
this._parameters = parameters;
}

getChunkIdx(cellIdx: ChunkMap.CellIdx): ChunkMap.ChunkIdx {
return {
chunkRowIdx: Math.floor(cellIdx.rowIdx / this._parameters.rowChunkSize),
chunkColIdx: Math.floor(cellIdx.colIdx / this._parameters.colChunkSize),
};
}

chunkIsValid(chunkIdx: ChunkMap.ChunkIdx): boolean {
const { chunkRowIdx, chunkColIdx } = chunkIdx;
const { chunkRowIdx: maxChunkRowIdx, chunkColIdx: maxChunkColIdx } = this.getChunkIdx({
rowIdx: this._parameters.numRows - 1,
colIdx: this._parameters.numCols - 1,
});
return (
chunkRowIdx >= 0 &&
chunkRowIdx <= maxChunkRowIdx &&
chunkColIdx >= 0 &&
chunkColIdx <= maxChunkColIdx
);
}

set(chunkIdx: ChunkMap.ChunkIdx, value: ChunkMap.ChunkData): this {
this._map.set(ChunkMap._chunkIdxToKey(chunkIdx), value);
return this;
}

get(chunkIdx: ChunkMap.ChunkIdx): ChunkMap.ChunkData | undefined {
return this._map.get(ChunkMap._chunkIdxToKey(chunkIdx));
}

clear(): void {
this._map.clear();
}

delete(chunkIdx: ChunkMap.ChunkIdx): boolean {
return this._map.delete(ChunkMap._chunkIdxToKey(chunkIdx));
}

has(chunkIdx: ChunkMap.ChunkIdx): boolean {
return this._map.has(ChunkMap._chunkIdxToKey(chunkIdx));
}

get size(): number {
return this._map.size;
}

forEach(
callbackfn: (value: ChunkMap.ChunkData, key: ChunkMap.ChunkIdx, map: ChunkMap) => void,
// biome-ignore lint/suspicious/noExplicitAny: This is in the Map signature
thisArg?: any,
): void {
this._map.forEach((value, key) => {
callbackfn.call(thisArg, value, ChunkMap._keyToChunkIdx(key), this);
});
}

private static _chunkIdxToKey(chunkIdx: ChunkMap.ChunkIdx): [number, number] {
return [chunkIdx.chunkRowIdx, chunkIdx.chunkColIdx];
}

private static _keyToChunkIdx(key: [number, number]): ChunkMap.ChunkIdx {
return { chunkRowIdx: key[0], chunkColIdx: key[1] };
}

private _map = new PairMap<number, number, ChunkMap.ChunkData>();
private _parameters: Required<ChunkMap.Parameters>;
}

namespace ChunkMap {
export type Parameters = {
rowChunkSize: number;
numRows: number;
colChunkSize: number;
numCols: number;
};

export type ChunkIdx = {
chunkRowIdx: number;
chunkColIdx: number;
};

export type CellIdx = {
rowIdx: number;
colIdx: number;
};

export type Chunk = {
data: Arrow.Table;
startRow: number;
startCol: number;
readonly type: "chunk";
};

export function makeChunk(chunk: Omit<Chunk, "type">): Chunk {
return {
...chunk,
type: "chunk",
};
}

export type PendingChunk = {
promise: Promise<void>;
reason: "query" | "prefetch";
readonly type: "pending";
};

export function makePendingChunk(chunk: Omit<PendingChunk, "type">): PendingChunk {
return {
...chunk,
type: "pending",
};
}

export type ChunkData = Chunk | PendingChunk;
}
Loading