Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
148 commits
Select commit Hold shift + click to select a range
1f52da7
Add support for streaming writes in IOBase (Python)
razvanculea Jun 3, 2025
d00dbc2
Merge branch 'apache:master' into iobase-streaming
razvanculea Jun 3, 2025
fe9eab2
add triggering_frequency in iobase.Sink
razvanculea Jun 3, 2025
e4ff191
Merge branch 'apache:master' into iobase-streaming
razvanculea Jun 3, 2025
db83273
Merge branch 'iobase-streaming' of https://github.com/razvanculea/bea…
razvanculea Jun 3, 2025
9cc97c6
fix whitespaces/newlines
razvanculea Jun 3, 2025
6d1aa55
fixes per https://github.com/apache/beam/pull/35137#pullrequestreview…
razvanculea Jun 10, 2025
4399005
implement pre_finalize_windowed and finalize_windowed_write in FileBa…
razvanculea Jun 12, 2025
8de75c0
update changes.md
razvanculea Jun 12, 2025
b04e1ec
fix formatting
razvanculea Jun 12, 2025
4c6dbc7
space formatting
razvanculea Jun 12, 2025
42e2cbc
space nagging
razvanculea Jun 12, 2025
339d3cf
keep in sync with refactor in https://github.com/apache/beam/pull/35137
razvanculea Jun 16, 2025
c08c643
keep in sync with iobase changes in https://github.com/apache/beam/pu…
razvanculea Jun 17, 2025
4695f1c
Merge branch 'master' into iobase_streaming_textio
razvanculea Jun 17, 2025
193dbe2
[Website] add akvelon case study (#34943)
bullet03 Jun 2, 2025
f64732c
added the rate test for GenerateSequence (#35108)
liferoad Jun 2, 2025
96cf6d2
Re-enable logging after importing vllm. (#35103)
claudevdm Jun 2, 2025
c8adaeb
Deprecate Java 8 (#35064)
Abacn Jun 2, 2025
7d30cc2
Remove beam college banners (#35123)
damccorm Jun 2, 2025
6dbc531
feat: change text (#35130)
bullet03 Jun 3, 2025
943ebf9
Update Custom Remote Inference example to use RemoteModelHandler (#35…
jrmccluskey Jun 3, 2025
3da6da8
Remove Java 8 container (#35125)
Abacn Jun 3, 2025
e755d48
add extra_transforms block documentation to chain transform documenta…
derrickaw Jun 3, 2025
0dcc0d4
add note about testing (#35075)
derrickaw Jun 3, 2025
82b134f
fix whitespaces/newlines
razvanculea Jun 3, 2025
7089387
fixes per https://github.com/apache/beam/pull/35137#pullrequestreview…
razvanculea Jun 10, 2025
e6fbcb0
implement pre_finalize_windowed and finalize_windowed_write in FileBa…
razvanculea Jun 12, 2025
ebfd9f1
update changes.md
razvanculea Jun 12, 2025
66f67a1
fix formatting
razvanculea Jun 12, 2025
87d0fac
space formatting
razvanculea Jun 12, 2025
6c2da61
space nagging
razvanculea Jun 12, 2025
ad70d66
keep in sync with refactor in https://github.com/apache/beam/pull/35137
razvanculea Jun 16, 2025
f5796ca
keep in sync with iobase changes in https://github.com/apache/beam/pu…
razvanculea Jun 17, 2025
dfc6cfc
[Website] update akvelon case study: update text and fix landing pag…
bullet03 Jun 3, 2025
b888c89
Fix PostCommit Python Xlang IO Dataflow job (#35131)
Amar3tto Jun 3, 2025
0ef6c73
Bump google.golang.org/grpc from 1.72.0 to 1.72.2 in /sdks (#35113)
dependabot[bot] Jun 3, 2025
3cf1858
Bump cloud.google.com/go/bigquery from 1.67.0 to 1.69.0 in /sdks (#35…
dependabot[bot] Jun 3, 2025
b87416e
Add known issues. (#35138)
claudevdm Jun 3, 2025
dc9c62b
Bump @octokit/plugin-paginate-rest and @octokit/rest (#34167)
dependabot[bot] Jun 3, 2025
d74e3bc
Explicitly handle singleton iterators instead of using helper and cat…
scwhittle Jun 3, 2025
300e02b
Build last snapshot against RC00 tag instead of release branch (#35142)
Abacn Jun 3, 2025
82765fe
Bump nodemailer from 6.7.5 to 6.9.9 in /scripts/ci/issue-report (#35143)
dependabot[bot] Jun 3, 2025
522b8bf
Fix tests affected by Java 8 container turn down (#35145)
Abacn Jun 3, 2025
2ef7eec
Bump github.com/aws/aws-sdk-go-v2/service/s3 in /sdks (#35146)
dependabot[bot] Jun 4, 2025
e5e02c0
fix jdbc transform validation (#35141)
stankiewicz Jun 4, 2025
f7f6608
Fix Java Example ARM PostCommit
Abacn Jun 4, 2025
e440709
fix: add missed word (#35163)
bullet03 Jun 5, 2025
3619ca2
Add postcommit yaml xlang workflow and split tests accordingly (#35119)
derrickaw Jun 5, 2025
f6faa68
Replace usages of deprecated pkg_resources package (#35153)
jrmccluskey Jun 5, 2025
d083f15
Improve error message when accidentally using PBegin/Pipeline (#35156)
hjtran Jun 5, 2025
9f485b8
add friendly error message for when transform is applied to no output…
hjtran Jun 5, 2025
e283b4b
Add warning if temp location bucket has soft delete enabled for Go SD…
TanuSharma2511 Jun 5, 2025
96cc9c9
Constrain DequeCoder type correctly, as it does not support nulls
kennknowles May 29, 2025
e2a63af
Do not overwrite class states if a cached dynamic class is returned i…
baeminbo Jun 5, 2025
0dcc4fc
Make SDK harness change effective on Iceberg Dataflow test (#35173)
Abacn Jun 5, 2025
a926d97
Fix beam_PostCommit_Java_Examples_Dataflow_V2 (#35172)
Abacn Jun 5, 2025
9e9f102
[YAML]: Update postgres IT test and readme (#35169)
derrickaw Jun 5, 2025
a9e11d7
Bump Java beam-master container (#35170)
Abacn Jun 5, 2025
d11ff01
Make WindowedValue a public interface
kennknowles Jun 3, 2025
6f424bf
Run integration tests for moving WindowedValue and making public
kennknowles Jun 5, 2025
85d6de3
Add timer tests to make sure event-time timer firing at the right tim…
shunping Jun 6, 2025
d8fa868
change to ubuntu-20.04 (#35182)
derrickaw Jun 6, 2025
c9bcec8
Fix IntelliJ sync project failure due to circular Beam dependency (#3…
Abacn Jun 6, 2025
d14eb7b
Update workflows categories (#35162)
Amar3tto Jun 6, 2025
918472f
Add cloudpickle coder. (#35166)
claudevdm Jun 6, 2025
94b9032
Move examples from sql package (#35183)
Amar3tto Jun 6, 2025
344a59b
Fix the beam interactive install problem when on Google Colab (#35148)
Chenzo1001 Jun 6, 2025
6ffe4cf
Bump github.com/docker/docker in /sdks (#35112)
dependabot[bot] Jun 6, 2025
d5353e8
Bump github.com/nats-io/nats-server/v2 from 2.11.3 to 2.11.4 in /sdks…
dependabot[bot] Jun 6, 2025
67cdf59
Touch JPMS test trigger file
kennknowles Jun 6, 2025
1d17368
Use staged SDK harness & Dataflow worker jar in JPMS tests
kennknowles Jun 6, 2025
1bfabc7
Bump cloud.google.com/go/storage from 1.52.0 to 1.55.0 in /sdks (#35114)
dependabot[bot] Jun 6, 2025
5bfa884
Bump github.com/nats-io/nats.go from 1.42.0 to 1.43.0 in /sdks (#35147)
dependabot[bot] Jun 6, 2025
ed204f4
Move non-dill specific code out of dill_pickler.py (#35139)
kristynsmith Jun 6, 2025
1088a3a
Fix some lints introduced in a recent PR. (#35193)
shunping Jun 6, 2025
379c15d
small filesystem fixes (#34956)
hjtran Jun 6, 2025
5a02cde
[YAML] Add a spec provider for transforms taking specifiable argument…
shunping Jun 7, 2025
a2fe66b
Touch trigger files to test WindowedValueReceiver in runners
kennknowles Jun 6, 2025
c4da6c6
Introduce WindowedValue receivers and consolidate runner code to them
kennknowles Jun 3, 2025
d08a063
Eliminate nullness errors from ByteBuddyDoFnInvokerFactory and DoFnOu…
kennknowles May 21, 2025
e8f6f05
Fix null check when fetching driverJars from value provider
Abacn Jun 6, 2025
ae65ea9
Fix PostCommit Python ValidatesRunner Samza / Spark jobs (#35210)
Amar3tto Jun 9, 2025
d0c3af6
Update pypi documentation 30145 (#34329)
rakeshcusat Jun 9, 2025
a1381ad
Add more YAML examples involving Kafka and Iceberg (#35151)
charlespnh Jun 9, 2025
25ebd66
Evaluate namedTuples as equivalent to rows (#35218)
damccorm Jun 10, 2025
90f328c
Add a new experiment flag to enable real-time clock as processing tim…
shunping Jun 10, 2025
1beaaac
Touch trigger files for lightweight runners
kennknowles Jun 9, 2025
ab9e2af
Eliminate WindowedValue.getPane() in preparation for making it a user…
kennknowles Jun 9, 2025
6aa4fa3
Do not fail if there were failed deletes (#35222)
Amar3tto Jun 10, 2025
d80cd9a
Force FnApiRunner in cases where prism can't handle use case (#35219)
damccorm Jun 10, 2025
5fc0402
Bump golang.org/x/net from 0.40.0 to 0.41.0 in /sdks (#35206)
dependabot[bot] Jun 10, 2025
67b9c6e
Fix incorrect typehints generated by FlatMap with default identity fu…
hjtran Jun 10, 2025
f3a4384
Parse values returned from Dataflow API to BoundedTrieData (#34738)
Abacn Jun 10, 2025
f69b82d
Remove breaking PDone change (#35224)
damccorm Jun 11, 2025
63aedcf
Generic Postgres + Cloudsql postgres embeddings. (#35215)
claudevdm Jun 11, 2025
9776f5d
Allow only one thread at a time to start the VLLM server. (#35234)
tvalentyn Jun 11, 2025
8282e0c
[IcebergIO] Create namespaces if needed (#35228)
ahmedabu98 Jun 11, 2025
8d17c6a
Support configuring flush_count and max_row_bytes of WriteToBigTable …
nguymin4 Jun 11, 2025
e8fe493
Update CHANGES.md (#35242)
tvalentyn Jun 11, 2025
88a6270
[Beam SQL] Implement Catalog and CatalogManager (#35223)
ahmedabu98 Jun 11, 2025
d51b27d
[IcebergIO] Fix conversion logic for arrays of structs and maps of st…
ahmedabu98 Jun 11, 2025
b635698
fix long_description when the md file cannot be found (#35246)
liferoad Jun 11, 2025
d435782
[IcebergIO] Create tables with a partition spec (#34966)
ahmedabu98 Jun 11, 2025
a4b3a56
Fix typo java-dependencies.md (#35251)
baeminbo Jun 12, 2025
2ec2fb8
Adding project and database support in write transform for firestoreI…
TanuSharma2511 Jun 12, 2025
5f46c87
Fix a logical type issue about JdbcDateType and JdbcTimeType (#35243)
shunping Jun 12, 2025
5e95675
Remove internal code. (#35239)
claudevdm Jun 12, 2025
aff00db
enable setting max_writer_per_bundle for avroIO and other IO (#35092)
TanuSharma2511 Jun 12, 2025
5b79048
[Java FnApi] Fix race in BeamFnStatusClient by initializing all field…
scwhittle Jun 12, 2025
7bd7f35
[IcebergIO] Integrate with Beam SQL (#34799)
ahmedabu98 Jun 12, 2025
d6b3182
update changes (#35256)
ahmedabu98 Jun 12, 2025
6a12c4e
[YAML]: fix postcommit oracle bug and reorganize postcommit tests (#3…
derrickaw Jun 12, 2025
53feaf8
update hadoop prefix (#35257)
ahmedabu98 Jun 12, 2025
ac15c12
Set go version 1.24.4 (#35261)
Amar3tto Jun 12, 2025
60d4854
upgrade org.apache.parquet:parquet-avro to to 1.15.2 (#35037)
liferoad Jun 12, 2025
1fe3aee
Bump google.golang.org/grpc from 1.72.2 to 1.73.0 in /sdks (#35236)
dependabot[bot] Jun 12, 2025
22a4799
add more doc strings for integration tests (#35171)
derrickaw Jun 12, 2025
bfc2244
[IcebergIO] extend table partitioning to Beam SQL (#35268)
ahmedabu98 Jun 13, 2025
59f8a84
Moving to 2.67.0-SNAPSHOT on master branch.
Jun 13, 2025
9715848
Update CHANGES.md to have fields for 2.67.0 release
Amar3tto Jun 13, 2025
b9e6a48
add free disk space step (#35260)
derrickaw Jun 13, 2025
f8bc1b7
[yaml]: Fix post commit yaml once more (#35273)
derrickaw Jun 13, 2025
3fc5653
Polish anomaly detection notebook and get ready to be imported in pub…
shunping Jun 13, 2025
7dad0f0
Suppress Findbugs (#35274)
Abacn Jun 13, 2025
422f214
Support Java 17+ compiled Beam components for build, release, and xla…
Abacn Jun 14, 2025
8c9c3e7
[IO] Update Debezium in DebeziumIO to 3.1.1 (#34763)
tkaymak Jun 14, 2025
bfe510f
Document BQ Storage API pipeline options (#35259)
VeronicaWasson Jun 16, 2025
827caf6
Speed up StreamingDataflowWorkerTest by removing 10 second wait from …
scwhittle Jun 16, 2025
d62a636
Bump org.ajoberstar.grgit:grgit-gradle from 4.1.1 to 5.3.2 (#35301)
dependabot[bot] Jun 16, 2025
337156e
Bump google.golang.org/api from 0.235.0 to 0.237.0 in /sdks (#35302)
dependabot[bot] Jun 16, 2025
86a0947
Bump github.com/linkedin/goavro/v2 from 2.13.1 to 2.14.0 in /sdks (#3…
dependabot[bot] Jun 16, 2025
e3062b0
Update spotbugs version, fix runner ubuntu version, fix found spotbug…
Amar3tto Jun 16, 2025
8e13061
feat: Add option to control resource cleanup failure for IT (#35287)
liferoad Jun 16, 2025
efb5831
Revert "Bump org.ajoberstar.grgit:grgit-gradle from 4.1.1 to 5.3.2 (#…
damccorm Jun 16, 2025
f840882
Add PeriodicStream in the new time series folder. (#35300)
shunping Jun 16, 2025
19b4de1
try buildah to replace kaniko (#35289)
liferoad Jun 16, 2025
cb80c16
Adding error handler for SpannerReadSchemaTransformProvider and missi…
TanuSharma2511 Jun 16, 2025
029b886
requests vulnerability. (#35308)
claudevdm Jun 16, 2025
f8cb258
[IcebergIO] create custom java container image for tests (#35307)
ahmedabu98 Jun 16, 2025
738abd5
add streaming support to iobase (python) (#35137)
razvanculea Jun 17, 2025
4120eab
Merge branch 'iobase_streaming_textio' of https://github.com/razvancu…
razvanculea Jun 17, 2025
fa8e102
add streaming to AvroIO, ParquetIO, TFRecordsIO
razvanculea Jun 24, 2025
8d063fe
reformat
razvanculea Jun 24, 2025
d9bb9e0
typo and spaces
razvanculea Jun 24, 2025
77f955d
Merge branch 'master' into iobase_streaming_others
razvanculea Jul 31, 2025
dd013d1
carry on the refactor from #35253
razvanculea Jul 31, 2025
6680874
Merge branch 'master' into iobase_streaming_others
razvanculea Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) ([#34747](https://github.com/apache/beam/issues/34747)).
* Add support for streaming writes in IOBase (Python)
* Add IT test for streaming writes for IOBase (Python)
* Implement support for streaming writes in FileBasedSink (Python)
* Expose support for streaming writes in AvroIO (Python)
* Expose support for streaming writes in ParquetIO (Python)
* Expose support for streaming writes in TextIO (Python)
* Expose support for streaming writes in TFRecordsIO (Python)

## New Features / Improvements

Expand Down
60 changes: 44 additions & 16 deletions sdks/python/apache_beam/io/avroio.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ def split_points_unclaimed(stop_position):
while range_tracker.try_claim(next_block_start):
block = next(blocks)
next_block_start = block.offset + block.size
for record in block:
yield record
yield from block


_create_avro_source = _FastAvroSource
Expand All @@ -375,7 +374,8 @@ def __init__(
num_shards=0,
shard_name_template=None,
mime_type='application/x-avro',
use_fastavro=True):
use_fastavro=True,
triggering_frequency=None):
"""Initialize a WriteToAvro transform.

Args:
Expand All @@ -393,25 +393,38 @@ def __init__(
Constraining the number of shards is likely to reduce
the performance of a pipeline. Setting this value is not recommended
unless you require a specific number of output files.
In streaming if not set, the service will write a file per bundle.
shard_name_template: A template string containing placeholders for
the shard number and shard count. When constructing a filename for a
particular shard number, the upper-case letters 'S' and 'N' are
replaced with the 0-padded shard number and shard count respectively.
This argument can be '' in which case it behaves as if num_shards was
set to 1 and only one file will be generated. The default pattern used
is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template.
the shard number and shard count. Currently only ``''``,
``'-SSSSS-of-NNNNN'``, ``'-W-SSSSS-of-NNNNN'`` and
``'-V-SSSSS-of-NNNNN'`` are patterns accepted by the service.
When constructing a filename for a particular shard number, the
upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
shard number and shard count respectively. This argument can be ``''``
in which case it behaves as if num_shards was set to 1 and only one file
will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'`` for
bounded PCollections and for ``'-W-SSSSS-of-NNNNN'`` unbounded
PCollections.
W is used for windowed shard naming and is replaced with
``[window.start, window.end)``
V is used for windowed shard naming and is replaced with
``[window.start.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S"),
window.end.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S")``
mime_type: The MIME type to use for the produced files, if the filesystem
supports specifying MIME types.
use_fastavro (bool): This flag is left for API backwards compatibility
and no longer has an effect. Do not use.
triggering_frequency: (int) Every triggering_frequency duration, a window
will be triggered and all bundles in the window will be written.
If set it overrides user windowing. Mandatory for GlobalWindow.

Returns:
A WriteToAvro transform usable for writing.
"""
self._schema = schema
self._sink_provider = lambda avro_schema: _create_avro_sink(
file_path_prefix, avro_schema, codec, file_name_suffix, num_shards,
shard_name_template, mime_type)
shard_name_template, mime_type, triggering_frequency)

def expand(self, pcoll):
if self._schema:
Expand All @@ -428,6 +441,15 @@ def expand(self, pcoll):
records = pcoll | beam.Map(
beam_row_to_avro_dict(avro_schema, beam_schema))
self._sink = self._sink_provider(avro_schema)
if (not pcoll.is_bounded and self._sink.shard_name_template
== filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE):
self._sink.shard_name_template = (
filebasedsink.DEFAULT_WINDOW_SHARD_NAME_TEMPLATE)
self._sink.shard_name_format = self._sink._template_to_format(
self._sink.shard_name_template)
self._sink.shard_name_glob_format = self._sink._template_to_glob_format(
self._sink.shard_name_template)

return records | beam.io.iobase.Write(self._sink)

def display_data(self):
Expand All @@ -441,7 +463,8 @@ def _create_avro_sink(
file_name_suffix,
num_shards,
shard_name_template,
mime_type):
mime_type,
triggering_frequency=60):
if "class 'avro.schema" in str(type(schema)):
raise ValueError(
'You are using Avro IO with fastavro (default with Beam on '
Expand All @@ -454,7 +477,8 @@ def _create_avro_sink(
file_name_suffix,
num_shards,
shard_name_template,
mime_type)
mime_type,
triggering_frequency)


class _BaseAvroSink(filebasedsink.FileBasedSink):
Expand All @@ -467,7 +491,8 @@ def __init__(
file_name_suffix,
num_shards,
shard_name_template,
mime_type):
mime_type,
triggering_frequency):
super().__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
Expand All @@ -477,7 +502,8 @@ def __init__(
mime_type=mime_type,
# Compression happens at the block level using the supplied codec, and
# not at the file level.
compression_type=CompressionTypes.UNCOMPRESSED)
compression_type=CompressionTypes.UNCOMPRESSED,
triggering_frequency=triggering_frequency)
self._schema = schema
self._codec = codec

Expand All @@ -498,15 +524,17 @@ def __init__(
file_name_suffix,
num_shards,
shard_name_template,
mime_type):
mime_type,
triggering_frequency):
super().__init__(
file_path_prefix,
schema,
codec,
file_name_suffix,
num_shards,
shard_name_template,
mime_type)
mime_type,
triggering_frequency)
self.file_handle = None

def open(self, temp_path):
Expand Down
Loading
Loading