@@ -229,49 +229,53 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
229229 remaining .add (datum );
230230 }
231231 }
232- if (offsets .size () > 0 ) {
233- commit (consumer , container , offsets );
234- }
235- if (isSeekAfterError ()) {
236- if (remaining .size () > 0 ) {
237- SeekUtils .seekOrRecover (thrownException , remaining , consumer , container , false ,
238- getFailureTracker (), this .logger , getLogLevel ());
239- ConsumerRecord <?, ?> recovered = remaining .get (0 );
240- commit (consumer , container ,
241- Collections .singletonMap (new TopicPartition (recovered .topic (), recovered .partition ()),
242- ListenerUtils .createOffsetAndMetadata (container , recovered .offset () + 1 )));
243- if (remaining .size () > 1 ) {
244- throw new KafkaException ("Seek to current after exception" , getLogLevel (), thrownException );
245- }
232+ try {
233+ if (offsets .size () > 0 ) {
234+ commit (consumer , container , offsets );
246235 }
247- return ConsumerRecords .empty ();
248236 }
249- else {
250- if (remaining .size () > 0 ) {
251- try {
252- if (getFailureTracker ().recovered (remaining .get (0 ), thrownException , container ,
253- consumer )) {
254- remaining .remove (0 );
237+ finally {
238+ if (isSeekAfterError ()) {
239+ if (remaining .size () > 0 ) {
240+ SeekUtils .seekOrRecover (thrownException , remaining , consumer , container , false ,
241+ getFailureTracker (), this .logger , getLogLevel ());
242+ ConsumerRecord <?, ?> recovered = remaining .get (0 );
243+ commit (consumer , container ,
244+ Collections .singletonMap (new TopicPartition (recovered .topic (), recovered .partition ()),
245+ ListenerUtils .createOffsetAndMetadata (container , recovered .offset () + 1 )));
246+ if (remaining .size () > 1 ) {
247+ throw new KafkaException ("Seek to current after exception" , getLogLevel (), thrownException );
255248 }
256249 }
257- catch (Exception e ) {
258- if (SeekUtils .isBackoffException (thrownException )) {
259- this .logger .debug (e , () -> KafkaUtils .format (remaining .get (0 ))
260- + " included in remaining due to retry back off " + thrownException );
250+ return ConsumerRecords .empty ();
251+ }
252+ else {
253+ if (remaining .size () > 0 ) {
254+ try {
255+ if (getFailureTracker ().recovered (remaining .get (0 ), thrownException , container ,
256+ consumer )) {
257+ remaining .remove (0 );
258+ }
261259 }
262- else {
263- this .logger .error (e , KafkaUtils .format (remaining .get (0 ))
264- + " included in remaining due to " + thrownException );
260+ catch (Exception e ) {
261+ if (SeekUtils .isBackoffException (thrownException )) {
262+ this .logger .debug (e , () -> KafkaUtils .format (remaining .get (0 ))
263+ + " included in remaining due to retry back off " + thrownException );
264+ }
265+ else {
266+ this .logger .error (e , KafkaUtils .format (remaining .get (0 ))
267+ + " included in remaining due to " + thrownException );
268+ }
265269 }
266270 }
271+ if (remaining .isEmpty ()) {
272+ return ConsumerRecords .empty ();
273+ }
274+ Map <TopicPartition , List <ConsumerRecord <K , V >>> remains = new HashMap <>();
275+ remaining .forEach (rec -> remains .computeIfAbsent (new TopicPartition (rec .topic (), rec .partition ()),
276+ tp -> new ArrayList <>()).add ((ConsumerRecord <K , V >) rec ));
277+ return new ConsumerRecords <>(remains );
267278 }
268- if (remaining .isEmpty ()) {
269- return ConsumerRecords .empty ();
270- }
271- Map <TopicPartition , List <ConsumerRecord <K , V >>> remains = new HashMap <>();
272- remaining .forEach (rec -> remains .computeIfAbsent (new TopicPartition (rec .topic (), rec .partition ()),
273- tp -> new ArrayList <>()).add ((ConsumerRecord <K , V >) rec ));
274- return new ConsumerRecords <>(remains );
275279 }
276280 }
277281
0 commit comments