Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: When using beam.io.kinesis.ReadDataFromKinesis, a java error is raised in DataFlowRunner #27165

Open
4 of 15 tasks
vishwesh0409 opened this issue Jun 19, 2023 · 14 comments
Open
4 of 15 tasks

Comments

@vishwesh0409
Copy link

vishwesh0409 commented Jun 19, 2023

What happened?

"Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is null
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:887)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:824)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:142)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2506)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1034)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:799)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is null
org.apache.beam.sdk.io.kinesis.KinesisReader.getSplitBacklogBytes(KinesisReader.java:172)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:999)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2415)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:540)

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@vishwesh0409 vishwesh0409 changed the title [Bug]: [Bug]: When using beam.io.kinesis.ReadDataFromKinesis, a java error is raised in DataFlowRunner Jun 19, 2023
@github-actions github-actions bot added this to the 2.50.0 Release milestone Jul 6, 2023
@benvit92
Copy link

Hello @vishwesh0409, what was the fix that was applied for this ? Tried to look for it but did not find it

@vishwesh0409
Copy link
Author

Hello @vishwesh0409, what was the fix that was applied for this ? Tried to look for it but did not find it

I'll reopen this. I thought it was fixed by #26953 but after updating the beam version to 2.49.0, I'm getting the same problem.

@vishwesh0409 vishwesh0409 reopened this Jul 19, 2023
@benvit92
Copy link

Hello @vishwesh0409, what was the fix that was applied for this ? Tried to look for it but did not find it

I'll reopen this. I thought it was fixed by #26953 but after updating the beam version to 2.49.0, I'm getting the same problem.

Yep, also tested this in the current master branch status and getting the same error when running a test job locally so def an issue IMO

@benvit92
Copy link

benvit92 commented Jul 20, 2023

also @vishwesh0409, #26953 was applied in the v2 of the aws kinesis reader and I think current implementation might still rely on v1 maybe (talking about the python code calling the right java package)

@benvit92
Copy link

benvit92 commented Jul 20, 2023

ok I am gonna structure my thought a little bit better here :)

So there are 2 KinesisReader.java around, one v1 and one v2, and as far as I can tell KinesisTransformRegistrar.java used to map the Python interface to the source Java package is relying on the v1 one.

#26953 fixed the issues in the KinesisReader.java v2 but not in the v1 and as a test I myself cloned master and applied the same fix on the v1 files and after that, the job runs without error.

I do encounter another issue though which is then when running on my laptop using the DirectRunner nothing happens when it comes to reading data even if I have data in the source stream and I am using the option InitialPositionInStream.TRIM_HORIZON, so the job runs but no data are read.

I also tried to make the KinesisTransformRegistrar.java point to v2 as the job does get deprecation warnings as v1 seems to be deprecated but my understanding of the project is not enough to make it work right now (plus my Java is a bit rusty 😓 ).

@vishwesh0409
Copy link
Author

Looks like the KinesisReader modules are not properly fixed yet. I observed the same deprication warnings when I tried to run my code on Google Cloud. It should be a simple enough fix tho.

@benvit92
Copy link

Basically, there is a need to develop a new KinesisTransformRegistrar.java based on the aws2 kinesis package instead of the current one and expose a new python transform URN which would be v2, hopefully that should enable the python flow to work as expected with the latest modules

@lostluck
Copy link
Contributor

lostluck commented Aug 2, 2023

2.50 release manager here.
This issue is currently tagged for the 2.50.0 release, which cuts in a week on August 9th.

Please complete work and get it into the main branch in that time, or move this issue to the 2.51 Milestone: https://github.com/apache/beam/milestone/15

@Abacn Abacn removed this from the 2.50.0 Release milestone Aug 3, 2023
@Abacn
Copy link
Contributor

Abacn commented Aug 3, 2023

this was due to issue get reopened and the milestone not removed

@benvit92
Copy link

benvit92 commented Sep 8, 2023

@Abacn @lostluck but then if an issue is open and neither who opened it nor who is participating in the discussion has the knowledge to fix it will the issue be forgotten or someone in the community will pick it up?
Just trying to understand if there will be any resolution to this or if it will just stick around

@lostluck
Copy link
Contributor

lostluck commented Sep 8, 2023

As an open source project, the primary means of fixes is via community and user involvement. As a rule, no one can expect someone else to fix an issue just because it's filed.

The story changes somewhat if judged to be a P1 or P0, a regression from the previous release version, or if it's affecting a large Dataflow customer, at which point it's likely that someone from the Dataflow team will drive it to resolution. That does require the customer to have a GCP support ticket in place as well.

P3s are typically not release blockers, so it's more likely this remains in the backlog until someone motivated to fix it it comes along. Beam welcomes contributions.

@mmxgn
Copy link

mmxgn commented Nov 29, 2023

This issue seems to also affect FlinkRunner.

@benvit92
Copy link

@mxm it should affect all runners since the issue is at source code level, the core issue I tried to explain in more details here #27165 (comment)

@nahplay
Copy link

nahplay commented Jan 20, 2024

The same issue on my side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants