Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions arbalister/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ def get_file_options(self, file_format: ff.FileFormat) -> FileReadOptions:
class IpcParams:
"""Query parameter for IPC data."""

row_chunk_size: int | None = None
row_chunk: int | None = None
col_chunk_size: int | None = None
col_chunk: int | None = None
start_row: int | None = None
end_row: int | None = None
start_col: int | None = None
end_col: int | None = None


class IpcRouteHandler(BaseRouteHandler):
Expand All @@ -98,15 +98,14 @@ async def get(self, path: str) -> None:

df: dn.DataFrame = self.dataframe(path)

if params.row_chunk_size is not None and params.row_chunk is not None:
offset: int = params.row_chunk * params.row_chunk_size
df = df.limit(count=params.row_chunk_size, offset=offset)
if params.start_row is not None and params.end_row is not None:
offset: int = params.start_row
count: int = params.end_row - params.start_row
df = df.limit(count=count, offset=offset)

if params.col_chunk_size is not None and params.col_chunk is not None:
if params.start_col is not None and params.end_col is not None:
col_names = df.schema().names
start: int = params.col_chunk * params.col_chunk_size
end: int = start + params.col_chunk_size
df = df.select(*col_names[start:end])
df = df.select(*col_names[params.start_col : params.end_col])

table: pa.Table = df.to_arrow_table()

Expand Down
50 changes: 24 additions & 26 deletions arbalister/tests/test_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,32 +124,32 @@ def table_file(
# No limits
lambda table: arb.routes.IpcParams(),
# Limit only number of rows
lambda table: arb.routes.IpcParams(row_chunk=0, row_chunk_size=3),
lambda table: arb.routes.IpcParams(row_chunk=1, row_chunk_size=2),
lambda table: arb.routes.IpcParams(row_chunk=0, row_chunk_size=table.num_rows),
lambda table: arb.routes.IpcParams(row_chunk=1, row_chunk_size=table.num_rows // 2 + 1),
lambda table: arb.routes.IpcParams(start_row=0, end_row=3),
lambda table: arb.routes.IpcParams(start_row=2, end_row=4),
lambda table: arb.routes.IpcParams(start_row=0, end_row=table.num_rows),
lambda table: arb.routes.IpcParams(start_row=table.num_rows // 2, end_row=table.num_rows),
# Limit only number of cols
lambda table: arb.routes.IpcParams(col_chunk=0, col_chunk_size=3),
lambda table: arb.routes.IpcParams(col_chunk=1, col_chunk_size=2),
lambda table: arb.routes.IpcParams(col_chunk=0, col_chunk_size=table.num_columns),
lambda table: arb.routes.IpcParams(col_chunk=1, col_chunk_size=table.num_columns // 2 + 1),
lambda table: arb.routes.IpcParams(start_col=0, end_col=3),
lambda table: arb.routes.IpcParams(start_col=2, end_col=4),
lambda table: arb.routes.IpcParams(start_col=0, end_col=table.num_columns),
lambda table: arb.routes.IpcParams(start_col=table.num_columns // 2, end_col=table.num_columns),
# Limit both
lambda table: arb.routes.IpcParams(
row_chunk=0,
row_chunk_size=3,
col_chunk=1,
col_chunk_size=table.num_columns // 2 + 1,
start_row=0,
end_row=3,
start_col=table.num_columns // 2,
end_col=table.num_columns,
),
lambda table: arb.routes.IpcParams(
row_chunk=0,
row_chunk_size=table.num_rows,
col_chunk=1,
col_chunk_size=2,
start_row=0,
end_row=table.num_rows,
start_col=2,
end_col=4,
),
# Schema only
lambda table: arb.routes.IpcParams(
row_chunk=0,
row_chunk_size=0,
start_row=0,
end_row=0,
),
]
)
Expand Down Expand Up @@ -185,19 +185,17 @@ async def test_ipc_route_limit(
expected = full_table

# Row slicing
if (size := ipc_params.row_chunk_size) is not None and (cidx := ipc_params.row_chunk) is not None:
expected_num_rows = min((size * (cidx + 1)), expected.num_rows) - (size * cidx)
if (start_row := ipc_params.start_row) is not None and (end_row := ipc_params.end_row) is not None:
expected_num_rows = min(end_row, expected.num_rows) - start_row
assert payload.num_rows == expected_num_rows
expected = expected.slice(cidx * size, size)
expected = expected.slice(start_row, end_row - start_row)

# Col slicing
if (size := ipc_params.col_chunk_size) is not None and (cidx := ipc_params.col_chunk) is not None:
expected_num_cols = min((size * (cidx + 1)), len(expected.schema)) - (size * cidx)
if (start_col := ipc_params.start_col) is not None and (end_col := ipc_params.end_col) is not None:
expected_num_cols = min(end_col, len(expected.schema)) - start_col
assert len(payload.schema) == expected_num_cols
col_names = expected.schema.names
start = cidx * size
end = start + size
expected = expected.select(col_names[start:end])
expected = expected.select(col_names[start_col:end_col])

assert expected.cast(payload.schema) == payload

Expand Down
12 changes: 4 additions & 8 deletions src/__tests__/model.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,13 @@ async function fetchStatsMocked(_params: Req.StatsOptions): Promise<Req.StatsRes
async function fetchTableMocked(params: Req.TableOptions): Promise<Arrow.Table> {
let table: Arrow.Table = MOCK_TABLE;

if (params.row_chunk !== undefined && params.row_chunk_size !== undefined) {
const start = params.row_chunk * params.row_chunk_size;
const end = start + params.row_chunk_size;
table = table.slice(start, end);
if (params.start_row !== undefined && params.end_row !== undefined) {
table = table.slice(params.start_row, params.end_row);
}

if (params.col_chunk !== undefined && params.col_chunk_size !== undefined) {
if (params.start_col !== undefined && params.end_col !== undefined) {
const colNames = table.schema.fields.map((field) => field.name);
const start = params.col_chunk * params.col_chunk_size;
const end = start + params.col_chunk_size;
const selectedCols = colNames.slice(start, end);
const selectedCols = colNames.slice(params.start_col, params.end_col);
table = table.select(selectedCols);
}

Expand Down
74 changes: 60 additions & 14 deletions src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ import { fetchFileInfo, fetchStats, fetchTable } from "./requests";
import type { FileInfo, FileReadOptions } from "./file-options";

export namespace ArrowModel {
export interface PrefetchFactors {
rowPrefetchFactor?: number;
colPrefetchFactor?: number;
}

export interface LoadingOptions {
path: string;
rowChunkSize?: number;
colChunkSize?: number;
loadingRepr?: string;
nullRepr?: string;
prefetchFactors?: PrefetchFactors;
}
}

Expand All @@ -32,11 +38,16 @@ export class ArrowModel extends DataModel {
super();

this._loadingParams = {
rowChunkSize: 512,
rowChunkSize: 100,
colChunkSize: 24,
loadingRepr: "",
nullRepr: "",
...loadingOptions,
prefetchFactors: {
rowPrefetchFactor: 16,
colPrefetchFactor: 16,
...loadingOptions.prefetchFactors,
},
};
this._fileOptions = fileOptions;
this._fileInfo = fileInfo;
Expand Down Expand Up @@ -147,28 +158,51 @@ export class ArrowModel extends DataModel {
return this._loadingParams.loadingRepr;
}

private async fetchThenStoreChunk(chunkIdx: ChunkMap.ChunkIdx): Promise<void> {
private async fetchThenStoreChunk(
chunkIdx: ChunkMap.ChunkIdx,
factors: Required<ArrowModel.PrefetchFactors> = { rowPrefetchFactor: 1, colPrefetchFactor: 1 },
): Promise<void> {
const { chunkRowIdx, chunkColIdx } = chunkIdx;

const startRow = chunkRowIdx * this._loadingParams.rowChunkSize;
const endRow = Math.min(
startRow + this._loadingParams.rowChunkSize * factors.rowPrefetchFactor,
this._numRows,
);
const startCol = chunkColIdx * this._loadingParams.colChunkSize;
const endCol = Math.min(
startCol + this._loadingParams.colChunkSize * factors.colPrefetchFactor,
this._numCols,
);

const table = await fetchTable({
path: this._loadingParams.path,
row_chunk_size: this._loadingParams.rowChunkSize,
row_chunk: chunkRowIdx,
col_chunk_size: this._loadingParams.colChunkSize,
col_chunk: chunkColIdx,
start_row: startRow,
end_row: endRow,
start_col: startCol,
end_col: endCol,
...this._fileOptions,
});
const chunk: ChunkMap.Chunk = ChunkMap.makeChunk({
data: table,
startRow: chunkRowIdx * this._loadingParams.rowChunkSize,
startCol: chunkColIdx * this._loadingParams.colChunkSize,
startRow,
startCol,
});

this.storeChunkData(chunkIdx, chunk);
this.storeChunkData(chunkIdx, chunk, factors);
}

private storeChunkData(chunkIdx: ChunkMap.ChunkIdx, data: ChunkMap.ChunkData) {
this._chunks.set(chunkIdx, data);
private storeChunkData(
chunkIdx: ChunkMap.ChunkIdx,
data: ChunkMap.ChunkData,
factors: Required<ArrowModel.PrefetchFactors> = { rowPrefetchFactor: 1, colPrefetchFactor: 1 },
) {
const { chunkRowIdx, chunkColIdx } = chunkIdx;
for (let r = 0; r < factors.rowPrefetchFactor; r++) {
for (let c = 0; c < factors.colPrefetchFactor; c++) {
this._chunks.set({ chunkRowIdx: chunkRowIdx + r, chunkColIdx: chunkColIdx + c }, data);
}
}
}

private emitChangedChunk(chunkIdx: ChunkMap.ChunkIdx) {
Expand Down Expand Up @@ -203,24 +237,34 @@ export class ArrowModel extends DataModel {

const nextRowsChunkIdx: ChunkMap.ChunkIdx = { chunkRowIdx: chunkRowIdx + 1, chunkColIdx };
if (!this._chunks.has(nextRowsChunkIdx) && this._chunks.chunkIsValid(nextRowsChunkIdx)) {
promise = promise.then((_) => this.fetchThenStoreChunk(nextRowsChunkIdx));
const rowFactors = {
rowPrefetchFactor: this._loadingParams.prefetchFactors.rowPrefetchFactor,
colPrefetchFactor: 1,
};
promise = promise.then((_) => this.fetchThenStoreChunk(nextRowsChunkIdx, rowFactors));
this.storeChunkData(
nextRowsChunkIdx,
ChunkMap.makePendingChunk({ promise, reason: "prefetch" }),
rowFactors,
);
}

const nextColsChunkIdx: ChunkMap.ChunkIdx = { chunkRowIdx, chunkColIdx: chunkColIdx + 1 };
if (!this._chunks.has(nextColsChunkIdx) && this._chunks.chunkIsValid(nextColsChunkIdx)) {
promise = promise.then((_) => this.fetchThenStoreChunk(nextColsChunkIdx));
const colFactors = {
rowPrefetchFactor: 1,
colPrefetchFactor: this._loadingParams.prefetchFactors.colPrefetchFactor,
};
promise = promise.then((_) => this.fetchThenStoreChunk(nextColsChunkIdx, colFactors));
this.storeChunkData(
nextColsChunkIdx,
ChunkMap.makePendingChunk({ promise, reason: "prefetch" }),
colFactors,
);
}
}

private readonly _loadingParams: Required<ArrowModel.LoadingOptions>;
private readonly _loadingParams: DeepRequired<ArrowModel.LoadingOptions>;
private readonly _fileInfo: FileInfo;
private _fileOptions: FileReadOptions;

Expand Down Expand Up @@ -351,3 +395,5 @@ namespace ChunkMap {

export type ChunkData = Chunk | PendingChunk;
}

type DeepRequired<T> = T extends object ? { [K in keyof T]-?: DeepRequired<T[K]> } : T;
16 changes: 8 additions & 8 deletions src/requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ export async function fetchStats(

export interface TableOptions {
path: string;
row_chunk_size?: number;
row_chunk?: number;
col_chunk_size?: number;
col_chunk?: number;
start_row?: number;
end_row?: number;
start_col?: number;
end_col?: number;
}

/**
Expand All @@ -137,10 +137,10 @@ export async function fetchTable(
params: Readonly<TableOptions & FileReadOptions>,
): Promise<Arrow.Table> {
const queryKeys = [
"row_chunk_size",
"row_chunk",
"col_chunk_size",
"col_chunk",
"start_row",
"end_row",
"start_col",
"end_col",
"delimiter",
"table_name",
] as const;
Expand Down
Loading