Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ subprojects {
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
"**/SourceTaskOffsetCommitterTest.*", "**/WorkerConfigTransformerTest.*", "**/WorkerGroupMemberTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*", "**/WorkerSourceTaskTest.*",
"**/WorkerTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
"**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*", "**/ExactlyOnceWorkerSourceTaskTest.*",
"**/WorkerTaskTest.*",
// streams tests
"**/KafkaStreamsTest.*"
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* source offsets. Records are tracked in the order in which they are submitted, which should match the order they were
* returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via
* {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been
* either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}.
* either {@link SubmittedRecord#ack() acknowledged} or {@link SubmittedRecord#drop dropped}.
* Note that this class is not thread-safe, though a {@link SubmittedRecord} can be
* {@link SubmittedRecord#ack() acknowledged} from a different thread.
*/
Expand All @@ -54,13 +54,13 @@ public SubmittedRecords() {
/**
* Enqueue a new source record before dispatching it to a producer.
* The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the
* producer callback, or {@link #removeLastOccurrence(SubmittedRecord) removed} if the record could not be successfully
* producer callback, or {@link SubmittedRecord#drop() dropped} if the record could not be successfully
* sent to the producer.
*
*
* @param record the record about to be dispatched; may not be null but may have a null
* {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()}
* @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by
* the producer, or {@link #removeLastOccurrence removed} if synchronously rejected by the producer
* the producer, or {@link SubmittedRecord#drop() dropped} if synchronously rejected by the producer
*/
@SuppressWarnings("unchecked")
public SubmittedRecord submit(SourceRecord record) {
Expand All @@ -78,32 +78,6 @@ SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset
return result;
}

/**
* Remove a source record and do not take it into account any longer when tracking offsets.
* Useful if the record has been synchronously rejected by the producer.
* If multiple instances of the same {@link SubmittedRecord} have been submitted already, only the first one found
* (traversing from the end of the deque backward) will be removed.
* @param record the {@link #submit previously-submitted} record to stop tracking; may not be null
* @return whether an instance of the record was removed
*/
public boolean removeLastOccurrence(SubmittedRecord record) {
Deque<SubmittedRecord> deque = records.get(record.partition());
if (deque == null) {
log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition());
return false;
}
boolean result = deque.removeLastOccurrence(record);
if (deque.isEmpty()) {
records.remove(record.partition());
}
if (result) {
messageAcked();
} else {
log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition());
}
return result;
}

/**
* Clear out any acknowledged records at the head of the deques and return a {@link CommittableOffsets snapshot} of the offsets and offset metadata
* accrued between the last time this method was invoked and now. This snapshot can be {@link CommittableOffsets#updatedWith(CommittableOffsets) combined}
Expand Down Expand Up @@ -187,7 +161,7 @@ private synchronized void messageAcked() {
}
}

class SubmittedRecord {
public class SubmittedRecord {
private final Map<String, Object> partition;
private final Map<String, Object> offset;
private final AtomicBoolean acked;
Expand All @@ -208,6 +182,34 @@ public void ack() {
}
}

/**
* Remove this record and do not take it into account any longer when tracking offsets.
* Useful if the record has been synchronously rejected by the producer.
* If multiple instances of this record have been submitted already, only the first one found
* (traversing from the end of the deque backward) will be removed.
* <p>
* This is <strong>not safe</strong> to be called from a different thread
* than what called {@link SubmittedRecords#submit(SourceRecord)}.
* @return whether this instance was dropped
*/
public boolean drop() {
Deque<SubmittedRecord> deque = records.get(partition);
if (deque == null) {
log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", partition);
return false;
}
boolean result = deque.removeLastOccurrence(this);
if (deque.isEmpty()) {
records.remove(partition);
}
if (result) {
messageAcked();
} else {
log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", partition);
}
return result;
}

private boolean acked() {
return acked.get();
}
Expand Down
Loading