Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828

Merged
merged 5 commits into from
Sep 20, 2022

Conversation

Amar3tto
Copy link
Contributor

@Amar3tto Amar3tto commented Jun 3, 2022

sdf-with-offset

Resolves #24960
Resolves #21680


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Add a link to the appropriate issue in your description, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@asf-ci
Copy link

asf-ci commented Jun 3, 2022

Can one of the admins verify this patch?

2 similar comments
@asf-ci
Copy link

asf-ci commented Jun 3, 2022

Can one of the admins verify this patch?

@asf-ci
Copy link

asf-ci commented Jun 3, 2022

Can one of the admins verify this patch?

@codecov
Copy link

codecov bot commented Jun 3, 2022

Codecov Report

Merging #17828 (b84ab8c) into master (aa574f5) will decrease coverage by 0.17%.
The diff coverage is n/a.

❗ Current head b84ab8c differs from pull request most recent head 18ac5cc. Consider uploading reports for the commit 18ac5cc to get more accurate results

@@            Coverage Diff             @@
##           master   #17828      +/-   ##
==========================================
- Coverage   74.17%   74.00%   -0.18%     
==========================================
  Files         711      704       -7     
  Lines       93605    93250     -355     
==========================================
- Hits        69433    69007     -426     
- Misses      22895    22977      +82     
+ Partials     1277     1266      -11     
Flag Coverage Δ
python 83.54% <ø> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ython/apache_beam/io/gcp/bigquery_read_internal.py 53.36% <0.00%> (-4.33%) ⬇️
sdks/go/pkg/beam/io/filesystem/memfs/memory.go 92.00% <0.00%> (-4.16%) ⬇️
sdks/python/apache_beam/io/gcp/bigquery.py 70.37% <0.00%> (-3.88%) ⬇️
...ks/go/pkg/beam/runners/dataflow/dataflowlib/job.go 21.55% <0.00%> (-1.13%) ⬇️
sdks/go/pkg/beam/core/runtime/exec/input.go 50.96% <0.00%> (-1.00%) ⬇️
sdks/go/pkg/beam/core/graph/coder/row_encoder.go 67.52% <0.00%> (-0.55%) ⬇️
sdks/python/apache_beam/runners/direct/executor.py 96.46% <0.00%> (-0.55%) ⬇️
sdks/python/apache_beam/dataframe/io.py 88.78% <0.00%> (-0.49%) ⬇️
sdks/python/apache_beam/typehints/row_type.py 96.55% <0.00%> (-0.47%) ⬇️
sdks/python/apache_beam/pipeline.py 92.42% <0.00%> (-0.42%) ⬇️
... and 61 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Amar3tto
Copy link
Contributor Author

Run Java PreCommit

@Amar3tto
Copy link
Contributor Author

Run SQL_Java17 PreCommit

@Amar3tto
Copy link
Contributor Author

Run Java PreCommit

@Amar3tto
Copy link
Contributor Author

Run Java PreCommit

@Amar3tto Amar3tto marked this pull request as ready for review July 22, 2022 10:01
@elizaveta-lomteva
Copy link
Contributor

@aromanenko-dev
Alexey, we've prepared the SparkReceiverIO connector Read interface. This PR is ready to be reviewed.
Thanks for your attention!

@aromanenko-dev
Copy link
Contributor

Thanks!
I'm sorry for a delay with review, I'll try to take a look as soon as I can.

@aromanenko-dev aromanenko-dev self-requested a review August 1, 2022 17:11
@Amar3tto
Copy link
Contributor Author

@aromanenko-dev Kind ping on this.
Did you have time to review this PR?

@aromanenko-dev
Copy link
Contributor

Please, avoid merge commits and use git rebase instead

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contribution!

I did a first round of code review (I didn't touch SDF part), ptal.

Also, please rebase your feature branch against master and squash your commits.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Streaming sources for Spark {@link Receiver}. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add more details about this connector and examples of how to use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect to add more detailed documentation and README in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why not to add it along with this PR? Seems logical for me...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadoc

@aromanenko-dev
Copy link
Contributor

Thanks for addressing review comments. It looks ok in general for me.

I'll be on PTO for the next 2 weeks, so I'd kindly ask @chamikaramj or @lukecwik to take a look on SDF part.

@chamikaramj
Copy link
Contributor

Ack. Will take a look. Thanks @aromanenko-dev

@Amar3tto
Copy link
Contributor Author

Run Java PreCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

objects -> {
V record = (V) objects[0];
recordsQueue.offer(record);
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"return null" was intended here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was intended, because it is SerializableFunction<?, Void> and it should return something. The common way is to return null here.

return null;
});
} catch (Exception e) {
LOG.error("Can not init Spark Receiver!", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be raising an exception here (or retry) instead of logging ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

try {
TimeUnit.MILLISECONDS.sleep(START_POLL_TIMEOUT_MS);
} catch (InterruptedException e) {
LOG.error("SparkReceiver was interrupted before polling started", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (sparkReceiver != null) {
sparkReceiver.stop("SparkReceiver is stopped.");
}
recordsQueue.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to existing records in the queue ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this moment all records for the current restriction should be already read from the queue.
Here we are stopping the Receiver and clearing the queue because it is not needed anymore.

checkStateNotNull(getOffsetFn, "Get offset fn can't be null!");
this.getOffsetFn = getOffsetFn;

SerializableFunction<V, Instant> getWatermarkFn = transform.getWatermarkFn();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably should be renamed to timestampFn since it basically provides the timestamp for a given record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

}

/** A function to calculate watermark after a record. */
public Read<V> withWatermarkFn(SerializableFunction<V, Instant> watermarkFn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should be renamed to getTimestampFn or similar (and also update the Javadoc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

LOG.info("{} started reading", ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
return input
.apply(Impulse.create())
.apply(ParDo.of(new ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we don't do any kind of splitting here which will limit the reading to a single worker. Is it possible to split data from SparkReceiver into multiple workers (for example, Kafka does this by creating splits for different partitions).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please merge this implementation to start with, to unblock the work on PRs with Spark Recevier integration tests and integration with Cdap IO?
While we haven't found a way to split the Spark receivers, we continue to work in this direction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this can be pushed to a future PR. Let's add a TODO though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO


private void receive() {
Long currentOffset = startOffset;
while (!isStopped()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation of the test does not seem to be thread safe currently (source might read while a batch is being loaded).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added a couple more comments which I think are critical issues. Splitting can be pushed to a future change.

V record = sparkConsumer.poll();
if (record != null) {
Long offset = getOffsetFn.apply(record);
if (!tracker.tryClaim(offset)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the runner resume an element from a checkpoint, how will the source start reading from the correct position ?

I think there should be logic here to seek to the correct position when "currentRestriction().getFrom()" is not zero.

Note that this has to be a seek (not read all elements from the beginning) for reading to be efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start reading from the correct position ("seek") implemented here:
sparkConsumer = new SparkConsumerWithOffset<>(tracker.currentRestriction().getFrom());

Long offset = getOffsetFn.apply(record);
if (!tracker.tryClaim(offset)) {
sparkConsumer.stop();
LOG.debug("Stop for restriction: {}", tracker.currentRestriction().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any sort of acking here for messages that are already read. How does the SparkConsumer finalize messages so that it does not have to hold off to data forever ? Usually this is implemented via a bundle finalizer: https://beam.apache.org/documentation/programming-guide/#bundle-finalization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed at the meeting that an additional ack is not needed, since Spark Consumer is created inside DoFn. As soon as element is processed it’s deleted from queue (not from the source).

* .withSparkReceiverBuilder(receiverBuilder);
* }</pre>
*/
public class SparkReceiverIO {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an experimental tag (see [1] for an example) since this is a new connector and the API can change.

[1]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

LOG.info("{} started reading", ReadFromSparkReceiverWithOffsetDoFn.class.getSimpleName());
return input
.apply(Impulse.create())
.apply(ParDo.of(new ReadFromSparkReceiverWithOffsetDoFn<>(sparkReceiverRead)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this can be pushed to a future PR. Let's add a TODO though.

@chamikaramj
Copy link
Contributor

Any updates ?

@elizaveta-lomteva
Copy link
Contributor

@chamikaramj
All your comments have been answered. The PR is ready to be merged.
Thank you.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

@chamikaramj chamikaramj merged commit d578e3d into apache:master Sep 20, 2022
@Amar3tto Amar3tto deleted the BEAM-14378-sdf branch September 21, 2022 11:57
@Abacn
Copy link
Contributor

Abacn commented Sep 30, 2022

New test flake seen on SparkReceiverIO test: #23449

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants