Skip to content

Commit

Permalink
Merge pull request #21723 Add several IOs to the typescript SDK.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jun 23, 2022
2 parents bf2ac41 + da9bce6 commit 52e1b3f
Show file tree
Hide file tree
Showing 26 changed files with 2,220 additions and 171 deletions.
1,012 changes: 893 additions & 119 deletions sdks/typescript/package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions sdks/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "apache_beam",
"version": "0.38.0",
"devDependencies": {
"@google-cloud/bigquery": "^5.12.0",
"@types/mocha": "^9.0.0",
"@typescript-eslint/eslint-plugin": "^5.24.0",
"@typescript-eslint/parser": "^5.24.0",
Expand All @@ -28,6 +29,7 @@
"lint": "eslint . --ext .ts"
},
"dependencies": {
"@google-cloud/pubsub": "^2.19.4",
"@grpc/grpc-js": "^1.4.6",
"@protobuf-ts/grpc-transport": "^2.1.0",
"@protobuf-ts/plugin": "^2.1.0",
Expand Down
24 changes: 19 additions & 5 deletions sdks/typescript/src/apache_beam/coders/row_coder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
BoolCoder,
BytesCoder,
IterableCoder,
NullableCoder,
StrUtf8Coder,
VarIntCoder,
} from "./standard_coders";
Expand Down Expand Up @@ -74,16 +75,18 @@ export class RowCoder implements Coder<any> {
// case "logicalType":
default:
throw new Error(
`Encountered a type that is not currently supported by RowCoder: ${f.type}`
`Encountered a type that is not currently supported by RowCoder: ${JSON.stringify(
f.type
)}`
);
}
return obj;
}
}

private static inferTypeFromJSON(obj: any): FieldType {
static inferTypeFromJSON(obj: any, nullable: boolean = true): FieldType {
let fieldType: FieldType = {
nullable: true,
nullable: nullable,
typeInfo: {
oneofKind: undefined,
},
Expand Down Expand Up @@ -164,6 +167,15 @@ export class RowCoder implements Coder<any> {
};
}

getCoderFromType(t: FieldType): any {
const nonNullCoder = this.getNonNullCoderFromType(t);
if (t.nullable) {
return new NullableCoder(nonNullCoder);
} else {
return nonNullCoder;
}
}

getNonNullCoderFromType(t: FieldType): any {
let typeInfo = t.typeInfo;

Expand Down Expand Up @@ -193,7 +205,7 @@ export class RowCoder implements Coder<any> {
case "arrayType":
if (typeInfo.arrayType.elementType !== undefined) {
return new IterableCoder(
this.getNonNullCoderFromType(typeInfo.arrayType.elementType)
this.getCoderFromType(typeInfo.arrayType.elementType)
);
} else {
throw new Error("ElementType missing on ArrayType");
Expand All @@ -210,7 +222,9 @@ export class RowCoder implements Coder<any> {
// case "logicalType":
default:
throw new Error(
`Encountered a type that is not currently supported by RowCoder: ${t}`
`Encountered a type that is not currently supported by RowCoder: ${JSON.stringify(
t
)}`
);
}
}
Expand Down
55 changes: 55 additions & 0 deletions sdks/typescript/src/apache_beam/io/avroio.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as beam from "../../apache_beam";
import { RowCoder } from "../coders/row_coder";
import { schemaio } from "./schemaio";
import { Schema } from "../proto/schema";
import { withCoderInternal } from "../transforms/internal";

export function readFromAvro<T>(
filePattern: string,
// TODO: Allow schema to be inferred.
options: { schema: Schema }
): beam.AsyncPTransform<beam.Root, beam.PCollection<T>> {
return schemaio<beam.Root, beam.PCollection<T>>(
"readFromTable",
"beam:transform:org.apache.beam:schemaio_avro_read:v1",
{ location: filePattern, schema: options.schema }
);
}

export function writeToAvro<T>(filePath: string, options: { schema: Schema }) {
return async function writeToAvro(
pcoll: beam.PCollection<Object>
): Promise<{}> {
// TODO: Allow schema to be inferred.
if (options.schema) {
pcoll = pcoll.apply(
withCoderInternal(RowCoder.fromSchema(options.schema))
);
}
return pcoll.asyncApply(
schemaio<beam.PCollection<T>, {}>(
"writeToAvro",
"beam:transform:org.apache.beam:schemaio_avro_write:v1",
{ location: filePath, schema: options.schema }
)
);
};
}
58 changes: 58 additions & 0 deletions sdks/typescript/src/apache_beam/io/bigqueryio.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as beam from "../../apache_beam";
import { RowCoder } from "../coders/row_coder";
import { schemaio } from "./schemaio";
import { Schema } from "../proto/schema";

// TODO: Read and write should use a different schema for v2.
const bigqueryIOConfigSchema = RowCoder.inferSchemaOfJSON({
table: "string",
query: "string",
queryLocation: "string",
createDisposition: "string",
});

export function readFromBigQuery<T>(
options:
| { table: string; schema?: Schema }
| { query: string; schema?: Schema }
): beam.AsyncPTransform<beam.Root, beam.PCollection<T>> {
return schemaio<beam.Root, beam.PCollection<T>>(
"readFromBigQuery",
"beam:transform:org.apache.beam:schemaio_bigquery_read:v1",
options,
bigqueryIOConfigSchema
);
}

export function writeToBigQuery<T>(
table: string,
options: { createDisposition?: "Never" | "IfNeeded" } = {}
): beam.AsyncPTransform<beam.Root, beam.PCollection<T>> {
if (options.createDisposition == undefined) {
options.createDisposition = "IfNeeded";
}
return schemaio<beam.Root, beam.PCollection<T>>(
"writeToBigquery",
"beam:transform:org.apache.beam:schemaio_bigquery_write:v1",
{ table, createDisposition: options.createDisposition },
bigqueryIOConfigSchema
);
}
124 changes: 124 additions & 0 deletions sdks/typescript/src/apache_beam/io/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as beam from "../../apache_beam";
import * as external from "../transforms/external";
import { Schema } from "../proto/schema";
import { RowCoder } from "../coders/row_coder";
import { serviceProviderFromJavaGradleTarget } from "../utils/service";
import * as protobufjs from "protobufjs";
import { camelToSnakeOptions } from "../utils/utils";

const KAFKA_EXPANSION_GRADLE_TARGET =
"sdks:java:io:expansion-service:shadowJar";

export type ReadFromKafkaOptions = {
keyDeserializer?: string;
valueDeserializer?: string;
startReadTime?: number;
maxNumRecords?: number;
maxReadTime?: number;
commitOffsetInFinalize?: boolean;
timestampPolicy?: "ProcessingTime" | "CreateTime" | "LogAppendTime";
};

const defaultReadFromKafkaOptions = {
keyDeserializer:
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
valueDeserializer:
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
timestampPolicy: "ProcessingTime",
};

export function readFromKafka<T>(
consumerConfig: { [key: string]: string }, // TODO: Or a map?
topics: string[],
options: ReadFromKafkaOptions = {}
): beam.AsyncPTransform<beam.Root, beam.PCollection<T>> {
return readFromKafkaMaybeWithMetadata(
"readFromKafkaWithMetadata",
"beam:transform:org.apache.beam:kafka_read_without_metadata:v1",
consumerConfig,
topics,
options
);
}

export function readFromKafkaWithMetadata<T>(
consumerConfig: { [key: string]: string }, // TODO: Or a map?
topics: string[],
options: ReadFromKafkaOptions = {}
): beam.AsyncPTransform<beam.Root, beam.PCollection<T>> {
return readFromKafkaMaybeWithMetadata<T>(
"readFromKafkaWithMetadata",
"beam:transform:org.apache.beam:kafka_read_with_metadata:v1",
consumerConfig,
topics,
options
);
}

function readFromKafkaMaybeWithMetadata<T>(
name: string,
urn: string,
consumerConfig: { [key: string]: string }, // TODO: Or a map?
topics: string[],
options: ReadFromKafkaOptions = {}
): beam.AsyncPTransform<beam.Root, beam.PCollection<T>> {
return beam.withName(
name,
external.rawExternalTransform<beam.Root, beam.PCollection<T>>(
urn,
{
topics,
consumerConfig,
...camelToSnakeOptions({ ...defaultReadFromKafkaOptions, ...options }),
},
serviceProviderFromJavaGradleTarget(KAFKA_EXPANSION_GRADLE_TARGET)
)
);
}

export type WriteToKafkaOptions = {
keySerializer?: string;
valueSerializer?: string;
};

const defaultWriteToKafkaOptions = {
keySerializer: "org.apache.kafka.common.serialization.ByteArraySerializer",
valueSerializer: "org.apache.kafka.common.serialization.ByteArraySerializer",
};

export function writeToKafka<K = Uint8Array, V = Uint8Array>(
producerConfig: { [key: string]: string }, // TODO: Or a map?
topics: string[],
options: WriteToKafkaOptions = {}
): beam.AsyncPTransform<beam.PCollection<{ key: K; value: V }>, {}> {
return beam.withName(
"writeToKafka",
external.rawExternalTransform<beam.PCollection<{ key: K; value: V }>, {}>(
"beam:transform:org.apache.beam:kafka_write:v1",
{
topics,
producerConfig,
...camelToSnakeOptions({ ...defaultWriteToKafkaOptions, ...options }),
},
serviceProviderFromJavaGradleTarget(KAFKA_EXPANSION_GRADLE_TARGET)
)
);
}
69 changes: 69 additions & 0 deletions sdks/typescript/src/apache_beam/io/parquetio.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as beam from "../../apache_beam";
import { StrUtf8Coder } from "../coders/standard_coders";
import * as external from "../transforms/external";
import { withCoderInternal } from "../transforms/internal";
import { pythonTransform } from "../transforms/python";
import { PythonService } from "../utils/service";
import { camelToSnakeOptions } from "../utils/utils";
import { Schema } from "../proto/schema";
import { RowCoder } from "../coders/row_coder";

export function readFromParquet(
filePattern: string,
options: {
columns?: string[];
} = {}
): (root: beam.Root) => Promise<beam.PCollection<any>> {
return async function readFromParquet(root: beam.Root) {
return root.asyncApply(
pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
path: filePattern,
format: "parquet",
...camelToSnakeOptions(options),
})
);
};
}

export function writeToParquet(
filePathPrefix: string,
options: { schema?: Schema } = {}
): (
toWrite: beam.PCollection<Object>
) => Promise<{ filesWritten: beam.PCollection<string> }> {
return async function writeToJson(toWrite: beam.PCollection<Object>) {
if (options.schema) {
toWrite = toWrite.apply(
withCoderInternal(RowCoder.fromSchema(options.schema))
);
delete options.schema;
}
return {
filesWritten: await toWrite.asyncApply(
pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
path: filePathPrefix,
format: "parquet",
...camelToSnakeOptions(options),
})
),
};
};
}
Loading

0 comments on commit 52e1b3f

Please sign in to comment.