Skip to content

Commit

Permalink
apacheGH-23576: [JS] Support DictionaryMessage replacement (apache#41965
Browse files Browse the repository at this point in the history
)

This PR makes the stream reader/writer support replacement dictionary
messages.

Closes apache#23576
Closes apache#41683

* GitHub Issue: apache#23576
  • Loading branch information
trxcllnt authored Jun 11, 2024
1 parent 6597467 commit fd11b7a
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 46 deletions.
1 change: 1 addition & 0 deletions js/gulp/argv.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const argv = args([
{ name: `target`, type: String, defaultValue: `` },
{ name: `module`, type: String, defaultValue: `` },
{ name: `coverage`, type: Boolean, defaultValue: false },
{ name: `tests`, type: String, multiple: true, defaultValue: [`test/unit/`] },
{ name: `targets`, alias: `t`, type: String, multiple: true, defaultValue: [] },
{ name: `modules`, alias: `m`, type: String, multiple: true, defaultValue: [] },
], { partial: true });
Expand Down
17 changes: 3 additions & 14 deletions js/gulp/test-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,6 @@ import { createRequire } from 'node:module';
const require = createRequire(import.meta.url);

const jestArgv = [];
const testFiles = [
`test/unit/`,
// `test/unit/bit-tests.ts`,
// `test/unit/int-tests.ts`,
// `test/unit/bn-tests.ts`,
// `test/unit/math-tests.ts`,
// `test/unit/table-tests.ts`,
// `test/unit/generated-data-tests.ts`,
// `test/unit/builders/`,
// `test/unit/recordbatch/`,
// `test/unit/table/`,
// `test/unit/ipc/`,
];

if (argv.verbose) {
jestArgv.push(`--verbose`);
Expand Down Expand Up @@ -80,8 +67,10 @@ export const testTask = ((cache, execArgv, testOptions) => memoizeTask(cache, fu
args.push(`-c`, `jestconfigs/jest.coverage.config.js`);
} else {
const cfgname = [target, format].filter(Boolean).join('.');
args.push(`-c`, `jestconfigs/jest.${cfgname}.config.js`, ...testFiles);
args.push(`-c`, `jestconfigs/jest.${cfgname}.config.js`);
}
args.push(...(argv._unknown || []).filter((x) => x !== 'test'));
args.push(...argv.tests);
opts.env = {
...opts.env,
TEST_TARGET: target,
Expand Down
13 changes: 5 additions & 8 deletions js/src/ipc/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,11 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = any> implements RecordB
const { id, isDelta } = header;
const { dictionaries, schema } = this;
const dictionary = dictionaries.get(id);
if (isDelta || !dictionary) {
const type = schema.dictionaries.get(id)!;
const data = this._loadVectors(header.data, body, [type]);
return (dictionary && isDelta ? dictionary.concat(
new Vector(data)) :
new Vector(data)).memoize() as Vector;
}
return dictionary.memoize();
const type = schema.dictionaries.get(id)!;
const data = this._loadVectors(header.data, body, [type]);
return (dictionary && isDelta ? dictionary.concat(
new Vector(data)) :
new Vector(data)).memoize() as Vector;
}
protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) {
return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries, this.schema.metadataVersion).visitMany(types);
Expand Down
44 changes: 29 additions & 15 deletions js/src/ipc/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<
protected _schema: Schema | null = null;
protected _dictionaryBlocks: FileBlock[] = [];
protected _recordBatchBlocks: FileBlock[] = [];
protected _seenDictionaries = new Map<number, Vector>();
protected _dictionaryDeltaOffsets = new Map<number, number>();

public toString(sync: true): string;
Expand Down Expand Up @@ -144,6 +145,7 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<
this._started = false;
this._dictionaryBlocks = [];
this._recordBatchBlocks = [];
this._seenDictionaries = new Map();
this._dictionaryDeltaOffsets = new Map();

if (!schema || !(compareSchemas(schema, this._schema))) {
Expand Down Expand Up @@ -259,7 +261,6 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<
}

protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) {
this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0));
const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary]));
const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions);
const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta);
Expand All @@ -284,14 +285,21 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<
}

protected _writeDictionaries(batch: RecordBatch<T>) {
for (let [id, dictionary] of batch.dictionaries) {
let offset = this._dictionaryDeltaOffsets.get(id) || 0;
if (offset === 0 || (dictionary = dictionary?.slice(offset)).length > 0) {
for (const data of dictionary.data) {
this._writeDictionaryBatch(data, id, offset > 0);
offset += data.length;
}
for (const [id, dictionary] of batch.dictionaries) {
const chunks = dictionary?.data ?? [];
const prevDictionary = this._seenDictionaries.get(id);
const offset = this._dictionaryDeltaOffsets.get(id) ?? 0;
// * If no previous dictionary was written, write an initial DictionaryMessage.
// * If the current dictionary does not share chunks with the previous dictionary, write a replacement DictionaryMessage.
if (!prevDictionary || prevDictionary.data[0] !== chunks[0]) {
// * If `index > 0`, then `isDelta` is true.
// * If `index = 0`, then `isDelta` is false, because this is either the initial or a replacement DictionaryMessage.
for (const [index, chunk] of chunks.entries()) this._writeDictionaryBatch(chunk, id, index > 0);
} else if (offset < chunks.length) {
for (const chunk of chunks.slice(offset)) this._writeDictionaryBatch(chunk, id, true);
}
this._seenDictionaries.set(id, dictionary);
this._dictionaryDeltaOffsets.set(id, chunks.length);
}
return this;
}
Expand Down Expand Up @@ -342,6 +350,13 @@ export class RecordBatchFileWriter<T extends TypeMap = any> extends RecordBatchW
return this._writeMagic()._writePadding(2);
}

protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) {
if (!isDelta && this._seenDictionaries.has(id)) {
throw new Error('The Arrow File format does not support replacement dictionaries. ');
}
return super._writeDictionaryBatch(dictionary, id, isDelta);
}

protected _writeFooter(schema: Schema<T>) {
const buffer = Footer.encode(new Footer(
schema, MetadataVersion.V5,
Expand Down Expand Up @@ -369,13 +384,13 @@ export class RecordBatchJSONWriter<T extends TypeMap = any> extends RecordBatchW
}

private _recordBatches: RecordBatch[];
private _dictionaries: RecordBatch[];
private _recordBatchesWithDictionaries: RecordBatch[];

constructor() {
super();
this._autoDestroy = true;
this._recordBatches = [];
this._dictionaries = [];
this._recordBatchesWithDictionaries = [];
}

protected _writeMessage() { return this; }
Expand All @@ -386,12 +401,11 @@ export class RecordBatchJSONWriter<T extends TypeMap = any> extends RecordBatchW
}
protected _writeDictionaries(batch: RecordBatch<T>) {
if (batch.dictionaries.size > 0) {
this._dictionaries.push(batch);
this._recordBatchesWithDictionaries.push(batch);
}
return this;
}
protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) {
this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0));
this._write(this._dictionaryBlocks.length === 0 ? ` ` : `,\n `);
this._write(dictionaryBatchToJSON(dictionary, id, isDelta));
this._dictionaryBlocks.push(new FileBlock(0, 0, 0));
Expand All @@ -403,9 +417,9 @@ export class RecordBatchJSONWriter<T extends TypeMap = any> extends RecordBatchW
return this;
}
public close() {
if (this._dictionaries.length > 0) {
if (this._recordBatchesWithDictionaries.length > 0) {
this._write(`,\n "dictionaries": [\n`);
for (const batch of this._dictionaries) {
for (const batch of this._recordBatchesWithDictionaries) {
super._writeDictionaries(batch);
}
this._write(`\n ]`);
Expand All @@ -424,7 +438,7 @@ export class RecordBatchJSONWriter<T extends TypeMap = any> extends RecordBatchW
this._write(`\n}`);
}

this._dictionaries = [];
this._recordBatchesWithDictionaries = [];
this._recordBatches = [];

return super.close();
Expand Down
15 changes: 10 additions & 5 deletions js/test/unit/ipc/message-reader-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ for (const table of generateRandomTables([10, 20, 30])) {

const io = ArrowIOTestHelper.stream(table);
const name = `[\n ${table.schema.fields.join(',\n ')}\n]`;
const numMessages = table.batches.reduce((numMessages, batch) => {
return numMessages +
/* recordBatch message */ 1 +
/* dictionary messages */ batch.dictionaries.size;
}, /* schema message */ 1);

const numDictionaries = table.batches.reduce((dictionaries, batch) => {
return [...batch.dictionaries.values()]
.flatMap((dictionary) => dictionary.data)
.reduce((dictionaries, data) => dictionaries.add(data), dictionaries);
}, new Set()).size;

const numMessages = /* schema message */ 1 +
/* recordBatch messages */ table.batches.length +
/* dictionary messages */ numDictionaries;

const validate = validateMessageReader.bind(0, numMessages);
const validateAsync = validateAsyncMessageReader.bind(0, numMessages);
Expand Down
62 changes: 61 additions & 1 deletion js/test/unit/ipc/writer/file-writer-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@
import {
generateDictionaryTables, generateRandomTables
} from '../../../data/tables.js';
import * as generate from '../../../generate-test-data.js';
import { validateRecordBatchIterator } from '../validate.js';

import { RecordBatchFileWriter, RecordBatchReader, Table } from 'apache-arrow';
import {
builderThroughIterable,
Dictionary,
Int32,
RecordBatch,
RecordBatchFileWriter,
RecordBatchReader,
Table,
Uint32,
Vector
} from 'apache-arrow';

describe('RecordBatchFileWriter', () => {
for (const table of generateRandomTables([10, 20, 30])) {
Expand All @@ -29,6 +40,55 @@ describe('RecordBatchFileWriter', () => {
for (const table of generateDictionaryTables([10, 20, 30])) {
testFileWriter(table, `${table.schema.fields[0]}`);
}

it('should throw if attempting to write replacement dictionary batches', async () => {
const type = new Dictionary<Uint32, Int32>(new Uint32, new Int32, 0);
const writer = new RecordBatchFileWriter();
writer.write(new RecordBatch({
// Clone the data with the original Dictionary type so the cloned chunk has id 0
dictionary_encoded_uint32: generate.dictionary(50, 20, new Uint32, new Int32).vector.data[0].clone(type)
}));
expect(() => {
writer.write(new RecordBatch({
// Clone the data with the original Dictionary type so the cloned chunk has id 0
dictionary_encoded_uint32: generate.dictionary(50, 20, new Uint32, new Int32).vector.data[0].clone(type)
}));
}).toThrow();
});

it('should write delta dictionary batches', async () => {

const name = 'dictionary_encoded_uint32';
const resultChunks: Vector<Dictionary<Uint32, Int32>>[] = [];
const {
vector: sourceVector, values: sourceValues,
} = generate.dictionary(1000, 20, new Uint32(), new Int32());

const writer = RecordBatchFileWriter.writeAll((function* () {
const transform = builderThroughIterable({
type: sourceVector.type, nullValues: [null],
queueingStrategy: 'count', highWaterMark: 50,
});
for (const chunk of transform(sourceValues())) {
resultChunks.push(chunk);
yield new RecordBatch({ [name]: chunk.data[0] });
}
})());

expect(new Vector(resultChunks)).toEqualVector(sourceVector);

type T = { [name]: Dictionary<Uint32, Int32> };
const sourceTable = new Table({ [name]: sourceVector });
const resultTable = new Table(RecordBatchReader.from<T>(await writer.toUint8Array()));

const child = resultTable.getChild(name)!;
const dicts = child.data.map(({ dictionary }) => dictionary!);
const dictionary = dicts[child.data.length - 1];

expect(resultTable).toEqualTable(sourceTable);
expect(dictionary).toBeInstanceOf(Vector);
expect(dictionary.data).toHaveLength(20);
});
});

function testFileWriter(table: Table, name: string) {
Expand Down
43 changes: 40 additions & 3 deletions js/test/unit/ipc/writer/stream-writer-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { validateRecordBatchIterator } from '../validate.js';
import type { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer';
import {
builderThroughIterable,
Data,
Dictionary,
Field,
Int32,
Expand Down Expand Up @@ -81,10 +82,46 @@ describe('RecordBatchStreamWriter', () => {
await validate;
});

it('should write replacement dictionary batches', async () => {

const name = 'dictionary_encoded_uint32';
const type = new Dictionary<Uint32, Int32>(new Uint32, new Int32, 0);
const sourceChunks: Data<Dictionary<Uint32, Int32>>[] = [];
const resultChunks: Data<Dictionary<Uint32, Int32>>[] = [];

const writer = RecordBatchStreamWriter.writeAll((function* () {
for (let i = 0; i < 1000; i += 50) {
const { vector: { data: [chunk] } } = generate.dictionary(50, 20, type.dictionary, type.indices);
sourceChunks.push(chunk);
// Clone the data with the original Dictionary type so the cloned chunk has id 0
resultChunks.push(chunk.clone(type));
yield new RecordBatch({ [name]: resultChunks.at(-1)! });
}
})());

expect(new Vector(resultChunks)).toEqualVector(new Vector(sourceChunks));

type T = { [name]: Dictionary<Uint32, Int32> };
const sourceTable = new Table({ [name]: new Vector(sourceChunks) });
const resultTable = new Table(RecordBatchReader.from<T>(await writer.toUint8Array()));

// 1000 / 50 = 20
expect(sourceTable.batches).toHaveLength(20);
expect(resultTable.batches).toHaveLength(20);
expect(resultTable).toEqualTable(sourceTable);

for (const batch of resultTable.batches) {
for (const [_, dictionary] of batch.dictionaries) {
expect(dictionary).toBeInstanceOf(Vector);
expect(dictionary.data).toHaveLength(1);
}
}
});

it('should write delta dictionary batches', async () => {

const name = 'dictionary_encoded_uint32';
const chunks: Vector<Dictionary<Uint32, Int32>>[] = [];
const resultChunks: Vector<Dictionary<Uint32, Int32>>[] = [];
const {
vector: sourceVector, values: sourceValues,
} = generate.dictionary(1000, 20, new Uint32(), new Int32());
Expand All @@ -95,12 +132,12 @@ describe('RecordBatchStreamWriter', () => {
queueingStrategy: 'count', highWaterMark: 50,
});
for (const chunk of transform(sourceValues())) {
chunks.push(chunk);
resultChunks.push(chunk);
yield new RecordBatch({ [name]: chunk.data[0] });
}
})());

expect(new Vector(chunks)).toEqualVector(sourceVector);
expect(new Vector(resultChunks)).toEqualVector(sourceVector);

type T = { [name]: Dictionary<Uint32, Int32> };
const sourceTable = new Table({ [name]: sourceVector });
Expand Down

0 comments on commit fd11b7a

Please sign in to comment.