Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scaladoc for TopicCache #493

Merged
merged 13 commits into from
Jul 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,18 @@ object HeadCache {
}


/** Lighweight wrapper over [[EventualJournal]].
*
* Allows easier stubbing in unit tests.
*/
trait Eventual[F[_]] {

/** Gets the last replicated offset for a partition topic.
*
* @see [[EventualJournal#offset]] for more details.
*/
def pointer(topic: Topic, partition: Partition): F[Option[Offset]]

}

object Eventual {
Expand Down Expand Up @@ -183,14 +192,61 @@ object HeadCache {
}


/** Provides methods to update the metrics for [[HeadCache]] internals */
trait Metrics[F[_]] {

/** Report duration and result of cache hits, i.e. [[TopicCache#get]].
*
* @param topic
* Topic journal is being stored in.
* @param latency
* Duration of [[TopicCache#get]] call.
* @param result
* Result of the call, i.e. "ahead", "limited", "timeout" or "failure".
* @param now
* If result was [[PartitionCache.Result.Now]], i.e. entry was already in
* cache.
*/
def get(topic: Topic, latency: FiniteDuration, result: String, now: Boolean): F[Unit]

/** Report health of all [[PartitionCache]] instances related to a topic.
*
* @param topic
* Topic which these [[PartitionCache]] instances are related to.
* @param entries
* Number of distinct journals stored in a topic cache. If it is too
* close to [[HeadCacheConfig.Partition#maxSize]] multiplied by number of
* partitions, the cache might not work efficiently.
* @param listeners
* Number of listeners waiting after [[PartitionCache#get]] call. Too
* many of them might mean that cache is not being loaded fast enough.
*/
def meters(topic: Topic, entries: Int, listeners: Int): F[Unit]

/** Report the latency and number of records coming from Kafka.
*
* I.e. how long it took for a next element in a stream returned by
* [[HeadCacheConsumption#apply]] to get from a journal writer to this
* cache.
*
* @param topic
* Topic being read by [[HeadCacheConsumption]].
* @param age
* Time it took for an element to reach [[HeadCache]].
* @param diff
* The number of elements added to cache by this batch, i.e. returned by
* [[PartitionCache#add]].
*/
def consumer(topic: Topic, age: FiniteDuration, diff: Long): F[Unit]

/** Report the number of records coming from Cassandra.
*
* @param topic
* Topic being read by [[Eventual]].
* @param diff
* The number of elements remove from cache by this batch, i.e. returned
* by [[PartitionCache#remove]].
*/
def storage(topic: Topic, diff: Long): F[Unit]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.evolution.scache.Cache
import scala.concurrent.duration.FiniteDuration


/** Maintains an information about all non-replicated Kafka records.
/** Maintains an information about non-replicated Kafka records in a partition.
*
* The class itself does not read Kafka or poll Cassandra (or other long term
* storage), it relies on the information incoming through
Expand Down Expand Up @@ -636,7 +636,7 @@ object PartitionCache {
* Number of listeners waiting after [[PartitionCache#get]] call. Too many
* of them might mean that cache is not being loaded fast enough.
* @param entries
* Number of distinct journal store in a cache. If it is too close to
* Number of distinct journals stored in a cache. If it is too close to
* maximum configured number, the cache might not work efficiently.
*/
final case class Meters(listeners: Int, entries: Int)
Expand Down Expand Up @@ -953,6 +953,29 @@ object PartitionCache {
}

private implicit class CacheOps[F[_], K, V](val self: Cache[F, K, V]) extends AnyVal {

/** Aggregate all succesfully loaded or loading values to something else.
*
* If an error happens when value is loaded, then it is ignored.
*
* Example: calculate sum of all even loaded or loading `Int` values:
* {{{
* cache.foldMap { value =>
* if (value % 2 == 0) value else 0
* }
* }}}
*
* @tparam A
* Type to map the key/values to, and aggregate with. It requires
* [[CommutativeMonoid]] to be present to be able to sum up the values,
* without having a guarantee about the order of the values being
* aggregates as the order may be random depending on a cache
* implementation.
*
* @return
* Result of the aggregation, i.e. all mapped values combined using
* passed [[CommutativeMonoid]].
*/
def foldMap1[A](f: V => F[A])(implicit F: Sync[F], commutativeMonoid: CommutativeMonoid[A]): F[A] = {
self.foldMap {
case (_, Right(a)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,62 @@ import com.evolution.scache.Cache

import scala.concurrent.duration._


/** Maintains an information about non-replicated Kafka records in a topic.
*
* The implementation reads both Kafka and Cassandra by itself, continously
* refreshing the information.
*/
trait TopicCache[F[_]] {

/** Get the information about a state of a journal stored in the topic.
*
* @param id
* Journal id
* @param partition
* Partition where journal is stored to. The usual way to get the partition
* is to write a "marker" record to Kafka topic and use the partition of
* the marker as a current one.
* @param offset
* Current [[Offset]], i.e. maximum offset where Kafka records related to a
* journal are located. The usual way to get such an offset is to write a
* "marker" record to Kafka patition and use the offset of the marker as a
* current one.
*
* @return
* [[PartitionCache.Result]] with either the current state or indication of
* a reason why such state is not present in a cache.
*
* @see
* [[PartitionCache.Result]] for more details on possible results.
*/
def get(id: String, partition: Partition, offset: Offset): F[PartitionCache.Result[F]]
}

object TopicCache {

/** Creates [[TopicCache]] using configured parameters and data sources.
*
* @param eventual
* Cassandra data source.
* @param topic
* Topic stored in this cache.
* @param log
* Logger used to write debug logs to.
* @param consumer
* Kafka data source.
* @param config
* [[HeadCache]] configuration.
* @param consRecordToActionHeader
* Function used to parse records coming from `consumer`. Only headers will
* be parsed, and the payload will be ignored.
* @param metrics
* Interface to report the metrics to.
* @return
* Resource which will configure a [[TopicCache]] with the passed
* parameters. Instance of `Resource[TopicCache]` are, obviously, reusable
* and there is no need to call [[TopicCache#of]] each time if parameters
* did not change.
*/
def of[F[_]: Async: Parallel: Runtime](
eventual: Eventual[F],
topic: Topic,
Expand Down Expand Up @@ -204,19 +252,50 @@ object TopicCache {
}
}

/** Lighweight wrapper over [[KafkaConsumer]].
*
* Allows easier stubbing in unit tests and provides a little bit more
* convenient [[TopicCache]]-specific API.
*/
trait Consumer[F[_]] {

/** Assigns specific topic partitions to a consumer.
*
* I.e. consumer groups will not be used.
*
* @see
* [[KafkaConsumer#assign]] for more details.
*/
def assign(topic: Topic, partitions: Nes[Partition]): F[Unit]

/** Moves fetching position to a different offset(s).
*
* The read will start from the new offsets the next time [[#poll]] is
* called.
*
* @see
* [[KafkaConsumer#seek]] for more details.
*/
def seek(topic: Topic, offsets: Nem[Partition, Offset]): F[Unit]

/** Fetch data from the previously assigned partitions.
*
* @see
* [[KafkaConsumer#poll]] for more details.
*/
def poll: F[ConsumerRecords[String, Unit]]

/** Get the set of partitions for a given topic.
*
* @see
* [[KafkaConsumer#partitions]] for more details.
*/
def partitions(topic: Topic): F[Set[Partition]]
}

object Consumer {

/** Stub implemenation of [[Consumer]], which never returns any records. */
def empty[F[_]: Applicative]: Consumer[F] = {
class Empty
new Empty with Consumer[F] {
Expand All @@ -234,6 +313,11 @@ object TopicCache {

def apply[F[_]](implicit F: Consumer[F]): Consumer[F] = F

/** Wraps existing [[KafkaConsumer]] into [[Consumer]] API.
*
* @param consumer Previously created [[KafkaConsumer]].
* @param pollTimeout The timeout to use for [[KafkaConsumer#poll]].
*/
def apply[F[_]: Monad](
consumer: KafkaConsumer[F, String, Unit],
pollTimeout: FiniteDuration
Expand Down Expand Up @@ -262,6 +346,16 @@ object TopicCache {
}
}

/** Creates a new [[KafkaConsumer]] and wraps it into [[Consumer]] API.
*
* @param config
* Kafka configuration in form of [[ConsumerConfig]]. It is used to get
* Kafka address, mostly, and some important parameters will be ignored,
* as these need to be set to specific values for the cache to work. I.e.
* `autoOffsetReset`, `groupId` and `autoCommit` will not be used.
* @param pollTimeout
* The timeout to use for [[KafkaConsumer#poll]].
*/
def of[F[_]: Monad: KafkaConsumerOf: FromTry](
config: ConsumerConfig,
pollTimeout: FiniteDuration = 10.millis
Expand All @@ -281,6 +375,11 @@ object TopicCache {

implicit class ConsumerOps[F[_]](val self: Consumer[F]) extends AnyVal {

/** Log debug messages on every call to the class methods.
*
* The messages will go to DEBUG level, so it is also necessary to enable
* it in logger configuration.
*/
def withLog(log: Log[F])(implicit F: Monad[F]): Consumer[F] = {
new WithLog with Consumer[F] {

Expand Down Expand Up @@ -326,12 +425,35 @@ object TopicCache {
}


/** Cumulative average of some data stream.
*
* If one has to calcuate an average for a large list of numbers, one does
* not have to keep all these numbers in a memory. It is enough to keep sum
* of them and the count.
*
* @param sum
* Sum of all numbers seen.
* @param count
* Number of all numbers seen.
*
* Example:
* {{{
* scala> import cats.syntax.all._
* scala> (1L to 100L).toList.map(Sample(_)).combineAll.avg
* val res0: Option[Long] = Some(50)
* }}}
*
* @see
* https://en.wikipedia.org/wiki/Moving_average#Cumulative_average
*/
private final case class Sample(sum: Long, count: Int)

private object Sample {

/** Single number in a stream we are calculating average for */
def apply(value: Long): Sample = Sample(sum = value, count = 1)

/** Initial state of cumulative average, i.e. no numbers registered */
val Empty: Sample = Sample(0L, 0)

implicit val monoidSample: Monoid[Sample] = new Monoid[Sample] {
Expand All @@ -346,6 +468,11 @@ object TopicCache {
}

implicit class SampleOps(val self: Sample) extends AnyVal {

/** Average of the all numbers seen, or `None` if no numbers were added.
*
* @return Average of all numbers seen, rounded down.
*/
def avg: Option[Long] = {
if (self.count > 0) (self.sum / self.count).some else none
}
Expand All @@ -358,6 +485,7 @@ object TopicCache {

implicit class TopicCacheOps[F[_]](val self: TopicCache[F]) extends AnyVal {

/** Wrap instance in a class, which logs metrics to [[HeadCache.Metrics]] */
def withMetrics(
topic: Topic,
metrics: HeadCache.Metrics[F])(implicit
Expand Down Expand Up @@ -415,6 +543,11 @@ object TopicCache {
}
}

/** Log debug messages on every call to a cache.
*
* The messages will go to DEBUG level, so it is also necessary to enable
* it in logger configuration.
*/
def withLog(log: Log[F])(implicit F: FlatMap[F], measureDuration: MeasureDuration[F]): TopicCache[F] = {
new WithLog with TopicCache[F] {

Expand All @@ -432,6 +565,22 @@ object TopicCache {

private implicit class SetOps[A](val self: Set[A]) extends AnyVal {

/** Aggregate all values in a set to something else using [[Monoid]].
*
* In other words, provides `foldMapM` method to `Set`.
*
* The method is not provided directly by `cats-core`,
* because it is unlawful.
*
* It is possible to achieve the same using `alleycats-core`
* library like this, so the method might not be removed in future:
* {{{
* scala> import cats.syntax.all._
* scala> import alleycats.std.all._
* scala> Set(1, 2, 3).foldMapM(_.some)
* val res0: Option[Int] = Some(6)
* }}}
*/
def foldMapM[F[_]: Monad, B: Monoid](f: A => F[B]): F[B] = {
self.foldLeft(Monoid[B].empty.pure[F]) { case (b0, a) =>
for {
Expand Down
Loading