Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable prefix to Consumer Group in IngestionJob's Kafka reader #969

Merged
merged 3 commits into from
Sep 2, 2020

Conversation

terryyylim
Copy link
Member

What this PR does / why we need it:
To allow multiple deployments with jobs running in parallel, we should separate their Kafka consumer groups which is based on the JobId. This would prevent overwriting each other's offsets.

Which issue(s) this PR fixes:

Fixes #

Does this PR introduce a user-facing change?:

NONE

@terryyylim
Copy link
Member Author

/retest

@terryyylim
Copy link
Member Author

/retest

1 similar comment
@terryyylim
Copy link
Member Author

/retest

@@ -23,6 +23,9 @@ feast:
# Enabling JobManagement
enabled: true

# Prefix for JobId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says absolutely nothing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated comment.

@@ -71,6 +74,9 @@ private String createJobId(SourceProto.Source source) {
source.getKafkaSourceConfig().getBootstrapServers(),
source.getKafkaSourceConfig().getTopic()),
dateSuffix);
if (!this.jobProperties.getJobIdPrefix().isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when jobProperties is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed case.

@@ -87,9 +87,9 @@ public JobGroupingStrategy getJobGroupingStrategy(
Boolean shouldConsolidateJobs =
feastProperties.getJobs().getController().getConsolidateJobsPerSource();
if (shouldConsolidateJobs) {
return new ConsolidatedJobStrategy(jobRepository);
return new ConsolidatedJobStrategy(jobRepository, feastProperties.getJobs());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract the jobProperties before passing it? It's a convention that I am trying to get us to follow, even if it takes more lines of code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted before passing jobProperties.

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: pyalex, terryyylim

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@pyalex
Copy link
Collaborator

pyalex commented Sep 2, 2020

/lgtm

@pyalex pyalex changed the title Add job id prefix Add configurable prefix to Consumer Group in IngestionJob's Kafka reader Sep 2, 2020
@pyalex pyalex added the kind/feature New feature or request label Sep 2, 2020
@feast-ci-bot feast-ci-bot merged commit da812c2 into feast-dev:master Sep 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants