Skip to content

KAFKA-14143: Exactly-once source connector system tests (KIP-618)#11783

Merged
C0urante merged 1 commit intoapache:trunkfrom
C0urante:kafka-10000-system-tests
Sep 8, 2022
Merged

KAFKA-14143: Exactly-once source connector system tests (KIP-618)#11783
C0urante merged 1 commit intoapache:trunkfrom
C0urante:kafka-10000-system-tests

Conversation

@C0urante
Copy link
Contributor

Implements system tests for KIP-618.

Relies on changes from:

@C0urante
Copy link
Contributor Author

Converting to draft until upstream PRs are reviewed.

@C0urante C0urante force-pushed the kafka-10000-system-tests branch 2 times, most recently from c962806 to 1369ad7 Compare March 3, 2022 17:54
@C0urante C0urante force-pushed the kafka-10000-system-tests branch 5 times, most recently from 503f389 to b1383e1 Compare June 18, 2022 19:25
@C0urante C0urante force-pushed the kafka-10000-system-tests branch 2 times, most recently from 775bb0d to dd8135c Compare June 21, 2022 22:49
@C0urante C0urante force-pushed the kafka-10000-system-tests branch 2 times, most recently from ed2cb48 to 9b2e4fe Compare June 29, 2022 22:00
Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante! I left a few comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If src_seqno_max==0 then we've not really proven that EOS is actually able to make progress in the presence of worker restarts, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's correct. Added a check for that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really should restart the nodes in the same order each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context, this follows the same logic in the test_bounces case. I guess we could do something a little less repetitive; I've offset the order by 1 with each successive restart.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that no data making it through the test is something to avoid, but I think always giving the consumer groups time to recover is a lenient way of achieving that. Perhaps the 2nd restart should not have the timeout, precisely to ensure that the messy case still doesn't cause problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also copied from the test_bounces case. It should probably be updated to not refer to consumer groups, but instead, worker rebalances and task startups.

I guess if we want to be rigorous here, we could do some rolling bounces with this cushioning in place, and some without. Perhaps two rounds of each?

@C0urante C0urante marked this pull request as ready for review July 1, 2022 20:55
@C0urante C0urante force-pushed the kafka-10000-system-tests branch 2 times, most recently from 862206e to 3b41de7 Compare July 6, 2022 02:42
@showuon
Copy link
Member

showuon commented Jul 11, 2022

@C0urante , I'll take a look this week. Before that, I'd like to know if you have run these system tests locally?

@C0urante
Copy link
Contributor Author

C0urante commented Jul 11, 2022

I ran the tests locally when I first wrote them, repeatedly (believe I ran everything in a loop overnight and made sure it all came out green). In the year since then, enough has changed (including getting a new laptop) that I'm no longer able to run them locally. Attempts to do so using Docker have led to some hung JVMs and appear to be due to environmental issues.

If there's dedicated hardware out there to run these on, it'd be nice if we could leverage that for these tests. Otherwise, I can try to diagnose my local Docker issues and/or experiment with an alternative testing setup.

Copy link
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@C0urante , thanks for adding system tests. Left some comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Did we validate sink tasks in the test?
  2. Should we mention "exactly once" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Good point, no, we do not.
  2. I believe it is already mentioned? I've cleaned up the description a bit but left in the "deliver messages exactly once" part.

Comment on lines +578 to +583
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also help update the description in test_bounce? It tests not only clean bounces, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, done.

@C0urante C0urante force-pushed the kafka-10000-system-tests branch 2 times, most recently from 5f72c1a to 259f747 Compare July 15, 2022 02:06
@C0urante
Copy link
Contributor Author

I've been able to get one green run of the test locally, but all other attempts have failed with timeouts, even when bumping the permitted duration for worker startup from one minute to five.

I also fixed a typo that would have broken the test_bounce case.

@showuon
Copy link
Member

showuon commented Jul 15, 2022

I've been able to get one green run of the test locally, but all other attempts have failed with timeouts, even when bumping the permitted duration for worker startup from one minute to five.

I also fixed a typo that would have broken the test_bounce case.

Yes, that's what I saw when running in my local env. I think we need to make sure it works well before we can merge it.

@jsancio , this is the last PR for KIP-618. We'd like to put this into v3.3, but needs to make sure the new added/updated system tests didn't break any test. Could you help run it and confirm it? Thanks.

@C0urante
Copy link
Contributor Author

I've started seeing this worker startup error in the logs for my local runs:

[2022-07-15 22:29:48,073] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed)
org.apache.kafka.connect.errors.ConnectException: Unable to initialize REST server
        at org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:203)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:101)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
Caused by: java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:8083
        at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:349)
        at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:310)
        at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
        at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:234)
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
        at org.eclipse.jetty.server.Server.doStart(Server.java:401)
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
        at org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:201)
        ... 2 more
Caused by: java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:461)
        at sun.nio.ch.Net.bind(Net.java:453)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:85)
        at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:344)
        ... 9 more

I've seen it both with the new test_exactly_once_source test, and the test_bounce test case (which I've been using as a control group).

@C0urante C0urante force-pushed the kafka-10000-system-tests branch from 259f747 to cdebeac Compare July 19, 2022 18:52
@C0urante
Copy link
Contributor Author

Ah, thanks for the update Luke! In the meantime, I've discovered and addressed a few more issues that surfaced during my local runs yesterday:

Since follower workers don't retry when zombie fencing requests to the leader fail (which is intentional, as we want to be able to surface failures caused by things like insufficient ACLs to perform a round of fencing), it's possible that a task that's hosted on a follower may fail during startup if the leader has just been bounced the worker process hasn't started yet. I've added a small part to restart any failed tasks after all the bounces have completed and before we check to make sure that the connector and its tasks are healthy.

Since the REST API is available before workers have actually completed startup, it's also possible that requests to fence zombies (and submit task configs) can be made to the leader before it has been able to read a session key from the config topic. I've tweaked the herder logic to catch this case and throw a 503 error with a user-friendly error message. I experimented with some other approaches to automatically refresh the leader's view of the config topic in this case, and/or handle request signature validation on the herder's tick thread (which would ensure that the worker had been able to complete startup and read to the current end of the config topic), but the additional complexity incurred by these options didn't seem worth the benefits since they would still be incomplete for cases like the one described above.

It's also possible that, when hard-bouncing a worker, a transaction opened by one of its tasks gets left hanging. If the task has begun to write offsets, then startup for subsequent workers will be blocked on the expiration of that transaction, which by default takes 60 seconds. This can cause test failures because we usually wait for 60 seconds for workers to complete startup. To address this, I've lowered the transaction timeout to 10 seconds. Ideally, we could proactively abort any open transactions left behind by prior task generations during zombie fencing, but it's probably too late to add this kind of logic in time for the 3.3.0 release. I've filed https://issues.apache.org/jira/browse/KAFKA-14091 to track this.

There's also a possible NPE in KafkaBasedLog caused by yet another unsafe use of Utils::closeQuietly. It's not a major issue since it only occurs when the log is shut down before it has had a chance to start, but it's still worth patching.

I've kicked off another local run of test_exactly_once_source with unclean shutdown and the sessioned protocol after applying these changes. I've only completed five tests so far, but they've all succeeded. Will report the results after the other ninety-five runs have completed.

@C0urante C0urante force-pushed the kafka-10000-system-tests branch from cdebeac to 25d1679 Compare July 20, 2022 22:17
@C0urante
Copy link
Contributor Author

Out of the 100 runs, 94 passed. The 6 failures all appear to be environmental as they were encountered while trying to allocate a console consumer at the end of the test with this stack trace:

InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 187, in _do_run
    data = self.run_test()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 265, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 637, in test_exactly_once_source
    consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__
    BackgroundThreadService.__init__(self, context, num_nodes)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__
    super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__
    self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes
    self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc
    allocated = self.do_alloc(cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc
    good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec
    raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0

@C0urante
Copy link
Contributor Author

@showuon any news? Hoping we can get this merged and backported soon so that the stability of 3.3 isn't impacted; let me know if there's anything I can do to help.

@C0urante C0urante force-pushed the kafka-10000-system-tests branch from 25d1679 to 2f62b74 Compare August 4, 2022 15:53
@C0urante C0urante changed the title KAFKA-10000: System tests (KIP-618) KAFKA-14143: Exactly-once source connector system tests (KIP-618) Aug 4, 2022
@C0urante
Copy link
Contributor Author

@showuon @mimaison @tombentley any chance we could revisit this soon? I think we've probably missed the 3.3.0 boat but it'd be nice to get this in just to have the testing framework available in case we need to do any debugging for KIP-618 after the release.

@showuon
Copy link
Member

showuon commented Aug 31, 2022

Still working on it. Hope we can get the results this week.

@jsancio
Copy link
Member

jsancio commented Aug 31, 2022

@showuon and @C0urante here is the system test job https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5109/ not sure if you are authorized to see the job.

@yashmayya also submitted this fix: #12575. Should we merge that and cherry pick it to 3.3? At a glance it seems like a lower risk change and it fixes the failing system tests.

@mimaison
Copy link
Member

@jsancio We can't access the Jenkins URL you shared (This page is only accessible by Confluent employees on corporate VPN.). It would be great to get this into 3.3 if you can, having system tests to assert this new feature works as designed is important.

@jsancio jsancio added the 3.3 label Aug 31, 2022
@jsancio
Copy link
Member

jsancio commented Aug 31, 2022

@jsancio We can't access the Jenkins URL you shared (This page is only accessible by Confluent employees on corporate VPN.). It would be great to get this into 3.3 if you can, having system tests to assert this new feature works as designed is important.

@mimaison Sound good to me. I marked it as a 3.3.0 blocker but I need help reviewing the PR. I'll post the job result here when it finishes. It usually takes a few hours to complete.

@showuon
Copy link
Member

showuon commented Sep 1, 2022

Thank you @jsancio ! As long as the system test passed, I'm good to merge it.

@C0urante
Copy link
Contributor Author

C0urante commented Sep 1, 2022

@showuon @mimaison @jsancio are we okay with adding this to the 3.3 branch? There's very little additional risk; the only changes to non-testing code are an additional branch here and adding a default value for an Optional field here.

@mimaison
Copy link
Member

mimaison commented Sep 1, 2022

Yes if the system tests pass, I'm in favor of merging this in 3.3.

@showuon
Copy link
Member

showuon commented Sep 6, 2022

@jsancio , do you have the test results for the system test?
We ran the test in our system for these 2 modified test suite:

tests/kafkatest/tests/connect/connect_distributed_test.py

SESSION REPORT (ALL TESTS)
ducktape version: 0.11.1
session_id:       2022-09-05--001
run time:         150 minutes 28.079 seconds
tests run:        101
passed:           101
flaky:            0
failed:           0
ignored:          0

tests/kafkatest/tests/connect/connect_rest_test.py

SESSION REPORT (ALL TESTS)
ducktape version: 0.11.1
session_id:       2022-09-05--001
run time:         1 minute 11.905 seconds
tests run:        2
passed:           2
flaky:            0
failed:           0
ignored:          0

And they both passed.

@jsancio
Copy link
Member

jsancio commented Sep 7, 2022

@showuon I tried running the system tests twice in Confluent's infrastructure and we got some infrastructure issues in both cases. Feel free to merge this change and cherry pick it to 3.3. Thanks!

I started another build but I would wait for the result if you want to merge this PR.

@showuon
Copy link
Member

showuon commented Sep 8, 2022

Let's wait for the result. Otherwise, this merge will also block 3.3 release.

@C0urante
Copy link
Contributor Author

C0urante commented Sep 8, 2022

@jsancio looks like all the Connect tests were green, and the failures were unrelated. LGTY?

@jsancio
Copy link
Member

jsancio commented Sep 8, 2022

@C0urante sounds good. Do you want to merge it and cherry-pick it to 3.3? I can also merge it if you update the description with what you want me to write in the commit message.

@C0urante
Copy link
Contributor Author

C0urante commented Sep 8, 2022

I can handle the merge. Thanks @jsancio, @showuon, @mimaison, and @tombentley for the review!

@C0urante C0urante merged commit 897bf47 into apache:trunk Sep 8, 2022
@C0urante C0urante deleted the kafka-10000-system-tests branch September 8, 2022 19:13
C0urante added a commit that referenced this pull request Sep 8, 2022
Also includes a minor quality-of-life improvement to clarify why some internal REST requests to workers may fail while that worker is still starting up.

Reviewers: Tom Bentley <tbentley@redhat.com>, Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants