Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleaned up types in standard_coders.ts #22316

Merged
merged 2 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdks/typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ Install node.js, and then from within `sdks/typescript`.
npm install
```

To install without rewriting **package-lock.json**, run:
pcoet marked this conversation as resolved.
Show resolved Hide resolved

```
npm ci
```

### Running tests

```
Expand Down
85 changes: 39 additions & 46 deletions sdks/typescript/src/apache_beam/coders/standard_coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,44 @@ import {
} from "./coders";
import { BytesCoder, InstantCoder } from "./required_coders";
import Long from "long";
import {
Window,
Instant,
IntervalWindow,
KV,
PaneInfo,
Timing,
WindowedValue,
} from "../values";
import { IntervalWindow } from "../values";

// Historical
export * from "./required_coders";

/**
* @fileoverview Defines all of the Apache Beam standard coders.
*
* Beyond required coders, standard coders provide a efficient ways of encode
* Beyond required coders, standard coders provide an efficient way to encode
* data for communication between the runner and various Beam workers for
* types that commonly cross process boundaries. Though none of these coders
* are strictly necessary, if encodings are given for these types it is highly
* is strictly necessary, if encodings are given for these types it is highly
* advised to use these definitions that are interoperable with runners and
* other SDKs.
*
* For schema-aware transforms RowCoder, which is a coder for rows of data
* with a predetermined schema, is also advised.
* For the schema-aware transform RowCoder, which is a coder for rows of data
* with a predetermined schema, it is also advised.
*
* The formal specifications for these coders can be found in
* model/pipeline/src/main/proto/beam_runner_api.proto
*/

export class StrUtf8Coder implements Coder<String> {
static URN: string = "beam:coder:string_utf8:v1";
type: string = "stringutf8coder";
export class StrUtf8Coder implements Coder<string> {
static URN = "beam:coder:string_utf8:v1";
type = "stringutf8coder";
encoder = new TextEncoder();
decoder = new TextDecoder();

encode(element: String, writer: Writer, context: Context) {
const encodedElement = this.encoder.encode(element as string);
encode(element: string, writer: Writer, context: Context) {
const encodedElement = this.encoder.encode(element);
BytesCoder.INSTANCE.encode(encodedElement, writer, context);
}

decode(reader: Reader, context: Context): String {
decode(reader: Reader, context: Context): string {
return this.decoder.decode(BytesCoder.INSTANCE.decode(reader, context));
}

toProto(pipelineContext: ProtoContext): runnerApi.Coder {
toProto(): runnerApi.Coder {
damccorm marked this conversation as resolved.
Show resolved Hide resolved
return {
spec: {
urn: StrUtf8Coder.URN,
Expand All @@ -86,22 +78,20 @@ export class StrUtf8Coder implements Coder<String> {
globalRegistry().register(StrUtf8Coder.URN, StrUtf8Coder);

export class VarIntCoder implements Coder<number> {
static URN: string = "beam:coder:varint:v1";
static URN = "beam:coder:varint:v1";
static INSTANCE = new VarIntCoder();

type: string = "varintcoder";
type = "varintcoder";

encode(element: Number | Long | BigInt, writer: Writer, context: Context) {
var numEl = element as number;
writer.int32(numEl);
return;
encode(element: number, writer: Writer) {
writer.int32(element);
}

decode(reader: Reader, context: Context): number {
decode(reader: Reader): number {
return reader.int32();
}

toProto(pipelineContext: ProtoContext): runnerApi.Coder {
toProto(): runnerApi.Coder {
return {
spec: {
urn: VarIntCoder.URN,
Expand All @@ -114,22 +104,22 @@ export class VarIntCoder implements Coder<number> {
globalRegistry().register(VarIntCoder.URN, VarIntCoder);

export class DoubleCoder implements Coder<number> {
static URN: string = "beam:coder:double:v1";
static URN = "beam:coder:double:v1";

encode(element: number, writer: Writer, context: Context) {
encode(element: number, writer: Writer) {
const farr = new Float64Array([element]);
const barr = new Uint8Array(farr.buffer).reverse();
writeRawBytes(barr, writer);
}

decode(reader: Reader, context: Context): number {
decode(reader: Reader): number {
const barr = new Uint8Array(reader.buf);
const dView = new DataView(barr.buffer.slice(reader.pos, reader.pos + 8));
reader.double();
return dView.getFloat64(0, false);
}

toProto(pipelineContext: ProtoContext): runnerApi.Coder {
toProto(): runnerApi.Coder {
return {
spec: {
urn: DoubleCoder.URN,
Expand All @@ -141,19 +131,19 @@ export class DoubleCoder implements Coder<number> {
}
globalRegistry().register(DoubleCoder.URN, DoubleCoder);

export class BoolCoder implements Coder<Boolean> {
static URN: string = "beam:coder:bool:v1";
type: string = "boolcoder";
export class BoolCoder implements Coder<boolean> {
static URN = "beam:coder:bool:v1";
type = "boolcoder";

encode(element: Boolean, writer: Writer, context: Context) {
writer.bool(element as boolean);
encode(element: boolean, writer: Writer) {
writer.bool(element);
}

decode(reader: Reader, context: Context): Boolean {
decode(reader: Reader): boolean {
return reader.bool();
}

toProto(pipelineContext: ProtoContext): runnerApi.Coder {
toProto(): runnerApi.Coder {
return {
spec: {
urn: BoolCoder.URN,
Expand All @@ -166,8 +156,8 @@ export class BoolCoder implements Coder<Boolean> {
globalRegistry().register(BoolCoder.URN, BoolCoder);

export class NullableCoder<T> implements Coder<T | undefined> {
static URN: string = "beam:coder:nullable:v1";
type: string = "nullablecoder";
static URN = "beam:coder:nullable:v1";
type = "nullablecoder";

elementCoder: Coder<T>;

Expand Down Expand Up @@ -205,7 +195,7 @@ export class NullableCoder<T> implements Coder<T | undefined> {
globalRegistry().register(NullableCoder.URN, NullableCoder);

export class IntervalWindowCoder implements Coder<IntervalWindow> {
static URN: string = "beam:coder:interval_window:v1";
static URN = "beam:coder:interval_window:v1";
static INSTANCE: IntervalWindowCoder = new IntervalWindowCoder();

encode(value: IntervalWindow, writer: Writer, context: Context) {
Expand All @@ -214,12 +204,12 @@ export class IntervalWindowCoder implements Coder<IntervalWindow> {
}

decode(reader: Reader, context: Context) {
var end = InstantCoder.INSTANCE.decode(reader, context);
var duration = <Long>reader.int64();
const end = InstantCoder.INSTANCE.decode(reader, context);
const duration = <Long>reader.int64();
return new IntervalWindow(end.sub(duration), end);
}

toProto(pipelineContext: ProtoContext): runnerApi.Coder {
toProto(): runnerApi.Coder {
return {
spec: {
urn: IntervalWindowCoder.URN,
Expand All @@ -233,4 +223,7 @@ export class IntervalWindowCoder implements Coder<IntervalWindow> {
globalRegistry().register(IntervalWindowCoder.URN, IntervalWindowCoder);

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/coders/standard_coders", exports);
requireForSerialization(
"apache-beam/coders/standard_coders",
exports as Record<string, unknown>
);