Skip to content

Commit 40fcebe

Browse files
trxcllntTheNeuralBit
authored andcommitted
ARROW-5537: [JS] Support delta dictionaries in RecordBatchWriter and DictionaryBuilder
Adds support for building and writing delta dictionaries. Moves the `dictionary` Vector pointer to the Data class, similar to #4316. Forked from #4476 since this adds support for delta dictionaries to the DictionaryBuilder. Will rebase this PR after that's merged. All the work is in the last commit, here: b12d842 Author: ptaylor <paul.e.taylor@me.com> Closes #4502 from trxcllnt/js/delta-dictionaries and squashes the following commits: 6a70a25 <ptaylor> make dictionarybuilder and recordbatchwriter support delta dictionaries
1 parent 71deb04 commit 40fcebe

26 files changed

+300
-263
lines changed

js/bin/print-buffer-alignment.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ const { VectorLoader } = require(`../targets/apache-arrow/visitor/vectorloader`)
7373
})().catch((e) => { console.error(e); process.exit(1); });
7474

7575
function loadRecordBatch(schema, header, body) {
76-
return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers).visitMany(schema.fields));
76+
return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany(schema.fields));
7777
}
7878

7979
function loadDictionaryBatch(header, body, dictionaryType) {
80-
return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers).visitMany([dictionaryType]));
80+
return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany([dictionaryType]));
8181
}

js/src/builder.ts

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,7 @@ export abstract class Builder<T extends DataType = any, TNull = any> {
157157
* @nocollapse
158158
*/
159159
public static throughIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) {
160-
const build = throughIterable(options);
161-
if (!DataType.isDictionary(options.type)) {
162-
return build;
163-
}
164-
return function*(source: Iterable<T['TValue'] | TNull>) {
165-
const chunks = []; for (const chunk of build(source)) { chunks.push(chunk); } yield* chunks;
166-
};
160+
return throughIterable(options);
167161
}
168162

169163
/**
@@ -192,13 +186,7 @@ export abstract class Builder<T extends DataType = any, TNull = any> {
192186
* @nocollapse
193187
*/
194188
public static throughAsyncIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) {
195-
const build = throughAsyncIterable(options);
196-
if (!DataType.isDictionary(options.type)) {
197-
return build;
198-
}
199-
return async function* (source: Iterable<T['TValue'] | TNull> | AsyncIterable<T['TValue'] | TNull>) {
200-
const chunks = []; for await (const chunk of build(source)) { chunks.push(chunk); } yield* chunks;
201-
};
189+
return throughAsyncIterable(options);
202190
}
203191

204192
/**

js/src/builder/dictionary.ts

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,17 @@ export interface DictionaryBuilderOptions<T extends DataType = any, TNull = any>
2929
/** @ignore */
3030
export class DictionaryBuilder<T extends Dictionary, TNull = any> extends Builder<T, TNull> {
3131

32-
protected _codes = Object.create(null);
32+
protected _dictionaryOffset: number;
33+
protected _dictionary?: Vector<T['dictionary']>;
34+
protected _keysToIndices: { [key: string]: number };
3335
public readonly indices: IntBuilder<T['indices']>;
3436
public readonly dictionary: Builder<T['dictionary']>;
3537

3638
constructor({ 'type': type, 'nullValues': nulls, 'dictionaryHashFunction': hashFn }: DictionaryBuilderOptions<T, TNull>) {
37-
super({ type });
39+
super({ type: new Dictionary(type.dictionary, type.indices, type.id, type.isOrdered) as T });
3840
this._nulls = <any> null;
41+
this._dictionaryOffset = 0;
42+
this._keysToIndices = Object.create(null);
3943
this.indices = Builder.new({ 'type': this.type.indices, 'nullValues': nulls }) as IntBuilder<T['indices']>;
4044
this.dictionary = Builder.new({ 'type': this.type.dictionary, 'nullValues': null }) as Builder<T['dictionary']>;
4145
if (typeof hashFn === 'function') {
@@ -46,9 +50,9 @@ export class DictionaryBuilder<T extends Dictionary, TNull = any> extends Builde
4650
public get values() { return this.indices.values; }
4751
public get nullCount() { return this.indices.nullCount; }
4852
public get nullBitmap() { return this.indices.nullBitmap; }
49-
public get byteLength() { return this.indices.byteLength; }
50-
public get reservedLength() { return this.indices.reservedLength; }
51-
public get reservedByteLength() { return this.indices.reservedByteLength; }
53+
public get byteLength() { return this.indices.byteLength + this.dictionary.byteLength; }
54+
public get reservedLength() { return this.indices.reservedLength + this.dictionary.reservedLength; }
55+
public get reservedByteLength() { return this.indices.reservedByteLength + this.dictionary.reservedByteLength; }
5256
public isValid(value: T['TValue'] | TNull) { return this.indices.isValid(value); }
5357
public setValid(index: number, valid: boolean) {
5458
const indices = this.indices;
@@ -57,25 +61,35 @@ export class DictionaryBuilder<T extends Dictionary, TNull = any> extends Builde
5761
return valid;
5862
}
5963
public setValue(index: number, value: T['TValue']) {
60-
let keysToCodesMap = this._codes;
64+
let keysToIndices = this._keysToIndices;
6165
let key = this.valueToKey(value);
62-
let idx = keysToCodesMap[key];
66+
let idx = keysToIndices[key];
6367
if (idx === undefined) {
64-
keysToCodesMap[key] = idx = this.dictionary.append(value).length - 1;
68+
keysToIndices[key] = idx = this._dictionaryOffset + this.dictionary.append(value).length - 1;
6569
}
6670
return this.indices.setValue(index, idx);
6771
}
6872
public flush() {
69-
const chunk = this.indices.flush().clone(this.type);
73+
const type = this.type;
74+
const prev = this._dictionary;
75+
const curr = this.dictionary.toVector();
76+
const data = this.indices.flush().clone(type);
77+
data.dictionary = prev ? prev.concat(curr) : curr;
78+
this.finished || (this._dictionaryOffset += curr.length);
79+
this._dictionary = data.dictionary as Vector<T['dictionary']>;
7080
this.clear();
71-
return chunk;
81+
return data;
7282
}
7383
public finish() {
74-
this.type.dictionaryVector = Vector.new(this.dictionary.finish().flush());
84+
this.indices.finish();
85+
this.dictionary.finish();
86+
this._dictionaryOffset = 0;
87+
this._keysToIndices = Object.create(null);
7588
return super.finish();
7689
}
7790
public clear() {
7891
this.indices.clear();
92+
this.dictionary.clear();
7993
return super.clear();
8094
}
8195
public valueToKey(val: any): string | number {

js/src/column.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export class Column<T extends DataType = any>
4949

5050
if (typeof field === 'string') {
5151
const type = chunks[0].data.type;
52-
field = new Field(field, type, chunks.some(({ nullCount }) => nullCount > 0));
52+
field = new Field(field, type, true);
5353
} else if (!field.nullable && chunks.some(({ nullCount }) => nullCount > 0)) {
5454
field = field.clone({ nullable: true });
5555
}

js/src/data.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ export class Data<T extends DataType = DataType> {
6464
public readonly offset: number;
6565
public readonly stride: number;
6666
public readonly childData: Data[];
67+
68+
/**
69+
* The dictionary for this Vector, if any. Only used for Dictionary type.
70+
*/
71+
public dictionary?: Vector;
72+
6773
public readonly values: Buffers<T>[BufferType.DATA];
6874
// @ts-ignore
6975
public readonly typeIds: Buffers<T>[BufferType.TYPE];
@@ -98,8 +104,9 @@ export class Data<T extends DataType = DataType> {
98104
return nullCount;
99105
}
100106

101-
constructor(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[]) {
107+
constructor(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[], dictionary?: Vector) {
102108
this.type = type;
109+
this.dictionary = dictionary;
103110
this.offset = Math.floor(Math.max(offset || 0, 0));
104111
this.length = Math.floor(Math.max(length || 0, 0));
105112
this._nullCount = Math.floor(Math.max(nullCount || 0, -1));
@@ -123,7 +130,7 @@ export class Data<T extends DataType = DataType> {
123130
}
124131

125132
public clone<R extends DataType>(type: R, offset = this.offset, length = this.length, nullCount = this._nullCount, buffers: Buffers<R> = <any> this, childData: (Data | Vector)[] = this.childData) {
126-
return new Data(type, offset, length, nullCount, buffers, childData);
133+
return new Data(type, offset, length, nullCount, buffers, childData, this.dictionary);
127134
}
128135

129136
public slice(offset: number, length: number): Data<T> {
@@ -173,12 +180,12 @@ export class Data<T extends DataType = DataType> {
173180
// Convenience methods for creating Data instances for each of the Arrow Vector types
174181
//
175182
/** @nocollapse */
176-
public static new<T extends DataType>(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[]): Data<T> {
183+
public static new<T extends DataType>(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[], dictionary?: Vector): Data<T> {
177184
if (buffers instanceof Data) { buffers = buffers.buffers; } else if (!buffers) { buffers = [] as Partial<Buffers<T>>; }
178185
switch (type.typeId) {
179186
case Type.Null: return <unknown> Data.Null( <unknown> type as Null, offset, length, nullCount || 0, buffers[BufferType.VALIDITY]) as Data<T>;
180187
case Type.Int: return <unknown> Data.Int( <unknown> type as Int, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
181-
case Type.Dictionary: return <unknown> Data.Dictionary( <unknown> type as Dictionary, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
188+
case Type.Dictionary: return <unknown> Data.Dictionary( <unknown> type as Dictionary, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || [], dictionary!) as Data<T>;
182189
case Type.Float: return <unknown> Data.Float( <unknown> type as Float, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
183190
case Type.Bool: return <unknown> Data.Bool( <unknown> type as Bool, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
184191
case Type.Decimal: return <unknown> Data.Decimal( <unknown> type as Decimal, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
@@ -207,8 +214,8 @@ export class Data<T extends DataType = DataType> {
207214
return new Data(type, offset, length, nullCount, [undefined, toArrayBufferView(type.ArrayType, data), toUint8Array(nullBitmap)]);
208215
}
209216
/** @nocollapse */
210-
public static Dictionary<T extends Dictionary>(type: T, offset: number, length: number, nullCount: number, nullBitmap: NullBuffer, data: DataBuffer<T>) {
211-
return new Data(type, offset, length, nullCount, [undefined, toArrayBufferView<T['TArray']>(type.indices.ArrayType, data), toUint8Array(nullBitmap)]);
217+
public static Dictionary<T extends Dictionary>(type: T, offset: number, length: number, nullCount: number, nullBitmap: NullBuffer, data: DataBuffer<T>, dictionary: Vector<T['dictionary']>) {
218+
return new Data(type, offset, length, nullCount, [undefined, toArrayBufferView<T['TArray']>(type.indices.ArrayType, data), toUint8Array(nullBitmap)], [], dictionary);
212219
}
213220
/** @nocollapse */
214221
public static Float<T extends Float>(type: T, offset: number, length: number, nullCount: number, nullBitmap: NullBuffer, data: DataBuffer<T>) {

js/src/interfaces.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ export type BuilderType<T extends Type | DataType = any, TNull = any> =
141141

142142
/** @ignore */
143143
export type VectorCtor<T extends Type | DataType | VectorType> =
144-
T extends VectorType ? VectorCtorType<T> :
144+
T extends VectorType ? VectorCtorType<T> :
145145
T extends Type ? VectorCtorType<VectorType<T>> :
146146
T extends DataType ? VectorCtorType<VectorType<T['TType']>> :
147147
VectorCtorType<vecs.BaseVector>
@@ -157,7 +157,7 @@ export type BuilderCtor<T extends Type | DataType = any> =
157157
/** @ignore */
158158
export type DataTypeCtor<T extends Type | DataType | VectorType = any> =
159159
T extends DataType ? ConstructorType<T> :
160-
T extends VectorType ? ConstructorType<T['type']> :
160+
T extends VectorType ? ConstructorType<T['type']> :
161161
T extends Type ? ConstructorType<TypeToDataType<T>> :
162162
never
163163
;

js/src/io/node/builder.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class BuilderDuplex<T extends DataType = any, TNull = any> extends Duplex {
4747

4848
constructor(builder: Builder<T, TNull>, options: BuilderDuplexOptions<T, TNull>) {
4949

50-
const isDictionary = DataType.isDictionary(builder.type);
5150
const { queueingStrategy = 'count', autoDestroy = true } = options;
5251
const { highWaterMark = queueingStrategy !== 'bytes' ? 1000 : 2 ** 14 } = options;
5352

@@ -58,20 +57,6 @@ class BuilderDuplex<T extends DataType = any, TNull = any> extends Duplex {
5857
this._builder = builder;
5958
this._desiredSize = highWaterMark;
6059
this._getSize = queueingStrategy !== 'bytes' ? builderLength : builderByteLength;
61-
62-
if (isDictionary) {
63-
let chunks: any[] = [];
64-
this.push = (chunk: any, _?: string) => {
65-
if (chunk !== null) {
66-
chunks.push(chunk);
67-
return true;
68-
}
69-
const chunks_ = chunks;
70-
chunks = [];
71-
chunks_.forEach((x) => super.push(x));
72-
return super.push(null) && false;
73-
};
74-
}
7560
}
7661
_read(size: number) {
7762
this._maybeFlush(this._builder, this._desiredSize = size);

js/src/io/whatwg/builder.ts

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,6 @@ export class BuilderTransform<T extends DataType = any, TNull = any> {
8282
'highWaterMark': writableHighWaterMark,
8383
'size': (value: T['TValue'] | TNull) => this._writeValueAndReturnChunkSize(value),
8484
});
85-
86-
if (DataType.isDictionary(builderOptions.type)) {
87-
let chunks: any[] = [];
88-
this._enqueue = (controller: ReadableStreamDefaultController<V<T>>, chunk: V<T> | null) => {
89-
this._bufferedSize = 0;
90-
if (chunk !== null) {
91-
chunks.push(chunk);
92-
} else {
93-
const chunks_ = chunks;
94-
chunks = [];
95-
chunks_.forEach((x) => controller.enqueue(x));
96-
controller.close();
97-
this._controller = null;
98-
}
99-
};
100-
}
10185
}
10286

10387
private _writeValueAndReturnChunkSize(value: T['TValue'] | TNull) {

js/src/ipc/metadata/json.ts

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import { DictionaryBatch, RecordBatch, FieldNode, BufferRegion } from './message
2727
import { TimeUnit, Precision, IntervalUnit, UnionMode, DateUnit } from '../../enum';
2828

2929
/** @ignore */
30-
export function schemaFromJSON(_schema: any, dictionaries: Map<number, DataType> = new Map(), dictionaryFields: Map<number, Field<Dictionary>[]> = new Map()) {
30+
export function schemaFromJSON(_schema: any, dictionaries: Map<number, DataType> = new Map()) {
3131
return new Schema(
32-
schemaFieldsFromJSON(_schema, dictionaries, dictionaryFields),
32+
schemaFieldsFromJSON(_schema, dictionaries),
3333
customMetadataFromJSON(_schema['customMetadata']),
34-
dictionaries, dictionaryFields
34+
dictionaries
3535
);
3636
}
3737

@@ -53,13 +53,13 @@ export function dictionaryBatchFromJSON(b: any) {
5353
}
5454

5555
/** @ignore */
56-
function schemaFieldsFromJSON(_schema: any, dictionaries?: Map<number, DataType>, dictionaryFields?: Map<number, Field<Dictionary>[]>) {
57-
return (_schema['fields'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries, dictionaryFields));
56+
function schemaFieldsFromJSON(_schema: any, dictionaries?: Map<number, DataType>) {
57+
return (_schema['fields'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries));
5858
}
5959

6060
/** @ignore */
61-
function fieldChildrenFromJSON(_field: any, dictionaries?: Map<number, DataType>, dictionaryFields?: Map<number, Field<Dictionary>[]>): Field[] {
62-
return (_field['children'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries, dictionaryFields));
61+
function fieldChildrenFromJSON(_field: any, dictionaries?: Map<number, DataType>): Field[] {
62+
return (_field['children'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries));
6363
}
6464

6565
/** @ignore */
@@ -93,19 +93,18 @@ function nullCountFromJSON(validity: number[]) {
9393
}
9494

9595
/** @ignore */
96-
export function fieldFromJSON(_field: any, dictionaries?: Map<number, DataType>, dictionaryFields?: Map<number, Field<Dictionary>[]>) {
96+
export function fieldFromJSON(_field: any, dictionaries?: Map<number, DataType>) {
9797

9898
let id: number;
9999
let keys: TKeys | null;
100100
let field: Field | void;
101101
let dictMeta: any;
102102
let type: DataType<any>;
103103
let dictType: Dictionary;
104-
let dictField: Field<Dictionary>;
105104

106105
// If no dictionary encoding
107-
if (!dictionaries || !dictionaryFields || !(dictMeta = _field['dictionary'])) {
108-
type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries, dictionaryFields));
106+
if (!dictionaries || !(dictMeta = _field['dictionary'])) {
107+
type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries));
109108
field = new Field(_field['name'], type, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
110109
}
111110
// tslint:disable
@@ -115,19 +114,17 @@ export function fieldFromJSON(_field: any, dictionaries?: Map<number, DataType>,
115114
else if (!dictionaries.has(id = dictMeta['id'])) {
116115
// a dictionary index defaults to signed 32 bit int if unspecified
117116
keys = (keys = dictMeta['indexType']) ? indexTypeFromJSON(keys) as TKeys : new Int32();
118-
dictionaries.set(id, type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries, dictionaryFields)));
117+
dictionaries.set(id, type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries)));
119118
dictType = new Dictionary(type, keys, id, dictMeta['isOrdered']);
120-
dictField = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
121-
dictionaryFields.set(id, [field = dictField]);
119+
field = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
122120
}
123121
// If dictionary encoded, and have already seen this dictionary Id in the schema, then reuse the
124122
// data type and wrap in a new Dictionary type and field.
125123
else {
126124
// a dictionary index defaults to signed 32 bit int if unspecified
127125
keys = (keys = dictMeta['indexType']) ? indexTypeFromJSON(keys) as TKeys : new Int32();
128126
dictType = new Dictionary(dictionaries.get(id)!, keys, id, dictMeta['isOrdered']);
129-
dictField = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
130-
dictionaryFields.get(id)!.push(field = dictField);
127+
field = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
131128
}
132129
return field || null;
133130
}

0 commit comments

Comments
 (0)