Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change to parameterize AdminClientSettings on effect type #125

Merged
merged 1 commit into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 0 additions & 68 deletions src/main/scala/fs2/kafka/AdminClientFactory.scala

This file was deleted.

128 changes: 65 additions & 63 deletions src/main/scala/fs2/kafka/AdminClientSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package fs2.kafka

import cats.effect.Sync
import cats.Show
import org.apache.kafka.clients.admin.AdminClientConfig

import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import scala.collection.JavaConverters._
import scala.concurrent.duration._

/**
Expand All @@ -31,10 +32,10 @@ import scala.concurrent.duration._
* [[AdminClientSettings]] instances are immutable and all modification
* functions return a new [[AdminClientSettings]] instance.<br>
* <br>
* Use [[AdminClientSettings#Default]] for the default settings, and
* Use [[AdminClientSettings#apply]] for the default settings, and
* then apply any desired modifications on top of that instance.
*/
sealed abstract class AdminClientSettings {
sealed abstract class AdminClientSettings[F[_]] {

/**
* Properties which can be provided when creating a Java `KafkaAdminClient`
Expand All @@ -52,7 +53,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
* }}}
*/
def withBootstrapServers(bootstrapServers: String): AdminClientSettings
def withBootstrapServers(bootstrapServers: String): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -63,7 +64,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.CLIENT_ID_CONFIG
* }}}
*/
def withClientId(clientId: String): AdminClientSettings
def withClientId(clientId: String): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -75,7 +76,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG
* }}}
*/
def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings
def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -87,7 +88,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG
* }}}
*/
def withReconnectBackoffMax(reconnectBackoffMax: FiniteDuration): AdminClientSettings
def withReconnectBackoffMax(reconnectBackoffMax: FiniteDuration): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -99,7 +100,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.RETRY_BACKOFF_MS_CONFIG
* }}}
*/
def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings
def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -111,7 +112,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG
* }}}
*/
def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings
def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -123,7 +124,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG
* }}}
*/
def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings
def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -135,7 +136,7 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.METADATA_MAX_AGE_CONFIG
* }}}
*/
def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings
def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings[F]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand All @@ -147,28 +148,28 @@ sealed abstract class AdminClientSettings {
* AdminClientConfig.RETRIES_CONFIG
* }}}
*/
def withRetries(retries: Int): AdminClientSettings
def withRetries(retries: Int): AdminClientSettings[F]

/**
* Includes a property with the specified `key` and `value`.
* The key should be one of the keys in `AdminClientConfig`,
* and the value should be a valid choice for the key.
*/
def withProperty(key: String, value: String): AdminClientSettings
def withProperty(key: String, value: String): AdminClientSettings[F]

/**
* Includes the specified keys and values as properties. The
* keys should be part of the `AdminClientConfig` keys, and
* the values should be valid choices for the keys.
*/
def withProperties(properties: (String, String)*): AdminClientSettings
def withProperties(properties: (String, String)*): AdminClientSettings[F]

/**
* Includes the specified keys and values as properties. The
* keys should be part of the `AdminClientConfig` keys, and
* the values should be valid choices for the keys.
*/
def withProperties(properties: Map[String, String]): AdminClientSettings
def withProperties(properties: Map[String, String]): AdminClientSettings[F]

/**
* The time to wait for the Java `KafkaAdminClient` to shutdown.<br>
Expand All @@ -180,106 +181,107 @@ sealed abstract class AdminClientSettings {
/**
* Creates a new [[AdminClientSettings]] with the specified [[closeTimeout]].
*/
def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings
def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings[F]

/**
* The [[AdminClientFactory]] for creating the Java `AdminClient`.<br>
* <br>
* The default is [[AdminClientFactory#Default]].<br>
* <br>
* Note that under normal usage you don't need to have a custom
* [[AdminClientFactory]] instance. For testing, you should prefer
* to use a custom trait or class similar to [[KafkaAdminClient]].
* Creates a new `KafkaAdminClient` using the [[properties]]. Note that
* this operation should be bracketed, using e.g. `Resource`, to ensure
* the `close` function on the admin client is called.
*/
def adminClientFactory: AdminClientFactory
def createAdminClient: F[AdminClient]

/**
* Creates a new [[AdminClientSettings]] with the specified
* [[AdminClientFactory]] as the [[adminClientFactory]].<br>
* <br>
* Note that under normal usage you don't need to have a custom
* [[AdminClientFactory]] instance. For testing, you should prefer
* to use a custom trait or class similar to [[KafkaAdminClient]].
* Creates a new [[AdminClientSettings]] with the specified function for
* creating `AdminClient` instances in [[createAdminClient]]. The argument
* is the [[properties]] of the settings instance.
*/
def withAdminClientFactory(adminClientFactory: AdminClientFactory): AdminClientSettings
def withCreateAdminClient(
createAdminClient: Map[String, String] => F[AdminClient]
): AdminClientSettings[F]
}

object AdminClientSettings {
private[this] final case class AdminClientSettingsImpl(
private[this] final case class AdminClientSettingsImpl[F[_]](
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration,
override val adminClientFactory: AdminClientFactory
) extends AdminClientSettings {
override def withBootstrapServers(bootstrapServers: String): AdminClientSettings =
val createAdminClientWith: Map[String, String] => F[AdminClient]
) extends AdminClientSettings[F] {
override def withBootstrapServers(bootstrapServers: String): AdminClientSettings[F] =
withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

override def withClientId(clientId: String): AdminClientSettings =
override def withClientId(clientId: String): AdminClientSettings[F] =
withProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)

override def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings =
override def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings[F] =
withProperty(
AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG,
reconnectBackoff.toMillis.toString
)

override def withReconnectBackoffMax(reconnectBackoffMax: FiniteDuration): AdminClientSettings =
override def withReconnectBackoffMax(
reconnectBackoffMax: FiniteDuration
): AdminClientSettings[F] =
withProperty(
AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG,
reconnectBackoffMax.toMillis.toString
)

override def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings =
override def withRetryBackoff(retryBackoff: FiniteDuration): AdminClientSettings[F] =
withProperty(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff.toMillis.toString)

override def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings =
override def withConnectionsMaxIdle(
connectionsMaxIdle: FiniteDuration
): AdminClientSettings[F] =
withProperty(
AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,
connectionsMaxIdle.toMillis.toString
)

override def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings =
override def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings[F] =
withProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString)

override def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings =
override def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings[F] =
withProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAge.toMillis.toString)

override def withRetries(retries: Int): AdminClientSettings =
override def withRetries(retries: Int): AdminClientSettings[F] =
withProperty(AdminClientConfig.RETRIES_CONFIG, retries.toString)

override def withProperty(key: String, value: String): AdminClientSettings =
override def withProperty(key: String, value: String): AdminClientSettings[F] =
copy(properties = properties.updated(key, value))

override def withProperties(properties: (String, String)*): AdminClientSettings =
override def withProperties(properties: (String, String)*): AdminClientSettings[F] =
copy(properties = this.properties ++ properties.toMap)

override def withProperties(properties: Map[String, String]): AdminClientSettings =
override def withProperties(properties: Map[String, String]): AdminClientSettings[F] =
copy(properties = this.properties ++ properties)

override def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings =
override def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings[F] =
copy(closeTimeout = closeTimeout)

override def withAdminClientFactory(
adminClientFactory: AdminClientFactory
): AdminClientSettings =
copy(adminClientFactory = adminClientFactory)
override def createAdminClient: F[AdminClient] =
createAdminClientWith(properties)

override def withCreateAdminClient(
createAdminClientWith: Map[String, String] => F[AdminClient]
): AdminClientSettings[F] =
copy(createAdminClientWith = createAdminClientWith)

override def toString: String =
Show[AdminClientSettings].show(this)
s"AdminClientSettings(closeTimeout = $closeTimeout)"
}

/**
* The default [[AdminClientSettings]] instance. You can use this
* instance as a base for creating custom [[AdminClientSettings]].
*/
val Default: AdminClientSettings =
def apply[F[_]](implicit F: Sync[F]): AdminClientSettings[F] =
AdminClientSettingsImpl(
properties = Map.empty,
closeTimeout = 20.seconds,
adminClientFactory = AdminClientFactory.Default
createAdminClientWith = properties =>
F.delay {
AdminClient.create {
(properties: Map[String, AnyRef]).asJava
}
}
)

implicit val adminClientSettingsShow: Show[AdminClientSettings] =
Show.show { s =>
s"AdminClientSettings(closeTimeout = ${s.closeTimeout}, adminClientFactory = ${s.adminClientFactory})"
}
implicit def adminClientSettingsShow[F[_]]: Show[AdminClientSettings[F]] =
Show.fromToString
}
7 changes: 3 additions & 4 deletions src/main/scala/fs2/kafka/KafkaAdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,11 @@ object KafkaAdminClient {
}

private[this] def createAdminClient[F[_]](
settings: AdminClientSettings
settings: AdminClientSettings[F]
)(implicit F: Concurrent[F]): Resource[F, Client[F]] =
Resource
.make[F, AdminClient] {
settings.adminClientFactory
.create(settings)
settings.createAdminClient
} { adminClient =>
F.delay {
adminClient.close(settings.closeTimeout.asJava)
Expand All @@ -373,7 +372,7 @@ object KafkaAdminClient {
}

private[kafka] def adminClientResource[F[_]](
settings: AdminClientSettings
settings: AdminClientSettings[F]
)(implicit F: Concurrent[F]): Resource[F, KafkaAdminClient[F]] =
createAdminClient(settings).map { client =>
new KafkaAdminClient[F] {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/fs2/kafka/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ package object kafka {
* using the specified [[AdminClientSettings]]. If working in a
* `Stream` context, you might prefer [[adminClientStream]].
*/
def adminClientResource[F[_]](settings: AdminClientSettings)(
def adminClientResource[F[_]](settings: AdminClientSettings[F])(
implicit F: Concurrent[F]
): Resource[F, KafkaAdminClient[F]] =
KafkaAdminClient.adminClientResource(settings)
Expand All @@ -348,7 +348,7 @@ package object kafka {
* working in a `Stream` context, you might instead prefer to
* use the [[adminClientResource]] function.
*/
def adminClientStream[F[_]](settings: AdminClientSettings)(
def adminClientStream[F[_]](settings: AdminClientSettings[F])(
implicit F: Concurrent[F]
): Stream[F, KafkaAdminClient[F]] =
Stream.resource(adminClientResource(settings))
Expand Down
Loading