diff --git a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java index 5abff1b2bb..c6ab40831f 100644 --- a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java +++ b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowDeserializer.java @@ -1,3 +1,20 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package feast.ingestion.deserializer; import com.google.protobuf.InvalidProtocolBufferException; diff --git a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java index 01ca9c7068..57574d6982 100644 --- a/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java +++ b/ingestion/src/main/java/feast/ingestion/deserializer/FeatureRowKeyDeserializer.java @@ -1,3 +1,20 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package feast.ingestion.deserializer; import com.google.protobuf.InvalidProtocolBufferException; diff --git a/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java b/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java index 1435778446..4b9ece362d 100644 --- a/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java +++ b/ingestion/src/test/java/feast/ingestion/deserializer/KafkaFeatureRowDeserializerTest.java @@ -1,9 +1,25 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package feast.ingestion.deserializer; import com.google.protobuf.MessageLite; import feast.types.FeatureRowProto.FeatureRow; import java.util.Map; -import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; @@ -13,9 +29,11 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.junit.Assert; +import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; @@ -28,31 +46,29 @@ import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.concurrent.ListenableFuture; @RunWith(SpringRunner.class) -@EmbeddedKafka(controlledShutdown = true) +@SpringBootTest +@DirtiesContext public class KafkaFeatureRowDeserializerTest { - @Autowired private EmbeddedKafkaBroker embeddedKafka; + private static final String topic = "TEST_TOPIC"; + + @ClassRule public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic); @Autowired private KafkaTemplate template; private void deserialize(MessageType input) { - // generate a random UUID to create a unique topic and consumer group id for each test - String uuid = UUID.randomUUID().toString(); - String topic = "topic-" + uuid; - - embeddedKafka.addTopics(topic); Deserializer deserializer = new FeatureRowDeserializer(); Map consumerProps = - KafkaTestUtils.consumerProps(uuid, Boolean.FALSE.toString(), embeddedKafka); + KafkaTestUtils.consumerProps("testGroup", "false", embeddedKafka.getEmbeddedKafka()); ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps, deserializer, deserializer); @@ -63,7 +79,8 @@ private void deserialize(MessageType input) { MessageListenerContainer container = new KafkaMessageListenerContainer<>(consumerFactory, containerProps); container.start(); - ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); + ContainerTestUtils.waitForAssignment( + container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic()); byte[] data = input.toByteArray(); ProducerRecord producerRecord = new ProducerRecord<>(topic, data, data); @@ -99,12 +116,10 @@ public void deserializeFeatureRowProto() { @Configuration static class ContextConfiguration { - - @Autowired private EmbeddedKafkaBroker embeddedKafka; - @Bean ProducerFactory producerFactory() { - Map producerProps = KafkaTestUtils.producerProps(embeddedKafka); + Map producerProps = + KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()); return new DefaultKafkaProducerFactory<>( producerProps, new ByteArraySerializer(), new ByteArraySerializer());