1616
1717package org .springframework .kafka .retrytopic ;
1818
19- import java .util .Arrays ;
2019import java .util .List ;
2120import java .util .function .BiPredicate ;
22- import java .util .stream .Collectors ;
2321import java .util .stream .IntStream ;
2422
2523import org .springframework .classify .BinaryExceptionClassifier ;
3634 * @author Tomaz Fernandes
3735 * @author Gary Russell
3836 * @author João Lima
37+ * @author Wang Zhiyang
3938 * @since 2.7
4039 *
4140 */
@@ -47,19 +46,21 @@ public class DestinationTopicPropertiesFactory {
4746
4847 private final List <Long > backOffValues ;
4948
50- private final BinaryExceptionClassifier exceptionClassifier ;
51-
5249 private final int numPartitions ;
5350
5451 private final int maxAttempts ;
5552
56- private final KafkaOperations <?, ?> kafkaOperations ;
53+ private final boolean isSameIntervalReuse ;
5754
58- private final DltStrategy dltStrategy ;
55+ private final boolean isFixedDelay ;
56+
57+ private final int retryTopicsAmount ;
58+
59+ private final BiPredicate <Integer , Throwable > shouldRetryOn ;
5960
60- private final TopicSuffixingStrategy topicSuffixingStrategy ;
61+ private final KafkaOperations <?, ?> kafkaOperations ;
6162
62- private final SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy ;
63+ private final DltStrategy dltStrategy ;
6364
6465 private final long timeout ;
6566
@@ -90,15 +91,19 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
9091
9192 this .dltStrategy = dltStrategy ;
9293 this .kafkaOperations = kafkaOperations ;
93- this .exceptionClassifier = exceptionClassifier ;
9494 this .numPartitions = numPartitions ;
95- this .topicSuffixingStrategy = topicSuffixingStrategy ;
96- this .sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy ;
9795 this .timeout = timeout ;
9896 this .destinationTopicSuffixes = new DestinationTopicSuffixes (retryTopicSuffix , dltSuffix );
9997 this .backOffValues = backOffValues ;
100- // Max Attempts include the initial try.
101- this .maxAttempts = this .backOffValues .size () + 1 ;
98+ // Max Attempts to include the initial try.
99+ int backOffValuesSize = this .backOffValues .size ();
100+ this .maxAttempts = backOffValuesSize + 1 ;
101+ this .shouldRetryOn = (attempt , throwable ) -> attempt < this .maxAttempts
102+ && exceptionClassifier .classify (throwable );
103+ this .isSameIntervalReuse = SameIntervalTopicReuseStrategy .SINGLE_TOPIC .equals (sameIntervalTopicReuseStrategy );
104+ this .retryTopicsAmount = backOffValuesSize - reusableTopicAttempts ();
105+ this .isFixedDelay = TopicSuffixingStrategy .SUFFIX_WITH_INDEX_VALUE .equals (topicSuffixingStrategy )
106+ || backOffValuesSize > 1 && backOffValues .stream ().distinct ().count () == 1 ;
102107 }
103108
104109 /**
@@ -113,71 +118,26 @@ public DestinationTopicPropertiesFactory autoStartDltHandler(@Nullable Boolean a
113118 }
114119
115120 public List <DestinationTopic .Properties > createProperties () {
116- return isSingleTopicFixedDelay ()
117- ? createPropertiesForFixedDelaySingleTopic ()
118- : createPropertiesForDefaultTopicStrategy ();
119- }
120-
121- private List <DestinationTopic .Properties > createPropertiesForFixedDelaySingleTopic () {
122- return isNoDltStrategy ()
123- ? Arrays .asList (createMainTopicProperties (),
124- createRetryProperties (1 , getShouldRetryOn ()))
125- : Arrays .asList (createMainTopicProperties (),
126- createRetryProperties (1 , getShouldRetryOn ()),
127- createDltProperties ());
128- }
129-
130- private boolean isSingleTopicFixedDelay () {
131- return (this .backOffValues .size () == 1 || isFixedDelay ()) && isSingleTopicSameIntervalTopicReuseStrategy ();
132- }
133-
134- private boolean isSingleTopicSameIntervalTopicReuseStrategy () {
135- return SameIntervalTopicReuseStrategy .SINGLE_TOPIC .equals (this .sameIntervalTopicReuseStrategy );
136- }
137-
138- private List <DestinationTopic .Properties > createPropertiesForDefaultTopicStrategy () {
139-
140- int retryTopicsAmount = retryTopicsAmount ();
141-
142- return IntStream .rangeClosed (0 , isNoDltStrategy ()
143- ? retryTopicsAmount
144- : retryTopicsAmount + 1 )
145- .mapToObj (this ::createTopicProperties )
146- .collect (Collectors .toList ());
147- }
148-
149- int retryTopicsAmount () {
150- return this .backOffValues .size () - reusableTopicAttempts ();
151- }
152-
153- private int reusableTopicAttempts () {
154- return this .backOffValues .size () > 0
155- ? !isFixedDelay ()
156- ? isSingleTopicSameIntervalTopicReuseStrategy ()
157- // Assuming that duplicates are always in
158- // the end of the list.
159- ? amountOfDuplicates (this .backOffValues .get (this .backOffValues .size () - 1 )) - 1
160- : 0
161- : 0
162- : 0 ;
163- }
164-
165- private boolean isNoDltStrategy () {
166- return DltStrategy .NO_DLT .equals (this .dltStrategy );
121+ int topicAmount = DltStrategy .NO_DLT .equals (this .dltStrategy )
122+ ? this .retryTopicsAmount
123+ : this .retryTopicsAmount + 1 ;
124+ return IntStream
125+ .rangeClosed (0 , topicAmount )
126+ .mapToObj (this ::createTopicProperties )
127+ .toList ();
167128 }
168129
169130 private DestinationTopic .Properties createTopicProperties (int index ) {
170- BiPredicate <Integer , Throwable > shouldRetryOn = getShouldRetryOn ();
171131 return index == 0
172132 ? createMainTopicProperties ()
173- : ( index <= this .retryTopicsAmount ())
174- ? createRetryProperties (index , shouldRetryOn )
133+ : index <= this .retryTopicsAmount
134+ ? createRetryProperties (index )
175135 : createDltProperties ();
176136 }
177137
178138 private DestinationTopic .Properties createMainTopicProperties () {
179139 return new DestinationTopic .Properties (0 , MAIN_TOPIC_SUFFIX , DestinationTopic .Type .MAIN , this .maxAttempts ,
180- this .numPartitions , this .dltStrategy , this .kafkaOperations , getShouldRetryOn () , this .timeout );
140+ this .numPartitions , this .dltStrategy , this .kafkaOperations , this . shouldRetryOn , this .timeout );
181141 }
182142
183143 private DestinationTopic .Properties createDltProperties () {
@@ -186,49 +146,42 @@ private DestinationTopic.Properties createDltProperties() {
186146 this .kafkaOperations , (a , e ) -> false , this .timeout , this .autoStartDltHandler );
187147 }
188148
189- private BiPredicate <Integer , Throwable > getShouldRetryOn () {
190- return (attempt , throwable ) -> attempt < this .maxAttempts && this .exceptionClassifier .classify (throwable );
191- }
192-
193- private DestinationTopic .Properties createRetryProperties (int index ,
194- BiPredicate <Integer , Throwable > shouldRetryOn ) {
195-
149+ private DestinationTopic .Properties createRetryProperties (int index ) {
196150 int indexInBackoffValues = index - 1 ;
197- Long thisBackOffValue = this .backOffValues .get (indexInBackoffValues );
198- DestinationTopic .Type topicTypeToUse = isDelayWithReusedTopic (thisBackOffValue )
199- ? Type .REUSABLE_RETRY_TOPIC
200- : Type .RETRY ;
201- return createProperties (topicTypeToUse , shouldRetryOn , indexInBackoffValues ,
202- getTopicSuffix (indexInBackoffValues , thisBackOffValue ));
203- }
204-
205- private String getTopicSuffix (int indexInBackoffValues , Long thisBackOffValue ) {
206- return isSingleTopicFixedDelay ()
207- ? this .destinationTopicSuffixes .getRetrySuffix ()
208- : isSuffixWithIndexStrategy () || isFixedDelay ()
209- ? joinWithRetrySuffix (indexInBackoffValues )
210- : hasDuplicates (thisBackOffValue )
211- ? joinWithRetrySuffix (thisBackOffValue )
212- .concat (suffixForRepeatedInterval (indexInBackoffValues , thisBackOffValue ))
213- : joinWithRetrySuffix (thisBackOffValue );
214- }
215-
216- private String suffixForRepeatedInterval (int indexInBackoffValues , Long thisBackOffValue ) {
217- return isSingleTopicSameIntervalTopicReuseStrategy ()
218- ? ""
219- : "-" + getIndexInBackoffValues (indexInBackoffValues , thisBackOffValue );
151+ long thisBackOffValue = this .backOffValues .get (indexInBackoffValues );
152+ return createProperties (thisBackOffValue , getTopicSuffix (indexInBackoffValues , thisBackOffValue ));
220153 }
221154
222- private boolean isDelayWithReusedTopic (Long backoffValue ) {
223- return hasDuplicates (backoffValue ) && isSingleTopicSameIntervalTopicReuseStrategy ();
155+ private String getTopicSuffix (int indexInBackoffValues , long thisBackOffValue ) {
156+ if (this .isSameIntervalReuse && this .retryTopicsAmount == 1 ) {
157+ return this .destinationTopicSuffixes .getRetrySuffix ();
158+ }
159+ else if (this .isFixedDelay ) {
160+ return joinWithRetrySuffix (indexInBackoffValues );
161+ }
162+ else {
163+ String retrySuffix = joinWithRetrySuffix (thisBackOffValue );
164+ if (!this .isSameIntervalReuse && hasDuplicates (thisBackOffValue )) {
165+ return retrySuffix .concat ("-" + getIndexInBackoffValues (indexInBackoffValues , thisBackOffValue ));
166+ }
167+ return retrySuffix ;
168+ }
224169 }
225170
226171 private int getIndexInBackoffValues (int indexInBackoffValues , Long thisBackOffValue ) {
227172 return indexInBackoffValues - this .backOffValues .indexOf (thisBackOffValue );
228173 }
229174
230- private boolean isSuffixWithIndexStrategy () {
231- return TopicSuffixingStrategy .SUFFIX_WITH_INDEX_VALUE .equals (this .topicSuffixingStrategy );
175+ private DestinationTopic .Type getDestinationTopicType (Long backOffValue ) {
176+ return this .isSameIntervalReuse && hasDuplicates (backOffValue ) ? Type .REUSABLE_RETRY_TOPIC : Type .RETRY ;
177+ }
178+
179+ private int reusableTopicAttempts () {
180+ if (this .isSameIntervalReuse && this .backOffValues .size () > 1 ) {
181+ // Assuming that duplicates are always at the end of the list.
182+ return amountOfDuplicates (this .backOffValues .get (this .backOffValues .size () - 1 )) - 1 ;
183+ }
184+ return 0 ;
232185 }
233186
234187 private boolean hasDuplicates (Long thisBackOffValue ) {
@@ -238,22 +191,15 @@ private boolean hasDuplicates(Long thisBackOffValue) {
238191 private int amountOfDuplicates (Long thisBackOffValue ) {
239192 return Long .valueOf (this .backOffValues
240193 .stream ()
241- .filter (value -> value .equals (thisBackOffValue ))
242- .count ()).intValue ();
243- }
244-
245- private DestinationTopic .Properties createProperties (DestinationTopic .Type topicType ,
246- BiPredicate <Integer , Throwable > shouldRetryOn ,
247- int indexInBackoffValues ,
248- String suffix ) {
249- return new DestinationTopic .Properties (this .backOffValues .get (indexInBackoffValues ), suffix ,
250- topicType , this .maxAttempts , this .numPartitions , this .dltStrategy ,
251- this .kafkaOperations , shouldRetryOn , this .timeout );
194+ .filter (thisBackOffValue ::equals )
195+ .count ())
196+ .intValue ();
252197 }
253198
254- private boolean isFixedDelay () {
255- // If all values are the same, such as in NoBackOffPolicy and FixedBackoffPolicy
256- return this .backOffValues .size () > 1 && this .backOffValues .stream ().distinct ().count () == 1 ;
199+ private DestinationTopic .Properties createProperties (long delayMs , String suffix ) {
200+ return new DestinationTopic .Properties (delayMs , suffix , getDestinationTopicType (delayMs ),
201+ this .maxAttempts , this .numPartitions , this .dltStrategy , this .kafkaOperations , this .shouldRetryOn ,
202+ this .timeout );
257203 }
258204
259205 private String joinWithRetrySuffix (long parameter ) {
0 commit comments