Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
4 changes: 1 addition & 3 deletions .github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
github.event_name == 'workflow_dispatch' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam')
runs-on: ubuntu-latest
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 100
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
Expand All @@ -70,8 +70,6 @@ jobs:
comment_phrase: ${{ matrix.job_phrase }} ${{ matrix.test_set }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} ${{ matrix.test_set }} (${{ matrix.job_phrase }} ${{ matrix.test_set }})
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@v1.3.0
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
Expand Down
91 changes: 46 additions & 45 deletions sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# limitations under the License.
#

fixtures:
- name: TEMP_BOOTSTAP_SERVER
type: "apache_beam.yaml.integration_tests.temp_kafka_server"
# fixtures:
# - name: TEMP_BOOTSTAP_SERVER
# type: "apache_beam.yaml.integration_tests.temp_kafka_server"

pipelines:
# Kafka write pipeline
Expand All @@ -30,51 +30,52 @@ pipelines:
- {value: 123}
- {value: 456}
- {value: 789}
- type: MapToFields
config:
language: python
fields:
value:
callable: |
lambda row: str(row.value).encode('utf-8')
output_type: bytes
- type: WriteToKafka
config:
format: "RAW"
topic: "silly_topic"
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
producer_config_updates:
linger.ms: "0"
# TODO(#35272) - fix kafka test
# - type: MapToFields
# config:
# language: python
# fields:
# value:
# callable: |
# lambda row: str(row.value).encode('utf-8')
# output_type: bytes
# - type: WriteToKafka
# config:
# format: "RAW"
# topic: "silly_topic"
# bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
# producer_config_updates:
# linger.ms: "0"

# Kafka read pipeline
# Need a separate read pipeline to make sure the write pipeline is flushed
- pipeline:
type: chain
transforms:
- type: ReadFromKafka
config:
format: "RAW"
topic: "silly_topic"
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
consumer_config:
auto.offset.reset: "earliest"
group.id: "yaml-kafka-test-group"
max_read_time_seconds: 10 # will read forever if not set
- type: MapToFields
config:
language: python
fields:
value:
callable: |
# Kafka RAW format reads messages as bytes in the 'payload' field of a Row
lambda row: row.payload.decode('utf-8')
output_type: string
- type: AssertEqual
config:
elements:
- {value: "123"}
- {value: "456"}
- {value: "789"}
# - pipeline:
# type: chain
# transforms:
# - type: ReadFromKafka
# config:
# format: "RAW"
# topic: "silly_topic"
# bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
# consumer_config:
# auto.offset.reset: "earliest"
# group.id: "yaml-kafka-test-group"
# max_read_time_seconds: 10 # will read forever if not set
# - type: MapToFields
# config:
# language: python
# fields:
# value:
# callable: |
# # Kafka RAW format reads messages as bytes in the 'payload' field of a Row
# lambda row: row.payload.decode('utf-8')
# output_type: string
# - type: AssertEqual
# config:
# elements:
# - {value: "123"}
# - {value: "456"}
# - {value: "789"}

# TODO: Error handling hard to trigger upon initial investigations. Need to
# investigate more.
Loading