Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/repairkafka3.7 #707

Merged
merged 23 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3feef5a
Fixed an issue where intercept methods for kafka3.7 were invalid
darkness-2nd Jul 21, 2024
2971c2b
Fixed an issue where intercept methods for kafka3.7 were invalid
darkness-2nd Jul 21, 2024
7db4f51
Revert "Fixed an issue where intercept methods for kafka3.7 were inva…
darkness-2nd Jul 22, 2024
1cbc3db
Revert "Fixed an issue where intercept methods for kafka3.7 were inva…
darkness-2nd Jul 22, 2024
6220bd4
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 22, 2024
1e2b82a
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
d875bd3
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
ef2193b
add support for kafka3.7.x, because kafka3.7 changed the pull message…
darkness-2nd Jul 23, 2024
e20ef76
repair ci error
darkness-2nd Jul 23, 2024
3543204
add case of 3.6.0
darkness-2nd Jul 23, 2024
526abbb
fix upe
darkness-2nd Jul 23, 2024
abd3848
repair spring-kafka 2.2.x ci
darkness-2nd Jul 25, 2024
0cb524c
remove dead line in Bootstrap-plugins.md
darkness-2nd Jul 25, 2024
f31fb6f
repair the uncompatible code in kafka Case
darkness-2nd Jul 25, 2024
8297b83
test ci for 3.7 jdk ci
darkness-2nd Jul 25, 2024
73c85a0
add support version in Supported-list.md
darkness-2nd Jul 25, 2024
1cc9438
remove error class
darkness-2nd Jul 25, 2024
960be6c
remove error class
darkness-2nd Jul 25, 2024
14ebf37
Add comments for resolve the problem that Kafka3.7.x can not be inter…
darkness-2nd Jul 25, 2024
39353e8
revert the format codes
darkness-2nd Jul 25, 2024
3f51753
ignore 403 when checking markdown link
kezhenxu94 Jul 25, 2024
1a5f39c
Update CHANGES.md: Support kafka-clients-3.7.x intercept
darkness-2nd Jul 25, 2024
b876765
Update docs/en/setup/service-agent/java-agent/Supported-list.md
wu-sheng Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .dlc.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
200,
301,
302,
401
401,
403
]
}
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Release Notes.
* Improve 4x performance of ContextManagerExtendService.createTraceContext()
* Add a plugin that supports the Solon framework.
* Fixed issues in the MySQL component where the executeBatch method could result in empty SQL statements .

* Support kafka-clients-3.7.x intercept

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/213?closed=1)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.kafka.define;

import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the comment, I think we can focus on the difference instead of repeating what is already written in the parent class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the comment, I think we can focus on the difference instead of repeating what is already written in the parent class.

Yes, the difference is the method named pollForFetchs was removed from KafkaConsumer to another two classes, so the original interceptor can not intercept it. Because of the enhance class is changed, so I create two new classes to repair the uncompatible problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the difference is the method named pollForFetchs was removed from KafkaConsumer to another two classes, so the original interceptor can not intercept it. Because of the enhance class is changed, so I create two new classes to repair the uncompatible problem.

I meant you can modify the comment for this class......

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I forget to change the comment

* For Kafka 3.7.x change
*
* <pre>
* 1. The method named pollForFetchs was removed from KafkaConsumer to <code>AsyncKafkaConsumer</code> and <code>LegacyKafkaConsumer</code>
* 2. Because of the enhance class was changed, so we should create new Instrumentation to intercept the method
* </pre>
*/
public class Kafka37AsyncConsumerInstrumentation extends KafkaConsumerInstrumentation {

private static final String ENHANCE_CLASS_37_ASYNC = "org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer";

@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS_37_ASYNC);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.kafka.define;

import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

/**
* For Kafka 3.7.x change
*
* <pre>
* 1. The method named pollForFetchs was removed from KafkaConsumer to <code>AsyncKafkaConsumer</code> and <code>LegacyKafkaConsumer</code>
* 2. Because of the enhance class was changed, so we should create new Instrumentation to intercept the method
* </pre>
*/
public class Kafka37LegacyConsumerInstrumentation extends KafkaConsumerInstrumentation {

private static final String ENHANCE_CLASS_37_LEGACY = "org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer";

@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS_37_LEGACY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstr
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37AsyncConsumerInstrumentation
kafka-3.7.x=org.apache.skywalking.apm.plugin.kafka.define.Kafka37LegacyConsumerInstrumentation
1 change: 1 addition & 0 deletions docs/en/setup/service-agent/java-agent/Plugin-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
- jetty-client-9.x
- jetty-server-9.x
- kafka-0.11.x/1.x/2.x
- kafka-3.7.x
- kotlin-coroutine
- lettuce-5.x
- light4j
Expand Down
2 changes: 1 addition & 1 deletion docs/en/setup/service-agent/java-agent/Supported-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ metrics based on the tracing data.
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 3.x-> 5.x
* [RocketMQ-gRPC](http://github.com/apache/rocketmq-clients) 5.x
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.7.1
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.Collection;
import java.util.regex.Pattern;
import java.util.List;
import java.util.ArrayList;
Expand All @@ -32,10 +33,10 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -270,7 +271,17 @@ public void run() {
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
consumer.subscribe(topicPattern, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {

}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {

}
});
while (true) {
if (pollAndInvoke(consumer)) break;
}
Expand Down
3 changes: 3 additions & 0 deletions test/plugin/scenarios/kafka-scenario/support-version.list
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@
3.0.2
3.1.2
3.2.3
3.6.0
3.7.0
3.7.1
Loading