diff --git a/arbalister/routes.py b/arbalister/routes.py index 8b0d2ea..009df18 100644 --- a/arbalister/routes.py +++ b/arbalister/routes.py @@ -80,10 +80,10 @@ def get_file_options(self, file_format: ff.FileFormat) -> FileReadOptions: class IpcParams: """Query parameter for IPC data.""" - row_chunk_size: int | None = None - row_chunk: int | None = None - col_chunk_size: int | None = None - col_chunk: int | None = None + start_row: int | None = None + end_row: int | None = None + start_col: int | None = None + end_col: int | None = None class IpcRouteHandler(BaseRouteHandler): @@ -98,15 +98,14 @@ async def get(self, path: str) -> None: df: dn.DataFrame = self.dataframe(path) - if params.row_chunk_size is not None and params.row_chunk is not None: - offset: int = params.row_chunk * params.row_chunk_size - df = df.limit(count=params.row_chunk_size, offset=offset) + if params.start_row is not None and params.end_row is not None: + offset: int = params.start_row + count: int = params.end_row - params.start_row + df = df.limit(count=count, offset=offset) - if params.col_chunk_size is not None and params.col_chunk is not None: + if params.start_col is not None and params.end_col is not None: col_names = df.schema().names - start: int = params.col_chunk * params.col_chunk_size - end: int = start + params.col_chunk_size - df = df.select(*col_names[start:end]) + df = df.select(*col_names[params.start_col : params.end_col]) table: pa.Table = df.to_arrow_table() diff --git a/arbalister/tests/test_routes.py b/arbalister/tests/test_routes.py index a40e277..756e0b8 100644 --- a/arbalister/tests/test_routes.py +++ b/arbalister/tests/test_routes.py @@ -124,32 +124,32 @@ def table_file( # No limits lambda table: arb.routes.IpcParams(), # Limit only number of rows - lambda table: arb.routes.IpcParams(row_chunk=0, row_chunk_size=3), - lambda table: arb.routes.IpcParams(row_chunk=1, row_chunk_size=2), - lambda table: arb.routes.IpcParams(row_chunk=0, row_chunk_size=table.num_rows), - lambda table: arb.routes.IpcParams(row_chunk=1, row_chunk_size=table.num_rows // 2 + 1), + lambda table: arb.routes.IpcParams(start_row=0, end_row=3), + lambda table: arb.routes.IpcParams(start_row=2, end_row=4), + lambda table: arb.routes.IpcParams(start_row=0, end_row=table.num_rows), + lambda table: arb.routes.IpcParams(start_row=table.num_rows // 2, end_row=table.num_rows), # Limit only number of cols - lambda table: arb.routes.IpcParams(col_chunk=0, col_chunk_size=3), - lambda table: arb.routes.IpcParams(col_chunk=1, col_chunk_size=2), - lambda table: arb.routes.IpcParams(col_chunk=0, col_chunk_size=table.num_columns), - lambda table: arb.routes.IpcParams(col_chunk=1, col_chunk_size=table.num_columns // 2 + 1), + lambda table: arb.routes.IpcParams(start_col=0, end_col=3), + lambda table: arb.routes.IpcParams(start_col=2, end_col=4), + lambda table: arb.routes.IpcParams(start_col=0, end_col=table.num_columns), + lambda table: arb.routes.IpcParams(start_col=table.num_columns // 2, end_col=table.num_columns), # Limit both lambda table: arb.routes.IpcParams( - row_chunk=0, - row_chunk_size=3, - col_chunk=1, - col_chunk_size=table.num_columns // 2 + 1, + start_row=0, + end_row=3, + start_col=table.num_columns // 2, + end_col=table.num_columns, ), lambda table: arb.routes.IpcParams( - row_chunk=0, - row_chunk_size=table.num_rows, - col_chunk=1, - col_chunk_size=2, + start_row=0, + end_row=table.num_rows, + start_col=2, + end_col=4, ), # Schema only lambda table: arb.routes.IpcParams( - row_chunk=0, - row_chunk_size=0, + start_row=0, + end_row=0, ), ] ) @@ -185,19 +185,17 @@ async def test_ipc_route_limit( expected = full_table # Row slicing - if (size := ipc_params.row_chunk_size) is not None and (cidx := ipc_params.row_chunk) is not None: - expected_num_rows = min((size * (cidx + 1)), expected.num_rows) - (size * cidx) + if (start_row := ipc_params.start_row) is not None and (end_row := ipc_params.end_row) is not None: + expected_num_rows = min(end_row, expected.num_rows) - start_row assert payload.num_rows == expected_num_rows - expected = expected.slice(cidx * size, size) + expected = expected.slice(start_row, end_row - start_row) # Col slicing - if (size := ipc_params.col_chunk_size) is not None and (cidx := ipc_params.col_chunk) is not None: - expected_num_cols = min((size * (cidx + 1)), len(expected.schema)) - (size * cidx) + if (start_col := ipc_params.start_col) is not None and (end_col := ipc_params.end_col) is not None: + expected_num_cols = min(end_col, len(expected.schema)) - start_col assert len(payload.schema) == expected_num_cols col_names = expected.schema.names - start = cidx * size - end = start + size - expected = expected.select(col_names[start:end]) + expected = expected.select(col_names[start_col:end_col]) assert expected.cast(payload.schema) == payload diff --git a/src/__tests__/model.spec.ts b/src/__tests__/model.spec.ts index 4b84eb2..5d773ba 100644 --- a/src/__tests__/model.spec.ts +++ b/src/__tests__/model.spec.ts @@ -27,17 +27,13 @@ async function fetchStatsMocked(_params: Req.StatsOptions): Promise { let table: Arrow.Table = MOCK_TABLE; - if (params.row_chunk !== undefined && params.row_chunk_size !== undefined) { - const start = params.row_chunk * params.row_chunk_size; - const end = start + params.row_chunk_size; - table = table.slice(start, end); + if (params.start_row !== undefined && params.end_row !== undefined) { + table = table.slice(params.start_row, params.end_row); } - if (params.col_chunk !== undefined && params.col_chunk_size !== undefined) { + if (params.start_col !== undefined && params.end_col !== undefined) { const colNames = table.schema.fields.map((field) => field.name); - const start = params.col_chunk * params.col_chunk_size; - const end = start + params.col_chunk_size; - const selectedCols = colNames.slice(start, end); + const selectedCols = colNames.slice(params.start_col, params.end_col); table = table.select(selectedCols); } diff --git a/src/model.ts b/src/model.ts index 32d6a01..adae484 100644 --- a/src/model.ts +++ b/src/model.ts @@ -7,12 +7,18 @@ import { fetchFileInfo, fetchStats, fetchTable } from "./requests"; import type { FileInfo, FileReadOptions } from "./file-options"; export namespace ArrowModel { + export interface PrefetchFactors { + rowPrefetchFactor?: number; + colPrefetchFactor?: number; + } + export interface LoadingOptions { path: string; rowChunkSize?: number; colChunkSize?: number; loadingRepr?: string; nullRepr?: string; + prefetchFactors?: PrefetchFactors; } } @@ -32,11 +38,16 @@ export class ArrowModel extends DataModel { super(); this._loadingParams = { - rowChunkSize: 512, + rowChunkSize: 100, colChunkSize: 24, loadingRepr: "", nullRepr: "", ...loadingOptions, + prefetchFactors: { + rowPrefetchFactor: 16, + colPrefetchFactor: 16, + ...loadingOptions.prefetchFactors, + }, }; this._fileOptions = fileOptions; this._fileInfo = fileInfo; @@ -147,28 +158,51 @@ export class ArrowModel extends DataModel { return this._loadingParams.loadingRepr; } - private async fetchThenStoreChunk(chunkIdx: ChunkMap.ChunkIdx): Promise { + private async fetchThenStoreChunk( + chunkIdx: ChunkMap.ChunkIdx, + factors: Required = { rowPrefetchFactor: 1, colPrefetchFactor: 1 }, + ): Promise { const { chunkRowIdx, chunkColIdx } = chunkIdx; + const startRow = chunkRowIdx * this._loadingParams.rowChunkSize; + const endRow = Math.min( + startRow + this._loadingParams.rowChunkSize * factors.rowPrefetchFactor, + this._numRows, + ); + const startCol = chunkColIdx * this._loadingParams.colChunkSize; + const endCol = Math.min( + startCol + this._loadingParams.colChunkSize * factors.colPrefetchFactor, + this._numCols, + ); + const table = await fetchTable({ path: this._loadingParams.path, - row_chunk_size: this._loadingParams.rowChunkSize, - row_chunk: chunkRowIdx, - col_chunk_size: this._loadingParams.colChunkSize, - col_chunk: chunkColIdx, + start_row: startRow, + end_row: endRow, + start_col: startCol, + end_col: endCol, ...this._fileOptions, }); const chunk: ChunkMap.Chunk = ChunkMap.makeChunk({ data: table, - startRow: chunkRowIdx * this._loadingParams.rowChunkSize, - startCol: chunkColIdx * this._loadingParams.colChunkSize, + startRow, + startCol, }); - this.storeChunkData(chunkIdx, chunk); + this.storeChunkData(chunkIdx, chunk, factors); } - private storeChunkData(chunkIdx: ChunkMap.ChunkIdx, data: ChunkMap.ChunkData) { - this._chunks.set(chunkIdx, data); + private storeChunkData( + chunkIdx: ChunkMap.ChunkIdx, + data: ChunkMap.ChunkData, + factors: Required = { rowPrefetchFactor: 1, colPrefetchFactor: 1 }, + ) { + const { chunkRowIdx, chunkColIdx } = chunkIdx; + for (let r = 0; r < factors.rowPrefetchFactor; r++) { + for (let c = 0; c < factors.colPrefetchFactor; c++) { + this._chunks.set({ chunkRowIdx: chunkRowIdx + r, chunkColIdx: chunkColIdx + c }, data); + } + } } private emitChangedChunk(chunkIdx: ChunkMap.ChunkIdx) { @@ -203,24 +237,34 @@ export class ArrowModel extends DataModel { const nextRowsChunkIdx: ChunkMap.ChunkIdx = { chunkRowIdx: chunkRowIdx + 1, chunkColIdx }; if (!this._chunks.has(nextRowsChunkIdx) && this._chunks.chunkIsValid(nextRowsChunkIdx)) { - promise = promise.then((_) => this.fetchThenStoreChunk(nextRowsChunkIdx)); + const rowFactors = { + rowPrefetchFactor: this._loadingParams.prefetchFactors.rowPrefetchFactor, + colPrefetchFactor: 1, + }; + promise = promise.then((_) => this.fetchThenStoreChunk(nextRowsChunkIdx, rowFactors)); this.storeChunkData( nextRowsChunkIdx, ChunkMap.makePendingChunk({ promise, reason: "prefetch" }), + rowFactors, ); } const nextColsChunkIdx: ChunkMap.ChunkIdx = { chunkRowIdx, chunkColIdx: chunkColIdx + 1 }; if (!this._chunks.has(nextColsChunkIdx) && this._chunks.chunkIsValid(nextColsChunkIdx)) { - promise = promise.then((_) => this.fetchThenStoreChunk(nextColsChunkIdx)); + const colFactors = { + rowPrefetchFactor: 1, + colPrefetchFactor: this._loadingParams.prefetchFactors.colPrefetchFactor, + }; + promise = promise.then((_) => this.fetchThenStoreChunk(nextColsChunkIdx, colFactors)); this.storeChunkData( nextColsChunkIdx, ChunkMap.makePendingChunk({ promise, reason: "prefetch" }), + colFactors, ); } } - private readonly _loadingParams: Required; + private readonly _loadingParams: DeepRequired; private readonly _fileInfo: FileInfo; private _fileOptions: FileReadOptions; @@ -351,3 +395,5 @@ namespace ChunkMap { export type ChunkData = Chunk | PendingChunk; } + +type DeepRequired = T extends object ? { [K in keyof T]-?: DeepRequired } : T; diff --git a/src/requests.ts b/src/requests.ts index aefb106..d8e1061 100644 --- a/src/requests.ts +++ b/src/requests.ts @@ -122,10 +122,10 @@ export async function fetchStats( export interface TableOptions { path: string; - row_chunk_size?: number; - row_chunk?: number; - col_chunk_size?: number; - col_chunk?: number; + start_row?: number; + end_row?: number; + start_col?: number; + end_col?: number; } /** @@ -137,10 +137,10 @@ export async function fetchTable( params: Readonly, ): Promise { const queryKeys = [ - "row_chunk_size", - "row_chunk", - "col_chunk_size", - "col_chunk", + "start_row", + "end_row", + "start_col", + "end_col", "delimiter", "table_name", ] as const;