Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async as a suffix rather than a prefix for asynchronous variants. #22134

Merged
merged 3 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async function main() {
const filtered = await lines
.map((w) => ({ word: w }))
.apply(beam.withRowCoder({ word: "str" }))
.asyncApply(
.applyAsync(
sqlTransform(
"SELECT word, count(*) as c from PCOLLECTION group by word"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
await new PortableRunner("localhost:3333").run(async (root) => {
const lines = await root.asyncApply(
const lines = await root.applyAsync(
textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
);

Expand Down
4 changes: 2 additions & 2 deletions sdks/typescript/src/apache_beam/internal/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ export class Pipeline {
return this.postApplyTransform(transform, transformProto, result);
}

async asyncApplyTransform<
async applyAsyncTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
Expand All @@ -190,7 +190,7 @@ export class Pipeline {
let result: OutputT;
try {
this.transformStack.push(transformId);
result = await transform.asyncExpandInternal(input, this, transformProto);
result = await transform.expandInternalAsync(input, this, transformProto);
} finally {
this.transformStack.pop();
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/io/avroio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export function writeToAvro<T>(filePath: string, options: { schema: Schema }) {
withCoderInternal(RowCoder.fromSchema(options.schema))
);
}
return pcoll.asyncApply(
return pcoll.applyAsync(
schemaio<beam.PCollection<T>, {}>(
"writeToAvro",
"beam:transform:org.apache.beam:schemaio_avro_write:v1",
Expand Down
4 changes: 2 additions & 2 deletions sdks/typescript/src/apache_beam/io/parquetio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function readFromParquet(
} = {}
): (root: beam.Root) => Promise<beam.PCollection<any>> {
return async function readFromParquet(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
path: filePattern,
format: "parquet",
Expand All @@ -57,7 +57,7 @@ export function writeToParquet(
delete options.schema;
}
return {
filesWritten: await toWrite.asyncApply(
filesWritten: await toWrite.applyAsync(
pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
path: filePathPrefix,
format: "parquet",
Expand Down
4 changes: 2 additions & 2 deletions sdks/typescript/src/apache_beam/io/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export function readFromPubSubWithAttributes(
> {
return async function readFromPubSubWithAttributes(root: beam.Root) {
return (
await root.asyncApply(readFromPubSubWithAttributesRaw(options))
await root.applyAsync(readFromPubSubWithAttributesRaw(options))
).map((encoded) =>
PubSub.protos.google.pubsub.v1.PubsubMessage.decode(encoded)
);
Expand Down Expand Up @@ -126,7 +126,7 @@ export function writeToPubSub(topic: string, options: WriteOptions = {}) {
PubSub.protos.google.pubsub.v1.PubsubMessage.encode({ data }).finish()
)
.apply(internal.withCoderInternal(new BytesCoder()))
.asyncApply(writeToPubSubRaw(topic, options));
.applyAsync(writeToPubSubRaw(topic, options));
};
}

Expand Down
12 changes: 6 additions & 6 deletions sdks/typescript/src/apache_beam/io/textio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export function readFromText(
filePattern: string
): beam.AsyncPTransform<beam.Root, beam.PCollection<string>> {
return async function readFromText(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform<beam.Root, beam.PCollection<string>>(
"apache_beam.io.ReadFromText",
{
Expand Down Expand Up @@ -59,7 +59,7 @@ export function writeToText(
filesWritten: await pcoll
.map((e) => (typeof e == "string" ? e : "" + e))
.apply(withCoderInternal(new StrUtf8Coder()))
.asyncApply(
.applyAsync(
pythonTransform("apache_beam.io.WriteToText", {
file_path_prefix: filePathPrefix,
...camelToSnakeOptions(options),
Expand All @@ -74,7 +74,7 @@ export function readFromCsv(
options: {} = {}
): (root: beam.Root) => Promise<beam.PCollection<any>> {
return async function readFromCsv(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
path: filePattern,
format: "csv",
Expand All @@ -96,7 +96,7 @@ export function writeToCsv(
toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema)));
}
return {
filesWritten: await toWrite.asyncApply(
filesWritten: await toWrite.applyAsync(
pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
path: filePathPrefix,
format: "csv",
Expand All @@ -113,7 +113,7 @@ export function readFromJson(
options: {} = {}
): (root: beam.Root) => Promise<beam.PCollection<any>> {
return async function readFromJson(root: beam.Root) {
return root.asyncApply(
return root.applyAsync(
pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
path: filePattern,
format: "json",
Expand All @@ -137,7 +137,7 @@ export function writeToJson(
toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema)));
}
return {
filesWritten: await toWrite.asyncApply(
filesWritten: await toWrite.applyAsync(
pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
path: filePathPrefix,
format: "json",
Expand Down
14 changes: 7 additions & 7 deletions sdks/typescript/src/apache_beam/pvalue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ export class Root {
return this.pipeline.applyTransform(transform, this);
}

async asyncApply<OutputT extends PValue<any>>(
async applyAsync<OutputT extends PValue<any>>(
transform: AsyncPTransform<Root, OutputT>
) {
if (!(transform instanceof AsyncPTransformClass)) {
transform = new AsyncPTransformClassFromCallable(transform);
}
return await this.pipeline.asyncApplyTransform(transform, this);
return await this.pipeline.applyAsyncTransform(transform, this);
}
}

Expand Down Expand Up @@ -91,13 +91,13 @@ export class PCollection<T> {
return this.pipeline.applyTransform(transform, this);
}

asyncApply<OutputT extends PValue<any>>(
applyAsync<OutputT extends PValue<any>>(
transform: AsyncPTransform<PCollection<T>, OutputT>
) {
if (!(transform instanceof AsyncPTransformClass)) {
transform = new AsyncPTransformClassFromCallable(transform);
}
return this.pipeline.asyncApplyTransform(transform, this);
return this.pipeline.applyAsyncTransform(transform, this);
}

map<OutputT, ContextT>(
Expand Down Expand Up @@ -228,14 +228,14 @@ class PValueWrapper<T extends PValue<any>> {
return this.pipeline(root).applyTransform(transform, this.pvalue);
}

async asyncApply<O extends PValue<any>>(
async applyAsync<O extends PValue<any>>(
transform: AsyncPTransform<T, O>,
root: Root | null = null
) {
if (!(transform instanceof AsyncPTransformClass)) {
transform = new AsyncPTransformClassFromCallable(transform);
}
return await this.pipeline(root).asyncApplyTransform(
return await this.pipeline(root).applyAsyncTransform(
transform,
this.pvalue
);
Expand Down Expand Up @@ -302,7 +302,7 @@ class AsyncPTransformClassFromCallable<
this.expander = expander;
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/transforms/external.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class RawExternalTransform<
}
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/transforms/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export function sqlTransform<
) as InputT;
}

return await P(input).asyncApply(
return await P(input).applyAsync(
external.rawExternalTransform(
"beam:external:java:sql:v1",
{ query: query },
Expand Down
10 changes: 5 additions & 5 deletions sdks/typescript/src/apache_beam/transforms/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ export class AsyncPTransformClass<
this.beamName = name || this.constructor.name;
}

async asyncExpand(input: InputT): Promise<OutputT> {
async expandAsync(input: InputT): Promise<OutputT> {
throw new Error("Method expand has not been implemented.");
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
): Promise<OutputT> {
return this.asyncExpand(input);
return this.expandAsync(input);
}
}

Expand All @@ -94,7 +94,7 @@ export class PTransformClass<
throw new Error("Method expand has not been implemented.");
}

async asyncExpand(input: InputT): Promise<OutputT> {
async expandAsync(input: InputT): Promise<OutputT> {
return this.expand(input);
}

Expand All @@ -106,7 +106,7 @@ export class PTransformClass<
return this.expand(input);
}

async asyncExpandInternal(
async expandInternalAsync(
input: InputT,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
Expand Down
32 changes: 16 additions & 16 deletions sdks/typescript/test/io_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ xdescribe("IO Tests", function () {
await createRunner().run(async (root) => {
await root //
.apply(beam.create(lines))
.asyncApply(textio.writeToText(path.join(tempDir, "out.txt")));
.applyAsync(textio.writeToText(path.join(tempDir, "out.txt")));
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
textio.readFromText(path.join(tempDir, "out.txt*"))
)
).apply(testing.assertDeepEqual(lines));
Expand All @@ -90,13 +90,13 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(textio.writeToCsv(path.join(tempDir, "out.csv")));
.applyAsync(textio.writeToCsv(path.join(tempDir, "out.csv")));
});
console.log(tempDir);

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
textio.readFromCsv(path.join(tempDir, "out.csv*"))
)
).apply(testing.assertDeepEqual(elements));
Expand All @@ -110,12 +110,12 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(textio.writeToJson(path.join(tempDir, "out.json")));
.applyAsync(textio.writeToJson(path.join(tempDir, "out.json")));
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
textio.readFromJson(path.join(tempDir, "out.json*"))
)
).apply(testing.assertDeepEqual(elements));
Expand All @@ -129,22 +129,22 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(
.applyAsync(
parquetio.writeToParquet(path.join(tempDir, "out.parquet"))
);
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
parquetio.readFromParquet(path.join(tempDir, "out.parquet*"))
)
).apply(testing.assertDeepEqual(elements));
});

await createRunner().run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
parquetio.readFromParquet(path.join(tempDir, "out.parquet*"), {
columns: ["label", "rank"],
})
Expand Down Expand Up @@ -176,14 +176,14 @@ xdescribe("IO Tests", function () {
await createRunner(options).run(async (root) => {
await root //
.apply(beam.create(elements))
.asyncApply(
.applyAsync(
avroio.writeToAvro(path_join(tempDir, "out.avro"), { schema })
);
});

await createRunner(options).run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
avroio.readFromAvro(path_join(tempDir, "out.avro*"), { schema })
)
).apply(testing.assertDeepEqual(elements));
Expand Down Expand Up @@ -239,19 +239,19 @@ xdescribe("IO Tests", function () {
await root //
.apply(beam.create(elements))
.apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
.asyncApply(
.applyAsync(
bigqueryio.writeToBigQuery(table, { createDisposition: "IfNeeded" })
);
});

await createRunner(options).run(async (root) => {
(await root.asyncApply(bigqueryio.readFromBigQuery({ table }))) //
(await root.applyAsync(bigqueryio.readFromBigQuery({ table }))) //
.apply(testing.assertDeepEqual(elements));
});

await createRunner(options).run(async (root) => {
(
await root.asyncApply(
await root.applyAsync(
bigqueryio.readFromBigQuery({
query: `SELECT label, rank FROM ${table}`,
})
Expand Down Expand Up @@ -286,7 +286,7 @@ xdescribe("IO Tests", function () {
try {
pipelineHandle = await createRunner(options).runAsync(async (root) => {
await (
await root.asyncApply(
await root.applyAsync(
pubsub.readFromPubSub({
subscription: readSubscription.name,
})
Expand All @@ -296,7 +296,7 @@ xdescribe("IO Tests", function () {
.map((msg) => msg.toUpperCase())
.map((msg) => new TextEncoder().encode(msg))
.apply(internal.withCoderInternal(new BytesCoder()))
.asyncApply(pubsub.writeToPubSub(writeTopic.name));
.applyAsync(pubsub.writeToPubSub(writeTopic.name));
});
console.log("Pipeline started", pipelineHandle.jobId);

Expand Down