diff --git a/.github/actions/gradle-command-action b/.github/actions/gradle-command-action deleted file mode 160000 index 90ccf054e6b9..000000000000 --- a/.github/actions/gradle-command-action +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 90ccf054e6b9905f30f98c938bce4c6acd323b6b diff --git a/.github/actions/gradle-command-self-hosted-action/action.yml b/.github/actions/gradle-command-self-hosted-action/action.yml index 906b35169d9d..e2fd768220a3 100644 --- a/.github/actions/gradle-command-self-hosted-action/action.yml +++ b/.github/actions/gradle-command-self-hosted-action/action.yml @@ -35,9 +35,9 @@ runs: - name: Run Gradle Command shell: bash run: | - # Removing settings.xml is a workaround to avoid a decryption issue - # of Beam's gradle-command-action plugin and github's provided - # maven settings.xml file + # This step is a workaround to avoid a decryption issue of Beam's + # net.linguica.gradle.maven.settings plugin and github's provided maven + # settings.xml file if [ -f ~/.m2/settings.xml ]; then rm ~/.m2/settings.xml fi diff --git a/.github/workflows/java_tests.yml b/.github/workflows/java_tests.yml index 1132ba1c196b..30808563aaed 100644 --- a/.github/workflows/java_tests.yml +++ b/.github/workflows/java_tests.yml @@ -82,16 +82,12 @@ jobs: with: java-version: 8 go-version: 1.21 - - name: Remove default github maven configuration - # This step is a workaround to avoid a decryption issue of Beam's - # net.linguica.gradle.maven.settings plugin and github's provided maven - # settings.xml file - run: rm ~/.m2/settings.xml # :sdks:java:core:test - name: Run :sdks:java:core:test - uses: ./.github/actions/gradle-command-action + uses: ./.github/actions/gradle-command-self-hosted-action with: - arguments: -p sdks/java/core/ test + gradle-command: test + arguments: -p sdks/java/core/ - name: Upload test logs for :sdks:java:core:test uses: actions/upload-artifact@v3 if: always() @@ -100,9 +96,10 @@ jobs: path: sdks/java/core/build/reports/tests/test # :sdks:java:harness:test - name: Run :sdks:java:harness:test - uses: ./.github/actions/gradle-command-action + uses: ./.github/actions/gradle-command-self-hosted-action with: - arguments: -p sdks/java/harness/ test + gradle-command: test + arguments: -p sdks/java/harness/ if: always() - name: Upload test logs for :sdks:java:harness:test uses: actions/upload-artifact@v3 @@ -112,9 +109,10 @@ jobs: path: sdks/java/harness/build/reports/tests/test # :runners:core-java:test - name: Run :runners:core-java:test - uses: ./.github/actions/gradle-command-action + uses: ./.github/actions/gradle-command-self-hosted-action with: - arguments: -p runners/core-java/ test + gradle-command: test + arguments: -p runners/core-java/ if: always() - name: Upload test logs for :runners:core-java:test uses: actions/upload-artifact@v3 @@ -141,17 +139,13 @@ jobs: with: java-version: 8 go-version: 1.21 - - name: Remove default github maven configuration - # This step is a workaround to avoid a decryption issue of Beam's - # net.linguica.gradle.maven.settings plugin and github's provided maven - # settings.xml file - run: rm ~/.m2/settings.xml - name: Run WordCount Unix - uses: ./.github/actions/gradle-command-action + uses: ./.github/actions/gradle-command-self-hosted-action with: - arguments: -p examples/ integrationTest --tests org.apache.beam.examples.WordCountIT + gradle-command: integrationTest + arguments: -p examples/ --tests org.apache.beam.examples.WordCountIT -DintegrationTestRunner=direct - -DintegrationTestPipelineOptions=["--runner=DirectRunner","--tempRoot=./tmp"] + -DintegrationTestPipelineOptions=[\"--runner=DirectRunner\",\"--tempRoot=./tmp\"] - name: Upload test logs uses: actions/upload-artifact@v3 if: always() @@ -191,16 +185,12 @@ jobs: service_account_key: ${{ secrets.GCP_SA_KEY }} project_id: ${{ secrets.GCP_PROJECT_ID }} export_default_credentials: true - - name: Remove default github maven configuration - # This step is a workaround to avoid a decryption issue of Beam's - # gradle-command-action plugin and github's provided maven - # settings.xml file - run: rm ~/.m2/settings.xml - name: Run WordCount - uses: ./.github/actions/gradle-command-action + uses: ./.github/actions/gradle-command-self-hosted-action with: - arguments: -p examples/ integrationTest --tests org.apache.beam.examples.WordCountIT - -DintegrationTestPipelineOptions=["--runner=DataflowRunner","--project=${{ secrets.GCP_PROJECT_ID }}","--tempRoot=gs://${{ secrets.GCP_TESTING_BUCKET }}/tmp/"] + gradle-command: integrationTest + arguments: -p examples/ --tests org.apache.beam.examples.WordCountIT + -DintegrationTestPipelineOptions=[\"--runner=DataflowRunner\",\"--project=${{ secrets.GCP_PROJECT_ID }}\",\"--tempRoot=gs://${{ secrets.GCP_TESTING_BUCKET }}/tmp/\"] -DintegrationTestRunner=dataflow - name: Upload test logs uses: actions/upload-artifact@v3 diff --git a/.gitmodules b/.gitmodules index fa6e30a8850a..3a6406c405f3 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,6 +7,3 @@ [submodule ".github/actions/github-push-action"] path = .github/actions/github-push-action url = https://github.com/ad-m/github-push-action -[submodule ".github/actions/gradle-command-action"] - path = .github/actions/gradle-command-action - url = https://github.com/eskatos/gradle-command-action diff --git a/CHANGES.md b/CHANGES.md index e48ee7f1d51c..4b977bf3790d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -87,6 +87,8 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed MLTransform when duplicated elements are dropped in the output PCollection.([#29600](https://github.com/apache/beam/issues/29600)) + ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). @@ -135,6 +137,10 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * Fixed [CVE-2023-39325](https://www.cve.org/CVERecord?id=CVE-2023-39325) (Java/Python/Go) ([#29118](https://github.com/apache/beam/issues/29118)). * Mitigated [CVE-2023-47248](https://nvd.nist.gov/vuln/detail/CVE-2023-47248) (Python) [#29392](https://github.com/apache/beam/issues/29392). +## Known issues + +* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)). + # [2.51.0] - 2023-10-03 ## New Features / Improvements @@ -169,6 +175,8 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * Long-running Python pipelines might experience a memory leak: [#28246](https://github.com/apache/beam/issues/28246). * Python pipelines using BigQuery Storage Read API might need to pin `fastavro` dependency to 1.8.3 or earlier on some runners that don't use Beam Docker containers: [#28811](https://github.com/apache/beam/issues/28811) +* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)). + # [2.50.0] - 2023-08-30 @@ -230,6 +238,7 @@ as a workaround, a copy of "old" `CountingSource` class should be placed into a * Beam Python containers rely on a version of Debian/aom that has several security vulnerabilities: [CVE-2021-30474](https://nvd.nist.gov/vuln/detail/CVE-2021-30474), [CVE-2021-30475](https://nvd.nist.gov/vuln/detail/CVE-2021-30475), [CVE-2021-30473](https://nvd.nist.gov/vuln/detail/CVE-2021-30473), [CVE-2020-36133](https://nvd.nist.gov/vuln/detail/CVE-2020-36133), [CVE-2020-36131](https://nvd.nist.gov/vuln/detail/CVE-2020-36131), [CVE-2020-36130](https://nvd.nist.gov/vuln/detail/CVE-2020-36130), and [CVE-2020-36135](https://nvd.nist.gov/vuln/detail/CVE-2020-36135) * Python SDK's cross-language Bigtable sink mishandles records that don't have an explicit timestamp set: [#28632](https://github.com/apache/beam/issues/28632). To avoid this issue, set explicit timestamps for all records before writing to Bigtable. * Python SDK worker start-up logs, particularly PIP dependency installations, that are not logged at warning or higher are suppressed. This suppression is reverted in 2.51.0. +* MLTransform drops the identical elements in the output PCollection. For any duplicate elements, a single element will be emitted downstream. ([#29600](https://github.com/apache/beam/issues/29600)). # [2.49.0] - 2023-07-17 diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 359aeea55a2f..41aed5174c66 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -731,7 +731,7 @@ class BeamModulePlugin implements Plugin { // Keep version consistent with the version in google_cloud_resourcemanager, managed by google_cloud_platform_libraries_bom google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20230806-$google_clients_version", google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version", - google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20231003-$google_clients_version", + google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20231101-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", // Keep version consistent with the version in google_cloud_nio, managed by google_cloud_platform_libraries_bom google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20230907-$google_clients_version", diff --git a/contributor-docs/rc-testing-guide.md b/contributor-docs/rc-testing-guide.md new file mode 100644 index 000000000000..0ac932711206 --- /dev/null +++ b/contributor-docs/rc-testing-guide.md @@ -0,0 +1,60 @@ + + +# Release Candidate (RC) Testing Guide + +This guide is meant for anybody who is interested in testing Beam Release Candidates against downstream projects. Note +that one need not have any status on the Apache Beam project (eg. PMC Member, Committer) to vote; all are welcome. +Please subscribe to the [dev list](https://lists.apache.org/list.html?dev@beam.apache.org), and vote on the RC Vote email thread. + + +## RC Testing Objectives + +The RC testing process aims to: + + - Test new release candidates against existing code bases utilizing Apache Beam, to ensure there are no unexpected behaviors downstream. + - Incorporate a breadth of perspectives (including validation on multiple SDKs and multiple runners), before releasing a new version. + - Allow Beam Contributors to dogfood their changes and verify that they work as intended. + + +## Beam Release process overview +- For a comprehensive overview on the Beam release process, please take a look at our [release guide](https://github.com/apache/beam/blob/master/contributor-docs/release-guide.md). +- Note that release candidate votes will be open for 72 hours after the voting email is sent. + + +## Ideas for Python SDK Validators + +_Note: Do the following in a dev-like environment._ +- If you are a Python SDK user that utilizes notebooks (eg. Jupyter Notebooks, or Colab Notebooks), change `pip install` +to point to the new RC (e.g. `pip install apache_beam[gcp]==2.52.0rc1`). Re-execute the workflow to ensure everything +works as intended. +- If your workflow utilizes [Dataflow Templates](https://github.com/GoogleCloudPlatform/DataflowTemplates), or another way of launching your job, modify your `requirements.txt` file, `setup.py` file, or `DockerFile` to point to the new Beam RC. +- _Tip_: Run your pipeline both against Direct Runner, and another runner of your choice by modifying your job's `PipelineOptions`. + + +## Ideas for Java SDK Validators +_Note: Do the following in a dev-like environment._ +- If you are a Java SDK user that utilizes Maven or Gradle in your workflow, in your `pom.xml` file, modify the `beam.version` to reflect the newest RC, and modify the `` tag under ``, to point to the repository given in the vote email. [Example Changes](https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/1090/files). + + +## Ideas for Go SDK Validators +_Note: Do the following in a dev-like environment._ +- If you utilize the Go SDK, use `go-get` to use the desired RC, for example, `go get -d github.com/apache/beam/sdks/v2@v2.xx.0-RC1` +- Utilize the `--environment_config`, to point to the new release, for example, `--environment_config=apache/beam_go_sdk:2.xx.0rc1` + + +## After validation + +- Reply to [dev list](https://lists.apache.org/list.html?dev@beam.apache.org) vote thread with your [vote](https://www.apache.org/foundation/voting.html) and an explanation of the use case you tested. +- [Optional]: If your use case can be well represented by a test, consider contributing a test to Beam! However, note that the of the value of validation is manual testing outside of the Beam CI workflow, so it is still recommended (and highly encouraged!) to validate these test cases in future releases. \ No newline at end of file diff --git a/contributor-docs/release-guide.md b/contributor-docs/release-guide.md index faa8ad5927cf..e00ecd694336 100644 --- a/contributor-docs/release-guide.md +++ b/contributor-docs/release-guide.md @@ -817,7 +817,7 @@ template; please adjust as you see fit. Reviewers are encouraged to test their own use cases with the release candidate, and vote +1 if no issues are found. Only PMC member votes will count towards the final vote, but votes from all community members is encouraged and helpful for finding regressions; you can either test your own - use cases or use cases from the validation sheet [10]. + use cases [13] or use cases from the validation sheet [10]. The complete staging area is available for your review, which includes: * GitHub Release notes [1], @@ -833,7 +833,7 @@ template; please adjust as you see fit. The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. - For guidelines on how to try the release in your projects, check out our blog post at https://beam.apache.org/blog/validate-beam-release/. + For guidelines on how to try the release in your projects, check out our RC testing guide [13]. Thanks, Release Manager @@ -850,6 +850,7 @@ template; please adjust as you see fit. [10] https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=... [11] https://hub.docker.com/search?q=apache%2Fbeam&type=image [12] https://github.com/apache/beam/pull/... + [13] https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md If there are any issues found in the release candidate, reply on the vote thread to cancel the vote. There’s no need to wait 72 hours. Go back to @@ -860,7 +861,8 @@ pull request, just correct it on the spot and the vote can continue as-is. ### Run validation tests The community is responsible for performing validation, but as release manager -you are expected to contribute as well. +you are expected to contribute as well. Please see the [RC Testing Guide](https://github.com/apache/beam/blob/master/contributor-docs/rc-testing-guide.md) +for ideas on helping validate testing on downstream projects. Before accepting an RC, as a community we try to exercise most (if not all) of the tests listed in this diff --git a/sdks/go.mod b/sdks/go.mod index 3223814d86b6..b55c0bb474b8 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -28,9 +28,9 @@ require ( cloud.google.com/go/datastore v1.15.0 cloud.google.com/go/profiler v0.4.0 cloud.google.com/go/pubsub v1.33.0 - cloud.google.com/go/spanner v1.51.0 + cloud.google.com/go/spanner v1.53.1 cloud.google.com/go/storage v1.35.1 - github.com/aws/aws-sdk-go-v2 v1.23.4 + github.com/aws/aws-sdk-go-v2 v1.23.5 github.com/aws/aws-sdk-go-v2/config v1.25.8 github.com/aws/aws-sdk-go-v2/credentials v1.16.8 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.13.8 @@ -54,13 +54,13 @@ require ( github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c go.mongodb.org/mongo-driver v1.12.1 - golang.org/x/net v0.18.0 - golang.org/x/oauth2 v0.13.0 + golang.org/x/net v0.19.0 + golang.org/x/oauth2 v0.14.0 golang.org/x/sync v0.5.0 - golang.org/x/sys v0.14.0 + golang.org/x/sys v0.15.0 golang.org/x/text v0.14.0 - google.golang.org/api v0.151.0 - google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b + google.golang.org/api v0.152.0 + google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 gopkg.in/retry.v1 v1.0.3 @@ -89,15 +89,15 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect - golang.org/x/time v0.4.0 // indirect + golang.org/x/time v0.5.0 // indirect ) require ( - cloud.google.com/go v0.110.8 // indirect - cloud.google.com/go/compute v1.23.1 // indirect + cloud.google.com/go v0.110.10 // indirect + cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect - cloud.google.com/go/iam v1.1.3 // indirect - cloud.google.com/go/longrunning v0.5.2 // indirect + cloud.google.com/go/iam v1.1.5 // indirect + cloud.google.com/go/longrunning v0.5.4 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/andybalholm/brotli v1.0.4 // indirect @@ -173,11 +173,11 @@ require ( github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/crypto v0.15.0 // indirect + golang.org/x/crypto v0.16.0 // indirect golang.org/x/mod v0.11.0 // indirect golang.org/x/tools v0.10.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect ) diff --git a/sdks/go.sum b/sdks/go.sum index b9914ab1b931..ed4921bfacc5 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -8,8 +8,8 @@ cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg cloud.google.com/go v0.50.0/go.mod h1:r9sluTvynVuxRIOHXQEHMFffphuXHOMZMycpNR5e6To= cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4= cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M= -cloud.google.com/go v0.110.8 h1:tyNdfIxjzaWctIiLYOTalaLKZ17SI44SKFW26QbOhME= -cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= +cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y= +cloud.google.com/go v0.110.10/go.mod h1:v1OoFqYxiBkUrruItNM3eT4lLByNjxmJSV/xDKJNnic= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -17,20 +17,20 @@ cloud.google.com/go/bigquery v1.57.1 h1:FiULdbbzUxWD0Y4ZGPSVCDLvqRSyCIO6zKV7E2nf cloud.google.com/go/bigquery v1.57.1/go.mod h1:iYzC0tGVWt1jqSzBHqCr3lrRn0u13E8e+AqowBsDgug= cloud.google.com/go/bigtable v1.20.0 h1:NqZC/WcesSn4O8L0I2JmuNsUigSyBQifVLYgM9LMQeQ= cloud.google.com/go/bigtable v1.20.0/go.mod h1:upJDn8frsjzpRMfybiWkD1PG6WCCL7CRl26MgVeoXY4= -cloud.google.com/go/compute v1.23.1 h1:V97tBoDaZHb6leicZ1G6DLK2BAaZLJ/7+9BB/En3hR0= -cloud.google.com/go/compute v1.23.1/go.mod h1:CqB3xpmPKKt3OJpW2ndFIXnA9A4xAy/F3Xp1ixncW78= +cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= +cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= -cloud.google.com/go/datacatalog v1.18.1 h1:xJp9mZrc2HPaoxIz3sP9pCmf/impifweQ/yGG9VBfio= +cloud.google.com/go/datacatalog v1.18.3 h1:zmdxP6nOjN5Qb1rtu9h4kbEVwerQ6Oshf+t747QJUew= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/datastore v1.15.0 h1:0P9WcsQeTWjuD1H14JIY7XQscIPQ4Laje8ti96IC5vg= cloud.google.com/go/datastore v1.15.0/go.mod h1:GAeStMBIt9bPS7jMJA85kgkpsMkvseWWXiaHya9Jes8= -cloud.google.com/go/iam v1.1.3 h1:18tKG7DzydKWUnLjonWcJO6wjSCAtzh4GcRKlH/Hrzc= -cloud.google.com/go/iam v1.1.3/go.mod h1:3khUlaBXfPKKe7huYgEpDn6FtgRyMEqbkvBxrQyY5SE= -cloud.google.com/go/kms v1.15.3 h1:RYsbxTRmk91ydKCzekI2YjryO4c5Y2M80Zwcs9/D/cI= -cloud.google.com/go/longrunning v0.5.2 h1:u+oFqfEwwU7F9dIELigxbe0XVnBAo9wqMuQLA50CZ5k= -cloud.google.com/go/longrunning v0.5.2/go.mod h1:nqo6DQbNV2pXhGDbDMoN2bWz68MjZUzqv2YttZiveCs= +cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= +cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= +cloud.google.com/go/kms v1.15.5 h1:pj1sRfut2eRbD9pFRjNnPNg/CzJPuQAzUujMIM1vVeM= +cloud.google.com/go/longrunning v0.5.4 h1:w8xEcbZodnA2BbW6sVirkkoC+1gP8wS57EUUgGS0GVg= +cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI= cloud.google.com/go/profiler v0.4.0 h1:ZeRDZbsOBDyRG0OiK0Op1/XWZ3xeLwJc9zjkzczUxyY= cloud.google.com/go/profiler v0.4.0/go.mod h1:RvPlm4dilIr3oJtAOeFQU9Lrt5RoySHSDj4pTd6TWeU= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -38,8 +38,8 @@ cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+ cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g= cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc= -cloud.google.com/go/spanner v1.51.0 h1:l3exhhsVMKsx1E7Xd1QajYSvHmI1KZoWPW5tRxIIdvQ= -cloud.google.com/go/spanner v1.51.0/go.mod h1:c5KNo5LQ1X5tJwma9rSQZsXNBDNvj4/n8BVc3LNahq0= +cloud.google.com/go/spanner v1.53.1 h1:xNmE0SXMSxNBuk7lRZ5G/S+A49X91zkSTt7Jn5Ptlvw= +cloud.google.com/go/spanner v1.53.1/go.mod h1:liG4iCeLqm5L3fFLU5whFITqP0e0orsAW1uUSrd4rws= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= @@ -81,8 +81,8 @@ github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve github.com/aws/aws-sdk-go v1.34.0 h1:brux2dRrlwCF5JhTL7MUT3WUwo9zfDHZZp3+g3Mvlmo= github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250= -github.com/aws/aws-sdk-go-v2 v1.23.4 h1:2P20ZjH0ouSAu/6yZep8oCmTReathLuEu6dwoqEgjts= -github.com/aws/aws-sdk-go-v2 v1.23.4/go.mod h1:t3szzKfP0NeRU27uBFczDivYJjsmSnqI8kIvKyWb9ds= +github.com/aws/aws-sdk-go-v2 v1.23.5 h1:xK6C4udTyDMd82RFvNkDQxtAd00xlzFUtX4fF2nMZyg= +github.com/aws/aws-sdk-go-v2 v1.23.5/go.mod h1:t3szzKfP0NeRU27uBFczDivYJjsmSnqI8kIvKyWb9ds= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 h1:ZY3108YtBNq96jNZTICHxN1gSBSbnvIdYwwqnvCV4Mc= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1/go.mod h1:t8PYl/6LzdAqsU4/9tz28V/kU+asFePvpOMkdul0gEQ= github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA= @@ -495,8 +495,8 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= -golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -555,15 +555,15 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= -golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= -golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= +golang.org/x/oauth2 v0.14.0 h1:P0Vrf/2538nmC0H+pEQ3MNFRRnVR7RlqyVw+bvm26z0= +golang.org/x/oauth2 v0.14.0/go.mod h1:lAtNWgaWfL4cm7j2OV8TxGi9Qb7ECORx8DktCY74OwM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -616,8 +616,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -634,8 +634,8 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY= -golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -687,8 +687,8 @@ google.golang.org/api v0.14.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsb google.golang.org/api v0.15.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI= google.golang.org/api v0.17.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= google.golang.org/api v0.18.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= -google.golang.org/api v0.151.0 h1:FhfXLO/NFdJIzQtCqjpysWwqKk8AzGWBUhMIx67cVDU= -google.golang.org/api v0.151.0/go.mod h1:ccy+MJ6nrYFgE3WgRx/AMXOxOmU8Q4hSa+jjibzhxcg= +google.golang.org/api v0.152.0 h1:t0r1vPnfMc260S2Ci+en7kfCZaLOPs5KI0sVV/6jZrY= +google.golang.org/api v0.152.0/go.mod h1:3qNJX5eOmhiWYc67jRA/3GsDw97UFb5ivv7Y2PrriAY= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -714,12 +714,12 @@ google.golang.org/genproto v0.0.0-20200204135345-fa8e72b47b90/go.mod h1:GmwEX6Z4 google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA= -google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI= -google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b h1:CIC2YMXmIhYw6evmhPxBKJ4fmLbOFtXQN/GV3XOZR8k= -google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:IBQ646DjkDkvUIsVq/cc03FUFQ9wbZu7yE396YcL870= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= +google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ= +google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= +google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 h1:JpwMPBpFN3uKhdaekDpiNlImDdkUAyiJ6ez/uxGaUSo= +google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:0xJLfVdJqpAPl8tDg1ujOCGzx6LFLttXT5NhllGOXY4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 7b43ba78f054..ca701979497a 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -35,6 +35,7 @@ import ( "cloud.google.com/go/storage" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" @@ -442,7 +443,17 @@ func getContainerImage(ctx context.Context) string { if *image != "" { return *image } - return jobopts.GetEnvironmentConfig(ctx) + envConfig := jobopts.GetEnvironmentConfig(ctx) + if envConfig == core.DefaultDockerImage { + // It's possible the user set the image exactly manually, but unlikely. + // Prefer using the gcr.io image by default. + // Note: This doesn't change the dev experience, which requires a user + // to have a dev image. + // However, RC versions should automatically be picked up, since + // they are never tagged the RC number, just the main version. + return "gcr.io/cloud-dataflow/v1beta3/beam_go_sdk:" + core.SdkVersion + } + return envConfig } panic(fmt.Sprintf("Unsupported environment %v", urn)) } diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 663695f00c8e..2e0f28f263d6 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -22,6 +22,7 @@ import ( "sort" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core" "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow/dataflowlib" @@ -313,6 +314,15 @@ func TestGetJobOptions_DockerNoImage(t *testing.T) { } } +func TestGetJobOptions_DockerGCROverride(t *testing.T) { + resetGlobals() + *jobopts.EnvironmentType = "docker" + + if got, want := getContainerImage(context.Background()), "gcr.io/cloud-dataflow/v1beta3/beam_go_sdk:"+core.SdkVersion; got != want { + t.Fatalf("getContainerImage() = %q, want %q", got, want) + } +} + func TestGetJobOptions_TransformMapping(t *testing.T) { resetGlobals() *stagingLocation = "gs://testStagingLocation" diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py index 0db10718295b..261b480b1083 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py @@ -57,6 +57,7 @@ def check_mltransform_compute_and_apply_vocabulary_with_scalar(): Row(x=array([4])) Row(x=array([1])) Row(x=array([0])) +Row(x=array([0])) Row(x=array([2])) Row(x=array([3])) [END mltransform_compute_and_apply_vocabulary_with_scalar] '''.splitlines( diff --git a/sdks/python/apache_beam/ml/transforms/handlers.py b/sdks/python/apache_beam/ml/transforms/handlers.py index 8695d5146efa..e7d4f52ded85 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers.py +++ b/sdks/python/apache_beam/ml/transforms/handlers.py @@ -17,9 +17,9 @@ # pytype: skip-file import collections -import hashlib import os import typing +import uuid from typing import Dict from typing import List from typing import Optional @@ -49,6 +49,8 @@ 'TFTProcessHandler', ] +_ID_COLUMN = 'tmp_uuid' # Name for a temporary column. + RAW_DATA_METADATA_DIR = 'raw_data_metadata' SCHEMA_FILE = 'schema.pbtxt' # tensorflow transform doesn't support the types other than tf.int64, @@ -80,12 +82,12 @@ tft_process_handler_output_type = typing.Union[beam.Row, Dict[str, np.ndarray]] -class ConvertScalarValuesToListValues(beam.DoFn): +class _ConvertScalarValuesToListValues(beam.DoFn): def process( self, element, ): - hash_key, element = element + id, element = element new_dict = {} for key, value in element.items(): if isinstance(value, @@ -93,10 +95,10 @@ def process( new_dict[key] = [value] else: new_dict[key] = value - yield (hash_key, new_dict) + yield (id, new_dict) -class ConvertNamedTupleToDict( +class _ConvertNamedTupleToDict( beam.PTransform[beam.PCollection[typing.Union[beam.Row, typing.NamedTuple]], beam.PCollection[Dict[str, common_types.InstanceDictType]]]): @@ -121,76 +123,75 @@ def expand( return pcoll | beam.Map(lambda x: x._asdict()) -class ComputeAndAttachHashKey(beam.DoFn): +class _ComputeAndAttachUniqueID(beam.DoFn): """ - Computes and attaches a hash key to the element. - Only for internal use. No backwards compatibility guarantees. + Computes and attaches a unique id to each element in the PCollection. """ def process(self, element): - hash_object = hashlib.sha256() - for _, value in element.items(): - # handle the case where value is a list or numpy array - if isinstance(value, (list, np.ndarray)): - hash_object.update(str(list(value)).encode()) - else: # assume value is a primitive that can be turned into str - hash_object.update(str(value).encode()) - yield (hash_object.hexdigest(), element) + # UUID1 includes machine-specific bits and has a counter. As long as not too + # many are generated at the same time, they should be unique. + # UUID4 generation should be unique in practice as long as underlying random + # number generation is not compromised. + # A combintation of both should avoid the anecdotal pitfalls where + # replacing one with the other has helped some users. + # UUID collision will result in data loss, but we can detect that and fail. + + # TODO(https://github.com/apache/beam/issues/29593): Evaluate MLTransform + # implementation without CoGBK. + unique_key = uuid.uuid1().bytes + uuid.uuid4().bytes + yield (unique_key, element) -class GetMissingColumnsPColl(beam.DoFn): +class _GetMissingColumns(beam.DoFn): """ Returns data containing only the columns that are not present in the schema. This is needed since TFT only outputs columns that are transformed by any of the data processing transforms. - - Only for internal use. No backwards compatibility guarantees. """ def __init__(self, existing_columns): self.existing_columns = existing_columns def process(self, element): - new_dict = {} - hash_key, element = element - for key, value in element.items(): - if key not in self.existing_columns: - new_dict[key] = value - yield (hash_key, new_dict) + id, row_dict = element + new_dict = { + k: v + for k, v in row_dict.items() if k not in self.existing_columns + } + yield (id, new_dict) -class MakeHashKeyAsColumn(beam.DoFn): +class _MakeIdAsColumn(beam.DoFn): """ - Extracts the hash key from the element and adds it as a column. - - Only for internal use. No backwards compatibility guarantees. + Extracts the id from the element and adds it as a column instead. """ def process(self, element): - hash_key, element = element - element['hash_key'] = hash_key + id, element = element + element[_ID_COLUMN] = id yield element -class ExtractHashAndKeyPColl(beam.DoFn): +class _ExtractIdAndKeyPColl(beam.DoFn): """ - Extracts the hash key and return hashkey and element as a tuple. - - Only for internal use. No backwards compatibility guarantees. + Extracts the id and return id and element as a tuple. """ def process(self, element): - hashkey = element['hash_key'][0] - del element['hash_key'] - yield (hashkey.decode('utf-8'), element) + id = element[_ID_COLUMN][0] + del element[_ID_COLUMN] + yield (id, element) -class MergeDicts(beam.DoFn): +class _MergeDicts(beam.DoFn): """ - Merges the dictionaries in the PCollection. - - Only for internal use. No backwards compatibility guarantees. + Merges processed and unprocessed columns from CoGBK result into a single row. """ def process(self, element): - _, element = element + unused_row_id, row_dicts_tuple = element new_dict = {} - for d in element: + for d in row_dicts_tuple: + # After CoGBK, dicts with processed and unprocessed portions of each row + # are wrapped in 1-element lists, since all rows have a unique id. + # Assertion could fail due to UUID collision. + assert len(d) == 1, f"Expected 1 element, got: {len(d)}." new_dict.update(d[0]) yield new_dict @@ -323,7 +324,7 @@ def _get_raw_data_feature_spec_per_column( def get_raw_data_metadata( self, input_types: Dict[str, type]) -> dataset_metadata.DatasetMetadata: raw_data_feature_spec = self.get_raw_data_feature_spec(input_types) - raw_data_feature_spec['hash_key'] = tf.io.VarLenFeature(dtype=tf.string) + raw_data_feature_spec[_ID_COLUMN] = tf.io.VarLenFeature(dtype=tf.string) return self.convert_raw_data_feature_spec_to_dataset_metadata( raw_data_feature_spec) @@ -417,14 +418,14 @@ def process_data( # convert Row or NamedTuple to Dict raw_data = ( raw_data - | ConvertNamedTupleToDict().with_output_types( + | _ConvertNamedTupleToDict().with_output_types( Dict[str, typing.Union[tuple(column_type_mapping.values())]])) # type: ignore # AnalyzeAndTransformDataset raise type hint since this is # schema'd PCollection and the current output type would be a # custom type(NamedTuple) or a beam.Row type. else: column_type_mapping = self._map_column_names_to_types_from_transforms() - # Add hash key so TFT can output hash_key as output but as a no-op. + # Add id so TFT can output id as output but as a no-op. raw_data_metadata = self.get_raw_data_metadata( input_types=column_type_mapping) # Write untransformed metadata to a file so that it can be re-used @@ -445,21 +446,21 @@ def process_data( raw_data_metadata = metadata_io.read_metadata( os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR)) - keyed_raw_data = (raw_data | beam.ParDo(ComputeAndAttachHashKey())) + keyed_raw_data = (raw_data | beam.ParDo(_ComputeAndAttachUniqueID())) feature_set = [feature.name for feature in raw_data_metadata.schema.feature] - columns_not_in_schema_with_hash = ( + keyed_columns_not_in_schema = ( keyed_raw_data - | beam.ParDo(GetMissingColumnsPColl(feature_set))) + | beam.ParDo(_GetMissingColumns(feature_set))) # To maintain consistency by outputting numpy array all the time, # whether a scalar value or list or np array is passed as input, # we will convert scalar values to list values and TFT will ouput # numpy array all the time. keyed_raw_data = keyed_raw_data | beam.ParDo( - ConvertScalarValuesToListValues()) + _ConvertScalarValuesToListValues()) - raw_data_list = (keyed_raw_data | beam.ParDo(MakeHashKeyAsColumn())) + raw_data_list = (keyed_raw_data | beam.ParDo(_MakeIdAsColumn())) with tft_beam.Context(temp_dir=self.artifact_location): data = (raw_data_list, raw_data_metadata) @@ -467,7 +468,7 @@ def process_data( transform_fn = ( data | "AnalyzeDataset" >> tft_beam.AnalyzeDataset(self.process_data_fn)) - # TODO: Remove the 'hash_key' column from the transformed + # TODO: Remove the 'id' column from the transformed # dataset schema generated by TFT. self.write_transform_artifacts(transform_fn, self.artifact_location) else: @@ -490,7 +491,7 @@ def process_data( # So we will use a RowTypeConstraint to create a schema'd PCollection. # this is needed since new columns are included in the # transformed_dataset. - del self.transformed_schema['hash_key'] + del self.transformed_schema[_ID_COLUMN] row_type = RowTypeConstraint.from_fields( list(self.transformed_schema.items())) @@ -498,17 +499,17 @@ def process_data( # is not transformed by any of the transforms, then the output will # not have that column. So we will join the missing columns from the # raw_data to the transformed_dataset. - transformed_dataset = ( - transformed_dataset | beam.ParDo(ExtractHashAndKeyPColl())) + keyed_transformed_dataset = ( + transformed_dataset | beam.ParDo(_ExtractIdAndKeyPColl())) # The grouping is needed here since tensorflow transform only outputs # columns that are transformed by any of the transforms. So we will # join the missing columns from the raw_data to the transformed_dataset - # using the hash key. + # using the id. transformed_dataset = ( - (transformed_dataset, columns_not_in_schema_with_hash) + (keyed_transformed_dataset, keyed_columns_not_in_schema) | beam.CoGroupByKey() - | beam.ParDo(MergeDicts())) + | beam.ParDo(_MergeDicts())) # The schema only contains the columns that are transformed. transformed_dataset = ( diff --git a/sdks/python/apache_beam/ml/transforms/handlers_test.py b/sdks/python/apache_beam/ml/transforms/handlers_test.py index 327c8c76c0e9..d67d8ec3e705 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers_test.py +++ b/sdks/python/apache_beam/ml/transforms/handlers_test.py @@ -569,6 +569,52 @@ def test_consume_mode_with_extra_columns_in_the_input(self): equal_to(expected_test_data_z, equals_fn=np.array_equal), label='unused column: z') + def test_handler_with_same_input_elements(self): + with beam.Pipeline() as p: + data = [ + { + 'x': 'I' + }, + { + 'x': 'love' + }, + { + 'x': 'Beam' + }, + { + 'x': 'Beam' + }, + { + 'x': 'is' + }, + { + 'x': 'awesome' + }, + ] + raw_data = (p | beam.Create(data)) + process_handler = handlers.TFTProcessHandler( + transforms=[tft.ComputeAndApplyVocabulary(columns=['x'])], + artifact_location=self.artifact_location, + ) + transformed_data = process_handler.process_data(raw_data) + + expected_data = [ + beam.Row(x=np.array([4])), + beam.Row(x=np.array([1])), + beam.Row(x=np.array([0])), + beam.Row(x=np.array([0])), + beam.Row(x=np.array([2])), + beam.Row(x=np.array([3])), + ] + + expected_data_x = [row.x for row in expected_data] + actual_data_x = transformed_data | beam.Map(lambda x: x.x) + + assert_that( + actual_data_x, + equal_to(expected_data_x, equals_fn=np.array_equal), + label='transformed data') + if __name__ == '__main__': unittest.main()