Skip to content

Commit

Permalink
Merge pull request #22330 Support combiner lifting.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jul 21, 2022
2 parents 367173f + 12754bb commit 9697c13
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 45 deletions.
6 changes: 5 additions & 1 deletion sdks/typescript/src/apache_beam/coders/js_coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
StrUtf8Coder,
VarIntCoder,
} from "./standard_coders";
import { IterableCoder } from "./required_coders";
import * as runnerApi from "../proto/beam_runner_api";

export class BsonObjectCoder<T> implements Coder<T> {
Expand Down Expand Up @@ -94,6 +95,7 @@ export class GeneralObjectCoder<T> implements Coder<T> {
number: new NumberOrFloatCoder(),
object: new BsonObjectCoder(),
boolean: new BoolCoder(),
array: new IterableCoder(this),
};

// This is a map of type names to type markers. It maps a type name to its
Expand All @@ -103,6 +105,7 @@ export class GeneralObjectCoder<T> implements Coder<T> {
number: "N",
object: "O",
boolean: "B",
array: "A",
};

// This is a map of type markers to type names. It maps a type marker to its
Expand All @@ -112,14 +115,15 @@ export class GeneralObjectCoder<T> implements Coder<T> {
N: "number",
O: "object",
B: "boolean",
A: "array",
};

encode(element: T, writer: Writer, context: Context) {
if (element === null || element === undefined) {
// typeof is "object" but BSON can't handle it.
writer.string("Z");
} else {
const type = typeof element;
const type = Array.isArray(element) ? "array" : typeof element;
// TODO: Perf. Write a single byte (no need for the length prefix).
writer.string(this.typeMarkers[type]);
this.codersByType[type].encode(element, writer, context);
Expand Down
2 changes: 2 additions & 0 deletions sdks/typescript/src/apache_beam/internal/urns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export const SPLITTING_JS_DOFN_URN = "beam:dofn:splitting_dofn:v1";
export const JS_WINDOW_INTO_DOFN_URN = "beam:dofn:js_window_into:v1";
export const JS_ASSIGN_TIMESTAMPS_DOFN_URN =
"beam:dofn:js_assign_timestamps:v1";
export const SERIALIZED_JS_COMBINEFN_INFO =
"beam:dofn:serialized_js_combinefn_info:v1";

// Everything maps to the global window.
export const GLOBAL_WINDOW_MAPPING_FN_URN = "beam:window_mapping_fn:global:v1";
Expand Down
39 changes: 18 additions & 21 deletions sdks/typescript/src/apache_beam/runners/direct_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ export class DirectRunner extends Runner {
) {
yield "MergeStatus=" + windowing.mergeStatus;
}
if (windowing.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW) {
yield "OutputTime=" + windowing.outputTime;
}
}
}

Expand Down Expand Up @@ -194,24 +197,29 @@ class DirectGbkOperator implements operators.IOperator {
);
const windowingStrategy =
context.descriptor.windowingStrategies[inputPc.windowingStrategyId];
// TODO: (Cleanup) Check or implement triggers, etc.
if (
windowingStrategy.mergeStatus !== runnerApi.MergeStatus_Enum.NON_MERGING
) {
throw new Error("Non-merging WindowFn: " + windowingStrategy);
throw new Error("Unsupported non-merging WindowFn: " + windowingStrategy);
}
if (
windowingStrategy.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW
) {
throw new Error(
"Unsupported windowing output time: " + windowingStrategy
);
}
this.windowCoder = context.pipelineContext.getCoder(
windowingStrategy.windowCoderId
);
}

process(wvalue: WindowedValue<any>) {
// TODO: (Cleanup) Assert non-merging, EOW timestamp, etc.
for (const window of wvalue.windows) {
const wkey =
encodeToBase64(window, this.windowCoder) +
operators.encodeToBase64(window, this.windowCoder) +
" " +
encodeToBase64(wvalue.value.key, this.keyCoder);
operators.encodeToBase64(wvalue.value.key, this.keyCoder);
if (!this.groups.has(wkey)) {
this.groups.set(wkey, []);
}
Expand All @@ -226,14 +234,16 @@ class DirectGbkOperator implements operators.IOperator {

async finishBundle() {
for (const [wkey, values] of this.groups) {
// const [encodedWindow, encodedKey] = wkey.split(" ");
const parts = wkey.split(" ");
const encodedWindow = parts[0];
const encodedKey = parts[1];
const window = decodeFromBase64(encodedWindow, this.windowCoder);
const window = operators.decodeFromBase64(
encodedWindow,
this.windowCoder
);
const maybePromise = this.receiver.receive({
value: {
key: decodeFromBase64(encodedKey, this.keyCoder),
key: operators.decodeFromBase64(encodedKey, this.keyCoder),
value: values,
},
windows: [window],
Expand Down Expand Up @@ -507,19 +517,6 @@ class InMemoryStateProvider implements state.StateProvider {

/////

export function encodeToBase64<T>(element: T, coder: Coder<T>): string {
const writer = new protobufjs.Writer();
coder.encode(element, writer, CoderContext.wholeStream);
return Buffer.from(writer.finish()).toString("base64");
}

export function decodeFromBase64<T>(s: string, coder: Coder<T>): T {
return coder.decode(
new protobufjs.Reader(Buffer.from(s, "base64")),
CoderContext.wholeStream
);
}

function onlyElement<T>(arg: T[]): T {
if (arg.length > 1) {
Error("Expecting exactly one element.");
Expand Down
4 changes: 4 additions & 0 deletions sdks/typescript/src/apache_beam/transforms/combiners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/

import { CombineFn } from "./group_and_combine";
import { Coder } from "../coders/coders";
import { VarIntCoder } from "../coders/standard_coders";

// TODO(cleanup): These reductions only work on Arrays, not Iterables.

Expand All @@ -26,6 +28,7 @@ export const count: CombineFn<any, number, number> = {
mergeAccumulators: (accumulators: number[]) =>
accumulators.reduce((prev, current) => prev + current),
extractOutput: (acc) => acc,
accumulatorCoder: () => new VarIntCoder(),
};

export const sum: CombineFn<number, number, number> = {
Expand All @@ -34,6 +37,7 @@ export const sum: CombineFn<number, number, number> = {
mergeAccumulators: (accumulators: number[]) =>
accumulators.reduce((prev, current) => prev + current),
extractOutput: (acc: number) => acc,
accumulatorCoder: (inputCoder: Coder<number>) => inputCoder,
};

export const max: CombineFn<any, any, any> = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
import { flatten } from "./flatten";
import { PCollection } from "../pvalue";
import { PValue, P } from "../pvalue";
import { Coder } from "../coders/coders";
import * as internal from "./internal";
import { count } from "./combiners";

Expand All @@ -41,6 +42,7 @@ export interface CombineFn<I, A, O> {
addInput: (A, I) => A;
mergeAccumulators: (accumulators: Iterable<A>) => A;
extractOutput: (A) => O;
accumulatorCoder?(inputCoder: Coder<I>): Coder<A>;
}

// TODO: (Typescript) When typing this as ((a: I, b: I) => I), types are not inferred well.
Expand Down Expand Up @@ -263,9 +265,7 @@ function binaryCombineFn<I>(
createAccumulator: () => undefined,
addInput: (a, b) => (a === undefined ? b : combiner(a, b)),
mergeAccumulators: (accs) =>
[...accs]
.filter((a) => a !== null && a !== undefined)
.reduce(combiner, undefined),
[...accs].filter((a) => a !== null && a !== undefined).reduce(combiner),
extractOutput: (a) => a,
};
}
Expand Down
85 changes: 75 additions & 10 deletions sdks/typescript/src/apache_beam/transforms/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import { GeneralObjectCoder } from "../coders/js_coders";
import { RowCoder } from "../coders/row_coder";
import { KV } from "../values";
import { CombineFn } from "./group_and_combine";
import { serializeFn } from "../internal/serialize";
import { CombinePerKeyPrecombineOperator } from "../worker/operators";

/**
* `Impulse` is the basic *source* primitive `PTransformClass`. It receives a Beam
Expand Down Expand Up @@ -170,21 +172,84 @@ groupByKey.urn = "beam:transform:group_by_key:v1";
export function combinePerKey<K, InputT, AccT, OutputT>(
combineFn: CombineFn<InputT, AccT, OutputT>
): PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
function expandInternal(input: PCollection<KV<any, InputT>>) {
function expandInternal(
input: PCollection<KV<any, InputT>>,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
const pipelineComponents: runnerApi.Components =
pipeline.getProto().components!;
const inputProto = pipelineComponents.pcollections[input.getId()];

try {
// If this fails, we cannot lift, so we skip setting the liftable URN.
CombinePerKeyPrecombineOperator.checkSupportsWindowing(
pipelineComponents.windowingStrategies[inputProto.windowingStrategyId]
);

// Ensure the input is using the KV coder.
const inputCoderProto = pipelineComponents.coders[inputProto.coderId];
if (inputCoderProto.spec!.urn !== KVCoder.URN) {
return input
.apply(
withCoderInternal(
new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder())
)
)
.apply(combinePerKey(combineFn));
}

const inputValueCoder = pipeline.context.getCoder<InputT>(
inputCoderProto.componentCoderIds[1]
);

transformProto.spec = runnerApi.FunctionSpec.create({
urn: combinePerKey.urn,
payload: runnerApi.CombinePayload.toBinary({
combineFn: {
urn: urns.SERIALIZED_JS_COMBINEFN_INFO,
payload: serializeFn({ combineFn }),
},
accumulatorCoderId: pipeline.context.getCoderId(
combineFn.accumulatorCoder
? combineFn.accumulatorCoder(inputValueCoder)
: new GeneralObjectCoder()
),
}),
});
} catch (err) {
// Execute this as an unlifted combine.
}

return input //
.apply(groupByKey())
.map(
withName("applyCombine", (kv) => ({
key: kv.key,
value: combineFn.extractOutput(
kv.value.reduce(
combineFn.addInput.bind(combineFn),
combineFn.createAccumulator()
)
),
}))
withName("applyCombine", (kv) => {
// Artificially use multiple accumulators to emulate what would
// happen in a distributed combine for better testing.
const accumulators = [
combineFn.createAccumulator(),
combineFn.createAccumulator(),
combineFn.createAccumulator(),
];
let ix = 0;
for (const value of kv.value) {
accumulators[ix % 3] = combineFn.addInput(
accumulators[ix % 3],
value
);
}
return {
key: kv.key,
value: combineFn.extractOutput(
combineFn.mergeAccumulators(accumulators)
),
};
})
);
}

return withName(`combinePerKey(${extractName(combineFn)})`, expandInternal);
}

combinePerKey.urn = "beam:transform:combine_per_key:v1";
Loading

0 comments on commit 9697c13

Please sign in to comment.