Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(SchemaTransformProvider.class)
public class GenerateSequenceSchemaTransformProvider
extends TypedSchemaTransformProvider<GenerateSequenceConfiguration> {
private static final Logger LOG =
LoggerFactory.getLogger(GenerateSequenceSchemaTransformProvider.class);
public static final String OUTPUT_ROWS_TAG = "output";
public static final Schema OUTPUT_SCHEMA = Schema.builder().addInt64Field("value").build();

Expand Down Expand Up @@ -130,6 +134,18 @@ public static Builder builder() {
@Nullable
public abstract Rate getRate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Rate object already has the fields necessary (elements and seconds) to control elements per period. Can you check if this works?

If it doesn't work, I think we can just remove Rate and introduce them as top-level fields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. something like provider.GenerateSequence(start=0, rate={elements: 3, seconds: 1})

I don't remember if "rate" is provided as a dict or a beam.Row()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested. The dict works well. I also added the test test_run_generate_sequence_with_rate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the current Rate dict works then is there a need for this PR?

I don't think it makes sense to duplicate the API. Maybe we should keep the new rate test though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original idea is to create the similar syntax to

public GenerateSequence buildExternal(External.ExternalConfiguration config) {
for #35034. If you think we should just keep rate, I am also fine with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@SchemaFieldDescription(
"Number of elements to generate per period. Alternative to using the 'rate' object. "
+ "If set, 'period' must also be set. Takes precedence over 'rate'.")
@Nullable
public abstract Long getElementsPerPeriod();

@SchemaFieldDescription(
"The period in seconds for generating elements. Alternative to using the 'rate' object. "
+ "If set, 'elementsPerPeriod' must also be set. Takes precedence over 'rate'.")
@Nullable
public abstract Long getPeriod();

@AutoValue.Builder
public abstract static class Builder {

Expand All @@ -139,6 +155,10 @@ public abstract static class Builder {

public abstract Builder setRate(Rate rate);

public abstract Builder setElementsPerPeriod(Long elementsPerPeriod);

public abstract Builder setPeriod(Long period);

public abstract GenerateSequenceConfiguration build();
}

Expand All @@ -149,8 +169,33 @@ public void validate() {
if (end != null) {
checkArgument(end == -1 || end >= start, "Invalid range [%s, %s)", start, end);
}
Rate rate = this.getRate();
if (rate != null) {

Long elementsPerPeriod = getElementsPerPeriod();
Long period = getPeriod();
Rate rate = getRate();

if (elementsPerPeriod != null || period != null) {
// Ensure both are specified if one is.
if (elementsPerPeriod == null || period == null) {
throw new IllegalArgumentException(
"If either 'elementsPerPeriod' or 'period' is specified, both must be specified.");
}
// At this point, both elementsPerPeriod and period are guaranteed to be non-null.
checkArgument(
elementsPerPeriod > 0,
"Invalid 'elementsPerPeriod' specification. Expected positive value but received %s.",
elementsPerPeriod);
checkArgument(
period > 0,
"Invalid 'period' specification. Expected positive value but received %s.",
period);
if (rate != null) {
// Consider logging a warning if rate is also set, as it will be ignored.
// For now, we just prioritize elementsPerPeriod/period.
LOG.warn(
"Configuration includes both 'elementsPerPeriod'/'period' and 'rate'. 'rate' will be ignored.");
}
} else if (rate != null) {
checkArgument(
rate.getElements() > 0,
"Invalid rate specification. Expected positive elements component but received %s.",
Expand All @@ -159,6 +204,10 @@ public void validate() {
Optional.ofNullable(rate.getSeconds()).orElse(1L) > 0,
"Invalid rate specification. Expected positive seconds component but received %s.",
rate.getSeconds());
// Ensure seconds is present if elements is, to match the original issue's concern
checkArgument(
!(rate.getElements() != null && rate.getSeconds() == null),
"Invalid rate specification. If rate.elements is specified, rate.seconds must also be specified.");
}
}
}
Expand All @@ -177,14 +226,25 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
input.getAll().isEmpty(), "Expected no inputs but got: %s", input.getAll().keySet());

Long end = Optional.ofNullable(configuration.getEnd()).orElse(-1L);
GenerateSequenceConfiguration.Rate rate = configuration.getRate();

GenerateSequence sequence = GenerateSequence.from(configuration.getStart()).to(end);
if (rate != null) {
sequence =
sequence.withRate(
rate.getElements(),
Duration.standardSeconds(Optional.ofNullable(rate.getSeconds()).orElse(1L)));

Long elementsPerPeriod = configuration.getElementsPerPeriod();
Long period = configuration.getPeriod();

if (elementsPerPeriod != null && period != null) {
// elementsPerPeriod and period are validated to be non-null and positive by validate()
sequence = sequence.withRate(elementsPerPeriod, Duration.standardSeconds(period));
} else {
GenerateSequenceConfiguration.Rate rate = configuration.getRate();
if (rate != null) {
// rate.getElements() is validated to be positive.
// rate.getSeconds() is validated to be positive if present, defaults to 1L if null.
// The additional check in validate() ensures getSeconds() is present if getElements() is.
sequence =
sequence.withRate(
rate.getElements(),
Duration.standardSeconds(Optional.ofNullable(rate.getSeconds()).orElse(1L)));
}
}

return PCollectionRowTuple.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,34 @@ def test_run_generate_sequence(self):

assert_that(numbers, equal_to([i for i in range(10)]))

def test_run_generate_sequence_with_elements_per_period(self):
provider = ExternalTransformProvider(
BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))

# We expect this to produce 0, 1, 2, 3.
# The rate limiting (2 elements per 1 second) is primarily to ensure
# these parameters are accepted and the pipeline runs.
# Exact timing is hard to assert in an IT.
# The end parameter ensures the sequence is bounded for the test.
with beam.Pipeline() as p:
numbers = p | provider.GenerateSequence(
start=0, end=4, elements_per_period=2,
period=1) | beam.Map(lambda row: row.value)

assert_that(numbers, equal_to([0, 1, 2, 3]))

def test_run_generate_sequence_with_rate(self):
provider = ExternalTransformProvider(
BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))

with beam.Pipeline() as p:
numbers = p | provider.GenerateSequence(
start=0, end=3, rate={
'elements': 1, 'seconds': 1
}) | beam.Map(lambda row: row.value)

assert_that(numbers, equal_to([0, 1, 2]))


@pytest.mark.xlang_wrapper_generation
@unittest.skipUnless(
Expand Down
13 changes: 12 additions & 1 deletion sdks/standard_external_transforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# configuration in /sdks/standard_expansion_services.yaml.
# Refer to gen_xlang_wrappers.py for more info.
#
# Last updated on: 2025-04-24
# Last updated on: 2025-05-30

- default_service: sdks:java:io:expansion-service:shadowJar
description: 'Outputs a PCollection of Beam Rows, each containing a single INT64
Expand All @@ -34,11 +34,22 @@
destinations:
python: apache_beam/io
fields:
- description: Number of elements to generate per period. Alternative to using the
'rate' object. If set, 'period' must also be set. Takes precedence over 'rate'.
name: elements_per_period
nullable: true
type: int64
- description: The maximum number to generate (exclusive). Will be an unbounded
sequence if left unspecified.
name: end
nullable: true
type: int64
- description: The period in seconds for generating elements. Alternative to using
the 'rate' object. If set, 'elementsPerPeriod' must also be set. Takes precedence
over 'rate'.
name: period
nullable: true
type: int64
- description: Specifies the rate to generate a given number of elements per a given
number of seconds. Applicable only to unbounded sequences.
name: rate
Expand Down
Loading