Skip to content

Commit

Permalink
Use async as a suffix rather than a prefix for asynchronous variants. (
Browse files Browse the repository at this point in the history
…#22134)

This is better aligned with standard javascript convention.

Often libraries have the default version be asynchronous, and
name the Sync one explicitly, we are marking the async ones as
they are by far the most common and usable for pipeline
construction.
  • Loading branch information
robertwb authored Jul 11, 2022
1 parent da84804 commit 6e16941
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 48 deletions.
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async function main() {
const filtered = await lines
.map((w) => ({ word: w }))
.apply(beam.withRowCoder({ word: "str" }))
.asyncApply(
.applyAsync(
sqlTransform(
"SELECT word, count(*) as c from PCOLLECTION group by word"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
await new PortableRunner("localhost:3333").run(async (root) => {
const lines = await root.asyncApply(
const lines = await root.applyAsync(
textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
);

Expand Down
4 changes: 2 additions & 2 deletions sdks/typescript/src/apache_beam/internal/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ export class Pipeline {
return this.postApplyTransform(transform, transformProto, result);
}

async asyncApplyTransform<
async applyAsyncTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
Expand All @@ -190,7 +190,7 @@ export class Pipeline {
let result: OutputT;
try {
this.transformStack.push(transformId);
result = await transform.asyncExpandInternal(input, this, transformProto);
result = await transform.expandInternalAsync(input, this, transformProto);
} finally {
this.transformStack.pop();
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/io/avroio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export function writeToAvro<T>(filePath: string, options: { schema: Schema }) {
withCoderInternal(RowCoder.fromSchema(options.schema))
);
}
return pcoll.asyncApply(
return pcoll.applyAsync(
schemaio<beam.PCollection<T>, {}>(
"writeToAvro",
"beam:transform:org.apache.beam:schemaio_avro_write:v1",
Expand Down
4 changes: 2 additions & 2 deletions sdks/typescript/src/apache_beam/io/parquetio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function readFromParquet(
} = {}
): (root: beam.Root) => Promise<beam.PCollection<any>> {
return async function readFromParquet(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
path: filePattern,
format: "parquet",
Expand All @@ -57,7 +57,7 @@ export function writeToParquet(
delete options.schema;
}
return {
filesWritten: await toWrite.asyncApply(
filesWritten: await toWrite.applyAsync(
pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
path: filePathPrefix,
format: "parquet",
Expand Down
4 changes: 2 additions & 2 deletions sdks/typescript/src/apache_beam/io/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export function readFromPubSubWithAttributes(
> {
return async function readFromPubSubWithAttributes(root: beam.Root) {
return (
await root.asyncApply(readFromPubSubWithAttributesRaw(options))
await root.applyAsync(readFromPubSubWithAttributesRaw(options))
).map((encoded) =>
PubSub.protos.google.pubsub.v1.PubsubMessage.decode(encoded)
);
Expand Down Expand Up @@ -126,7 +126,7 @@ export function writeToPubSub(topic: string, options: WriteOptions = {}) {
PubSub.protos.google.pubsub.v1.PubsubMessage.encode({ data }).finish()
)
.apply(internal.withCoderInternal(new BytesCoder()))
.asyncApply(writeToPubSubRaw(topic, options));
.applyAsync(writeToPubSubRaw(topic, options));
};
}

Expand Down
12 changes: 6 additions & 6 deletions sdks/typescript/src/apache_beam/io/textio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export function readFromText(
filePattern: string
): beam.AsyncPTransform<beam.Root, beam.PCollection<string>> {
return async function readFromText(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform<beam.Root, beam.PCollection<string>>(
"apache_beam.io.ReadFromText",
{
Expand Down Expand Up @@ -59,7 +59,7 @@ export function writeToText(
filesWritten: await pcoll
.map((e) => (typeof e == "string" ? e : "" + e))
.apply(withCoderInternal(new StrUtf8Coder()))
.asyncApply(
.applyAsync(
pythonTransform("apache_beam.io.WriteToText", {
file_path_prefix: filePathPrefix,
...camelToSnakeOptions(options),
Expand All @@ -74,7 +74,7 @@ export function readFromCsv(
options: {} = {}
): (root: beam.Root) => Promise<beam.PCollection<any>> {
return async function readFromCsv(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
path: filePattern,
format: "csv",
Expand All @@ -96,7 +96,7 @@ export function writeToCsv(
toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema)));
}
return {
filesWritten: await toWrite.asyncApply(
filesWritten: await toWrite.applyAsync(
pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
path: filePathPrefix,
format: "csv",
Expand All @@ -113,7 +113,7 @@ export function readFromJson(
options: {} = {}
): (root: beam.Root) => Promise<beam.PCollection<any>> {
return async function readFromJson(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
path: filePattern,
format: "json",
Expand All @@ -137,7 +137,7 @@ export function writeToJson(
toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema)));
}
return {
filesWritten: await toWrite.asyncApply(
filesWritten: await toWrite.applyAsync(
pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
path: filePathPrefix,
format: "json",
Expand Down
14 changes: 7 additions & 7 deletions sdks/typescript/src/apache_beam/pvalue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ export class Root {
return this.pipeline.applyTransform(transform, this);
}

async asyncApply<OutputT extends PValue<any>>(
async applyAsync<OutputT extends PValue<any>>(
transform: AsyncPTransform<Root, OutputT>
) {
if (!(transform instanceof AsyncPTransformClass)) {
transform = new AsyncPTransformClassFromCallable(transform);
}
return await this.pipeline.asyncApplyTransform(transform, this);
return await this.pipeline.applyAsyncTransform(transform, this);
}
}

Expand Down Expand Up @@ -91,13 +91,13 @@ export class PCollection<T> {
return this.pipeline.applyTransform(transform, this);
}

asyncApply<OutputT extends PValue<any>>(
applyAsync<OutputT extends PValue<any>>(
transform: AsyncPTransform<PCollection<T>, OutputT>
) {
if (!(transform instanceof AsyncPTransformClass)) {
transform = new AsyncPTransformClassFromCallable(transform);
}
return this.pipeline.asyncApplyTransform(transform, this);
return this.pipeline.applyAsyncTransform(transform, this);
}

map<OutputT, ContextT>(
Expand Down Expand Up @@ -228,14 +228,14 @@ class PValueWrapper<T extends PValue<any>> {
return this.pipeline(root).applyTransform(transform, this.pvalue);
}

async asyncApply<O extends PValue<any>>(
async applyAsync<O extends PValue<any>>(
transform: AsyncPTransform<T, O>,
root: Root | null = null
) {
if (!(transform instanceof AsyncPTransformClass)) {
transform = new AsyncPTransformClassFromCallable(transform);
}
return await this.pipeline(root).asyncApplyTransform(
return await this.pipeline(root).applyAsyncTransform(
transform,
this.pvalue
);
Expand Down Expand Up @@ -302,7 +302,7 @@ class AsyncPTransformClassFromCallable<
this.expander = expander;
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/transforms/external.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class RawExternalTransform<
}
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/transforms/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export function sqlTransform<
) as InputT;
}

return await P(input).asyncApply(
return await P(input).applyAsync(
external.rawExternalTransform(
"beam:external:java:sql:v1",
{ query: query },
Expand Down
10 changes: 5 additions & 5 deletions sdks/typescript/src/apache_beam/transforms/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ export class AsyncPTransformClass<
this.beamName = name || this.constructor.name;
}

async asyncExpand(input: InputT): Promise<OutputT> {
async expandAsync(input: InputT): Promise<OutputT> {
throw new Error("Method expand has not been implemented.");
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
): Promise<OutputT> {
return this.asyncExpand(input);
return this.expandAsync(input);
}
}

Expand All @@ -94,7 +94,7 @@ export class PTransformClass<
throw new Error("Method expand has not been implemented.");
}

async asyncExpand(input: InputT): Promise<OutputT> {
async expandAsync(input: InputT): Promise<OutputT> {
return this.expand(input);
}

Expand All @@ -106,7 +106,7 @@ export class PTransformClass<
return this.expand(input);
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/test/docs/programming_guide.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe("Programming Guide Tested Samples", function () {
// [START pipelines_constructing_reading]
async function pipeline(root: beam.Root) {
// Note that textio.ReadFromText is an AsyncPTransform.
const pcoll: PCollection<string> = await root.asyncApply(
const pcoll: PCollection<string> = await root.applyAsync(
textio.ReadFromText("path/to/text_pattern")
);
}
Expand Down
32 changes: 16 additions & 16 deletions sdks/typescript/test/io_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ xdescribe("IO Tests", function () {
await createRunner().run(async (root) => {
await root //
.apply(beam.create(lines))
.asyncApply(textio.writeToText(path.join(tempDir, "out.txt")));
.applyAsync(textio.writeToText(path.join(tempDir, "out.txt")));
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
textio.readFromText(path.join(tempDir, "out.txt*"))
)
).apply(testing.assertDeepEqual(lines));
Expand All @@ -90,13 +90,13 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(textio.writeToCsv(path.join(tempDir, "out.csv")));
.applyAsync(textio.writeToCsv(path.join(tempDir, "out.csv")));
});
console.log(tempDir);

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
textio.readFromCsv(path.join(tempDir, "out.csv*"))
)
).apply(testing.assertDeepEqual(elements));
Expand All @@ -110,12 +110,12 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(textio.writeToJson(path.join(tempDir, "out.json")));
.applyAsync(textio.writeToJson(path.join(tempDir, "out.json")));
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
textio.readFromJson(path.join(tempDir, "out.json*"))
)
).apply(testing.assertDeepEqual(elements));
Expand All @@ -129,22 +129,22 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(
.applyAsync(
parquetio.writeToParquet(path.join(tempDir, "out.parquet"))
);
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
parquetio.readFromParquet(path.join(tempDir, "out.parquet*"))
)
).apply(testing.assertDeepEqual(elements));
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
parquetio.readFromParquet(path.join(tempDir, "out.parquet*"), {
columns: ["label", "rank"],
})
Expand Down Expand Up @@ -176,14 +176,14 @@ xdescribe("IO Tests", function () {
await createRunner(options).run(async (root) => {
await root //
.apply(beam.create(elements))
.asyncApply(
.applyAsync(
avroio.writeToAvro(path_join(tempDir, "out.avro"), { schema })
);
});

await createRunner(options).run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
avroio.readFromAvro(path_join(tempDir, "out.avro*"), { schema })
)
).apply(testing.assertDeepEqual(elements));
Expand Down Expand Up @@ -239,19 +239,19 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(
.applyAsync(
bigqueryio.writeToBigQuery(table, { createDisposition: "IfNeeded" })
);
});

await createRunner(options).run(async (root) => {
(await root.asyncApply(bigqueryio.readFromBigQuery({ table }))) //
(await root.applyAsync(bigqueryio.readFromBigQuery({ table }))) //
.apply(testing.assertDeepEqual(elements));
});

await createRunner(options).run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
bigqueryio.readFromBigQuery({
query: `SELECT label, rank FROM ${table}`,
})
Expand Down Expand Up @@ -286,7 +286,7 @@ xdescribe("IO Tests", function () {
try {
pipelineHandle = await createRunner(options).runAsync(async (root) => {
await (
await root.asyncApply(
await root.applyAsync(
pubsub.readFromPubSub({
subscription: readSubscription.name,
})
Expand All @@ -296,7 +296,7 @@ xdescribe("IO Tests", function () {
.map((msg) => msg.toUpperCase())
.map((msg) => new TextEncoder().encode(msg))
.apply(internal.withCoderInternal(new BytesCoder()))
.asyncApply(pubsub.writeToPubSub(writeTopic.name));
.applyAsync(pubsub.writeToPubSub(writeTopic.name));
});
console.log("Pipeline started", pipelineHandle.jobId);

Expand Down
Loading

0 comments on commit 6e16941

Please sign in to comment.