From 05c5854d1ff8510c867066b65e28ad9414d6c266 Mon Sep 17 00:00:00 2001 From: Joan Goyeau Date: Tue, 10 Jul 2018 01:48:34 +0200 Subject: [PATCH] MINOR: Add Scalafmt to Streams Scala API (#4965) Reviewers: Guozhang Wang --- build.gradle | 9 + checkstyle/.scalafmt.conf | 20 ++ .../streams/scala/FunctionConversions.scala | 8 +- .../streams/scala/ImplicitConversions.scala | 5 +- .../apache/kafka/streams/scala/Serdes.scala | 41 ++-- .../kafka/streams/scala/StreamsBuilder.scala | 32 ++-- .../scala/kstream/KGroupedStream.scala | 7 +- .../streams/scala/kstream/KGroupedTable.scala | 2 +- .../kafka/streams/scala/kstream/KStream.scala | 179 +++++++++--------- .../kafka/streams/scala/kstream/KTable.scala | 134 ++++++------- .../kstream/SessionWindowedKStream.scala | 9 +- .../scala/kstream/TimeWindowedKStream.scala | 6 +- ...inScalaIntegrationTestImplicitSerdes.scala | 58 +++--- .../scala/StreamToTableJoinTestData.scala | 1 - .../kafka/streams/scala/TopologyTest.scala | 70 +++---- .../kafka/streams/scala/WordCountTest.scala | 19 +- 16 files changed, 318 insertions(+), 282 deletions(-) create mode 100644 checkstyle/.scalafmt.conf diff --git a/build.gradle b/build.gradle index a0897e1aa72b9..3b04e4b8ccb16 100644 --- a/build.gradle +++ b/build.gradle @@ -29,6 +29,15 @@ buildscript { classpath 'org.scoverage:gradle-scoverage:2.3.0' classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' classpath 'org.owasp:dependency-check-gradle:3.2.1' + classpath "com.diffplug.spotless:spotless-plugin-gradle:3.10.0" + } +} + +apply plugin: "com.diffplug.gradle.spotless" +spotless { + scala { + target 'streams/**/*.scala' + scalafmt('1.5.1').configFile('checkstyle/.scalafmt.conf') } } diff --git a/checkstyle/.scalafmt.conf b/checkstyle/.scalafmt.conf new file mode 100644 index 0000000000000..057e3b930962e --- /dev/null +++ b/checkstyle/.scalafmt.conf @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +docstrings = JavaDoc +maxColumn = 120 +continuationIndent.defnSite = 2 +assumeStandardLibraryStripMargin = true +danglingParentheses = true +rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers] \ No newline at end of file diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala index 9ce9838c7c9a0..4a4c3b0ee44c9 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala @@ -30,7 +30,7 @@ import java.lang.{Iterable => JIterable} * more expressive, with less boilerplate and more succinct. *

* For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not - * have full support for SAM types. + * have full support for SAM types. */ object FunctionConversions { @@ -40,7 +40,7 @@ object FunctionConversions { } } - implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends AnyVal { + implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal { def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] { override def apply(key: T, value: U): VR = f(key, value) } @@ -49,7 +49,7 @@ object FunctionConversions { } } - implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal { + implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal { def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] { override def apply(key: K, value: V): KeyValue[KR, VR] = { val (kr, vr) = f(key, value) @@ -88,7 +88,7 @@ object FunctionConversions { } } - implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal { + implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal { def asMerger: Merger[K, VR] = new Merger[K, VR] { override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala index 0c384a1bad01c..d1ff6747d87e9 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala @@ -77,7 +77,8 @@ object ImplicitConversions { valueSerde: Serde[V]): Materialized[K, V, S] = Materialized.`with`[K, V, S](keySerde, valueSerde) - implicit def joinedFromKeyValueOtherSerde[K, V, VO] - (implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] = + implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K], + valueSerde: Serde[V], + otherValueSerde: Serde[VO]): Joined[K, V, VO] = Joined.`with`(keySerde, valueSerde, otherValueSerde) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala index a0ffffaf66063..8bfb083909b69 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala @@ -25,47 +25,48 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, S import org.apache.kafka.streams.kstream.WindowedSerdes object Serdes { - implicit val String: Serde[String] = JSerdes.String() - implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]] - implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long() - implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray() + implicit val String: Serde[String] = JSerdes.String() + implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]] + implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long() + implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray() implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes() - implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]] - implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float() - implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]] - implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double() - implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] - implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() + implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]] + implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float() + implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]] + implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double() + implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] + implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]() - implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]() + implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = + new WindowedSerdes.SessionWindowedSerde[T]() def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] = JSerdes.serdeFrom( new Serializer[T] { - override def serialize(topic: String, data: T): Array[Byte] = serializer(data) + override def serialize(topic: String, data: T): Array[Byte] = serializer(data) override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () - override def close(): Unit = () + override def close(): Unit = () }, new Deserializer[T] { - override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull + override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () - override def close(): Unit = () + override def close(): Unit = () } ) def fromFn[T >: Null](serializer: (String, T) => Array[Byte], - deserializer: (String, Array[Byte]) => Option[T]): Serde[T] = + deserializer: (String, Array[Byte]) => Option[T]): Serde[T] = JSerdes.serdeFrom( new Serializer[T] { - override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data) + override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data) override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () - override def close(): Unit = () + override def close(): Unit = () }, new Deserializer[T] { - override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull + override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () - override def close(): Unit = () + override def close(): Unit = () } ) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala index af342acce3af3..fcec778348007 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala @@ -31,18 +31,18 @@ import ImplicitConversions._ import scala.collection.JavaConverters._ /** - * Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object. - */ + * Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object. + */ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { /** * Create a [[kstream.KStream]] from the specified topic. *

- * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, + * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, * key and value deserializers etc. If the implicit is not found in scope, compiler error will result. *

* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly - * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. + * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. * {{{ * // Brings all implicit conversions in scope * import ImplicitConversions._ @@ -88,11 +88,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { /** * Create a [[kstream.KTable]] from the specified topic. *

- * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, + * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, * key and value deserializers etc. If the implicit is not found in scope, compiler error will result. *

* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly - * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. + * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. * {{{ * // Brings all implicit conversions in scope * import ImplicitConversions._ @@ -123,8 +123,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { * @see #table(String) * @see `org.apache.kafka.streams.StreamsBuilder#table` */ - def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore]) - (implicit consumed: Consumed[K, V]): KTable[K, V] = + def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])( + implicit consumed: Consumed[K, V] + ): KTable[K, V] = inner.table[K, V](topic, consumed, materialized) /** @@ -139,8 +140,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { inner.globalTable(topic, consumed) /** - * Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized - * in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers + * Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized + * in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers * from the implicit `Consumed` instance will be used. * * @param topic the topic name @@ -148,12 +149,13 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { * @return a `GlobalKTable` for the specified topic * @see `org.apache.kafka.streams.StreamsBuilder#globalTable` */ - def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore]) - (implicit consumed: Consumed[K, V]): GlobalKTable[K, V] = + def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])( + implicit consumed: Consumed[K, V] + ): GlobalKTable[K, V] = inner.globalTable(topic, consumed, materialized) /** - * Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`, + * Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`, * `Transformer`, or `ValueTransformer` before it can be used. * * @param builder the builder used to obtain this state store `StateStore` instance @@ -164,11 +166,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder) /** - * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`, + * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`, * or `ValueTransformer` (in contrast to regular stores). * * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore` - */ + */ def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, consumed: Consumed[_, _], diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index 0e5abfdd11b8d..f6a22d91812a0 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ - /** * Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object. * @@ -41,7 +40,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { * The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view) * provided by the given `materialized`. * - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that * represent the latest (rolling) count (i.e., number of records) for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#count` @@ -55,8 +54,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { /** * Combine the values of records in this stream by the grouped key. * - * @param reducer a function `(V, V) => V` that computes a new aggregate result. - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param reducer a function `(V, V) => V` that computes a new aggregate result. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce` diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala index 99bc83e1ba0e5..76ea9ed75558a 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala @@ -39,7 +39,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to * the same key into a new instance of [[KTable]]. * - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that * represent the latest (rolling) count (i.e., number of records) for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#count` diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 0f1fc82a48984..8f6aab86e2e13 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -46,7 +46,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param predicate a filter that is applied to each record * @return a [[KStream]] that contains only those records that satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KStream#filter` - */ + */ def filter(predicate: (K, V) => Boolean): KStream[K, V] = inner.filter(predicate.asPredicate) @@ -57,7 +57,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param predicate a filter that is applied to each record * @return a [[KStream]] that contains only those records that do not satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KStream#filterNot` - */ + */ def filterNot(predicate: (K, V) => Boolean): KStream[K, V] = inner.filterNot(predicate.asPredicate) @@ -70,7 +70,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param mapper a function `(K, V) => KR` that computes a new key for each record * @return a [[KStream]] that contains records with new key (possibly of different type) and unmodified value * @see `org.apache.kafka.streams.kstream.KStream#selectKey` - */ + */ def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] = inner.selectKey[KR](mapper.asKeyValueMapper) @@ -83,7 +83,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param mapper a function `(K, V) => (KR, VR)` that computes a new output record * @return a [[KStream]] that contains records with new key and value (possibly both of different type) * @see `org.apache.kafka.streams.kstream.KStream#map` - */ + */ def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] = { val kvMapper = mapper.tupled andThen tuple2ToKeyValue inner.map[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper) @@ -97,7 +97,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param mapper, a function `V => VR` that computes a new output value * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#mapValues` - */ + */ def mapValues[VR](mapper: V => VR): KStream[K, VR] = inner.mapValues[VR](mapper.asValueMapper) @@ -109,7 +109,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param mapper, a function `(K, V) => VR` that computes a new output value * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#mapValues` - */ + */ def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] = inner.mapValues[VR](mapper.asValueMapperWithKey) @@ -122,10 +122,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param mapper function `(K, V) => Iterable[(KR, VR)]` that computes the new output records * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#flatMap` - */ + */ def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR] = { val kvMapper = mapper.tupled andThen (iter => iter.map(tuple2ToKeyValue).asJava) - inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k , v)).asKeyValueMapper) + inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper) } /** @@ -139,7 +139,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param mapper a function `V => Iterable[VR]` that computes the new output values * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues` - */ + */ def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] = inner.flatMapValues[VR](mapper.asValueMapper) @@ -154,7 +154,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param mapper a function `(K, V) => Iterable[VR]` that computes the new output values * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues` - */ + */ def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] = inner.flatMapValues[VR](mapper.asValueMapperWithKey) @@ -187,7 +187,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream)) /** - * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for + * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner` *

* The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit @@ -219,7 +219,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.through(topic, produced) /** - * Materialize this stream to a topic using the `Produced` instance for + * Materialize this stream to a topic using the `Produced` instance for * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner` *

* The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit @@ -250,34 +250,34 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.to(topic, produced) /** - * Dynamically materialize this stream to topics using the `Produced` instance for - * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`. - * The topic names for each record to send to is dynamically determined based on the given mapper. - *

- * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit - * key and value serdes that will be converted to a `Produced` instance implicitly. - *

- * {{{ - * Example: - * - * // brings implicit serdes in scope - * import Serdes._ - * - * //.. - * val clicksPerRegion: KTable[String, Long] = //.. - * - * // Implicit serdes in scope will generate an implicit Produced instance, which - * // will be passed automatically to the call of through below - * clicksPerRegion.to(topicChooser) - * - * // Similarly you can create an implicit Produced and it will be passed implicitly - * // to the through call - * }}} - * - * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record - * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` - * @see `org.apache.kafka.streams.kstream.KStream#to` - */ + * Dynamically materialize this stream to topics using the `Produced` instance for + * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`. + * The topic names for each record to send to is dynamically determined based on the given mapper. + *

+ * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit + * key and value serdes that will be converted to a `Produced` instance implicitly. + *

+ * {{{ + * Example: + * + * // brings implicit serdes in scope + * import Serdes._ + * + * //.. + * val clicksPerRegion: KTable[String, Long] = //.. + * + * // Implicit serdes in scope will generate an implicit Produced instance, which + * // will be passed automatically to the call of through below + * clicksPerRegion.to(topicChooser) + * + * // Similarly you can create an implicit Produced and it will be passed implicitly + * // to the through call + * }}} + * + * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record + * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @see `org.apache.kafka.streams.kstream.KStream#to` + */ def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit = inner.to(extractor, produced) @@ -292,25 +292,23 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param stateStoreNames the names of the state stores used by the processor * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` - */ - def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], - stateStoreNames: String*): KStream[K1, V1] = { - val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K, V, KeyValue[K1, V1]] { - override def get(): Transformer[K, V, KeyValue[K1, V1]] = { - new Transformer[K, V, KeyValue[K1, V1]] { - override def transform(key: K, value: V): KeyValue[K1, V1] = { - transformer.transform(key, value) match { - case (k1, v1) => KeyValue.pair(k1, v1) - case _ => null - } + */ + def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], stateStoreNames: String*): KStream[K1, V1] = { + val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = + new TransformerSupplier[K, V, KeyValue[K1, V1]] { + override def get(): Transformer[K, V, KeyValue[K1, V1]] = + new Transformer[K, V, KeyValue[K1, V1]] { + override def transform(key: K, value: V): KeyValue[K1, V1] = + transformer.transform(key, value) match { + case (k1, v1) => KeyValue.pair(k1, v1) + case _ => null + } + + override def init(context: ProcessorContext): Unit = transformer.init(context) + + override def close(): Unit = transformer.close() } - - override def init(context: ProcessorContext): Unit = transformer.init(context) - - override def close(): Unit = transformer.close() - } } - } inner.transform(transformerSupplierJ, stateStoreNames: _*) } @@ -318,14 +316,14 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Transform the value of each input record into a new value (with possible new type) of the output record. * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input * record value and computes a new value for it. - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * In order to assign a state, the state must be created and registered + * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` * * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer` * @param stateStoreNames the names of the state stores used by the processor * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ + */ def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR], stateStoreNames: String*): KStream[K, VR] = inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*) @@ -334,29 +332,28 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Transform the value of each input record into a new value (with possible new type) of the output record. * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input * record value and computes a new value for it. - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * In order to assign a state, the state must be created and registered + * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` * * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` * @param stateStoreNames the names of the state stores used by the processor * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ + */ def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], - stateStoreNames: String*): KStream[K, VR] = { + stateStoreNames: String*): KStream[K, VR] = inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*) - } /** * Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given * `processorSupplier`). - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * In order to assign a state, the state must be created and registered + * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` * * @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]] * @param stateStoreNames the names of the state store used by the processor * @see `org.apache.kafka.streams.kstream.KStream#process` - */ + */ def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = { val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] { override def get(): Processor[K, V] = processorSupplier() @@ -365,7 +362,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { } /** - * Group the records by their current key into a [[KGroupedStream]] + * Group the records by their current key into a [[KGroupedStream]] *

* The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit * serdes that will be converted to a `Serialized` instance implicitly. @@ -390,10 +387,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * // to the groupByKey call * }}} * - * @param (implicit) serialized the instance of Serialized that gives the serdes + * @param (implicit) serialized the instance of Serialized that gives the serdes * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#groupByKey` - */ + */ def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] = inner.groupByKey(serialized) @@ -427,18 +424,18 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @param selector a function that computes a new key for grouping * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]] * @see `org.apache.kafka.streams.kstream.KStream#groupBy` - */ + */ def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStream[KR, V] = inner.groupBy(selector.asKeyValueMapper, serialized) /** - * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with + * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with * serializers and deserializers supplied by the implicit `Joined` instance. * * @param otherStream the [[KStream]] to be joined with this stream * @param joiner a function that computes the join result for a pair of matching records * @param windows the specification of the `JoinWindows` - * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize + * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize * inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply * key serde, value serde and other value serde in implicit scope and they will be * converted to the instance of `Joined` through implicit conversion @@ -453,17 +450,17 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined) /** - * Join records of this stream with another [[KTable]]'s records using inner equi join with + * Join records of this stream with another [[KTable]]'s records using inner equi join with * serializers and deserializers supplied by the implicit `Joined` instance. * * @param table the [[KTable]] to be joined with this stream * @param joiner a function that computes the join result for a pair of matching records - * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize + * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize * inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply * key serde, value serde and other value serde in implicit scope and they will be * converted to the instance of `Joined` through implicit conversion * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, - * one for each matched record-pair with the same key + * one for each matched record-pair with the same key * @see `org.apache.kafka.streams.kstream.KStream#join` */ def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] = @@ -479,7 +476,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * one output for each input [[KStream]] record * @see `org.apache.kafka.streams.kstream.KStream#join` - */ + */ def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])( keyValueMapper: (K, V) => GK, joiner: (V, GV) => RV @@ -491,20 +488,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { ) /** - * Join records of this stream with another [[KStream]]'s records using windowed left equi join with + * Join records of this stream with another [[KStream]]'s records using windowed left equi join with * serializers and deserializers supplied by the implicit `Joined` instance. * * @param otherStream the [[KStream]] to be joined with this stream * @param joiner a function that computes the join result for a pair of matching records * @param windows the specification of the `JoinWindows` - * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize + * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize * inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply * key serde, value serde and other value serde in implicit scope and they will be * converted to the instance of `Joined` through implicit conversion * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * one for each matched record-pair with the same key and within the joining window intervals * @see `org.apache.kafka.streams.kstream.KStream#leftJoin` - */ + */ def leftJoin[VO, VR](otherStream: KStream[K, VO])( joiner: (V, VO) => VR, windows: JoinWindows @@ -512,19 +509,19 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined) /** - * Join records of this stream with another [[KTable]]'s records using left equi join with + * Join records of this stream with another [[KTable]]'s records using left equi join with * serializers and deserializers supplied by the implicit `Joined` instance. * * @param table the [[KTable]] to be joined with this stream * @param joiner a function that computes the join result for a pair of matching records - * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize + * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize * inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply * key serde, value serde and other value serde in implicit scope and they will be * converted to the instance of `Joined` through implicit conversion * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, - * one for each matched record-pair with the same key + * one for each matched record-pair with the same key * @see `org.apache.kafka.streams.kstream.KStream#leftJoin` - */ + */ def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] = inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined) @@ -538,7 +535,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * one output for each input [[KStream]] record * @see `org.apache.kafka.streams.kstream.KStream#leftJoin` - */ + */ def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])( keyValueMapper: (K, V) => GK, joiner: (V, GV) => RV @@ -546,20 +543,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper, joiner.asValueJoiner) /** - * Join records of this stream with another [[KStream]]'s records using windowed outer equi join with + * Join records of this stream with another [[KStream]]'s records using windowed outer equi join with * serializers and deserializers supplied by the implicit `Joined` instance. * * @param otherStream the [[KStream]] to be joined with this stream * @param joiner a function that computes the join result for a pair of matching records * @param windows the specification of the `JoinWindows` - * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize + * @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize * inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply * key serde, value serde and other value serde in implicit scope and they will be * converted to the instance of `Joined` through implicit conversion * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * one for each matched record-pair with the same key and within the joining window intervals * @see `org.apache.kafka.streams.kstream.KStream#outerJoin` - */ + */ def outerJoin[VO, VR](otherStream: KStream[K, VO])( joiner: (V, VO) => VR, windows: JoinWindows @@ -569,8 +566,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { /** * Merge this stream and the given stream into one larger stream. *

- * There is no ordering guarantee between records from this `KStream` and records from the provided `KStream` - * in the merged stream. Relative order is preserved within each input stream though (ie, records within + * There is no ordering guarantee between records from this `KStream` and records from the provided `KStream` + * in the merged stream. Relative order is preserved within each input stream though (ie, records within * one input stream are processed in order). * * @param stream a stream which is to be merged into this stream diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index cff1844121dd3..b66977193e11d 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -44,10 +44,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @param predicate a filter that is applied to each record * @return a [[KTable]] that contains only those records that satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KTable#filter` - */ - def filter(predicate: (K, V) => Boolean): KTable[K, V] = { + */ + def filter(predicate: (K, V) => Boolean): KTable[K, V] = inner.filter(predicate(_, _)) - } /** * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given @@ -55,12 +54,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * * @param predicate a filter that is applied to each record * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] - * should be materialized. + * should be materialized. * @return a [[KTable]] that contains only those records that satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KTable#filter` - */ - def filter(predicate: (K, V) => Boolean, - materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + */ + def filter(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = inner.filter(predicate.asPredicate, materialized) /** @@ -70,7 +68,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @param predicate a filter that is applied to each record * @return a [[KTable]] that contains only those records that do not satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KTable#filterNot` - */ + */ def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = inner.filterNot(predicate(_, _)) @@ -80,12 +78,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * * @param predicate a filter that is applied to each record * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] - * should be materialized. + * should be materialized. * @return a [[KTable]] that contains only those records that do not satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KTable#filterNot` - */ - def filterNot(predicate: (K, V) => Boolean, - materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + */ + def filterNot(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = inner.filterNot(predicate.asPredicate, materialized) /** @@ -97,7 +94,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @param mapper, a function `V => VR` that computes a new output value * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KTable#mapValues` - */ + */ def mapValues[VR](mapper: V => VR): KTable[K, VR] = inner.mapValues[VR](mapper.asValueMapper) @@ -109,12 +106,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * * @param mapper, a function `V => VR` that computes a new output value * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] - * should be materialized. + * should be materialized. * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KTable#mapValues` - */ - def mapValues[VR](mapper: V => VR, - materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = + */ + def mapValues[VR](mapper: V => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = inner.mapValues[VR](mapper.asValueMapper, materialized) /** @@ -126,7 +122,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @param mapper, a function `(K, V) => VR` that computes a new output value * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KTable#mapValues` - */ + */ def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] = inner.mapValues[VR](mapper.asValueMapperWithKey) @@ -138,12 +134,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * * @param mapper, a function `(K, V) => VR` that computes a new output value * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] - * should be materialized. + * should be materialized. * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KTable#mapValues` - */ - def mapValues[VR](mapper: (K, V) => VR, - materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = + */ + def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = inner.mapValues[VR](mapper.asValueMapperWithKey) /** @@ -165,57 +160,55 @@ class KTable[K, V](val inner: KTableJ[K, V]) { inner.toStream[KR](mapper.asKeyValueMapper) /** - * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type). - * Transform the value of each input record into a new value (with possible new type) of the output record. - * A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input - * record value and computes a new value for it. - * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing access to additional state-stores, - * and to the `ProcessorContext`. - * If the downstream topology uses aggregation functions, (e.g. `KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc), - * care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct - * aggregate results. - * In contrast, if the resulting KTable is materialized, (cf. `#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`), - * such concerns are handled for you. - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` - * - * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`. - * At least one transformer instance will be created per streaming task. - * Transformer implementations doe not need to be thread-safe. - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ + * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type). + * Transform the value of each input record into a new value (with possible new type) of the output record. + * A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input + * record value and computes a new value for it. + * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing access to additional state-stores, + * and to the `ProcessorContext`. + * If the downstream topology uses aggregation functions, (e.g. `KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc), + * care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct + * aggregate results. + * In contrast, if the resulting KTable is materialized, (cf. `#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`), + * such concerns are handled for you. + * In order to assign a state, the state must be created and registered + * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * + * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`. + * At least one transformer instance will be created per streaming task. + * Transformer implementations doe not need to be thread-safe. + * @param stateStoreNames the names of the state stores used by the processor + * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) + * @see `org.apache.kafka.streams.kstream.KStream#transformValues` + */ def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], - stateStoreNames: String*): KTable[K, VR] = { + stateStoreNames: String*): KTable[K, VR] = inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*) - } /** - * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type). - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing stateful, rather than stateless, - * record-by-record operation, access to additional state-stores, and access to the `ProcessorContext`. - * In order to assign a state, the state must be created and registered - * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` - * The resulting `KTable` is materialized into another state store (additional to the provided state store names) - * as specified by the user via `Materialized` parameter, and is queryable through its given name. - * - * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` - * At least one transformer instance will be created per streaming task. - * Transformer implementations doe not need to be thread-safe. - * @param materialized an instance of `Materialized` used to describe how the state store of the - * resulting table should be materialized. - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ + * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type). + * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input + * record value and computes a new value for it. + * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing stateful, rather than stateless, + * record-by-record operation, access to additional state-stores, and access to the `ProcessorContext`. + * In order to assign a state, the state must be created and registered + * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` + * The resulting `KTable` is materialized into another state store (additional to the provided state store names) + * as specified by the user via `Materialized` parameter, and is queryable through its given name. + * + * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` + * At least one transformer instance will be created per streaming task. + * Transformer implementations doe not need to be thread-safe. + * @param materialized an instance of `Materialized` used to describe how the state store of the + * resulting table should be materialized. + * @param stateStoreNames the names of the state stores used by the processor + * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) + * @see `org.apache.kafka.streams.kstream.KStream#transformValues` + */ def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]], - stateStoreNames: String*): KTable[K, VR] = { + stateStoreNames: String*): KTable[K, VR] = inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*) - } /** * Re-groups the records of this [[KTable]] using the provided key/value mapper @@ -247,7 +240,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @param other the other [[KTable]] to be joined with this [[KTable]] * @param joiner a function that computes the join result for a pair of matching records * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] - * should be materialized. + * should be materialized. * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key * @see `org.apache.kafka.streams.kstream.KTable#join` @@ -276,7 +269,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @param other the other [[KTable]] to be joined with this [[KTable]] * @param joiner a function that computes the join result for a pair of matching records * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] - * should be materialized. + * should be materialized. * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key * @see `org.apache.kafka.streams.kstream.KTable#leftJoin` @@ -305,11 +298,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @param other the other [[KTable]] to be joined with this [[KTable]] * @param joiner a function that computes the join result for a pair of matching records * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] - * should be materialized. + * should be materialized. * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key * @see `org.apache.kafka.streams.kstream.KTable#leftJoin` - */ + */ def outerJoin[VO, VR](other: KTable[K, VO])( joiner: (V, VO) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore] @@ -323,4 +316,3 @@ class KTable[K, V](val inner: KTableJ[K, V]) { */ def queryableStoreName: String = inner.queryableStoreName } - diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala index ed41973c09080..a6027677edb90 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala @@ -46,8 +46,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate` */ - def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, - merger: (K, VR, VR) => VR)( + def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)( implicit materialized: Materialized[K, VR, ByteArraySessionStore] ): KTable[Windowed[K], VR] = inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized) @@ -55,7 +54,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { /** * Count the number of records in this stream by the grouped key into `SessionWindows`. * - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a windowed [[KTable]] that contains "update" records with unmodified keys and `Long` values * that represent the latest (rolling) count (i.e., number of records) for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count` @@ -69,8 +68,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { /** * Combine values of this stream by the grouped key into {@link SessionWindows}. * - * @param reducer a reducer function that computes a new aggregate result. - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param reducer a reducer function that computes a new aggregate result. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce` diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala index 9e31ab9a80a4f..9be57949589f2 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala @@ -53,11 +53,11 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { /** * Count the number of records in this stream by the grouped key and the defined windows. * - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that * represent the latest (rolling) count (i.e., number of records) for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count` - */ + */ def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = { val c: KTable[Windowed[K], java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]]) @@ -68,7 +68,7 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * Combine the values of records in this stream by the grouped key. * * @param reducer a function that computes a new aggregate result - * @param materialized an instance of `Materialized` used to materialize a state store. + * @param materialized an instance of `Materialized` used to materialize a state store. * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce` diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 02d1dabb9cf19..7891131aa9ea0 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -40,9 +40,8 @@ import org.scalatest.junit.JUnitSuite *

* Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11. * Hence the native Java API based version is more verbose. - */ -class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite - with StreamToTableJoinTestData { + */ +class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite with StreamToTableJoinTestData { private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1) @@ -67,7 +66,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite @Test def testShouldCountClicksPerRegion(): Unit = { - // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, + // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will // get these instances automatically import Serdes._ @@ -84,7 +83,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite val clicksPerRegion: KTable[String, Long] = userClicksStream - // Join the stream against the table. + // Join the stream against the table. .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) // Change the stream from -> to -> @@ -100,8 +99,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) streams.start() - - val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = + val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = produceNConsume(userClicksTopic, userRegionsTopic, outputTopic) streams.close() @@ -126,29 +124,32 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite val builder: StreamsBuilderJ = new StreamsBuilderJ() - val userClicksStream: KStreamJ[String, JLong] = + val userClicksStream: KStreamJ[String, JLong] = builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String, Serdes.JavaLong)) - val userRegionsTable: KTableJ[String, String] = + val userRegionsTable: KTableJ[String, String] = builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String, Serdes.String)) // Join the stream against the table. val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream - .leftJoin(userRegionsTable, + .leftJoin( + userRegionsTable, new ValueJoiner[JLong, String, (String, JLong)] { - def apply(clicks: JLong, region: String): (String, JLong) = + def apply(clicks: JLong, region: String): (String, JLong) = (if (region == null) "UNKNOWN" else region, clicks) - }, - Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)) + }, + Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String) + ) // Change the stream from -> to -> - val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion - .map { + val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion + .map { new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] { - def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2) + def apply(k: String, regionWithClicks: (String, JLong)) = + new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2) } } - + // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong)) @@ -157,7 +158,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite def apply(v1: JLong, v2: JLong) = v1 + v2 } } - + // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong)) @@ -165,7 +166,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite streams.start() - val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = + val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ) streams.close() @@ -214,17 +215,27 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite p } - private def produceNConsume(userClicksTopic: String, userRegionsTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = { + private def produceNConsume(userClicksTopic: String, + userRegionsTopic: String, + outputTopic: String): java.util.List[KeyValue[String, Long]] = { import collection.JavaConverters._ - + // Publish user-region information. val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig() - IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, userRegions.asJava, userRegionsProducerConfig, mockTime, false) + IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, + userRegions.asJava, + userRegionsProducerConfig, + mockTime, + false) // Publish user-click information. val userClicksProducerConfig: Properties = getUserClicksProducerConfig() - IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, userClicks.asJava, userClicksProducerConfig, mockTime, false) + IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, + userClicks.asJava, + userClicksProducerConfig, + mockTime, + false) // consume and verify result val consumerConfig = getConsumerConfig() @@ -232,4 +243,3 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size) } } - diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala index 45715a7abe6a5..e9040eee5d456 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala @@ -58,4 +58,3 @@ trait StreamToTableJoinTestData { new KeyValue("asia", 124L) ) } - diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index ffae666fcf127..f04ec5dcb04cf 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -35,7 +35,7 @@ import collection.JavaConverters._ /** * Test suite that verifies that the topology built by the Java and Scala APIs match. - */ + */ class TopologyTest extends JUnitSuite { val inputTopic = "input-topic" @@ -50,22 +50,22 @@ class TopologyTest extends JUnitSuite { def getTopologyScala(): TopologyDescription = { import Serdes._ - + val streamBuilder = new StreamsBuilder val textLines = streamBuilder.stream[String, String](inputTopic) - + val _: KStream[String, String] = textLines.flatMapValues(v => pattern.split(v.toLowerCase)) - + streamBuilder.build().describe() } - + // build the Java topology def getTopologyJava(): TopologyDescription = { val streamBuilder = new StreamsBuilderJ val textLines = streamBuilder.stream[String, String](inputTopic) - + val _: KStreamJ[String, String] = textLines.flatMapValues { new ValueMapper[String, java.lang.Iterable[String]] { def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava @@ -84,15 +84,16 @@ class TopologyTest extends JUnitSuite { def getTopologyScala(): TopologyDescription = { import Serdes._ - + val streamBuilder = new StreamsBuilder val textLines = streamBuilder.stream[String, String](inputTopic) - + val _: KTable[String, Long] = - textLines.flatMapValues(v => pattern.split(v.toLowerCase)) + textLines + .flatMapValues(v => pattern.split(v.toLowerCase)) .groupBy((k, v) => v) .count() - + streamBuilder.build().describe() } @@ -101,21 +102,21 @@ class TopologyTest extends JUnitSuite { val streamBuilder = new StreamsBuilderJ val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic) - + val splits: KStreamJ[String, String] = textLines.flatMapValues { new ValueMapper[String, java.lang.Iterable[String]] { def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava } } - + val grouped: KGroupedStreamJ[String, String] = splits.groupBy { new KeyValueMapper[String, String, String] { def apply(k: String, v: String): String = v } } - + val wordCounts: KTableJ[String, java.lang.Long] = grouped.count() - + streamBuilder.build().describe() } @@ -128,13 +129,13 @@ class TopologyTest extends JUnitSuite { // build the Scala topology def getTopologyScala(): TopologyDescription = { import Serdes._ - + val builder = new StreamsBuilder() - + val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) - + val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) - + val clicksPerRegion: KTable[String, Long] = userClicksStream .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) @@ -149,32 +150,35 @@ class TopologyTest extends JUnitSuite { def getTopologyJava(): TopologyDescription = { import java.lang.{Long => JLong} - + val builder: StreamsBuilderJ = new StreamsBuilderJ() - - val userClicksStream: KStreamJ[String, JLong] = + + val userClicksStream: KStreamJ[String, JLong] = builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String, Serdes.JavaLong)) - - val userRegionsTable: KTableJ[String, String] = + + val userRegionsTable: KTableJ[String, String] = builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String, Serdes.String)) - + // Join the stream against the table. val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream - .leftJoin(userRegionsTable, + .leftJoin( + userRegionsTable, new ValueJoiner[JLong, String, (String, JLong)] { - def apply(clicks: JLong, region: String): (String, JLong) = + def apply(clicks: JLong, region: String): (String, JLong) = (if (region == null) "UNKNOWN" else region, clicks) - }, - Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)) - + }, + Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String) + ) + // Change the stream from -> to -> - val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion - .map { + val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion + .map { new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] { - def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2) + def apply(k: String, regionWithClicks: (String, JLong)) = + new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2) } } - + // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong)) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala index 5abc1bcf6ffc9..5d858d8781ffb 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala @@ -50,7 +50,7 @@ import ImplicitConversions._ *

* Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11. * Hence the native Java API based version is more verbose. - */ + */ class WordCountTest extends JUnitSuite with WordCountTestData { private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1) @@ -61,11 +61,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData { val mockTime: MockTime = cluster.time mockTime.setCurrentTimeMs(alignedTime) - val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory()) @Rule def testFolder: TemporaryFolder = tFolder - - @Before def startKafkaCluster(): Unit = { cluster.createTopic(inputTopic) @@ -86,7 +83,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData { // generate word counts val wordCounts: KTable[String, Long] = - textLines.flatMapValues(v => pattern.split(v.toLowerCase)) + textLines + .flatMapValues(v => pattern.split(v.toLowerCase)) .groupBy((_, v) => v) .count() @@ -117,7 +115,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData { // generate word counts val wordCounts: KTable[String, Long] = - textLines.flatMapValues(v => pattern.split(v.toLowerCase)) + textLines + .flatMapValues(v => pattern.split(v.toLowerCase)) .groupBy((k, v) => v) .count()(Materialized.as("word-count")) @@ -139,7 +138,12 @@ class WordCountTest extends JUnitSuite with WordCountTestData { @Test def testShouldCountWordsJava(): Unit = { import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ} - import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _} + import org.apache.kafka.streams.kstream.{ + KTable => KTableJ, + KStream => KStreamJ, + KGroupedStream => KGroupedStreamJ, + _ + } import collection.JavaConverters._ val streamsConfiguration = getStreamsConfiguration() @@ -250,4 +254,3 @@ trait WordCountTestData { new KeyValue("слова", 1L) ) } -