Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement exactly-once commits to Kafka sinks and read_committed reads to Kafka sources. #218

Merged
merged 3 commits into from
Jul 28, 2023

Conversation

jacksonrnewhouse
Copy link
Contributor

Using the transactional kafka APIs we now support putting writing to Kafka via transactions. For each subtask a new kafka producer is created on each checkpoint, which writes all of the data within the checkpoint for that subtask in one transaction. When a checkpoint barrier is received it will switch initialize a new producer, while keeping the old one around for a commit message.

Currently we don't support recovering if the pipeline is interrupted after the checkpoints have been created but before the checkpoint is committed. The semantics of this are pretty tricky and there are a number of approaches with various safety and reliability tradeoffs. Flink, for instance, interrogates the internal transaction state from __transaction_state and then uses reflection to rebuild internal state. I don't think this is an option given the libraries we're using. Another approach that should work is to reliably determine if the previous transaction was successfully committed and, if not, replay the data by reading the uncommitted records off of kafka into a new commit. This will require careful management of active transactions, as Kafka doesn't directly expose the commit state of prior transaction.

I've also added the ability to only have our Kafka sources read committed data, via read_mode values of read_uncommitted and read_committed.

I'd appreciate feedback on the naming of the parameters? For things like kafka configs, should we be using the same names as the underlying service, maybe prefixed by the service? In our case, this'd mean that instead of read_mode we use kafka.isolation.level.

}
},
"required": [
"offset"
"offset",
"read_mode"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is a breaking change that will make existing kafka sources unusable. Can we make it not required and use read_uncommitted as the default value when we use it?

If that's not possible, perhaps it would make sense to include a DB migration that adds this field to existing kafka connections.

],
"additionalProperties": false
},
{
"type": "object",
"title": "Sink",
"properties": {
"commit_mode": {
"type": "string",
"description": "Type of committing behavior for Kafka Sink",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this description could be expanded a bit to say that this enables transactional semantics and requires that transactional support be enabled in Kafka

@@ -157,9 +157,23 @@ impl Connector for KafkaConnector {
None | Some("latest") => SourceOffset::Latest,
Some(other) => bail!("invalid value for source.offset '{}'", other),
},
read_mode: match opts.remove("read_mode").as_ref().map(|f| f.as_str()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be source.read_mode and sink.commit_mode to match the json and the existing source.offset config.

As far as the actual names I'm a bit torn—I do think "read_mode" is a better name than "isolation.level" but there's also benefit in using the existing terminology

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going with source.read_mode and sink.commit_mode for now.

},
"required": [
"commit_mode"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here — is it possible to make it not required and use a default?

info!("Creating kafka producer for {}", self.bootstrap_servers);
let mut client_config = ClientConfig::new();
async fn on_start(&mut self, ctx: &mut Context<(), ()>) {
self.init_producer(&ctx.task_info)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you mentioned that transactional support needs to be enabled in the broker for this to work. If it's not, how does the error manifest? Ideally we'd get that back to the user using the error reporting system: https://github.com/ArroyoSystems/arroyo/blob/master/arroyo-worker/src/engine.rs#L471

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that conversation we had was just that if you're running kafka locally you need some additional settings. In docker-compose.yaml it looks like

      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_MIN_INSYNC_REPLICAS: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

This converts to settings in /etc/kafka/kafka.properties like

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
min.insync.replicas=1
transaction.state.log.min.isr=1

Transactions have been in kafka since 0.11 from 2017, so I don't think it is worth having a lot of checks for that.

..
} => {
client_config.set("enable.idempotence", "true");
let transactional_id = format!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this include the topic as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, seems reasonable.

unimplemented!("received a commit message without a producer ready to commit. Restoring from commit phase not yet implemented");
};
committing_producer
.commit_transaction(Timeout::Never)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the implication of settings this to Timeout::Never?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means it will block without a timeout. Happy to switch to a different timeout, or have it be configurable.

arroyo-worker/src/connectors/kafka/sink/mod.rs Outdated Show resolved Hide resolved
@jacksonrnewhouse jacksonrnewhouse force-pushed the kafka_commits branch 2 times, most recently from 83e8362 to e454469 Compare July 27, 2023 00:59
error!("failed to commit {} times, retrying", commits_attempted);
commits_attempted += 1;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this panic if we fail all 5 attempts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants