11/*
2- * Copyright 2019-2021 the original author or authors.
2+ * Copyright 2019-2022 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
3535import org .apache .kafka .common .serialization .BytesSerializer ;
3636import org .apache .kafka .common .serialization .IntegerDeserializer ;
3737import org .apache .kafka .common .serialization .IntegerSerializer ;
38+ import org .apache .kafka .common .serialization .Serializer ;
3839import org .apache .kafka .common .serialization .StringDeserializer ;
3940import org .apache .kafka .common .serialization .StringSerializer ;
4041import org .apache .kafka .common .utils .Bytes ;
4546
4647/**
4748 * @author Gary Russell
49+ * @author Artem Bilan
50+ *
4851 * @since 2.3
4952 *
5053 */
@@ -121,16 +124,16 @@ void testWithPropertyConfigKeys() {
121124 private void doTest (DelegatingSerializer serializer , DelegatingDeserializer deserializer ) {
122125 Headers headers = new RecordHeaders ();
123126 headers .add (new RecordHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "bytes" .getBytes ()));
124- byte [] bytes = new byte [] { 1 , 2 , 3 , 4 };
127+ byte [] bytes = new byte []{ 1 , 2 , 3 , 4 };
125128 byte [] serialized = serializer .serialize ("foo" , headers , new Bytes (bytes ));
126129 assertThat (serialized ).isSameAs (bytes );
127130 headers .add (new RecordHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "int" .getBytes ()));
128131 serialized = serializer .serialize ("foo" , headers , 42 );
129- assertThat (serialized ).isEqualTo (new byte [] { 0 , 0 , 0 , 42 });
132+ assertThat (serialized ).isEqualTo (new byte []{ 0 , 0 , 0 , 42 });
130133 assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo (42 );
131134 headers .add (new RecordHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "string" .getBytes ()));
132135 serialized = serializer .serialize ("foo" , headers , "bar" );
133- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
136+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
134137 assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
135138
136139 // implicit Serdes
@@ -151,25 +154,25 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese
151154 Collections .singletonMap (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "string" ));
152155 new DefaultKafkaHeaderMapper ().fromHeaders (messageHeaders , headers );
153156 assertThat (headers .lastHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR ).value ())
154- .isEqualTo (new byte [] { 's' , 't' , 'r' , 'i' , 'n' , 'g' });
157+ .isEqualTo (new byte []{ 's' , 't' , 'r' , 'i' , 'n' , 'g' });
155158 serialized = serializer .serialize ("foo" , headers , "bar" );
156- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
159+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
157160 assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
158161 }
159162
160163 private void doTestKeys (DelegatingSerializer serializer , DelegatingDeserializer deserializer ) {
161164 Headers headers = new RecordHeaders ();
162165 headers .add (new RecordHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "bytes" .getBytes ()));
163- byte [] bytes = new byte [] { 1 , 2 , 3 , 4 };
166+ byte [] bytes = new byte []{ 1 , 2 , 3 , 4 };
164167 byte [] serialized = serializer .serialize ("foo" , headers , new Bytes (bytes ));
165168 assertThat (serialized ).isSameAs (bytes );
166169 headers .add (new RecordHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "int" .getBytes ()));
167170 serialized = serializer .serialize ("foo" , headers , 42 );
168- assertThat (serialized ).isEqualTo (new byte [] { 0 , 0 , 0 , 42 });
171+ assertThat (serialized ).isEqualTo (new byte []{ 0 , 0 , 0 , 42 });
169172 assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo (42 );
170173 headers .add (new RecordHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "string" .getBytes ()));
171174 serialized = serializer .serialize ("foo" , headers , "bar" );
172- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
175+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
173176 assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
174177
175178 // implicit Serdes
@@ -190,9 +193,9 @@ private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer
190193 Collections .singletonMap (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "string" ));
191194 new DefaultKafkaHeaderMapper ().fromHeaders (messageHeaders , headers );
192195 assertThat (headers .lastHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR ).value ())
193- .isEqualTo (new byte [] { 's' , 't' , 'r' , 'i' , 'n' , 'g' });
196+ .isEqualTo (new byte []{ 's' , 't' , 'r' , 'i' , 'n' , 'g' });
194197 serialized = serializer .serialize ("foo" , headers , "bar" );
195- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
198+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
196199 assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
197200 }
198201
@@ -218,21 +221,24 @@ void byTypeBadType() {
218221 assertThatExceptionOfType (SerializationException .class ).isThrownBy (
219222 () -> serializer .serialize ("foo" , new Bytes (foo )))
220223 .withMessageMatching ("No matching delegate for type: " + Bytes .class .getName ()
221- + "; supported types: \\ [(java.lang.String, \\ [B|\\ [B, java.lang.String)\\ ]" );
224+ + "; supported types: \\ [(java.lang.String, \\ [B|\\ [B, java.lang.String)]" );
222225 }
223226
224227 @ Test
225228 void assignable () {
226- DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer (Map .of (Number .class ,
227- new IntegerSerializer (), byte [].class , new ByteArraySerializer ()), true );
229+ var delegates = new HashMap <Class <?>, Serializer <?>>();
230+ delegates .put (Number .class , new IntegerSerializer ());
231+ delegates .put (byte [].class , new ByteArraySerializer ());
232+ DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer (delegates , true );
233+
228234 Integer i = 42 ;
229- assertThat (serializer .serialize ("foo" , i )).isEqualTo (new byte [] { 0 , 0 , 0 , 42 });
235+ assertThat (serializer .serialize ("foo" , i )).isEqualTo (new byte []{ 0 , 0 , 0 , 42 });
230236 byte [] foo = "foo" .getBytes ();
231237 assertThat (serializer .serialize ("foo" , foo )).isSameAs (foo );
232238 assertThatExceptionOfType (SerializationException .class ).isThrownBy (
233- () -> serializer .serialize ("foo" , new Bytes (foo )))
234- .withMessageMatching ("No matching delegate for type: " + Bytes .class .getName ()
235- + "; supported types: \\ [(java.lang.Number, \\ [B|\\ [B, java.lang.Number)\\ ]" );
239+ () -> serializer .serialize ("foo" , new Bytes (foo )))
240+ .withMessageMatching ("No matching delegate for type: " + Bytes .class .getName ()
241+ + "; supported types: \\ [(java.lang.Number, \\ [B|\\ [B, java.lang.Number)]" );
236242 }
237243
238244}
0 commit comments