Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal to use Scalafmt with the Scala files #4965

Merged
merged 1 commit into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ buildscript {
classpath 'org.scoverage:gradle-scoverage:2.3.0'
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
classpath 'org.owasp:dependency-check-gradle:3.2.1'
classpath "com.diffplug.spotless:spotless-plugin-gradle:3.10.0"
}
}

apply plugin: "com.diffplug.gradle.spotless"
spotless {
scala {
target 'streams/**/*.scala'
scalafmt('1.5.1').configFile('checkstyle/.scalafmt.conf')
}
}

Expand Down
20 changes: 20 additions & 0 deletions checkstyle/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
docstrings = JavaDoc
maxColumn = 120
continuationIndent.defnSite = 2
assumeStandardLibraryStripMargin = true
danglingParentheses = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

danglingParentheses default seems to be true already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is since 1.6.0 but here we are stuck on 1.5.1 so I specified it explicitly.
This PR diffplug/spotless#260 is the fix for us to be able to upgrade to Scalafmt 1.6.x

rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers]
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import java.lang.{Iterable => JIterable}
* more expressive, with less boilerplate and more succinct.
* <p>
* For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
* have full support for SAM types.
* have full support for SAM types.
*/
object FunctionConversions {

Expand All @@ -40,7 +40,7 @@ object FunctionConversions {
}
}

implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends AnyVal {
implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] {
override def apply(key: T, value: U): VR = f(key, value)
}
Expand All @@ -49,7 +49,7 @@ object FunctionConversions {
}
}

implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal {
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] {
override def apply(key: K, value: V): KeyValue[KR, VR] = {
val (kr, vr) = f(key, value)
Expand Down Expand Up @@ -88,7 +88,7 @@ object FunctionConversions {
}
}

implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal {
implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
def asMerger: Merger[K, VR] = new Merger[K, VR] {
override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ object ImplicitConversions {
valueSerde: Serde[V]): Materialized[K, V, S] =
Materialized.`with`[K, V, S](keySerde, valueSerde)

implicit def joinedFromKeyValueOtherSerde[K, V, VO]
(implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] =
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
Joined.`with`(keySerde, valueSerde, otherValueSerde)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,48 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, S
import org.apache.kafka.streams.kstream.WindowedSerdes

object Serdes {
implicit val String: Serde[String] = JSerdes.String()
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
implicit val String: Serde[String] = JSerdes.String()
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()

implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] =
new WindowedSerdes.SessionWindowedSerde[T]()

def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
},
new Deserializer[T] {
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
}
)

def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
},
new Deserializer[T] {
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def close(): Unit = ()
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ import ImplicitConversions._
import scala.collection.JavaConverters._

/**
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {

/**
* Create a [[kstream.KStream]] from the specified topic.
* <p>
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
* <p>
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* {{{
* // Brings all implicit conversions in scope
* import ImplicitConversions._
Expand Down Expand Up @@ -88,11 +88,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
/**
* Create a [[kstream.KTable]] from the specified topic.
* <p>
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
* <p>
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
* {{{
* // Brings all implicit conversions in scope
* import ImplicitConversions._
Expand Down Expand Up @@ -123,8 +123,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @see #table(String)
* @see `org.apache.kafka.streams.StreamsBuilder#table`
*/
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
(implicit consumed: Consumed[K, V]): KTable[K, V] =
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
): KTable[K, V] =
inner.table[K, V](topic, consumed, materialized)

/**
Expand All @@ -139,21 +140,22 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
inner.globalTable(topic, consumed)

/**
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
* from the implicit `Consumed` instance will be used.
*
* @param topic the topic name
* @param materialized the instance of `Materialized` used to materialize a state store
* @return a `GlobalKTable` for the specified topic
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
*/
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
(implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)

/**
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
* `Transformer`, or `ValueTransformer` before it can be used.
*
* @param builder the builder used to obtain this state store `StateStore` instance
Expand All @@ -164,11 +166,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)

/**
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
* or `ValueTransformer` (in contrast to regular stores).
*
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
*/
*/
def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[_, _],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._


/**
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
*
Expand All @@ -41,7 +40,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view)
* provided by the given `materialized`.
*
* @param materialized an instance of `Materialized` used to materialize a state store.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
Expand All @@ -55,8 +54,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
/**
* Combine the values of records in this stream by the grouped key.
*
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
* the same key into a new instance of [[KTable]].
*
* @param materialized an instance of `Materialized` used to materialize a state store.
* @param materialized an instance of `Materialized` used to materialize a state store.
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
* represent the latest (rolling) count (i.e., number of records) for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
Expand Down
Loading