Skip to content

Commit

Permalink
Add scaladoc for HeadCache (#494)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar authored Jul 14, 2023
1 parent 4e4c35a commit c7d0420
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,54 @@ import com.evolutiongaming.smetrics._

import scala.concurrent.duration._

/**
* TODO headcache:
* 1. Keep 1000 last seen entries, even if replicated.
* 2. Fail headcache when background tasks failed
*/
/** Metainfo of events written to Kafka, but not yet replicated to Cassandra.
*
* The implementation subcribes to all events in Kafka and periodically polls
* Cassandra to remove information about the events, which already replicated.
*
* The returned entries do not contain the events themselves, but only an
* offset of the first non-repliacted event, the sequence number of last event,
* range of events to be deleted etc.
*
* TODO headcache:
* 1. Keep 1000 last seen entries, even if replicated.
* 2. Fail headcache when background tasks failed
*
* @see [[HeadInfo]] for more details on the purpose of the stored data.
*/
trait HeadCache[F[_]] {

/** Get the information about a state of a journal stored in the cache.
*
* @param key
* Journal key including a Kafka topic where journal is stored and
* a journal identifier.
* @param partition
* Partition where journal is stored to. The usual way to get the partition
* is to write a "marker" record to Kafka topic and use the partition of
* the marker as a current one.
* @param offset
* Current [[Offset]], i.e. maximum offset where Kafka records related to a
* journal are located. The usual way to get such an offset is to write a
* "marker" record to Kafka patition and use the offset of the marker as a
* current one.
*
* @return
* [[HeadInfo]] with the current metainformation about non-replicated
* events, or `None` if it was not present in [[HeadCache]] and could not
* be loaded either.
*/
def get(key: Key, partition: Partition, offset: Offset): F[Option[HeadInfo]]
}


object HeadCache {

/** Disable cache and always return `None` */
def empty[F[_]: Applicative]: HeadCache[F] = const(none[HeadInfo].pure[F])


/** Disable cache and always return a predefined value */
def const[F[_]](value: F[Option[HeadInfo]]): HeadCache[F] = {
class Const
new Const with HeadCache[F] {
Expand All @@ -41,6 +73,30 @@ object HeadCache {
}


/** Creates new cache using a Kafka configuration and Cassandra reader.
*
* The created instances will report metrics to `metrics` and also will do
* the debug logging. There is no need to call [[HeadCache#withLogs]] on
* them.
*
* @param consumerConfig
* Kafka consumer configuration used to find new non-replicated journal
* events. Some of the parameters will be ignored. See
* [[TopicCache.Consumer#of]] for more details.
* @param eventualJournal
* Cassandra (or other long term storage) data source used to remove
* replicated events from the cache. Usually created by calling
* [[EventualCassandra#of]].
* @param metrics
* Interface to report the metrics to. The intended way to configure it is
* overriding [[KafkaJournal#metrics]] in a custom implementation of
* [[KafkaJournal]].
* @return
* Resource which will configure a [[HeadCache]] with the passed
* parameters. Instance of `Resource[HeadCache]` are, obviously, reusable
* and there is no need to call [[HeadCache#of]] each time if parameters
* did not change.
*/
def of[F[_]: Async: Parallel: Runtime: LogOf: KafkaConsumerOf: MeasureDuration: FromTry: FromJsResult: JsonCodec.Decode](
consumerConfig: ConsumerConfig,
eventualJournal: EventualJournal[F],
Expand All @@ -59,7 +115,34 @@ object HeadCache {
}
}


/** Creates new cache using Kafka and Cassandra data sources.
*
* The method also allows to change the default configuration in form of
* [[HeadCacheConfig]], i.e. to make the polling faster for testing purposes.
*
* @param eventual
* Cassandra data source.
* @param log
* Logger to use for [[TopicCache#withLog]]. Note, that only [[TopicCache]]
* debug logging will be affected by this. One needs to call
* [[HeadCache#withLog]] if debug logging for [[HeadCache]] is required.
* @param consumer
* Kakfa data source factory. The reason why it is factory (i.e.
* `Resource`) is that [[HeadCache]] will try to recreate consumer in case
* of the failure.
* @param metrics
* Interface to report the metrics to. The intended way to configure it is
* overriding [[KafkaJournal#metrics]] in a custom implementation of
* [[KafkaJournal]].
* @param config
* Cache configuration. It is recommended to keep it default, and only
* change it for unit testing purposes.
* @return
* Resource which will configure a [[HeadCache]] with the passed
* parameters. Instance of `Resource[HeadCache]` are, obviously, reusable
* and there is no need to call [[HeadCache#of]] each time if parameters
* did not change.
*/
def of[F[_]: Async: Parallel: Runtime: FromJsResult: MeasureDuration: JsonCodec.Decode](
eventual: Eventual[F],
log: Log[F],
Expand Down Expand Up @@ -131,15 +214,24 @@ object HeadCache {

def apply[F[_]](implicit F: Eventual[F]): Eventual[F] = F

/** Wrap [[EventualJournal]] into [[Eventual]] */
def apply[F[_]](eventualJournal: EventualJournal[F]): Eventual[F] = {
class Main
new Main with HeadCache.Eventual[F] {
def pointer(topic: Topic, partition: Partition): F[Option[Offset]] = eventualJournal.offset(topic, partition)
}
}

/** Always return `None` as an offset, i.e. pretend nothing ever replicates.
*
* Only useful for testing purposes.
*/
def empty[F[_]: Applicative]: Eventual[F] = const(TopicPointers.empty.pure[F])

/** Ignore topic and specify offset to return by partition.
*
* Only useful for testing purposes.
*/
def const[F[_]: Applicative](value: F[TopicPointers]): Eventual[F] = {
class Const
new Const with Eventual[F] {
Expand All @@ -161,6 +253,11 @@ object HeadCache {
}
}

/** Log debug messages on every call to the class methods.
*
* The messages will go to DEBUG level, so it is also necessary to enable
* it in logger configuration.
*/
def withLog(log: Log[F])(implicit F: FlatMap[F], measureDuration: MeasureDuration[F]): HeadCache[F] = {
new WithLog with HeadCache[F] {

Expand All @@ -175,6 +272,14 @@ object HeadCache {
}
}

/** Prevents cache methods to be used after `Resource` was released.
*
* [[ReleasedError]] will be raised if [[HeadCache#get]] is called after
* resource is released.
*
* It may prevent certain kind of bugs, when several caches are,
* accidentially, alive and working.
*/
def withFence(implicit F: Sync[F]): Resource[F, HeadCache[F]] = {
Resource
.make { Ref[F].of(().pure[F]) } { _.set(ReleasedError.raiseError[F, Unit]) }
Expand Down Expand Up @@ -252,9 +357,14 @@ object HeadCache {

object Metrics {

/** Does not do anything, ignores metric reports */
def empty[F[_]: Applicative]: Metrics[F] = const(().pure[F])


/** Calls a passed effect when metrics are reported.
*
* May only be useful for tests, as the reported parameters are ignored.
*/
def const[F[_]](unit: F[Unit]): Metrics[F] = {
class Const
new Const with Metrics[F] {
Expand All @@ -277,6 +387,29 @@ object HeadCache {
}


/** Registers a default set of metrics to a passed collector registry.
*
* Note, that creating this metrics several times with the same collector
* registry may cause errors unless previous [[Metrics]] instance was not
* released yet.
*
* The following metrics will be registered by default, but it is possible
* to override the default `headcache` prefix to something else.
*
* {{{
* headcache_get_latency HeadCache get latency in seconds
* headcache_get_result HeadCache `get` call result counter
* headcache_entries HeadCache entries
* headcache_listeners HeadCache listeners
* headcache_records_age HeadCache time difference between record timestamp and now in seconds
* headcache_diff HeadCache offset difference between state and source
* }}}
*
* @param registry
* smetrics collector registry.
* @param prefix
* Prefix to use for the registered metrics.
*/
def of[F[_]: Monad](
registry: CollectorRegistry[F],
prefix: Prefix = Prefix.default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ object TopicCache {
* @param log
* Logger used to write debug logs to.
* @param consumer
* Kafka data source.
* Kafka data source factory. The reason why it is factory (i.e.
* `Resource`) is that [[HeadCache]] will try to recreate consumer in case
* of the failure.
* @param config
* [[HeadCache]] configuration.
* @param consRecordToActionHeader
Expand Down Expand Up @@ -573,7 +575,7 @@ object TopicCache {
* because it is unlawful.
*
* It is possible to achieve the same using `alleycats-core`
* library like this, so the method might not be removed in future:
* library like this, so the method might be removed in future:
* {{{
* scala> import cats.syntax.all._
* scala> import alleycats.std.all._
Expand Down

0 comments on commit c7d0420

Please sign in to comment.