Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 24d805d

Browse files
Demogorgon314BewareMyPower
authored andcommitted
[branch-2.11][transaction] Implement producer state manager recovery (#1923)
### Motivation Cherry-pick s4k transaction producer state manager snapshot recovery feature. ### Modifications * Support producer state manager recovery Co-authored-by: Enrico Olivelli <enrico.olivelli@datastax.com> Co-authored-by: Michael Marshall <mmarshall@apache.org> (cherry picked from commit fcf5234) Fix conflicts caused by #1759
1 parent 3ba7e21 commit 24d805d

35 files changed

+1905
-420
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,12 @@ public boolean tryComplete() {
107107
return true;
108108
}
109109
for (Map.Entry<TopicPartition, PartitionLog.ReadRecordsResult> entry : readRecordsResult.entrySet()) {
110-
TopicPartition tp = entry.getKey();
111110
PartitionLog.ReadRecordsResult result = entry.getValue();
112-
PartitionLog partitionLog = replicaManager.getPartitionLog(tp, context.getNamespacePrefix());
113-
PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition(context.getTopicManager());
111+
PartitionLog partitionLog = result.partitionLog();
112+
if (partitionLog == null) {
113+
return true;
114+
}
115+
PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition();
114116
if (currLastPosition.compareTo(PositionImpl.EARLIEST) == 0) {
115117
HAS_ERROR_UPDATER.set(this, true);
116118
return forceComplete();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

+49-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryChannelInitializer;
3131
import io.streamnative.pulsar.handlers.kop.stats.PrometheusMetricsProvider;
3232
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
33+
import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer;
34+
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer;
3335
import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager;
3436
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
3537
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
@@ -47,8 +49,10 @@
4749
import java.util.concurrent.ExecutionException;
4850
import java.util.concurrent.TimeUnit;
4951
import java.util.concurrent.TimeoutException;
52+
import java.util.function.Function;
5053
import lombok.Getter;
5154
import lombok.extern.slf4j.Slf4j;
55+
import org.apache.bookkeeper.common.util.OrderedExecutor;
5256
import org.apache.bookkeeper.common.util.OrderedScheduler;
5357
import org.apache.commons.configuration.Configuration;
5458
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -87,6 +91,8 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
8791
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
8892
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
8993
private LookupClient lookupClient;
94+
95+
private KafkaTopicLookupService kafkaTopicLookupService;
9096
@VisibleForTesting
9197
@Getter
9298
private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
@@ -113,6 +119,9 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
113119
private final Map<String, GroupCoordinator> groupCoordinatorsByTenant = new ConcurrentHashMap<>();
114120
private final Map<String, TransactionCoordinator> transactionCoordinatorByTenant = new ConcurrentHashMap<>();
115121

122+
123+
private OrderedExecutor recoveryExecutor;
124+
116125
@Override
117126
public GroupCoordinator getGroupCoordinator(String tenant) {
118127
return groupCoordinatorsByTenant.computeIfAbsent(tenant, this::createAndBootGroupCoordinator);
@@ -256,6 +265,12 @@ private void invalidatePartitionLog(TopicName topicName) {
256265
});
257266
namespaceService.addNamespaceBundleOwnershipListener(bundleListener);
258267

268+
recoveryExecutor = OrderedExecutor
269+
.newBuilder()
270+
.name("kafka-tx-recovery")
271+
.numThreads(kafkaConfig.getKafkaTransactionRecoveryNumThreads())
272+
.build();
273+
259274
if (kafkaConfig.isKafkaManageSystemNamespaces()) {
260275
// initialize default Group Coordinator
261276
getGroupCoordinator(kafkaConfig.getKafkaMetadataTenant());
@@ -411,6 +426,20 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
411426
lookupClient);
412427
}
413428

429+
class ProducerStateManagerSnapshotProvider implements Function<String, ProducerStateManagerSnapshotBuffer> {
430+
@Override
431+
public ProducerStateManagerSnapshotBuffer apply(String tenant) {
432+
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
433+
return new MemoryProducerStateManagerSnapshotBuffer();
434+
}
435+
return getTransactionCoordinator(tenant)
436+
.getProducerStateManagerSnapshotBuffer();
437+
}
438+
}
439+
440+
private Function<String, ProducerStateManagerSnapshotBuffer> getProducerStateManagerSnapshotBufferByTenant =
441+
new ProducerStateManagerSnapshotProvider();
442+
414443
// this is called after initialize, and with kafkaConfig, brokerService all set.
415444
@Override
416445
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
@@ -426,13 +455,19 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
426455
.timeoutTimer(SystemTimer.builder().executorName("fetch").build())
427456
.build();
428457

458+
kafkaTopicLookupService = new KafkaTopicLookupService(brokerService);
459+
429460
replicaManager = new ReplicaManager(
430461
kafkaConfig,
431462
requestStats,
432463
Time.SYSTEM,
433464
brokerService.getEntryFilters(),
434465
producePurgatory,
435-
fetchPurgatory);
466+
fetchPurgatory,
467+
kafkaTopicLookupService,
468+
getProducerStateManagerSnapshotBufferByTenant,
469+
recoveryExecutor
470+
);
436471

437472
try {
438473
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
@@ -480,6 +515,17 @@ public void close() {
480515
statsProvider.stop();
481516
sendResponseScheduler.shutdown();
482517

518+
if (offsetTopicClient != null) {
519+
offsetTopicClient.close();
520+
}
521+
if (txnTopicClient != null) {
522+
txnTopicClient.close();
523+
}
524+
if (adminManager != null) {
525+
adminManager.shutdown();
526+
}
527+
recoveryExecutor.shutdown();
528+
483529
List<CompletableFuture<?>> closeHandles = new ArrayList<>();
484530
if (offsetTopicClient != null) {
485531
closeHandles.add(offsetTopicClient.closeAsync());
@@ -571,6 +617,8 @@ public TransactionCoordinator initTransactionCoordinator(String tenant, PulsarAd
571617
.transactionLogNumPartitions(kafkaConfig.getKafkaTxnLogTopicNumPartitions())
572618
.transactionMetadataTopicName(MetadataUtils.constructTxnLogTopicBaseName(tenant, kafkaConfig))
573619
.transactionProducerIdTopicName(MetadataUtils.constructTxnProducerIdTopicBaseName(tenant, kafkaConfig))
620+
.transactionProducerStateSnapshotTopicName(MetadataUtils.constructTxProducerStateTopicBaseName(tenant,
621+
kafkaConfig))
574622
.abortTimedOutTransactionsIntervalMs(kafkaConfig.getKafkaTxnAbortTimedOutTransactionCleanupIntervalMs())
575623
.transactionalIdExpirationMs(kafkaConfig.getKafkaTransactionalIdExpirationMs())
576624
.removeExpiredTransactionalIdsIntervalMs(

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1754,7 +1754,6 @@ protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup,
17541754
joinGroupResult.getLeaderId(),
17551755
members
17561756
);
1757-
17581757
if (log.isTraceEnabled()) {
17591758
log.trace("Sending join group response {} for correlation id {} to client {}.",
17601759
response, joinGroup.getHeader().correlationId(), joinGroup.getHeader().clientId());
@@ -2151,6 +2150,10 @@ protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest,
21512150
.setErrorCode(resp.getError().code())
21522151
.setProducerId(resp.getProducerId())
21532152
.setProducerEpoch(resp.getProducerEpoch());
2153+
if (resp.getError() == Errors.COORDINATOR_LOAD_IN_PROGRESS
2154+
|| resp.getError() == Errors.CONCURRENT_TRANSACTIONS) {
2155+
responseData.setThrottleTimeMs(1000);
2156+
}
21542157
response.complete(new InitProducerIdResponse(responseData));
21552158
});
21562159
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java

+19
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
5555
private static final int OffsetsMessageTTL = 3 * 24 * 3600;
5656
// txn configuration
5757
public static final int DefaultTxnLogTopicNumPartitions = 50;
58+
public static final int DefaultTxnProducerStateLogTopicNumPartitions = 8;
5859
public static final int DefaultTxnCoordinatorSchedulerNum = 1;
5960
public static final int DefaultTxnStateManagerSchedulerNum = 1;
6061
public static final long DefaultAbortTimedOutTransactionsIntervalMs = TimeUnit.SECONDS.toMillis(10);
@@ -425,6 +426,24 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
425426
)
426427
private int kafkaTxnLogTopicNumPartitions = DefaultTxnLogTopicNumPartitions;
427428

429+
@FieldContext(
430+
category = CATEGORY_KOP_TRANSACTION,
431+
doc = "Number of partitions for the transaction producer state topic"
432+
)
433+
private int kafkaTxnProducerStateTopicNumPartitions = DefaultTxnProducerStateLogTopicNumPartitions;
434+
435+
@FieldContext(
436+
category = CATEGORY_KOP_TRANSACTION,
437+
doc = "Interval for taking snapshots of the status of pending transactions"
438+
)
439+
private int kafkaTxnProducerStateTopicSnapshotIntervalSeconds = 300;
440+
441+
@FieldContext(
442+
category = CATEGORY_KOP_TRANSACTION,
443+
doc = "Number of threads dedicated to transaction recovery"
444+
)
445+
private int kafkaTransactionRecoveryNumThreads = 8;
446+
428447
@FieldContext(
429448
category = CATEGORY_KOP_TRANSACTION,
430449
doc = "The interval in milliseconds at which to rollback transactions that have timed out."

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java

+25-13
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
public class KafkaTopicConsumerManager implements Closeable {
4949

5050
private final PersistentTopic topic;
51-
private final KafkaRequestHandler requestHandler;
5251

5352
private final AtomicBoolean closed = new AtomicBoolean(false);
5453

@@ -67,13 +66,21 @@ public class KafkaTopicConsumerManager implements Closeable {
6766

6867
private final boolean skipMessagesWithoutIndex;
6968

69+
private final String description;
70+
7071
KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) {
72+
this(requestHandler.ctx.channel() + "",
73+
requestHandler.isSkipMessagesWithoutIndex(),
74+
topic);
75+
}
76+
77+
public KafkaTopicConsumerManager(String description, boolean skipMessagesWithoutIndex, PersistentTopic topic) {
7178
this.topic = topic;
7279
this.cursors = new ConcurrentHashMap<>();
7380
this.createdCursors = new ConcurrentHashMap<>();
7481
this.lastAccessTimes = new ConcurrentHashMap<>();
75-
this.requestHandler = requestHandler;
76-
this.skipMessagesWithoutIndex = requestHandler.isSkipMessagesWithoutIndex();
82+
this.description = description;
83+
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
7784
}
7885

7986
// delete expired cursors, so backlog can be cleared.
@@ -96,7 +103,7 @@ void deleteOneExpiredCursor(long offset) {
96103
if (cursorFuture != null) {
97104
if (log.isDebugEnabled()) {
98105
log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}",
99-
requestHandler.ctx.channel(), offset, cursors.size());
106+
description, offset, cursors.size());
100107
}
101108

102109
// TODO: Should we just cancel this future?
@@ -118,14 +125,19 @@ public void deleteOneCursorAsync(ManagedCursor cursor, String reason) {
118125
public void deleteCursorComplete(Object ctx) {
119126
if (log.isDebugEnabled()) {
120127
log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.",
121-
requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason);
128+
description, cursor.getName(), topic.getName(), reason);
122129
}
123130
}
124131

125132
@Override
126133
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
127-
log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.",
128-
requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason, exception);
134+
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
135+
log.debug("[{}] Cursor already deleted {} for topic {} for reason: {} - {}.",
136+
description, cursor.getName(), topic.getName(), reason, exception.toString());
137+
} else {
138+
log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.",
139+
description, cursor.getName(), topic.getName(), reason, exception);
140+
}
129141
}
130142
}, null);
131143
createdCursors.remove(cursor.getName());
@@ -148,7 +160,7 @@ public CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture(long offs
148160

149161
if (log.isDebugEnabled()) {
150162
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
151-
requestHandler.ctx.channel(), offset, cursors.size());
163+
description, offset, cursors.size());
152164
}
153165
return cursorFuture;
154166
}
@@ -182,7 +194,7 @@ public void add(long offset, Pair<ManagedCursor, Long> pair) {
182194

183195
if (log.isDebugEnabled()) {
184196
log.debug("[{}] Add cursor back {} for offset: {}",
185-
requestHandler.ctx.channel(), pair.getLeft().getName(), offset);
197+
description, pair.getLeft().getName(), offset);
186198
}
187199
}
188200

@@ -194,7 +206,7 @@ public void close() {
194206
}
195207
if (log.isDebugEnabled()) {
196208
log.debug("[{}] Close TCM for topic {}.",
197-
requestHandler.ctx.channel(), topic.getName());
209+
description, topic.getName());
198210
}
199211
final List<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose = new ArrayList<>();
200212
cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture));
@@ -224,7 +236,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
224236
if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) {
225237
log.error("[{}] Async get cursor for offset {} for topic {} failed, "
226238
+ "because current managedLedger has been closed",
227-
requestHandler.ctx.channel(), offset, topic.getName());
239+
description, offset, topic.getName());
228240
CompletableFuture<Pair<ManagedCursor, Long>> future = new CompletableFuture<>();
229241
future.completeExceptionally(new Exception("Current managedLedger for "
230242
+ topic.getName() + " has been closed."));
@@ -243,7 +255,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
243255
final PositionImpl previous = ((ManagedLedgerImpl) ledger).getPreviousPosition((PositionImpl) position);
244256
if (log.isDebugEnabled()) {
245257
log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}",
246-
requestHandler.ctx.channel(), cursorName, offset, position, previous);
258+
description, cursorName, offset, position, previous);
247259
}
248260
try {
249261
final ManagedCursor newCursor = ledger.newNonDurableCursor(previous, cursorName);
@@ -252,7 +264,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
252264
return Pair.of(newCursor, offset);
253265
} catch (ManagedLedgerException e) {
254266
log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.",
255-
requestHandler.ctx.channel(), topic.getName(), offset, previous, e);
267+
description, topic.getName(), offset, previous, e);
256268
return null;
257269
}
258270
});

0 commit comments

Comments
 (0)