Skip to content

Conversation

@C0urante
Copy link
Contributor

@C0urante C0urante commented Feb 17, 2022

Adds the new exactly-once-related source connector APIs described in KIP-618.

Note that these APIs are not used by the framework in this PR, just defined. They will be used in downstream PRs.

@C0urante C0urante changed the title KAFKA-10000: Add new source connector APIs related to exactly-once support KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618) Feb 17, 2022
Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits, but otherwise LGTM.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be clearer as to which will be called first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All else equal, I think we may want to err on the side of being a little too broad here rather than being too granular and painting ourselves into a corner in case we need to change the order of operations later.

That said, do you have a case in mind where this might matter to a connector author? If there's one ordering that might be more intuitive or easier to implement then I wouldn't be opposed to sticking to it, as long as the rationale applies broadly enough that we don't think we'll need to change things later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems relevant to the implementer whether the configs have been validated or not. If they're not guaranteed by the contract to have been validated then the implementer might need to duplicate some of the validation logic in this method.

Copy link
Contributor Author

@C0urante C0urante Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm... I think this is valid and could be useful but there are a few complications.

If we want to guarantee that the config has been validated first (via Connector::validate and possibly with other checks like making sure that the value for transaction.boundary is valid), should we then only invoke this method if there are no errors in the connector config?

This might make life easier for connector authors who want to be able to instantiate an AbstractConfig subclass for their connector and then make decisions about its exactly-once viability based on the already-parsed values from that config class instead of, like you say, having to duplicate that validation logic in this method.

On the other hand, it'll prevent issues with exactly-once support from surfacing the first time around and will require a follow-up validation attempt to discover any, even if the problems with the connector config don't impact its ability to provide exactly-once guarantees.

Thinking about just this specific use case (instantiating an AbstractConfig subclass inside this method), I think it'd be best to do two things:

  1. Add a catch clause specifically for ConfigException instances around the calls to Connector::exactlyOnceSupport and Connector::canDefineTransactionBoundaries that gets translated into a specific error message stating that the connector config appears to be invalid and exactly-once support for the connector cannot be determined (instead of the existing "An unexpected error occurred..." message). This should allow connector authors to instantiate their specific AbstractConfig subclass within this method and not have to worry about running into something like KAFKA-13327, which causes 500 errors to be thrown during some connector config validations when it's possible (and a much better UX) to add an error message to the validated config.
  2. Add a try/catch block around the parsing logic for TransactionBoundaries that translates an IllegalArgumentException into a ConfigException, so that connector authors can freely invoke TransactionBoundaries::fromProperty on their connector config inside SourceConnector::exactlyOnceSupport and, if there's an issue, it'll be translated into a helpful error message to the user. And if the connector transaction boundary style isn't relevant at all to its exactly-once support, we won't double-ding the connector config with two error messages related to the transaction.boundary property.

Do you think that this is a reasonable way to handle that specific connector development style? And are there other development styles that we should aim to accommodate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately there's a cyclic dependency:

  1. We (or at least I) would like for Connector::exactlyOnceSupport and Connector::canDefineTransactionBoundaries to be called with only with valid config parameters (implying that Connector::validate has been called first).
  2. But we'd like to call Connector::validate having first validated the transaction.boundary and exactly.once.support, (which implies calling Connector::exactlyOnceSupport and Connector::canDefineTransactionBoundaries).

But in 2. do we really need to call Connector::exactlyOnceSupport and Connector::canDefineTransactionBoundaries though, or it is enough to validate transaction.boundary and exactly.once.support are legal enum values (terminating early if they're not), and then call Connector::exactlyOnceSupport and Connector::canDefineTransactionBoundaries after Connector::validate? We could add any 2nd phase exactlyOnceSupport and canDefineTransactionBoundaries errors to the ConfigValues from validate? That would then establish the pattern of per-config syntax validation happening before any inter-config validation (since typically a connector overriding validate would call super's impl first thus achieving ConfigDef validation). I realise the KIP approved these signatures (so it's getting a bit late to change them), but I think that would allow Connector::exactlyOnceSupport(Config) and Connector::canDefineTransactionBoundaries(Config), which makes the ordering much more explicit purely from the types involved. Or am I asking the impossible (AbstractHerder#validateConnectorConfig is not the easiest piece of code to reason about)?

Copy link
Contributor Author

@C0urante C0urante Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley

Definitely in favor of doing single-property validations before multi-property validations, and as far as single-property validations and short-circuiting go, the downstream preflight validation PR already performs short-circuiting if the exactly.once.support property is invalid by skipping the call to SourceConnector::exactlyOnceSupport, and does the same for the transaction.boundary property and SourceConnector::canDefineTransactionBoundaries method as well.

Can you clarify why you prefer to also short-circuit if any errors are reported by single-property or multi-property validation beyond ones reported for the exactly.once.support and transaction.boundary properties? It comes with a downside that we'd end up forcing two validation passes on users in order to discover any issues with the exactly.once.support and transaction.boundary properties, even if they could potentially be surfaced by the connector despite the issues with its config.

Not sure passing in a Config object would be the best way to address this, but it is better than nothing. It doesn't seem like a very ergonomic class to work with for follow-up validation attempts and is better suited for rendering final results to users (which, IMO, is part of the reason that the preflight validation logic in AbstractHerder is difficult to follow). Something like a Map<String, ConfigValue> might be more useful since I imagine most connector authors would end up either deriving one of their own from the Config object or using Java 8 streams logic to find the ConfigValue for a single property or subset of properties. But I'm also hesitant about this approach since I'm not sure how we'd want to handle properties that are present in the raw connector config, but which the connector doesn't define a ConfigValue for in the Config returned from Connector::validate. Maybe I'm overthinking this but I can't shake the feeling that either way we go on that front (either discard those properties and don't include them in SourceConnector::exactlyOnceSupport and SourceConnector::canDefineTransactionBoundaries, or include them with ConfigValue objects constructed by the framework and inserted into the resulting Config or Map<String, ConfigValue> object), we're going to end up introducing some footguns into the logic here that are just going to frustrate developers who want to implement these methods without having to read paragraphs of documentation or go through framework source code. One practical example of this is the transaction.boundary property--most connectors won't (and shouldn't) define this property themselves, so we'd have to decide if we'd want to provide that property to connectors in SourceConnector::exactlyOnceSupport, and if so, how.

I think an ideal solution here might be opinionated but flexible: we can provide special accommodations for idiomatic usage patterns with the Connect API (like attaching special meaning to thrown ConfigException instances, like is already done during ConfigDef config validation, or how we handle RetriableException instances specially for source tasks), but allow connector authors enough breathing room to try to surface as much information as possible in a single validation pass if they want to put in the extra work to make the UX for their connector as smooth as possible.

From an API design perspective, I think adding a Config argument to the new methods here certainly accomplishes both of these; I'm just stuck on a few implementation details that I'm not sure we can overcome.

On this front:

Or am I asking the impossible (AbstractHerder#validateConnectorConfig is not the easiest piece of code to reason about)?

One thing that may be useful to point out (forgive me if you're already aware) is that it's expected that Connector::validate will report errors without throwing exceptions, which means that there's no implicit contract that the validation control flow gets short-circuited right now if there are any errors in the connector config. This means that we can also validate, for example, overridden Kafka client properties and whether they are permitted by the ConnectorClientConfigOverridePolicy set up on the worker, even if there are other errors in the connector config.

@mimaison

Fair enough! There's definitely some potential for out-of-scope work here (like following up on KIP-419 or developing alternatives to it), but at the very least we can count the newly-introduced APIs as in-scope and should provide these types of guarantees for them.

@ both

It may be easier to go over the details of the precise behavior we want here on the preflight validation PR. If we prefer a high-level discussion it's probably best to keep things here, but since we're getting fairly granular now, it might help to see what's been drafted so far implementation-wise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially controversial idea: Have you considered always calling start() first? That way the connector should have computed its configuration and should be able to easily handle exactlyOnceSupport() and canDefineTransactionBoundaries(). Obviously start() may cause the connector to do some work before the runtime stops it in case of an invalid configuration. But since the proposed javadoc hinted it could be called first anyway, it's not making first worst

The configuration validation logic is getting more and more complex and showing the limits of the current APIs. I wonder if having a configure(Map) method would help us.

Copy link
Contributor Author

@C0urante C0urante Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh, that's an interesting idea.

I think your point about the connector doing extra work is my biggest concern on that front, and it seems to be validated (heh) by how validate is designed (we never actually invoke validate on the same Connector instances that we invoke start on at the moment). It may not be very frequent, but there certainly are some connectors out there that do some heavy lifting in start that we wouldn't want to do every time during preflight validation. For a practical example, see MirrorMaker2:

scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
scheduler.execute(this::computeAndCreateTopicPartitions, "creating downstream topic-partitions");
scheduler.execute(this::refreshKnownTargetTopics, "refreshing known target topics");
scheduler.scheduleRepeating(this::syncTopicAcls, config.syncTopicAclsInterval(), "syncing topic ACLs");
scheduler.scheduleRepeating(this::syncTopicConfigs, config.syncTopicConfigsInterval(),
"syncing topic configs");
scheduler.scheduleRepeatingDelayed(this::refreshTopicPartitions, config.refreshTopicsInterval(),
"refreshing topics");

Adding a configure method would help with this issue by allowing us to give a long-lived config to a Connector that the connector can hold onto across successive invocations of exactlyOnceSupport and canDefineTransactionBoundaries (and possibly even validate). But, it would also complicate some of the existing preflight validation logic in the framework. Right now, we only create a single Connector instance per connector type per worker for preflight validation, and all invocations of validate and config are performed with that instance (see AbstractHerder here, here, and here).

Since we use these validation-only Connector instances pretty loosely and without any guarantees about order of invocations for config and validate, it's unlikely that there are connectors out there that store state in either of these methods, so there could be relatively low risk for creating a new validation-only Connector instance every time a config has to be validated.

But I have to wonder if we're overthinking things? It seems like we're trying to optimize away from methods that accept a raw config map and instead only provide that config once per round of validation. Is the objective there to create a smoother/better-documented/more-strictly-defined API? Improve resource utilization by obviating the need of connector developers to construct AbstractConfig subclasses (or equivalents) which parse and validate individual properties at instantiation time?

As an aside--I think the language in the Javadoc may need to be revisited since, as you note, it implies that these new methods will be invoked before start, which is not necessarily the case (for example, in the current preflight validation PR, use of these methods is mutually exclusive with the start method for a given Connector instance). TODO: Rework Javadoc once exact behavior is agreed upon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My questions are mostly a way to refresh my memory and recap how the connector API works exactly. I'm not asking to introduce new methods or change things significantly. Since we missed the 3.2 window, I thought it was worth having a quick think.

I'm mostly focused on the Connector API and how consistent and easy it is to write/maintain connectors.

Copy link
Contributor Author

@C0urante C0urante Apr 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha! Hope I've been able to shed some light on things.

I think keeping things simple with the validation APIs and sticking to a raw Map<String, String> parameter is going to maximize consistency and ease of maintenance for developers.

In addition to that, only invoking these new methods if the connector config is valid (according to Connector::validate) will probably also make life easier for connector developers, since they'll have to program less defensively in these methods. I think this is why @tombentley proposed this behavior.

However, it will provide a slightly-worse user experience, since users might go through some hoops to fix their connector config and only then find out that the connector doesn't even support exactly once or custom transaction boundaries. If the implementation of this method is trivial, there's some headaches that could be spared in some situations for users. It's also not terribly difficult to shoulder most if not all of the responsibility for defensive programming in the framework so that, should an invalid config cause one of these methods to fail, that case is handled gracefully.

Ultimately, if you both still believe we should skip these method calls on invalid connector configs I can just go ahead and apply that change; this is a relatively minor detail and it's not worth blocking the rest of this new feature on it.

@C0urante
Copy link
Contributor Author

C0urante commented Mar 2, 2022

Thanks @tombentley!

@C0urante C0urante force-pushed the kafka-10000-api-changes branch from 1554e66 to 759051b Compare March 3, 2022 16:44
* The configuration key that determines how source tasks will define transaction boundaries
* when exactly-once support is enabled.
*/
public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this config is defined here? Is it just temporary? (I've not looked the the remaining PRs yet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to define the TransactionBoundary enum somewhere public (i.e., accessible by connector authors) so that they could use it for things like the SourceConnector::exactlyOnceSupport method in cases where connectors need to be able to define their own transaction boundaries in order to provide exactly-once support. Seems more ergonomic to be able to do something like TransactionBoundary.fromProperty(props.get(TRANSACTION_BOUNDARY_CONFIG)) in that case.

Given that, it seemed like a toss-up between SourceConnector and SourceTask. The former felt a little more intuitive (tasks probably have less reason to need to know about their transaction boundary style beyond the return value of SourceTaskContext::transactionContext), but the latter lines up better with the precedent set by the topics-related properties for sink connectors, which are defined in the SinkTask class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the precedent with SinkTask, it makes sense. Thanks

void commitTransaction(SourceRecord record);

/**
* Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be Requests a transaction abort after the next batch of records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yep, good catch!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'd be in favor in having stronger guarantees on the order of method calls and on the overall lifecycle of plugins. At the moment, connectors can't assume very much and we get weird behaviors like described in KIP-419. At least here, I like that you were explicit in the Javadoc!

* configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
* connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
* exactly-once guarantees.
* @since 3.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the @since annotations to 3.3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, done.

@C0urante
Copy link
Contributor Author

@tombentley @mimaison gentle reminder, can you revisit this when you have a chance?

@C0urante
Copy link
Contributor Author

@tombentley @mimaison I'd really appreciate another pass if you can find the time this week. Thanks!

@C0urante
Copy link
Contributor Author

C0urante commented May 2, 2022

@tombentley @mimaison I'd really like it if we could confirm the intended direction for this API. I'm willing to go whichever direction you believe is best, but (as Tom has noted) given that this is fairly green-field for Connect, I want to make sure that we consider our options carefully and set a good precedent for future APIs like this. If you believe we've done our due diligence, I'm happy to implement whatever approach you believe is best; please just confirm what you would like to see here if there are still any reservations about the current state of the PR.

@mimaison
Copy link
Member

mimaison commented May 2, 2022

Sorry @C0urante for the delays, we were at Kafka Summit last week and I'm still trying to catch up on stuff. I'm hoping to take another look this week.

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante !

@mimaison mimaison merged commit a586c94 into apache:trunk May 6, 2022
@mimaison
Copy link
Member

mimaison commented May 6, 2022

Is #11775 the next one to review?

@C0urante C0urante deleted the kafka-10000-api-changes branch May 6, 2022 13:47
@C0urante
Copy link
Contributor Author

C0urante commented May 6, 2022

Yep! Got to fix some merge conflicts on that one though; should be ready by Monday. Thanks Mickael!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants