|
23 | 23 | import java.util.Iterator; |
24 | 24 | import java.util.List; |
25 | 25 | import java.util.Map; |
| 26 | +import java.util.Objects; |
26 | 27 | import java.util.concurrent.BlockingQueue; |
27 | 28 | import java.util.concurrent.ConcurrentHashMap; |
28 | 29 | import java.util.concurrent.Future; |
|
110 | 111 | * @author Artem Bilan |
111 | 112 | * @author Chris Gilbert |
112 | 113 | * @author Thomas Strauß |
| 114 | + * @author Nathan Xu |
113 | 115 | */ |
114 | 116 | public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory |
115 | 117 | implements ProducerFactory<K, V>, ApplicationContextAware, |
@@ -660,23 +662,31 @@ public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) { |
660 | 662 |
|
661 | 663 | @Override |
662 | 664 | public void updateConfigs(Map<String, Object> updates) { |
663 | | - updates.entrySet().forEach(entry -> { |
664 | | - if (entry.getKey().equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) { |
665 | | - Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG |
666 | | - + "' must be a String, not a " + entry.getClass().getName()); |
667 | | - Assert.isTrue(this.transactionIdPrefix != null |
668 | | - ? entry.getValue() != null |
669 | | - : entry.getValue() == null, |
670 | | - "Cannot change transactional capability"); |
671 | | - this.transactionIdPrefix = (String) entry.getValue(); |
| 665 | + updates.forEach((key, value) -> { |
| 666 | + if (key == null) { // ConcurrentHashMap doesn't accept null key |
| 667 | + return; |
672 | 668 | } |
673 | | - else if (entry.getKey().equals(ProducerConfig.CLIENT_ID_CONFIG)) { |
674 | | - Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG |
675 | | - + "' must be a String, not a " + entry.getClass().getName()); |
676 | | - this.clientIdPrefix = (String) entry.getValue(); |
| 669 | + if (key.equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) { |
| 670 | + Assert.isTrue( |
| 671 | + value == null || value instanceof String, |
| 672 | + () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG |
| 673 | + + "' must be null or a String, not a " + Objects.requireNonNull(value) |
| 674 | + .getClass() |
| 675 | + .getName() |
| 676 | + ); |
| 677 | + Assert.isTrue( |
| 678 | + (this.transactionIdPrefix != null) == (value != null), |
| 679 | + "Cannot change transactional capability" |
| 680 | + ); |
| 681 | + this.transactionIdPrefix = (String) value; |
677 | 682 | } |
678 | | - else { |
679 | | - this.configs.put(entry.getKey(), entry.getValue()); |
| 683 | + else if (key.equals(ProducerConfig.CLIENT_ID_CONFIG)) { |
| 684 | + Assert.isTrue(value == null || value instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG |
| 685 | + + "' must be null or a String, not a " + Objects.requireNonNull(value).getClass().getName()); |
| 686 | + this.clientIdPrefix = (String) value; |
| 687 | + } |
| 688 | + else if (value != null) { // ConcurrentHashMap doesn't accept null value |
| 689 | + this.configs.put(key, value); |
680 | 690 | } |
681 | 691 | }); |
682 | 692 | } |
|
0 commit comments