Skip to content

Commit

Permalink
fix tests and revert refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
stillya committed Dec 12, 2023
1 parent defa439 commit 0969e05
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,9 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
producerProperties.putAll(overrideProperties);
producerProperties = ensureExistingTransactionIdPrefixInProperties(producerProperties);
DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(producerProperties,
getKeySerializerSupplier(),
getValueSerializerSupplier(),
isConfigureSerializers());
getKeySerializerSupplier(),
getValueSerializerSupplier(),
isConfigureSerializers());
newFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds());
newFactory.setProducerPerThread(isProducerPerThread());
for (ProducerPostProcessor<K, V> templatePostProcessor : getPostProcessors()) {
Expand Down Expand Up @@ -867,7 +867,7 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
private boolean expire(CloseSafeProducer<K, V> producer) {
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
if (expired) {
closeTransactionProducer(producer, this.physicalCloseTimeout, this.listeners);
producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
}
return expired;
}
Expand All @@ -880,14 +880,10 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
else {
this.globalLock.lock();
try {
if (producerToRemove.epoch != this.epoch.get()) {
producerToRemove.closeDelegate(timeout, this.listeners);
return true;
}

BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
if (txIdCache != null && !txIdCache.contains(producerToRemove)
&& !txIdCache.offer(producerToRemove)) {
if (producerToRemove.epoch != this.epoch.get()
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
&& !txIdCache.offer(producerToRemove))) {
closeTransactionProducer(producerToRemove, timeout, this.listeners);
return true;
}
Expand All @@ -900,7 +896,7 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
}

private void closeTransactionProducer(CloseSafeProducer<K, V> producer, Duration timeout,
List<Listener<K, V>> listeners) {
List<Listener<K, V>> listeners) {
try {
producer.closeDelegate(timeout, listeners);
}
Expand Down Expand Up @@ -1084,12 +1080,16 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
LOGGER.trace(() -> toString() + " send(" + record + ")");
return this.delegate.send(record, (metadata, exception) -> {
if (exception instanceof OutOfOrderSequenceException) {
CloseSafeProducer.this.producerFailed = exception;
close(CloseSafeProducer.this.closeTimeout);
return this.delegate.send(record, new Callback() {

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception instanceof OutOfOrderSequenceException) {
CloseSafeProducer.this.producerFailed = exception;
close(CloseSafeProducer.this.closeTimeout);
}
callback.onCompletion(metadata, exception);
}
callback.onCompletion(metadata, exception);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class DefaultTransactionIdSuffixStrategy implements TransactionIdSuffixSt
*/
@Override
public String acquireSuffix(String txIdPrefix) {
Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null");
Assert.notNull(txIdPrefix, () -> "'txIdPrefix' must not be null");
BlockingQueue<String> cache = getSuffixCache(txIdPrefix);
if (cache == null) {
return String.valueOf(this.transactionIdSuffix.getAndIncrement());
Expand All @@ -71,8 +71,8 @@ public String acquireSuffix(String txIdPrefix) {

@Override
public void releaseSuffix(String txIdPrefix, String suffix) {
Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null");
Assert.notNull(suffix, "'suffix' must not be null");
Assert.notNull(txIdPrefix, () -> "'txIdPrefix' must not be null");
Assert.notNull(suffix, () -> "'suffix' must not be null");
if (this.maxCache <= 0) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public Producer<String, String> createProducer(@Nullable String txIdPrefixArg) {
}
KafkaTestUtils.getPropertyValue(this, "cache", Map.class).put("foo", cache);
CloseSafeProducer<String, String> closeSafeProducer = new CloseSafeProducer<>(producer,
this::cacheReturner, "foo", Duration.ofSeconds(1), "factory", 0);
this::cacheReturner, "foo", "1", Duration.ofSeconds(1), "factory", 0);
return closeSafeProducer;
}

Expand Down

0 comments on commit 0969e05

Please sign in to comment.