Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8710: Update InitProducerId to allow transactional producers to… #7115

Merged
merged 16 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
package org.apache.kafka.common.utils;

import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
import org.apache.kafka.common.record.RecordBatch;

class ProducerIdAndEpoch {
static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
public class ProducerIdAndEpoch {
public static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH);

public final long producerId;
public final short epoch;

ProducerIdAndEpoch(long producerId, short epoch) {
public ProducerIdAndEpoch(long producerId, short epoch) {
this.producerId = producerId;
this.epoch = epoch;
}

public boolean isValid() {
return NO_PRODUCER_ID < producerId;
return RecordBatch.NO_PRODUCER_ID < producerId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
// Version 1 is the same as version 0.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",
"about": "The transactional id, or null if the producer is not transactional." },
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
"about": "The time in ms to wait for before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." }
"about": "The time in ms to wait before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." },
{ "name": "ProducerId", "type": "int64", "versions": "2+", "default": "-1",
"about": "The producer id. This is used to disambiguate requests if a transactional id is reused following its expiration." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "2+", "default": "-1",
"about": "The producer's current epoch. This will be checked against the producer epoch on the broker, and the request will return an error if they do not match." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
// Starting in version 1, on quota violation, brokers send out responses before throttling.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 is the same as version 2.
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}

object TransactionCoordinator {

Expand All @@ -55,7 +55,8 @@ object TransactionCoordinator {
// we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged
val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId,
reaperEnabled = false, timerEnabled = false)
val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time, metrics)
val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig,
time, metrics, config.interBrokerProtocolVersion)

val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
Expand Down Expand Up @@ -104,6 +105,7 @@ class TransactionCoordinator(brokerId: Int,

def handleInitProducerId(transactionalId: String,
transactionTimeoutMs: Int,
expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
responseCallback: InitProducerIdCallback): Unit = {

if (transactionalId == null) {
Expand All @@ -124,7 +126,9 @@ class TransactionCoordinator(brokerId: Int,
val producerId = producerIdManager.generateProducerId()
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
Expand All @@ -140,7 +144,8 @@ class TransactionCoordinator(brokerId: Int,
val txnMetadata = existingEpochAndMetadata.transactionMetadata

txnMetadata.inLock {
prepareInitProduceIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
prepareInitProducerIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata,
expectedProducerIdAndEpoch)
}
}

Expand Down Expand Up @@ -183,13 +188,32 @@ class TransactionCoordinator(brokerId: Int,
}
}

private def prepareInitProduceIdTransit(transactionalId: String,
private def prepareInitProducerIdTransit(transactionalId: String,
transactionTimeoutMs: Int,
coordinatorEpoch: Int,
txnMetadata: TransactionMetadata): ApiResult[(Int, TxnTransitMetadata)] = {
txnMetadata: TransactionMetadata,
expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch]): ApiResult[(Int, TxnTransitMetadata)] = {

def isValidProducerId(producerIdAndEpoch: ProducerIdAndEpoch): Boolean = {
// If a producer ID and epoch are provided by the request, fence the producer unless one of the following is true:
// 1. The producer epoch is equal to -1, which implies that the metadata was just created. This is the case of a
// producer recovering from an UNKNOWN_PRODUCER_ID error, and it is safe to return the newly-generated
// producer ID.
// 2. The expected producer ID matches the ID in current metadata (the epoch will be checked when we try to
// increment it)
// 3. The expected producer ID matches the previous one and the expected epoch is exhausted, in which case this
// could be a retry after a valid epoch bump that the producer never received the response for
txnMetadata.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH ||
producerIdAndEpoch.producerId == txnMetadata.producerId ||
(producerIdAndEpoch.producerId == txnMetadata.lastProducerId && TransactionMetadata.isEpochExhausted(producerIdAndEpoch.epoch))
}

if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
Left(Errors.CONCURRENT_TRANSACTIONS)
}
else if (!expectedProducerIdAndEpoch.forall(isValidProducerId)) {
Left(Errors.INVALID_PRODUCER_EPOCH)
} else {
// caller should have synchronized on txnMetadata already
txnMetadata.state match {
Expand All @@ -198,14 +222,22 @@ class TransactionCoordinator(brokerId: Int,
Left(Errors.CONCURRENT_TRANSACTIONS)

case CompleteAbort | CompleteCommit | Empty =>
val transitMetadata = if (txnMetadata.isProducerEpochExhausted) {
val newProducerId = producerIdManager.generateProducerId()
txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds())
} else {
txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds())
}
val transitMetadataResult =
// If the epoch is exhausted and the expected epoch (if provided) matches it, generate a new producer ID
if (txnMetadata.isProducerEpochExhausted &&
expectedProducerIdAndEpoch.forall(_.epoch == txnMetadata.producerEpoch)) {
val newProducerId = producerIdManager.generateProducerId()
Right(txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds(),
expectedProducerIdAndEpoch.isDefined))
} else {
txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, expectedProducerIdAndEpoch.map(_.epoch),
time.milliseconds())
}

Right(coordinatorEpoch, transitMetadata)
transitMetadataResult match {
case Right(transitMetadata) => Right((coordinatorEpoch, transitMetadata))
case Left(err) => Left(err)
}

case Ongoing =>
// indicate to abort the current ongoing txn first. Note that this epoch is never returned to the
Expand Down Expand Up @@ -325,6 +357,7 @@ class TransactionCoordinator(brokerId: Int,
// We should clear the pending state to make way for the transition to PrepareAbort and also bump
// the epoch in the transaction metadata we are about to append.
txnMetadata.pendingState = None
txnMetadata.lastProducerEpoch = txnMetadata.producerEpoch
txnMetadata.producerEpoch = producerEpoch
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.record.{CompressionType, RecordBatch}

import scala.collection.mutable

Expand Down Expand Up @@ -223,8 +223,8 @@ object TransactionLog {
val entryTimestamp = value.getLong(TxnEntryTimestampField)
val startTimestamp = value.getLong(TxnStartTimestampField)

val transactionMetadata = new TransactionMetadata(transactionalId, producerId, epoch, timeout, state,
mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp)
val transactionMetadata = new TransactionMetadata(transactionalId, producerId, RecordBatch.NO_PRODUCER_ID,
epoch, RecordBatch.NO_PRODUCER_EPOCH, timeout, state, mutable.Set.empty[TopicPartition],startTimestamp, entryTimestamp)

if (!state.equals(Empty)) {
val topicPartitionArray = value.getArray(TxnPartitionsField)
Expand Down
Loading