Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit fcf5234

Browse files
Demogorgon314eolivellimichaeljmarshall
authored
[transaction] Implement producer state manager recovery (#1923)
### Motivation Cherry-pick s4k transaction producer state manager snapshot recovery feature. ### Modifications * Support producer state manager recovery Co-authored-by: Enrico Olivelli <enrico.olivelli@datastax.com> Co-authored-by: Michael Marshall <mmarshall@apache.org>
1 parent 45bd29c commit fcf5234

35 files changed

+1905
-420
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,12 @@ public boolean tryComplete() {
107107
return true;
108108
}
109109
for (Map.Entry<TopicPartition, PartitionLog.ReadRecordsResult> entry : readRecordsResult.entrySet()) {
110-
TopicPartition tp = entry.getKey();
111110
PartitionLog.ReadRecordsResult result = entry.getValue();
112-
PartitionLog partitionLog = replicaManager.getPartitionLog(tp, context.getNamespacePrefix());
113-
PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition(context.getTopicManager());
111+
PartitionLog partitionLog = result.partitionLog();
112+
if (partitionLog == null) {
113+
return true;
114+
}
115+
PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition();
114116
if (currLastPosition.compareTo(PositionImpl.EARLIEST) == 0) {
115117
HAS_ERROR_UPDATER.set(this, true);
116118
return forceComplete();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

+49-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryChannelInitializer;
3131
import io.streamnative.pulsar.handlers.kop.stats.PrometheusMetricsProvider;
3232
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
33+
import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer;
34+
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer;
3335
import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager;
3436
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
3537
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
@@ -47,8 +49,10 @@
4749
import java.util.concurrent.ExecutionException;
4850
import java.util.concurrent.TimeUnit;
4951
import java.util.concurrent.TimeoutException;
52+
import java.util.function.Function;
5053
import lombok.Getter;
5154
import lombok.extern.slf4j.Slf4j;
55+
import org.apache.bookkeeper.common.util.OrderedExecutor;
5256
import org.apache.bookkeeper.common.util.OrderedScheduler;
5357
import org.apache.commons.configuration.Configuration;
5458
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -86,6 +90,8 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
8690
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
8791
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
8892
private LookupClient lookupClient;
93+
94+
private KafkaTopicLookupService kafkaTopicLookupService;
8995
@VisibleForTesting
9096
@Getter
9197
private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
@@ -114,6 +120,9 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
114120
private final Map<String, GroupCoordinator> groupCoordinatorsByTenant = new ConcurrentHashMap<>();
115121
private final Map<String, TransactionCoordinator> transactionCoordinatorByTenant = new ConcurrentHashMap<>();
116122

123+
124+
private OrderedExecutor recoveryExecutor;
125+
117126
@Override
118127
public GroupCoordinator getGroupCoordinator(String tenant) {
119128
return groupCoordinatorsByTenant.computeIfAbsent(tenant, this::createAndBootGroupCoordinator);
@@ -268,6 +277,12 @@ private void invalidatePartitionLog(TopicName topicName) {
268277
});
269278
bundleListener.register();
270279

280+
recoveryExecutor = OrderedExecutor
281+
.newBuilder()
282+
.name("kafka-tx-recovery")
283+
.numThreads(kafkaConfig.getKafkaTransactionRecoveryNumThreads())
284+
.build();
285+
271286
if (kafkaConfig.isKafkaManageSystemNamespaces()) {
272287
// initialize default Group Coordinator
273288
getGroupCoordinator(kafkaConfig.getKafkaMetadataTenant());
@@ -436,6 +451,20 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
436451
lookupClient);
437452
}
438453

454+
class ProducerStateManagerSnapshotProvider implements Function<String, ProducerStateManagerSnapshotBuffer> {
455+
@Override
456+
public ProducerStateManagerSnapshotBuffer apply(String tenant) {
457+
if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) {
458+
return new MemoryProducerStateManagerSnapshotBuffer();
459+
}
460+
return getTransactionCoordinator(tenant)
461+
.getProducerStateManagerSnapshotBuffer();
462+
}
463+
}
464+
465+
private Function<String, ProducerStateManagerSnapshotBuffer> getProducerStateManagerSnapshotBufferByTenant =
466+
new ProducerStateManagerSnapshotProvider();
467+
439468
// this is called after initialize, and with kafkaConfig, brokerService all set.
440469
@Override
441470
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
@@ -451,13 +480,19 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
451480
.timeoutTimer(SystemTimer.builder().executorName("fetch").build())
452481
.build();
453482

483+
kafkaTopicLookupService = new KafkaTopicLookupService(brokerService);
484+
454485
replicaManager = new ReplicaManager(
455486
kafkaConfig,
456487
requestStats,
457488
Time.SYSTEM,
458489
brokerService.getEntryFilterProvider().getBrokerEntryFilters(),
459490
producePurgatory,
460-
fetchPurgatory);
491+
fetchPurgatory,
492+
kafkaTopicLookupService,
493+
getProducerStateManagerSnapshotBufferByTenant,
494+
recoveryExecutor
495+
);
461496

462497
try {
463498
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
@@ -505,6 +540,17 @@ public void close() {
505540
statsProvider.stop();
506541
sendResponseScheduler.shutdown();
507542

543+
if (offsetTopicClient != null) {
544+
offsetTopicClient.close();
545+
}
546+
if (txnTopicClient != null) {
547+
txnTopicClient.close();
548+
}
549+
if (adminManager != null) {
550+
adminManager.shutdown();
551+
}
552+
recoveryExecutor.shutdown();
553+
508554
List<CompletableFuture<?>> closeHandles = new ArrayList<>();
509555
if (offsetTopicClient != null) {
510556
closeHandles.add(offsetTopicClient.closeAsync());
@@ -596,6 +642,8 @@ public TransactionCoordinator initTransactionCoordinator(String tenant, PulsarAd
596642
.transactionLogNumPartitions(kafkaConfig.getKafkaTxnLogTopicNumPartitions())
597643
.transactionMetadataTopicName(MetadataUtils.constructTxnLogTopicBaseName(tenant, kafkaConfig))
598644
.transactionProducerIdTopicName(MetadataUtils.constructTxnProducerIdTopicBaseName(tenant, kafkaConfig))
645+
.transactionProducerStateSnapshotTopicName(MetadataUtils.constructTxProducerStateTopicBaseName(tenant,
646+
kafkaConfig))
599647
.abortTimedOutTransactionsIntervalMs(kafkaConfig.getKafkaTxnAbortTimedOutTransactionCleanupIntervalMs())
600648
.transactionalIdExpirationMs(kafkaConfig.getKafkaTransactionalIdExpirationMs())
601649
.removeExpiredTransactionalIdsIntervalMs(

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1754,7 +1754,6 @@ protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup,
17541754
joinGroupResult.getLeaderId(),
17551755
members
17561756
);
1757-
17581757
if (log.isTraceEnabled()) {
17591758
log.trace("Sending join group response {} for correlation id {} to client {}.",
17601759
response, joinGroup.getHeader().correlationId(), joinGroup.getHeader().clientId());
@@ -2151,6 +2150,10 @@ protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest,
21512150
.setErrorCode(resp.getError().code())
21522151
.setProducerId(resp.getProducerId())
21532152
.setProducerEpoch(resp.getProducerEpoch());
2153+
if (resp.getError() == Errors.COORDINATOR_LOAD_IN_PROGRESS
2154+
|| resp.getError() == Errors.CONCURRENT_TRANSACTIONS) {
2155+
responseData.setThrottleTimeMs(1000);
2156+
}
21542157
response.complete(new InitProducerIdResponse(responseData));
21552158
});
21562159
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java

+19
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
5555
private static final int OffsetsMessageTTL = 3 * 24 * 3600;
5656
// txn configuration
5757
public static final int DefaultTxnLogTopicNumPartitions = 50;
58+
public static final int DefaultTxnProducerStateLogTopicNumPartitions = 8;
5859
public static final int DefaultTxnCoordinatorSchedulerNum = 1;
5960
public static final int DefaultTxnStateManagerSchedulerNum = 1;
6061
public static final long DefaultAbortTimedOutTransactionsIntervalMs = TimeUnit.SECONDS.toMillis(10);
@@ -425,6 +426,24 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
425426
)
426427
private int kafkaTxnLogTopicNumPartitions = DefaultTxnLogTopicNumPartitions;
427428

429+
@FieldContext(
430+
category = CATEGORY_KOP_TRANSACTION,
431+
doc = "Number of partitions for the transaction producer state topic"
432+
)
433+
private int kafkaTxnProducerStateTopicNumPartitions = DefaultTxnProducerStateLogTopicNumPartitions;
434+
435+
@FieldContext(
436+
category = CATEGORY_KOP_TRANSACTION,
437+
doc = "Interval for taking snapshots of the status of pending transactions"
438+
)
439+
private int kafkaTxnProducerStateTopicSnapshotIntervalSeconds = 300;
440+
441+
@FieldContext(
442+
category = CATEGORY_KOP_TRANSACTION,
443+
doc = "Number of threads dedicated to transaction recovery"
444+
)
445+
private int kafkaTransactionRecoveryNumThreads = 8;
446+
428447
@FieldContext(
429448
category = CATEGORY_KOP_TRANSACTION,
430449
doc = "The interval in milliseconds at which to rollback transactions that have timed out."

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java

+25-13
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
public class KafkaTopicConsumerManager implements Closeable {
4949

5050
private final PersistentTopic topic;
51-
private final KafkaRequestHandler requestHandler;
5251

5352
private final AtomicBoolean closed = new AtomicBoolean(false);
5453

@@ -67,13 +66,21 @@ public class KafkaTopicConsumerManager implements Closeable {
6766

6867
private final boolean skipMessagesWithoutIndex;
6968

69+
private final String description;
70+
7071
KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) {
72+
this(requestHandler.ctx.channel() + "",
73+
requestHandler.isSkipMessagesWithoutIndex(),
74+
topic);
75+
}
76+
77+
public KafkaTopicConsumerManager(String description, boolean skipMessagesWithoutIndex, PersistentTopic topic) {
7178
this.topic = topic;
7279
this.cursors = new ConcurrentHashMap<>();
7380
this.createdCursors = new ConcurrentHashMap<>();
7481
this.lastAccessTimes = new ConcurrentHashMap<>();
75-
this.requestHandler = requestHandler;
76-
this.skipMessagesWithoutIndex = requestHandler.isSkipMessagesWithoutIndex();
82+
this.description = description;
83+
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
7784
}
7885

7986
// delete expired cursors, so backlog can be cleared.
@@ -96,7 +103,7 @@ void deleteOneExpiredCursor(long offset) {
96103
if (cursorFuture != null) {
97104
if (log.isDebugEnabled()) {
98105
log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}",
99-
requestHandler.ctx.channel(), offset, cursors.size());
106+
description, offset, cursors.size());
100107
}
101108

102109
// TODO: Should we just cancel this future?
@@ -118,14 +125,19 @@ public void deleteOneCursorAsync(ManagedCursor cursor, String reason) {
118125
public void deleteCursorComplete(Object ctx) {
119126
if (log.isDebugEnabled()) {
120127
log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.",
121-
requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason);
128+
description, cursor.getName(), topic.getName(), reason);
122129
}
123130
}
124131

125132
@Override
126133
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
127-
log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.",
128-
requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason, exception);
134+
if (exception instanceof ManagedLedgerException.CursorNotFoundException) {
135+
log.debug("[{}] Cursor already deleted {} for topic {} for reason: {} - {}.",
136+
description, cursor.getName(), topic.getName(), reason, exception.toString());
137+
} else {
138+
log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.",
139+
description, cursor.getName(), topic.getName(), reason, exception);
140+
}
129141
}
130142
}, null);
131143
createdCursors.remove(cursor.getName());
@@ -149,7 +161,7 @@ public CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture(long offs
149161

150162
if (log.isDebugEnabled()) {
151163
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
152-
requestHandler.ctx.channel(), offset, cursors.size());
164+
description, offset, cursors.size());
153165
}
154166
return cursorFuture;
155167
}
@@ -183,7 +195,7 @@ public void add(long offset, Pair<ManagedCursor, Long> pair) {
183195

184196
if (log.isDebugEnabled()) {
185197
log.debug("[{}] Add cursor back {} for offset: {}",
186-
requestHandler.ctx.channel(), pair.getLeft().getName(), offset);
198+
description, pair.getLeft().getName(), offset);
187199
}
188200
}
189201

@@ -195,7 +207,7 @@ public void close() {
195207
}
196208
if (log.isDebugEnabled()) {
197209
log.debug("[{}] Close TCM for topic {}.",
198-
requestHandler.ctx.channel(), topic.getName());
210+
description, topic.getName());
199211
}
200212
final List<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose = new ArrayList<>();
201213
cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture));
@@ -225,7 +237,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
225237
if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) {
226238
log.error("[{}] Async get cursor for offset {} for topic {} failed, "
227239
+ "because current managedLedger has been closed",
228-
requestHandler.ctx.channel(), offset, topic.getName());
240+
description, offset, topic.getName());
229241
CompletableFuture<Pair<ManagedCursor, Long>> future = new CompletableFuture<>();
230242
future.completeExceptionally(new Exception("Current managedLedger for "
231243
+ topic.getName() + " has been closed."));
@@ -244,7 +256,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
244256
final PositionImpl previous = ((ManagedLedgerImpl) ledger).getPreviousPosition((PositionImpl) position);
245257
if (log.isDebugEnabled()) {
246258
log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}",
247-
requestHandler.ctx.channel(), cursorName, offset, position, previous);
259+
description, cursorName, offset, position, previous);
248260
}
249261
try {
250262
final ManagedCursor newCursor = ledger.newNonDurableCursor(previous, cursorName);
@@ -253,7 +265,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
253265
return Pair.of(newCursor, offset);
254266
} catch (ManagedLedgerException e) {
255267
log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.",
256-
requestHandler.ctx.channel(), topic.getName(), offset, previous, e);
268+
description, topic.getName(), offset, previous, e);
257269
return null;
258270
}
259271
});

0 commit comments

Comments
 (0)