Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public ProducerAppendInfo(TopicPartition topicPartition,
this.currentEntry = currentEntry;
this.origin = origin;

updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), currentEntry.coordinatorEpoch, currentEntry.lastTimestamp, currentEntry.currentTxnFirstOffset, Optional.empty());
updatedEntry = currentEntry.withProducerIdAndBatchMetadata(producerId, Optional.empty());
}

public long producerId() {
Expand Down Expand Up @@ -166,27 +166,27 @@ public void appendDataBatch(short epoch,
maybeValidateDataBatch(epoch, firstSeq, firstOffset);
updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp);

OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset;
OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset();
if (currentTxnFirstOffset.isPresent() && !isTransactional) {
// Received a non-transactional message while a transaction is active
throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " +
"offset " + firstOffsetMetadata + " in partition " + topicPartition);
} else if (!currentTxnFirstOffset.isPresent() && isTransactional) {
// Began a new transaction
updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset);
updatedEntry.setCurrentTxnFirstOffset(firstOffset);
transactions.add(new TxnMetadata(producerId, firstOffsetMetadata));
}
}

private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long offset) {
if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) {
if (updatedEntry.coordinatorEpoch() > endTxnMarker.coordinatorEpoch()) {
if (origin == AppendOrigin.REPLICATION) {
log.info("Detected invalid coordinator epoch for producerId {} at offset {} in partition {}: {} is older than previously known coordinator epoch {}",
producerId, offset, topicPartition, endTxnMarker.coordinatorEpoch(), updatedEntry.coordinatorEpoch);
producerId, offset, topicPartition, endTxnMarker.coordinatorEpoch(), updatedEntry.coordinatorEpoch());
} else {
throw new TransactionCoordinatorFencedException("Invalid coordinator epoch for producerId " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + endTxnMarker.coordinatorEpoch() +
" (zombie), " + updatedEntry.coordinatorEpoch + " (current)");
" (zombie), " + updatedEntry.coordinatorEpoch() + " (current)");
}
}
}
Expand All @@ -202,16 +202,11 @@ public Optional<CompletedTxn> appendEndTxnMarker(
// Only emit the `CompletedTxn` for non-empty transactions. A transaction marker
// without any associated data will not have any impact on the last stable offset
// and would not need to be reflected in the transaction index.
Optional<CompletedTxn> completedTxn = updatedEntry.currentTxnFirstOffset.isPresent() ?
Optional.of(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset.getAsLong(), offset,
Optional<CompletedTxn> completedTxn = updatedEntry.currentTxnFirstOffset().isPresent() ?
Optional.of(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset().getAsLong(), offset,
endTxnMarker.controlType() == ControlRecordType.ABORT))
: Optional.empty();

updatedEntry.maybeUpdateProducerEpoch(producerEpoch);
updatedEntry.currentTxnFirstOffset = OptionalLong.empty();
updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch();
updatedEntry.lastTimestamp = timestamp;

updatedEntry.update(producerEpoch, endTxnMarker.coordinatorEpoch(), timestamp);
return completedTxn;
}

Expand All @@ -230,9 +225,9 @@ public String toString() {
", producerEpoch=" + updatedEntry.producerEpoch() +
", firstSequence=" + updatedEntry.firstSeq() +
", lastSequence=" + updatedEntry.lastSeq() +
", currentTxnFirstOffset=" + updatedEntry.currentTxnFirstOffset +
", coordinatorEpoch=" + updatedEntry.coordinatorEpoch +
", lastTimestamp=" + updatedEntry.lastTimestamp +
", currentTxnFirstOffset=" + updatedEntry.currentTxnFirstOffset() +
", coordinatorEpoch=" + updatedEntry.coordinatorEpoch() +
", lastTimestamp=" + updatedEntry.lastTimestamp() +
", startedTransactions=" + transactions +
')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,18 @@
*/
public class ProducerStateEntry {
public static final int NUM_BATCHES_TO_RETAIN = 5;

public int coordinatorEpoch;
public long lastTimestamp;
public OptionalLong currentTxnFirstOffset;

private final long producerId;
private final Deque<BatchMetadata> batchMetadata = new ArrayDeque<>();

private short producerEpoch;
private int coordinatorEpoch;
private long lastTimestamp;
private OptionalLong currentTxnFirstOffset;

public static ProducerStateEntry empty(long producerId) {
return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty());
}

public ProducerStateEntry(long producerId) {
this(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty());
}

public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset, Optional<BatchMetadata> firstBatchMetadata) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
Expand Down Expand Up @@ -84,6 +79,15 @@ public boolean isEmpty() {
return batchMetadata.isEmpty();
}

/**
* Returns a new instance with the provided parameters (when present) and the values from the current instance
* otherwise.
*/
public ProducerStateEntry withProducerIdAndBatchMetadata(long producerId, Optional<BatchMetadata> batchMetadata) {
return new ProducerStateEntry(producerId, this.producerEpoch(), this.coordinatorEpoch, this.lastTimestamp,
this.currentTxnFirstOffset, batchMetadata);
}

public void addBatch(short producerEpoch, int lastSeq, long lastOffset, int offsetDelta, long timestamp) {
maybeUpdateProducerEpoch(producerEpoch);
addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp));
Expand All @@ -106,11 +110,25 @@ private void addBatchMetadata(BatchMetadata batch) {
}

public void update(ProducerStateEntry nextEntry) {
maybeUpdateProducerEpoch(nextEntry.producerEpoch);
while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.removeFirst());
this.coordinatorEpoch = nextEntry.coordinatorEpoch;
this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset;
this.lastTimestamp = nextEntry.lastTimestamp;
update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, nextEntry.lastTimestamp, nextEntry.batchMetadata, nextEntry.currentTxnFirstOffset);
}

public void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp) {
update(producerEpoch, coordinatorEpoch, lastTimestamp, new ArrayDeque<>(0), OptionalLong.empty());
}

private void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp, Deque<BatchMetadata> batchMetadata,
OptionalLong currentTxnFirstOffset) {
maybeUpdateProducerEpoch(producerEpoch);
while (!batchMetadata.isEmpty())
addBatchMetadata(batchMetadata.removeFirst());
this.coordinatorEpoch = coordinatorEpoch;
this.currentTxnFirstOffset = currentTxnFirstOffset;
this.lastTimestamp = lastTimestamp;
}

public void setCurrentTxnFirstOffset(long firstOffset) {
this.currentTxnFirstOffset = OptionalLong.of(firstOffset);
}

public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
Expand All @@ -136,6 +154,18 @@ public long producerId() {
return producerId;
}

public int coordinatorEpoch() {
return coordinatorEpoch;
}

public long lastTimestamp() {
return lastTimestamp;
}

public OptionalLong currentTxnFirstOffset() {
return currentTxnFirstOffset;
}

@Override
public String toString() {
return "ProducerStateEntry(" +
Expand All @@ -147,4 +177,4 @@ public String toString() {
", batchMetadata=" + batchMetadata +
')';
}
}
}