Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send acknowledgment on Spec Update only after sinks are ready #822

Merged
merged 3 commits into from
Jun 25, 2020

Conversation

pyalex
Copy link
Collaborator

@pyalex pyalex commented Jun 23, 2020

What this PR does / why we need it:

For some sinks it may take time to update its internal state with new FeatureSetSpec version. For example BigQuery sink needs to merge new schema generated from FeatureSetSpec with existing table in BQ. It takes time to call BQ API.

In this PR IngestionJob uses sink's prepareWrite output to make ack sending synchronous with sink's internal state update.
It prevents inconsistency when FeatureRow would come earlier than FeatureSetSpec View is updated by not sending ACK message before all sinks would finish preparations.

Which issue(s) this PR fixes:

Fixes #

Does this PR introduce a user-facing change?:


@pyalex
Copy link
Collaborator Author

pyalex commented Jun 23, 2020

/retest

@pyalex
Copy link
Collaborator Author

pyalex commented Jun 23, 2020

/test test-end-to-end-batch-dataflow

@woop
Copy link
Member

woop commented Jun 24, 2020

Let me know once this is ready for review, still missing a PR description.

@pyalex
Copy link
Collaborator Author

pyalex commented Jun 24, 2020

/retest

<!--compileOnly 'org.projectlombok:lombok:1.18.12'-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make a decision on Lombok. Personally I am open to using it, but the team previously decided to phase it out and to use purely AutoValue. Apparently they are not compatible.

Copy link
Collaborator Author

@pyalex pyalex Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so I think AutoValue is more heavily used, since it's everywhere in Beam and I also applied it a lot in ingestion. So I changed to AutoValue.
Unfortunately AutoValue cannot be used in this specific case since Avro needs to have empty constructor on object, which is hard to have on autovalue object if not impossible. In general I like AutoValue more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so lets continue to use Lombok for the time being. I hope this doesnt interfere with the rest of the code base.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is no need to use the AvroCoder, right (@DefaultSchema(AutoValueSchema.class) can be used for serializable autovalue types)? And even if you do want to use AvroCoder, in this case there are only 3 parameters to the object, the methods can be easily written out in code over using Lombok

Copy link
Collaborator Author

@pyalex pyalex Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhilingc you mean SerializableCoder? it's not deterministic and thus such class can't be used as Key in KV.
Regarding second point, I also need equality and hash to use it as key. And I'm not a fan of generating boilerplate code.
I've already produced several enormously huge PRs for the recent few weeks. Don't think it was easy to review them

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"as key" I meant for grouping operations of course

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess @zhilingc is right. I could use SchemaCoder. Of course not through decorator @DefaultSchema() since it's common module and we don't want to have dependency on beam here. But that's still feasible. I guess we can move towards AutoValue.

@woop
Copy link
Member

woop commented Jun 25, 2020

/kind housekeeping

@woop
Copy link
Member

woop commented Jun 25, 2020

/lgtm

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: pyalex, woop

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@feast-ci-bot feast-ci-bot merged commit 5f77d7d into feast-dev:master Jun 25, 2020
<!--compileOnly 'org.projectlombok:lombok:1.18.12'-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is no need to use the AvroCoder, right (@DefaultSchema(AutoValueSchema.class) can be used for serializable autovalue types)? And even if you do want to use AvroCoder, in this case there are only 3 parameters to the object, the methods can be easily written out in code over using Lombok

Window.<FeatureSetReference>into(new GlobalWindows())
.accumulatingFiredPanes()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im wondering, what's the effect of AllowedLateness on global windows?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure, but Beam demands specified lateness

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oke

@@ -99,7 +102,7 @@ public void processElement(
createTable(specKey, schema);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment probably comes late, but since table updates are purely additive (and backward compatible), is there a downside to running createTable even if the table already exists?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's no point to run createTable if table already exists. I guess you meant updateTable, but we're already doing it as part of batch load process, which 100% guaranteed to be consistent approach and doing it here - I'm not sure about consistency.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i meant updateTable. By sketchy consistency do you mean the race condition between in-progress writes and feature-set updates?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants