11/*
2- * Copyright 2019-2022 the original author or authors.
2+ * Copyright 2019-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
2020import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
2121
2222import java .time .Duration ;
23+ import java .util .List ;
2324import java .util .Map ;
2425
2526import org .apache .kafka .clients .admin .AdminClient ;
2627import org .apache .kafka .clients .consumer .ConsumerConfig ;
2728import org .apache .kafka .clients .consumer .ConsumerRecord ;
2829import org .apache .kafka .clients .consumer .ConsumerRecords ;
2930import org .apache .kafka .clients .consumer .KafkaConsumer ;
31+ import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
3032import org .apache .kafka .clients .producer .KafkaProducer ;
3133import org .apache .kafka .clients .producer .ProducerRecord ;
3234import org .apache .kafka .common .TopicPartition ;
35+ import org .apache .kafka .common .TopicPartitionInfo ;
3336import org .junit .jupiter .api .Test ;
3437
3538import org .springframework .kafka .test .EmbeddedKafkaBroker ;
3639import org .springframework .kafka .test .context .EmbeddedKafka ;
3740
3841/**
3942 * @author Gary Russell
43+ * @author Artem Bilan
4044 * @since 2.2.7
4145 *
4246 */
43- @ EmbeddedKafka (topics = { "singleTopic1" , "singleTopic2" , "singleTopic3" , "singleTopic4" , "singleTopic5" ,
44- "multiTopic1" })
47+ @ EmbeddedKafka (topics = {"singleTopic1" , "singleTopic2" , "singleTopic3" , "singleTopic4" , "singleTopic5" ,
48+ "multiTopic1" })
4549public class KafkaTestUtilsTests {
4650
4751 @ Test
48- void testGetSingleWithMoreThatOneTopic (EmbeddedKafkaBroker broker ) {
52+ void testGetSingleWithMoreThanOneTopic (EmbeddedKafkaBroker broker ) {
4953 Map <String , Object > producerProps = KafkaTestUtils .producerProps (broker );
5054 KafkaProducer <Integer , String > producer = new KafkaProducer <>(producerProps );
5155 producer .send (new ProducerRecord <>("singleTopic1" , 0 , 1 , "foo" ));
@@ -64,7 +68,7 @@ void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
6468 }
6569
6670 @ Test
67- void testGetSingleWithMoreThatOneTopicRecordNotThereYet (EmbeddedKafkaBroker broker ) {
71+ void testGetSingleWithMoreThanOneTopicRecordNotThereYet (EmbeddedKafkaBroker broker ) {
6872 Map <String , Object > producerProps = KafkaTestUtils .producerProps (broker );
6973 KafkaProducer <Integer , String > producer = new KafkaProducer <>(producerProps );
7074 producer .send (new ProducerRecord <>("singleTopic4" , 0 , 1 , "foo" ));
@@ -73,7 +77,7 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
7377 broker .consumeFromEmbeddedTopics (consumer , "singleTopic4" , "singleTopic5" );
7478 long t1 = System .currentTimeMillis ();
7579 assertThatExceptionOfType (IllegalStateException .class ).isThrownBy (() ->
76- KafkaTestUtils .getSingleRecord (consumer , "singleTopic5" , Duration .ofSeconds (2 )));
80+ KafkaTestUtils .getSingleRecord (consumer , "singleTopic5" , Duration .ofSeconds (2 )));
7781 assertThat (System .currentTimeMillis () - t1 ).isGreaterThanOrEqualTo (2000L );
7882 producer .send (new ProducerRecord <>("singleTopic5" , 1 , "foo" ));
7983 producer .close ();
@@ -97,19 +101,19 @@ public void testGetOneRecord(EmbeddedKafkaBroker broker) throws Exception {
97101 assertThat (oneRecord .value ()).isEqualTo ("foo" );
98102 assertThat (KafkaTestUtils .getCurrentOffset (broker .getBrokersAsString (), "getOne" , "singleTopic3" , 0 ))
99103 .isNotNull ()
100- .extracting (omd -> omd . offset () )
104+ .extracting (OffsetAndMetadata :: offset )
101105 .isEqualTo (1L );
102106 oneRecord = KafkaTestUtils .getOneRecord (broker .getBrokersAsString (), "getOne" ,
103107 "singleTopic3" , 0 , true , true , Duration .ofSeconds (10 ));
104108 assertThat (oneRecord .value ()).isEqualTo ("foo" );
105109 assertThat (KafkaTestUtils .getCurrentOffset (broker .getBrokersAsString (), "getOne" , "singleTopic3" , 0 ))
106110 .isNotNull ()
107- .extracting (omd -> omd . offset () )
111+ .extracting (OffsetAndMetadata :: offset )
108112 .isEqualTo (1L );
109113 }
110114
111115 @ Test
112- public void testMultiMinRecords (EmbeddedKafkaBroker broker ) throws Exception {
116+ public void testMultiMinRecords (EmbeddedKafkaBroker broker ) {
113117 Map <String , Object > producerProps = KafkaTestUtils .producerProps (broker );
114118 KafkaProducer <Integer , String > producer = new KafkaProducer <>(producerProps );
115119 producer .send (new ProducerRecord <>("multiTopic1" , 0 , 1 , "foo" ));
@@ -135,16 +139,36 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
135139 public void testGetCurrentOffsetWithAdminClient (EmbeddedKafkaBroker broker ) throws Exception {
136140 Map <String , Object > adminClientProps = Map .of (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , broker .getBrokersAsString ());
137141 Map <String , Object > producerProps = KafkaTestUtils .producerProps (broker );
138- try (AdminClient adminClient = AdminClient .create (adminClientProps ); KafkaProducer < Integer , String > producer = new KafkaProducer <>(producerProps )) {
142+ try (var adminClient = AdminClient .create (adminClientProps ); var producer = new KafkaProducer <>(producerProps )) {
139143 producer .send (new ProducerRecord <>("singleTopic3" , 0 , 1 , "foo" ));
140144
141145 KafkaTestUtils .getOneRecord (broker .getBrokersAsString (), "testGetCurrentOffsetWithAdminClient" ,
142146 "singleTopic3" , 0 , false , true , Duration .ofSeconds (10 ));
143147 assertThat (KafkaTestUtils .getCurrentOffset (adminClient , "testGetCurrentOffsetWithAdminClient" , "singleTopic3" , 0 ))
144148 .isNotNull ()
145- .extracting (omd -> omd . offset () )
149+ .extracting (OffsetAndMetadata :: offset )
146150 .isEqualTo (1L );
147151 }
152+ }
153+
154+ @ Test
155+ public void topicAutomaticallyCreatedWithProperNumberOfPartitions (EmbeddedKafkaBroker broker ) throws Exception {
156+ Map <String , Object > producerProps = KafkaTestUtils .producerProps (broker );
157+
158+ Map <String , Object > adminClientProps =
159+ Map .of (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , broker .getBrokersAsString ());
160+ try (var adminClient = AdminClient .create (adminClientProps ); var producer = new KafkaProducer <>(producerProps )) {
161+ producer .send (new ProducerRecord <>("auto-topic" , "test data" )).get ();
162+
163+ List <TopicPartitionInfo > partitions =
164+ adminClient .describeTopics (List .of ("auto-topic" ))
165+ .allTopicNames ()
166+ .get ()
167+ .get ("auto-topic" )
168+ .partitions ();
169+
170+ assertThat (partitions ).hasSize (2 );
171+ }
148172
149173 }
150174
0 commit comments