Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Zombie fencing and committer interface #2 #240

Closed
wants to merge 4 commits into from

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Apr 17, 2024

(ignore naming, nits, etc)

@bryanck
Copy link
Contributor Author

bryanck commented Apr 17, 2024

@fqaiser94 FYI approach with reusable write components

Comment on lines +42 to +58
public static class Result {
private final List<WriterResult> writerResults;
private final Map<TopicPartition, Offset> sourceOffsets;

Result(List<WriterResult> writerResults, Map<TopicPartition, Offset> sourceOffsets) {
this.writerResults = writerResults;
this.sourceOffsets = sourceOffsets;
}

public List<WriterResult> writerResults() {
return writerResults;
}

public Map<TopicPartition, Offset> sourceOffsets() {
return sourceOffsets;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are both thinking along very much similar lines e.g. this Result class is almost exactly the same as Committable in my PR.

producer.sendOffsetsToTransaction(
offsetsToCommit, new ConsumerGroupMetadata(controlGroupId));
offsetsToCommit, KafkaUtils.getConsumerGroupMetadata(context, connectGroupId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this PR is to just share your idea but just in case, we need to be careful (in this repo) about changes like this as this is a breaking change (and could cause duplicates).

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public interface Committer {
Copy link
Contributor

@fqtab fqtab Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have extremely similar ideas so I'm pretty sure we can reach consensus easily.

The only real difference that I can see is where we're defining our err "interface boundaries":

  • Your PR exposes just one interface; Task
    • I know you're calling it a Committer but I really don't think of it that way. From my perspective, Task: does everything from writing SinkRecords into files, to committing said files to iceberg. Versus a Committer, which does only the committing of files to iceberg (i.e. it does not do the write SinkRecords to files part). This interface you've defined clearly is the former despite it's name.
    • CMIIW but I'm assuming your idea here is that in the future we could make this Task interface pluggable
  • My PR splits the Task functionality into basically 2 interfaces: Writer and Committer.
    • With the idea being that only the Committer portion would be eventually become public and pluggable.

There are definite pros and cons both ways. Here's my perspective.

Pros of having a single, pluggable Task interface:

  • This option also gives us (maintainers) a lot of flexibility to change things in the future as the interface is extremely general
  • This option imposes no restrictions.
    • In contrast, by splitting Task into a Writer and Committer interface, I make it much more difficult for the Writer and Committer to share resources like Catalog. In addition, things like developing Committers that depend on writer state also becomes difficult (e.g. writing a Committer that commits every say 1000 records).
  • This option also gives users a ton of flexibility:
    • Users have the option to override how SinkRecords are written to files.
    • Users have the option to override how files are committed to iceberg (which is what I was mostly interested in).
  • Minor: Can evolve the existing codebase to conform to this interface very easily

Cons of having a single, pluggable Task interface:

  • I'm questioning how valuable it is to make the writing-sinkrecords-to-files logic pluggable.
    • I can come up with maybe one reasonable use-case that could only be accomplished by making this logic pluggable.
  • If we make this Task interface pluggable, we will likely also have to make a lot of the writing-sinkrecords-to-files logic public so that users (like me) that are only interested in overriding how files are committed to iceberg, don't have to copy the writing-sinkrecords-to-files logic.
    • I see you accomplish that in this PR by exposing a public WorkerHelper class but this looks awkward from a maintenance perspective for us (maintainers).
    • I worry about semver issues if we have to make this logic public. This is where I expect the most changes to happen going forwards in terms of new features and I don't want us to be restricted in terms of what changes we can make here.
  • More philosophical but if users plug in custom Task interface implementations, it's almost weird. You're at that point practically developing your own connector. We don't do this for other connectors (Spark, Flink). IDK rambling thoughts.

I'm not opposed to this approach, just worried where this might end up from a maintenance perspective. I could perhaps be persuaded if you share usecases you have in mind for making the writing-sinkrecords-to-files logic public and pluggable. I can't think of enough to justify the costs/cons but maybe I'm lacking imagination here :)

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public interface Committer {
Copy link
Contributor

@fqtab fqtab Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at your PR, I'm reminded of an earlier design I had discarded that is a mix of the ideas in both of our PRs.

  1. Have a public Task interface (basically the idea in your PR)
  2. Have private Writer and Committer interfaces
  3. Write a TaskImpl that the implements Task interface in terms of private Writer and Committer implementations (basically the idea in my PR)
    • Since the Writer and Committer interfaces are private, we have flexibility in evolving them and getting them right before making them eventually public
  4. In the future, make the Committer interface public and pluggable
  5. In the future, make the Writer interface public and pluggable (if we really feel this is necessary/valuable)

This option should allow us to:

  • Avoid awkward code management problems
  • Give us (maintainers) flexibility to continue to evolve the codebase (i.e. avoid semver issues) as most of the logic remains private until we decide to make things public and pluggable

What do you reckon?
(Implemented this in a new commit on my PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems reasonable to me. I'll prepare the Iceberg PR soon and include your updates. I may tweak a couple of things as I'm preparing that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants