Skip to content

Commit

Permalink
MINOR: Add Scalafmt to Streams Scala API (#4965)
Browse files Browse the repository at this point in the history
Reviewers: Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
joan38 authored and guozhangwang committed Jul 9, 2018
1 parent 9a18f92 commit 05c5854
Show file tree
Hide file tree
Showing 16 changed files with 318 additions and 282 deletions.
9 changes: 9 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ buildscript {
classpath 'org.scoverage:gradle-scoverage:2.3.0'
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
classpath 'org.owasp:dependency-check-gradle:3.2.1'
classpath "com.diffplug.spotless:spotless-plugin-gradle:3.10.0"
}
}

apply plugin: "com.diffplug.gradle.spotless"
spotless {
scala {
target 'streams/**/*.scala'
scalafmt('1.5.1').configFile('checkstyle/.scalafmt.conf')
}
}

Expand Down
20 changes: 20 additions & 0 deletions checkstyle/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
docstrings = JavaDoc
maxColumn = 120
continuationIndent.defnSite = 2
assumeStandardLibraryStripMargin = true
danglingParentheses = true
rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers]
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import java.lang.{Iterable => JIterable}
* more expressive, with less boilerplate and more succinct.
* <p>
* For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
* have full support for SAM types.
* have full support for SAM types.
*/
object FunctionConversions {

Expand All @@ -40,7 +40,7 @@ object FunctionConversions {
}
}

implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends AnyVal {
implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] {
override def apply(key: T, value: U): VR = f(key, value)
}
Expand All @@ -49,7 +49,7 @@ object FunctionConversions {
}
}

implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal {
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] {
override def apply(key: K, value: V): KeyValue[KR, VR] = {
val (kr, vr) = f(key, value)
Expand Down Expand Up @@ -88,7 +88,7 @@ object FunctionConversions {
}
}

implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal {
implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
def asMerger: Merger[K, VR] = new Merger[K, VR] {
override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ object ImplicitConversions {
valueSerde: Serde[V]): Materialized[K, V, S] =
Materialized.`with`[K, V, S](keySerde, valueSerde)

implicit def joinedFromKeyValueOtherSerde[K, V, VO]
(implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] =
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
Joined.`with`(keySerde, valueSerde, otherValueSerde)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,48 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, S
import org.apache.kafka.streams.kstream.WindowedSerdes

object Serdes {
implicit val String: Serde[String] = JSerdes.String()
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
implicit val String: Serde[String] = JSerdes.String()
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()

implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] =
new WindowedSerdes.SessionWindowedSerde[T]()

def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
},
new Deserializer[T] {
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
}
)

def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
},
new Deserializer[T] {
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ import ImplicitConversions._
import scala.collection.JavaConverters._

/**
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {

/**
* Create a [[kstream.KStream]] from the specified topic.
* <p>
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
* <p>
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* {{{
* // Brings all implicit conversions in scope
* import ImplicitConversions._
Expand Down Expand Up @@ -88,11 +88,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
/**
* Create a [[kstream.KTable]] from the specified topic.
* <p>
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
* <p>
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* {{{
* // Brings all implicit conversions in scope
* import ImplicitConversions._
Expand Down Expand Up @@ -123,8 +123,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @see #table(String)
* @see `org.apache.kafka.streams.StreamsBuilder#table`
*/
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
(implicit consumed: Consumed[K, V]): KTable[K, V] =
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
): KTable[K, V] =
inner.table[K, V](topic, consumed, materialized)

/**
Expand All @@ -139,21 +140,22 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
inner.globalTable(topic, consumed)

/**
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
* from the implicit `Consumed` instance will be used.
*
* @param topic the topic name
* @param materialized the instance of `Materialized` used to materialize a state store
* @return a `GlobalKTable` for the specified topic
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
*/
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
(implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)

/**
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
* `Transformer`, or `ValueTransformer` before it can be used.
*
* @param builder the builder used to obtain this state store `StateStore` instance
Expand All @@ -164,11 +166,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)

/**
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
* or `ValueTransformer` (in contrast to regular stores).
*
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
*/
*/
def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[_, _],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._


/**
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
*
Expand All @@ -41,7 +40,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view)
* provided by the given `materialized`.
*
* @param materialized an instance of `Materialized` used to materialize a state store.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
Expand All @@ -55,8 +54,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
/**
* Combine the values of records in this stream by the grouped key.
*
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
* the same key into a new instance of [[KTable]].
*
* @param materialized an instance of `Materialized` used to materialize a state store.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
Expand Down
Loading

0 comments on commit 05c5854

Please sign in to comment.