Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 046b9f9

Browse files
eolivelliBewareMyPower
authored andcommitted
Invalidate KopBrokerLookupManager caches separately from GroupManagement listener (#743)
With the new Tenant Aware Metadata Topics feature (d4f9960) now we initialise the OffsetAndTopicListener listener lazily. Before this patch `OffsetAndTopicListener` was responsible for invalidating the `KopBrokerLookupManager` caches. But as we initialise it lazily it would be possible to miss some cache invalidations. This patch introduces a dedicated listener `CacheInvalidator` that invalidates the caches for every change that is detected. Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
1 parent ced7bf4 commit 046b9f9

File tree

2 files changed

+167
-21
lines changed

2 files changed

+167
-21
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

+58-21
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.ConcurrentHashMap;
4444
import java.util.concurrent.TimeUnit;
4545
import java.util.stream.Collectors;
46+
import lombok.AllArgsConstructor;
4647
import lombok.Getter;
4748
import lombok.NonNull;
4849
import lombok.extern.slf4j.Slf4j;
@@ -105,6 +106,58 @@ public GroupCoordinator getGroupCoordinator(String tenant) {
105106
public TransactionCoordinator getTransactionCoordinator(String tenant) {
106107
return transactionCoordinatorByTenant.computeIfAbsent(tenant, this::createAndBootTransactionCoordinator);
107108
}
109+
110+
/**
111+
* Listener for invalidating the global Broker ownership cache.
112+
*/
113+
@AllArgsConstructor
114+
public static class CacheInvalidator implements NamespaceBundleOwnershipListener {
115+
final BrokerService service;
116+
117+
@Override
118+
public boolean test(NamespaceBundle namespaceBundle) {
119+
// we are interested in every topic,
120+
// because we do not know which topics are served by KOP
121+
return true;
122+
}
123+
124+
private void invalidateBundleCache(NamespaceBundle bundle) {
125+
log.info("invalidateBundleCache for {}", bundle);
126+
service.pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
127+
.whenComplete((topics, ex) -> {
128+
if (ex == null) {
129+
for (String topic : topics) {
130+
TopicName name = TopicName.get(topic);
131+
132+
log.info("invalidateBundleCache for topic {}", topic);
133+
KopBrokerLookupManager.removeTopicManagerCache(topic);
134+
KafkaTopicManager.deReference(topic);
135+
136+
// For non-partitioned topic.
137+
if (!name.isPartitioned()) {
138+
String partitionedZeroTopicName = name.getPartition(0).toString();
139+
KafkaTopicManager.deReference(partitionedZeroTopicName);
140+
KopBrokerLookupManager.removeTopicManagerCache(partitionedZeroTopicName);
141+
}
142+
}
143+
} else {
144+
log.error("Failed to get owned topic list for "
145+
+ "CacheInvalidator when triggering bundle ownership change {}.",
146+
bundle, ex);
147+
}
148+
}
149+
);
150+
}
151+
@Override
152+
public void onLoad(NamespaceBundle bundle) {
153+
invalidateBundleCache(bundle);
154+
}
155+
@Override
156+
public void unLoad(NamespaceBundle bundle) {
157+
invalidateBundleCache(bundle);
158+
}
159+
}
160+
108161
/**
109162
* Listener for the changing of topic that stores offsets of consumer group.
110163
*/
@@ -153,16 +206,6 @@ public void onLoad(NamespaceBundle bundle) {
153206
}
154207
groupCoordinator.handleGroupImmigration(name.getPartitionIndex());
155208
}
156-
// deReference topic when unload
157-
KopBrokerLookupManager.removeTopicManagerCache(topic);
158-
KafkaTopicManager.deReference(topic);
159-
160-
// For non-partitioned topic.
161-
if (!name.isPartitioned()) {
162-
String partitionedZeroTopicName = name.getPartition(0).toString();
163-
KafkaTopicManager.deReference(partitionedZeroTopicName);
164-
KopBrokerLookupManager.removeTopicManagerCache(partitionedZeroTopicName);
165-
}
166209
}
167210
} else {
168211
log.error("Failed to get owned topic list for "
@@ -197,17 +240,6 @@ public void unLoad(NamespaceBundle bundle) {
197240
}
198241
groupCoordinator.handleGroupEmigration(name.getPartitionIndex());
199242
}
200-
// deReference topic when unload
201-
KopBrokerLookupManager.removeTopicManagerCache(topic);
202-
KafkaTopicManager.deReference(topic);
203-
204-
// For non-partitioned topic.
205-
if (!name.isPartitioned()) {
206-
String partitionedZeroTopicName = name.getPartition(0).toString();
207-
KafkaTopicManager.deReference(partitionedZeroTopicName);
208-
KopBrokerLookupManager.removeTopicManagerCache(partitionedZeroTopicName);
209-
}
210-
211243
}
212244
} else {
213245
log.error("Failed to get owned topic list for "
@@ -315,6 +347,11 @@ public void start(BrokerService service) {
315347
// After it's created successfully, this method won't throw any exception.
316348
LOOKUP_CLIENT_MAP.put(brokerService.pulsar(), new LookupClient(brokerService.pulsar(), kafkaConfig));
317349

350+
brokerService.pulsar()
351+
.getNamespaceService()
352+
.addNamespaceBundleOwnershipListener(
353+
new CacheInvalidator(brokerService));
354+
318355
// initialize default Group Coordinator
319356
getGroupCoordinator(kafkaConfig.getKafkaMetadataTenant());
320357

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.pulsar.handlers.kop;
15+
16+
17+
import static org.testng.Assert.assertEquals;
18+
import static org.testng.Assert.assertFalse;
19+
import static org.testng.Assert.assertNotNull;
20+
import static org.testng.Assert.assertTrue;
21+
22+
import java.time.Duration;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Properties;
26+
import lombok.extern.slf4j.Slf4j;
27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.clients.consumer.ConsumerRecords;
29+
import org.apache.kafka.clients.producer.KafkaProducer;
30+
import org.apache.kafka.clients.producer.ProducerConfig;
31+
import org.apache.kafka.clients.producer.ProducerRecord;
32+
import org.apache.kafka.common.serialization.IntegerSerializer;
33+
import org.apache.kafka.common.serialization.StringSerializer;
34+
import org.apache.pulsar.common.policies.data.BundlesData;
35+
import org.awaitility.Awaitility;
36+
import org.testng.annotations.AfterClass;
37+
import org.testng.annotations.BeforeClass;
38+
import org.testng.annotations.Test;
39+
40+
/**
41+
* Validate CacheInvalidator.
42+
*/
43+
@Slf4j
44+
public class CacheInvalidatorTest extends KopProtocolHandlerTestBase {
45+
46+
47+
@Test
48+
public void testCacheInvalidatorIsTriggered() throws Exception {
49+
String topicName = "testCacheInvalidatorIsTriggered";
50+
String kafkaServer = "localhost:" + getKafkaBrokerPort();
51+
String transactionalId = "xxxx";
52+
53+
Properties producerProps = new Properties();
54+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
55+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
56+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
57+
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10);
58+
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
59+
60+
try (KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps)) {
61+
producer.initTransactions();
62+
producer.beginTransaction();
63+
producer.send(new ProducerRecord<>(topicName, 1, "value")).get();
64+
producer.commitTransaction();
65+
}
66+
67+
try (KConsumer kConsumer = new KConsumer(topicName, getKafkaBrokerPort(), true)) {
68+
kConsumer.getConsumer().subscribe(Collections.singleton(topicName));
69+
ConsumerRecords<Integer, String> records = kConsumer.getConsumer().poll(Duration.ofSeconds(5));
70+
assertNotNull(records);
71+
assertEquals(1, records.count());
72+
ConsumerRecord<Integer, String> record = records.iterator().next();
73+
assertEquals(1, record.key().intValue());
74+
assertEquals("value", record.value());
75+
}
76+
77+
assertFalse(KopBrokerLookupManager.KOP_ADDRESS_CACHE.isEmpty());
78+
assertFalse(KopBrokerLookupManager.LOOKUP_CACHE.isEmpty());
79+
80+
BundlesData bundles = pulsar.getAdminClient().namespaces().getBundles(
81+
conf.getKafkaTenant() + "/" + conf.getKafkaNamespace());
82+
List<String> boundaries = bundles.getBoundaries();
83+
for (int i = 0; i < boundaries.size() - 1; i++) {
84+
String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
85+
pulsar.getAdminClient().namespaces()
86+
.unloadNamespaceBundle(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace(), bundle);
87+
}
88+
89+
Awaitility.await().untilAsserted(() -> {
90+
log.info("LOOKUP_CACHE {}", KopBrokerLookupManager.LOOKUP_CACHE);
91+
log.info("KOP_ADDRESS_CACHE {}", KopBrokerLookupManager.KOP_ADDRESS_CACHE);
92+
assertTrue(KopBrokerLookupManager.KOP_ADDRESS_CACHE.isEmpty());
93+
assertTrue(KopBrokerLookupManager.LOOKUP_CACHE.isEmpty());
94+
});
95+
96+
}
97+
98+
@BeforeClass
99+
@Override
100+
protected void setup() throws Exception {
101+
super.internalSetup();
102+
}
103+
104+
@AfterClass(alwaysRun = true)
105+
@Override
106+
protected void cleanup() throws Exception {
107+
super.internalCleanup();
108+
}
109+
}

0 commit comments

Comments
 (0)