@@ -190,25 +190,27 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
190190
191191 private final AtomicBoolean enhancerIsBuilt = new AtomicBoolean ();
192192
193+ @ SuppressWarnings ("NullAway.Init" )
193194 private KafkaListenerEndpointRegistry endpointRegistry ;
194195
195196 private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME ;
196197
197- @ Nullable
198- private ApplicationContext applicationContext ;
198+ private @ Nullable ApplicationContext applicationContext ;
199199
200+ @ SuppressWarnings ("NullAway.Init" )
200201 private BeanFactory beanFactory ;
201202
202203 private BeanExpressionResolver resolver = new StandardBeanExpressionResolver ();
203204
205+ @ SuppressWarnings ("NullAway.Init" )
204206 private BeanExpressionContext expressionContext ;
205207
206208 private Charset charset = StandardCharsets .UTF_8 ;
207209
210+ @ SuppressWarnings ("NullAway.Init" )
208211 private AnnotationEnhancer enhancer ;
209212
210- @ Nullable
211- private RetryTopicConfigurer retryTopicConfigurer ;
213+ private @ Nullable RetryTopicConfigurer retryTopicConfigurer ;
212214
213215 private final Lock globalLock = new ReentrantLock ();
214216
@@ -473,9 +475,6 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
473475 Method checked = checkProxy (method , bean );
474476 KafkaHandler annotation = AnnotationUtils .findAnnotation (method , KafkaHandler .class );
475477 if (annotation != null && annotation .isDefault ()) {
476- Method toAssert = defaultMethod ;
477- Assert .state (toAssert == null , () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
478- + toAssert .toString () + " and " + method );
479478 defaultMethod = checked ;
480479 }
481480 checkedMethods .add (checked );
@@ -737,9 +736,8 @@ private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaList
737736 }
738737 }
739738
740- @ Nullable
741- private KafkaListenerContainerFactory <?> resolveContainerFactory (KafkaListener kafkaListener ,
742- Object factoryTarget , String beanName ) {
739+ private @ Nullable KafkaListenerContainerFactory <?> resolveContainerFactory (KafkaListener kafkaListener ,
740+ @ Nullable Object factoryTarget , String beanName ) {
743741
744742 String containerFactory = kafkaListener .containerFactory ();
745743 if (!StringUtils .hasText (containerFactory )) {
@@ -786,7 +784,7 @@ protected void assertBeanFactory() {
786784 Assert .state (this .beanFactory != null , "BeanFactory must be set to obtain container factory by bean name" );
787785 }
788786
789- protected String noBeanFoundMessage (Object target , String listenerBeanName , String requestedBeanName ,
787+ protected String noBeanFoundMessage (@ Nullable Object target , String listenerBeanName , String requestedBeanName ,
790788 Class <?> expectedClass ) {
791789
792790 return "Could not register Kafka listener endpoint on ["
@@ -833,7 +831,7 @@ private void loadProperty(Properties properties, String property, Object value)
833831 }
834832 }
835833
836- private String getEndpointId (KafkaListener kafkaListener ) {
834+ private @ Nullable String getEndpointId (KafkaListener kafkaListener ) {
837835 if (StringUtils .hasText (kafkaListener .id ())) {
838836 return resolveExpressionAsString (kafkaListener .id (), "id" );
839837 }
@@ -875,7 +873,7 @@ private String[] resolveTopics(KafkaListener kafkaListener) {
875873 return result .toArray (new String [0 ]);
876874 }
877875
878- private Pattern resolvePattern (KafkaListener kafkaListener ) {
876+ private @ Nullable Pattern resolvePattern (KafkaListener kafkaListener ) {
879877 Pattern pattern = null ;
880878 String text = kafkaListener .topicPattern ();
881879 if (StringUtils .hasText (text )) {
@@ -896,6 +894,7 @@ else if (resolved != null) {
896894
897895 private List <TopicPartitionOffset > resolveTopicPartitionsList (TopicPartition topicPartition ) {
898896 Object topic = resolveExpression (topicPartition .topic ());
897+ Assert .state (topic != null , "Topic must not be null" );
899898 Assert .state (topic instanceof String ,
900899 () -> "topic in @TopicPartition must resolve to a String, not " + topic .getClass ());
901900 Assert .state (StringUtils .hasText ((String ) topic ), "topic in @TopicPartition must not be empty" );
@@ -907,19 +906,22 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
907906 for (String partition : partitions ) {
908907 resolvePartitionAsInteger ((String ) topic , resolveExpression (partition ), result );
909908 }
910- if (partitionOffsets .length == 1 && resolveExpression (partitionOffsets [0 ].partition ()).equals ("*" )) {
911- result .forEach (tpo -> {
912- tpo .setOffset (resolveInitialOffset (tpo .getTopic (), partitionOffsets [0 ]));
913- tpo .setRelativeToCurrent (isRelative (tpo .getTopic (), partitionOffsets [0 ]));
914- });
915- }
916- else {
917- for (PartitionOffset partitionOffset : partitionOffsets ) {
918- Assert .isTrue (!partitionOffset .partition ().equals ("*" ), () ->
919- "Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result );
920- resolvePartitionAsInteger ((String ) topic , resolveExpression (partitionOffset .partition ()), result ,
921- resolveInitialOffset (topic , partitionOffset ), isRelative (topic , partitionOffset ), true ,
922- resolveExpression (partitionOffset .seekPosition ()));
909+ if (partitionOffsets .length > 0 ) {
910+ Object resolvedExpression = resolveExpression (partitionOffsets [0 ].partition ());
911+ if (partitionOffsets .length == 1 && resolvedExpression != null && resolvedExpression .equals ("*" )) {
912+ result .forEach (tpo -> {
913+ tpo .setOffset (resolveInitialOffset (tpo .getTopic (), partitionOffsets [0 ]));
914+ tpo .setRelativeToCurrent (isRelative (tpo .getTopic (), partitionOffsets [0 ]));
915+ });
916+ }
917+ else {
918+ for (PartitionOffset partitionOffset : partitionOffsets ) {
919+ Assert .isTrue (!partitionOffset .partition ().equals ("*" ), () ->
920+ "Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result );
921+ resolvePartitionAsInteger ((String ) topic , resolveExpression (partitionOffset .partition ()), result ,
922+ resolveInitialOffset (topic , partitionOffset ), isRelative (topic , partitionOffset ), true ,
923+ resolveExpression (partitionOffset .seekPosition ()));
924+ }
923925 }
924926 }
925927 Assert .isTrue (!result .isEmpty (), () -> "At least one partition required for " + topic );
@@ -938,9 +940,14 @@ else if (initialOffsetValue instanceof Long lng) {
938940 initialOffset = lng ;
939941 }
940942 else {
941- throw new IllegalArgumentException (String .format (
942- "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'" ,
943- topic , partitionOffset .initialOffset (), initialOffsetValue .getClass ()));
943+ if (initialOffsetValue != null ) {
944+ throw new IllegalArgumentException (String .format (
945+ "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'" ,
946+ topic , partitionOffset .initialOffset (), initialOffsetValue .getClass ()));
947+ }
948+ else {
949+ throw new IllegalArgumentException ("@PartitionOffset for topic '" + topic + "' cannot be empty. Initial offset is null" );
950+ }
944951 }
945952 return initialOffset ;
946953 }
@@ -955,15 +962,20 @@ else if (relativeToCurrentValue instanceof Boolean bool) {
955962 relativeToCurrent = bool ;
956963 }
957964 else {
958- throw new IllegalArgumentException (String .format (
959- "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'" ,
960- topic , partitionOffset .relativeToCurrent (), relativeToCurrentValue .getClass ()));
965+ if (relativeToCurrentValue != null ) {
966+ throw new IllegalArgumentException (String .format (
967+ "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'" ,
968+ topic , partitionOffset .relativeToCurrent (), relativeToCurrentValue .getClass ()));
969+ }
970+ else {
971+ throw new IllegalArgumentException ("@PartitionOffset for topic '" + topic + "' cannot be empty. Relative to current value is null" );
972+ }
961973 }
962974 return relativeToCurrent ;
963975 }
964976
965977 @ SuppressWarnings (UNCHECKED )
966- private void resolveAsString (Object resolvedValue , List <String > result ) {
978+ private void resolveAsString (@ Nullable Object resolvedValue , List <String > result ) {
967979 if (resolvedValue instanceof String [] strArr ) {
968980 for (Object object : strArr ) {
969981 resolveAsString (object , result );
@@ -983,12 +995,12 @@ else if (resolvedValue instanceof Iterable) {
983995 }
984996 }
985997
986- private void resolvePartitionAsInteger (String topic , Object resolvedValue , List <TopicPartitionOffset > result ) {
998+ private void resolvePartitionAsInteger (String topic , @ Nullable Object resolvedValue , List <TopicPartitionOffset > result ) {
987999 resolvePartitionAsInteger (topic , resolvedValue , result , null , false , false , null );
9881000 }
9891001
9901002 @ SuppressWarnings (UNCHECKED )
991- private void resolvePartitionAsInteger (String topic , Object resolvedValue , List <TopicPartitionOffset > result ,
1003+ private void resolvePartitionAsInteger (String topic , @ Nullable Object resolvedValue , List <TopicPartitionOffset > result ,
9921004 @ Nullable Long offset , boolean isRelative , boolean checkDups , @ Nullable Object seekPosition ) {
9931005
9941006 if (resolvedValue instanceof String [] strArr ) {
@@ -1034,6 +1046,7 @@ else if (resolvedValue instanceof Integer intgr) {
10341046 }
10351047 }
10361048
1049+ @ SuppressWarnings ("NullAway" ) // Overridden method does not define nullness
10371050 private TopicPartitionOffset .SeekPosition resloveTopicPartitionOffsetSeekPosition (@ Nullable Object seekPosition ) {
10381051 TopicPartitionOffset .SeekPosition resloveTpoSp = null ;
10391052 if (seekPosition instanceof String seekPositionName ) {
@@ -1062,7 +1075,7 @@ private TopicPartitionOffset createTopicPartitionOffset(String topic, int partit
10621075 }
10631076 }
10641077
1065- private String resolveExpressionAsString (String value , String attribute ) {
1078+ private @ Nullable String resolveExpressionAsString (String value , String attribute ) {
10661079 Object resolved = resolveExpression (value );
10671080 if (resolved instanceof String str ) {
10681081 return str ;
@@ -1074,7 +1087,7 @@ else if (resolved != null) {
10741087 return null ;
10751088 }
10761089
1077- @ Nullable
1090+ @ SuppressWarnings ( "NullAway" ) // Dataflow analysis limitation
10781091 private byte [] resolveExpressionAsBytes (String value , String attribute ) {
10791092 Object resolved = resolveExpression (value );
10801093 if (resolved instanceof String str ) {
@@ -1092,7 +1105,7 @@ else if (resolved != null) {
10921105 return null ;
10931106 }
10941107
1095- private Integer resolveExpressionAsInteger (String value , String attribute ) {
1108+ private @ Nullable Integer resolveExpressionAsInteger (String value , String attribute ) {
10961109 Object resolved = resolveExpression (value );
10971110 Integer result = null ;
10981111 if (resolved instanceof String str ) {
@@ -1109,7 +1122,7 @@ else if (resolved != null) {
11091122 return result ;
11101123 }
11111124
1112- private Boolean resolveExpressionAsBoolean (String value , String attribute ) {
1125+ private @ Nullable Boolean resolveExpressionAsBoolean (String value , String attribute ) {
11131126 Object resolved = resolveExpression (value );
11141127 Boolean result = null ;
11151128 if (resolved instanceof Boolean bool ) {
@@ -1126,7 +1139,7 @@ else if (resolved != null) {
11261139 return result ;
11271140 }
11281141
1129- private Object resolveExpression (String value ) {
1142+ private @ Nullable Object resolveExpression (String value ) {
11301143 return this .resolver .evaluate (resolve (value ), this .expressionContext );
11311144 }
11321145
@@ -1136,7 +1149,7 @@ private Object resolveExpression(String value) {
11361149 * @return the resolved value
11371150 * @see ConfigurableBeanFactory#resolveEmbeddedValue
11381151 */
1139- private String resolve (String value ) {
1152+ private @ Nullable String resolve (String value ) {
11401153 if (this .beanFactory instanceof ConfigurableBeanFactory cbf ) {
11411154 return cbf .resolveEmbeddedValue (value );
11421155 }
@@ -1210,7 +1223,7 @@ private final class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMe
12101223 private final DefaultFormattingConversionService defaultFormattingConversionService =
12111224 new DefaultFormattingConversionService ();
12121225
1213- private MessageHandlerMethodFactory handlerMethodFactory ;
1226+ private @ Nullable MessageHandlerMethodFactory handlerMethodFactory ;
12141227
12151228 public void setHandlerMethodFactory (MessageHandlerMethodFactory kafkaHandlerMethodFactory1 ) {
12161229 this .handlerMethodFactory = kafkaHandlerMethodFactory1 ;
@@ -1264,7 +1277,7 @@ public String convert(byte[] source) {
12641277
12651278 static class ListenerScope implements Scope {
12661279
1267- private final Map <String , Object > listeners = new HashMap <>();
1280+ private final Map <String , @ Nullable Object > listeners = new HashMap <>();
12681281
12691282 ListenerScope () {
12701283 }
@@ -1277,11 +1290,13 @@ public void removeListener(String key) {
12771290 this .listeners .remove (key );
12781291 }
12791292
1293+ @ SuppressWarnings ("NullAway" ) // Overridden method does not define nullness
12801294 @ Override
1281- public Object get (String name , ObjectFactory <?> objectFactory ) {
1295+ public @ Nullable Object get (String name , ObjectFactory <?> objectFactory ) {
12821296 return this .listeners .get (name );
12831297 }
12841298
1299+ @ SuppressWarnings ("NullAway" ) // Overridden method does not define nullness
12851300 @ Override
12861301 public Object remove (String name ) {
12871302 return null ;
@@ -1292,12 +1307,12 @@ public void registerDestructionCallback(String name, Runnable callback) {
12921307 }
12931308
12941309 @ Override
1295- public Object resolveContextualObject (String key ) {
1310+ public @ Nullable Object resolveContextualObject (String key ) {
12961311 return this .listeners .get (key );
12971312 }
12981313
12991314 @ Override
1300- public String getConversationId () {
1315+ public @ Nullable String getConversationId () {
13011316 return null ;
13021317 }
13031318
@@ -1319,7 +1334,6 @@ private static final class BytesToNumberConverter implements ConditionalGenericC
13191334 }
13201335
13211336 @ Override
1322- @ Nullable
13231337 public Set <ConvertiblePair > getConvertibleTypes () {
13241338 HashSet <ConvertiblePair > pairs = new HashSet <>();
13251339 pairs .add (new ConvertiblePair (byte [].class , long .class ));
@@ -1334,8 +1348,7 @@ public Set<ConvertiblePair> getConvertibleTypes() {
13341348 }
13351349
13361350 @ Override
1337- @ Nullable
1338- public Object convert (@ Nullable Object source , TypeDescriptor sourceType , TypeDescriptor targetType ) {
1351+ public @ Nullable Object convert (@ Nullable Object source , TypeDescriptor sourceType , TypeDescriptor targetType ) {
13391352 byte [] bytes = (byte []) source ;
13401353 if (bytes == null ) {
13411354 return null ;
0 commit comments