diff --git a/src/main/scala/fs2/kafka/AdminClientFactory.scala b/src/main/scala/fs2/kafka/AdminClientFactory.scala deleted file mode 100644 index c48ef91b2..000000000 --- a/src/main/scala/fs2/kafka/AdminClientFactory.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2018-2019 OVO Energy Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fs2.kafka - -import cats.effect.Sync -import org.apache.kafka.clients.admin.AdminClient - -import scala.collection.JavaConverters._ - -/** - * [[AdminClientFactory]] represents the ability to create a - * new Kafka `AdminClient` given [[AdminClientSettings]]. We - * normally do not need a custom [[AdminClientFactory]], but - * it can be useful for testing purposes. If you can instead - * have a custom trait or class with only the required parts - * from [[KafkaAdminClient]] for testing, then prefer that.
- *
- * To create a new [[AdminClientFactory]], simply create a - * new instance and implement the [[create]] function with - * the desired behaviour. To use a custom instance, set it - * with [[AdminClientSettings#withAdminClientFactory]].
- *
- * [[AdminClientFactory#Default]] is the default instance, - * and it creates a default `AdminClient` instance from - * the provided [[AdminClientSettings]]. - */ -abstract class AdminClientFactory { - def create[F[_]]( - settings: AdminClientSettings - )(implicit F: Sync[F]): F[AdminClient] -} - -object AdminClientFactory { - - /** - * The default [[AdminClientFactory]] used in [[AdminClientSettings]] - * unless a different instance has been specified. Default instance - * creates `AdminClient` instances from provided settings. - */ - val Default: AdminClientFactory = - new AdminClientFactory { - override def create[F[_]]( - settings: AdminClientSettings - )(implicit F: Sync[F]): F[AdminClient] = - F.delay { - AdminClient.create { - (settings.properties: Map[String, AnyRef]).asJava - } - } - - override def toString: String = - "Default" - } -} diff --git a/src/main/scala/fs2/kafka/AdminClientSettings.scala b/src/main/scala/fs2/kafka/AdminClientSettings.scala index 026137784..54d4f488d 100644 --- a/src/main/scala/fs2/kafka/AdminClientSettings.scala +++ b/src/main/scala/fs2/kafka/AdminClientSettings.scala @@ -16,9 +16,10 @@ package fs2.kafka +import cats.effect.Sync import cats.Show -import org.apache.kafka.clients.admin.AdminClientConfig - +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import scala.collection.JavaConverters._ import scala.concurrent.duration._ /** @@ -31,10 +32,10 @@ import scala.concurrent.duration._ * [[AdminClientSettings]] instances are immutable and all modification * functions return a new [[AdminClientSettings]] instance.
*
- * Use [[AdminClientSettings#Default]] for the default settings, and + * Use [[AdminClientSettings#apply]] for the default settings, and * then apply any desired modifications on top of that instance. */ -sealed abstract class AdminClientSettings { +sealed abstract class AdminClientSettings[F[_]] { /** * Properties which can be provided when creating a Java `KafkaAdminClient` @@ -52,7 +53,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG * }}} */ - def withBootstrapServers(bootstrapServers: String): AdminClientSettings + def withBootstrapServers(bootstrapServers: String): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -63,7 +64,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.CLIENT_ID_CONFIG * }}} */ - def withClientId(clientId: String): AdminClientSettings + def withClientId(clientId: String): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -75,7 +76,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG * }}} */ - def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings + def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -87,7 +88,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG * }}} */ - def withReconnectBackoffMax(reconnectBackoffMax: FiniteDuration): AdminClientSettings + def withReconnectBackoffMax(reconnectBackoffMax: FiniteDuration): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -99,7 +100,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.RETRY_BACKOFF_MS_CONFIG * }}} */ - def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings + def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -111,7 +112,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG * }}} */ - def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings + def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -123,7 +124,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG * }}} */ - def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings + def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -135,7 +136,7 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.METADATA_MAX_AGE_CONFIG * }}} */ - def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings + def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings[F] /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -147,28 +148,28 @@ sealed abstract class AdminClientSettings { * AdminClientConfig.RETRIES_CONFIG * }}} */ - def withRetries(retries: Int): AdminClientSettings + def withRetries(retries: Int): AdminClientSettings[F] /** * Includes a property with the specified `key` and `value`. * The key should be one of the keys in `AdminClientConfig`, * and the value should be a valid choice for the key. */ - def withProperty(key: String, value: String): AdminClientSettings + def withProperty(key: String, value: String): AdminClientSettings[F] /** * Includes the specified keys and values as properties. The * keys should be part of the `AdminClientConfig` keys, and * the values should be valid choices for the keys. */ - def withProperties(properties: (String, String)*): AdminClientSettings + def withProperties(properties: (String, String)*): AdminClientSettings[F] /** * Includes the specified keys and values as properties. The * keys should be part of the `AdminClientConfig` keys, and * the values should be valid choices for the keys. */ - def withProperties(properties: Map[String, String]): AdminClientSettings + def withProperties(properties: Map[String, String]): AdminClientSettings[F] /** * The time to wait for the Java `KafkaAdminClient` to shutdown.
@@ -180,106 +181,107 @@ sealed abstract class AdminClientSettings { /** * Creates a new [[AdminClientSettings]] with the specified [[closeTimeout]]. */ - def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings + def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings[F] /** - * The [[AdminClientFactory]] for creating the Java `AdminClient`.
- *
- * The default is [[AdminClientFactory#Default]].
- *
- * Note that under normal usage you don't need to have a custom - * [[AdminClientFactory]] instance. For testing, you should prefer - * to use a custom trait or class similar to [[KafkaAdminClient]]. + * Creates a new `KafkaAdminClient` using the [[properties]]. Note that + * this operation should be bracketed, using e.g. `Resource`, to ensure + * the `close` function on the admin client is called. */ - def adminClientFactory: AdminClientFactory + def createAdminClient: F[AdminClient] /** - * Creates a new [[AdminClientSettings]] with the specified - * [[AdminClientFactory]] as the [[adminClientFactory]].
- *
- * Note that under normal usage you don't need to have a custom - * [[AdminClientFactory]] instance. For testing, you should prefer - * to use a custom trait or class similar to [[KafkaAdminClient]]. + * Creates a new [[AdminClientSettings]] with the specified function for + * creating `AdminClient` instances in [[createAdminClient]]. The argument + * is the [[properties]] of the settings instance. */ - def withAdminClientFactory(adminClientFactory: AdminClientFactory): AdminClientSettings + def withCreateAdminClient( + createAdminClient: Map[String, String] => F[AdminClient] + ): AdminClientSettings[F] } object AdminClientSettings { - private[this] final case class AdminClientSettingsImpl( + private[this] final case class AdminClientSettingsImpl[F[_]]( override val properties: Map[String, String], override val closeTimeout: FiniteDuration, - override val adminClientFactory: AdminClientFactory - ) extends AdminClientSettings { - override def withBootstrapServers(bootstrapServers: String): AdminClientSettings = + val createAdminClientWith: Map[String, String] => F[AdminClient] + ) extends AdminClientSettings[F] { + override def withBootstrapServers(bootstrapServers: String): AdminClientSettings[F] = withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) - override def withClientId(clientId: String): AdminClientSettings = + override def withClientId(clientId: String): AdminClientSettings[F] = withProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) - override def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings = + override def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings[F] = withProperty( AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoff.toMillis.toString ) - override def withReconnectBackoffMax(reconnectBackoffMax: FiniteDuration): AdminClientSettings = + override def withReconnectBackoffMax( + reconnectBackoffMax: FiniteDuration + ): AdminClientSettings[F] = withProperty( AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnectBackoffMax.toMillis.toString ) - override def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings = + override def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings[F] = withProperty(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff.toMillis.toString) - override def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings = + override def withConnectionsMaxIdle( + connectionsMaxIdle: FiniteDuration + ): AdminClientSettings[F] = withProperty( AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connectionsMaxIdle.toMillis.toString ) - override def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings = + override def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings[F] = withProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString) - override def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings = + override def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings[F] = withProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAge.toMillis.toString) - override def withRetries(retries: Int): AdminClientSettings = + override def withRetries(retries: Int): AdminClientSettings[F] = withProperty(AdminClientConfig.RETRIES_CONFIG, retries.toString) - override def withProperty(key: String, value: String): AdminClientSettings = + override def withProperty(key: String, value: String): AdminClientSettings[F] = copy(properties = properties.updated(key, value)) - override def withProperties(properties: (String, String)*): AdminClientSettings = + override def withProperties(properties: (String, String)*): AdminClientSettings[F] = copy(properties = this.properties ++ properties.toMap) - override def withProperties(properties: Map[String, String]): AdminClientSettings = + override def withProperties(properties: Map[String, String]): AdminClientSettings[F] = copy(properties = this.properties ++ properties) - override def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings = + override def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings[F] = copy(closeTimeout = closeTimeout) - override def withAdminClientFactory( - adminClientFactory: AdminClientFactory - ): AdminClientSettings = - copy(adminClientFactory = adminClientFactory) + override def createAdminClient: F[AdminClient] = + createAdminClientWith(properties) + + override def withCreateAdminClient( + createAdminClientWith: Map[String, String] => F[AdminClient] + ): AdminClientSettings[F] = + copy(createAdminClientWith = createAdminClientWith) override def toString: String = - Show[AdminClientSettings].show(this) + s"AdminClientSettings(closeTimeout = $closeTimeout)" } - /** - * The default [[AdminClientSettings]] instance. You can use this - * instance as a base for creating custom [[AdminClientSettings]]. - */ - val Default: AdminClientSettings = + def apply[F[_]](implicit F: Sync[F]): AdminClientSettings[F] = AdminClientSettingsImpl( properties = Map.empty, closeTimeout = 20.seconds, - adminClientFactory = AdminClientFactory.Default + createAdminClientWith = properties => + F.delay { + AdminClient.create { + (properties: Map[String, AnyRef]).asJava + } + } ) - implicit val adminClientSettingsShow: Show[AdminClientSettings] = - Show.show { s => - s"AdminClientSettings(closeTimeout = ${s.closeTimeout}, adminClientFactory = ${s.adminClientFactory})" - } + implicit def adminClientSettingsShow[F[_]]: Show[AdminClientSettings[F]] = + Show.fromToString } diff --git a/src/main/scala/fs2/kafka/KafkaAdminClient.scala b/src/main/scala/fs2/kafka/KafkaAdminClient.scala index 0a953e733..43d923d6d 100644 --- a/src/main/scala/fs2/kafka/KafkaAdminClient.scala +++ b/src/main/scala/fs2/kafka/KafkaAdminClient.scala @@ -350,12 +350,11 @@ object KafkaAdminClient { } private[this] def createAdminClient[F[_]]( - settings: AdminClientSettings + settings: AdminClientSettings[F] )(implicit F: Concurrent[F]): Resource[F, Client[F]] = Resource .make[F, AdminClient] { - settings.adminClientFactory - .create(settings) + settings.createAdminClient } { adminClient => F.delay { adminClient.close(settings.closeTimeout.asJava) @@ -373,7 +372,7 @@ object KafkaAdminClient { } private[kafka] def adminClientResource[F[_]]( - settings: AdminClientSettings + settings: AdminClientSettings[F] )(implicit F: Concurrent[F]): Resource[F, KafkaAdminClient[F]] = createAdminClient(settings).map { client => new KafkaAdminClient[F] { diff --git a/src/main/scala/fs2/kafka/package.scala b/src/main/scala/fs2/kafka/package.scala index f63bd164b..50408953f 100644 --- a/src/main/scala/fs2/kafka/package.scala +++ b/src/main/scala/fs2/kafka/package.scala @@ -337,7 +337,7 @@ package object kafka { * using the specified [[AdminClientSettings]]. If working in a * `Stream` context, you might prefer [[adminClientStream]]. */ - def adminClientResource[F[_]](settings: AdminClientSettings)( + def adminClientResource[F[_]](settings: AdminClientSettings[F])( implicit F: Concurrent[F] ): Resource[F, KafkaAdminClient[F]] = KafkaAdminClient.adminClientResource(settings) @@ -348,7 +348,7 @@ package object kafka { * working in a `Stream` context, you might instead prefer to * use the [[adminClientResource]] function. */ - def adminClientStream[F[_]](settings: AdminClientSettings)( + def adminClientStream[F[_]](settings: AdminClientSettings[F])( implicit F: Concurrent[F] ): Stream[F, KafkaAdminClient[F]] = Stream.resource(adminClientResource(settings)) diff --git a/src/test/scala/fs2/kafka/AdminClientSettingsSpec.scala b/src/test/scala/fs2/kafka/AdminClientSettingsSpec.scala index 88c0bf7bc..e5dd15c75 100644 --- a/src/test/scala/fs2/kafka/AdminClientSettingsSpec.scala +++ b/src/test/scala/fs2/kafka/AdminClientSettingsSpec.scala @@ -1,8 +1,8 @@ package fs2.kafka +import cats.effect.IO import cats.implicits._ import org.apache.kafka.clients.admin.AdminClientConfig - import scala.concurrent.duration._ final class AdminClientSettingsSpec extends BaseSpec { @@ -118,22 +118,25 @@ final class AdminClientSettingsSpec extends BaseSpec { } } - it("should provide withAdminClientFactory") { + it("should provide withCreateAdminClient") { assert { settings - .withAdminClientFactory(AdminClientFactory.Default) - .adminClientFactory == AdminClientFactory.Default + .withCreateAdminClient(_ => IO.raiseError(new RuntimeException)) + .createAdminClient + .attempt + .unsafeRunSync() + .isLeft } } it("should have a Show instance and matching toString") { assert { - settings.toString == "AdminClientSettings(closeTimeout = 20 seconds, adminClientFactory = Default)" && + settings.toString == "AdminClientSettings(closeTimeout = 20 seconds)" && settings.show == settings.toString } } } val settings = - AdminClientSettings.Default + AdminClientSettings[IO] } diff --git a/src/test/scala/fs2/kafka/BaseKafkaSpec.scala b/src/test/scala/fs2/kafka/BaseKafkaSpec.scala index d07f7c75d..2b5d99d25 100644 --- a/src/test/scala/fs2/kafka/BaseKafkaSpec.scala +++ b/src/test/scala/fs2/kafka/BaseKafkaSpec.scala @@ -19,8 +19,8 @@ abstract class BaseKafkaSpec extends BaseAsyncSpec with EmbeddedKafka { final def adminClientSettings( config: EmbeddedKafkaConfig - ): AdminClientSettings = - AdminClientSettings.Default + ): AdminClientSettings[IO] = + AdminClientSettings[IO] .withProperties(adminClientProperties(config)) final def consumerSettings[F[_]]( diff --git a/src/test/scala/fs2/kafka/PackageSpec.scala b/src/test/scala/fs2/kafka/PackageSpec.scala index 959c16ca4..1be2a74aa 100644 --- a/src/test/scala/fs2/kafka/PackageSpec.scala +++ b/src/test/scala/fs2/kafka/PackageSpec.scala @@ -6,7 +6,7 @@ final class PackageSpec extends BaseKafkaSpec { describe("creating admin clients") { it("should support defined syntax") { val settings = - AdminClientSettings.Default + AdminClientSettings[IO] adminClientResource[IO](settings) adminClientStream[IO](settings)