diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index bf1f22ce..577ab95a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -55,7 +55,9 @@ object SafeUpdateCache { Try(BinaryPickle(bytes).unpickle[AnyRef]) } - def fromBytes(config: Config, bytes: Array[Byte])(implicit ec: ExecutionContext): SafeUpdateCache = { + def fromBytes(config: Config, bytes: Array[Byte])( + implicit ec: ExecutionContext + ): SafeUpdateCache = { import org.apache.hadoop.io.WritableUtils val cache: SafeUpdateCache = new SafeUpdateCache(config) @@ -80,23 +82,30 @@ object SafeUpdateCache { } } -class SafeUpdateCache(val config: Config) - (implicit executionContext: ExecutionContext) { +class SafeUpdateCache(val config: Config)( + implicit executionContext: ExecutionContext +) { import java.lang.{Long => JLong} import SafeUpdateCache._ - val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey) val systemTtl = config.getInt(SafeUpdateCache.TtlKey) def this(maxSize: Int, systemTtl: Int)(implicit ec: ExecutionContext) { - this(ConfigFactory.parseMap( - Map(SafeUpdateCache.MaxSizeKey -> maxSize, SafeUpdateCache.TtlKey -> systemTtl)) + this( + ConfigFactory.parseMap( + Map( + SafeUpdateCache.MaxSizeKey -> maxSize, + SafeUpdateCache.TtlKey -> systemTtl + ) + ) ) } - private val cache = CacheBuilder.newBuilder().maximumSize(maxSize) + private val cache = CacheBuilder + .newBuilder() + .maximumSize(maxSize) .build[JLong, (AnyRef, Int, AtomicBoolean)]() private def toCacheKey(key: String): Long = { @@ -126,14 +135,19 @@ class SafeUpdateCache(val config: Config) def defaultOnEvict(oldValue: AnyRef): Unit = { oldValue match { case None => - case _ => logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted.") + case _ => + logger.info( + s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is evicted." + ) } } - def withCache[T <: AnyRef](key: String, - broadcast: Boolean, - cacheTTLInSecs: Option[Int] = None, - onEvict: AnyRef => Unit = defaultOnEvict)(op: => T): T = { + def withCache[T <: AnyRef]( + key: String, + broadcast: Boolean, + cacheTTLInSecs: Option[Int] = None, + onEvict: AnyRef => Unit = defaultOnEvict + )(op: => T): T = { val cacheKey = toCacheKey(key) val cachedValWithTs = cache.getIfPresent(cacheKey) @@ -155,19 +169,21 @@ class SafeUpdateCache(val config: Config) if (running) cachedVal else { - val value = op - Future(value)(executionContext) onComplete { + Future(op)(executionContext) onComplete { case Failure(ex) => put(key, cachedVal, false) logger.error(s"withCache update failed: $cacheKey", ex) case Success(newValue) => - - put(key, newValue, broadcast = (broadcast && newValue != cachedVal)) + put( + key, + newValue, + broadcast = (broadcast && newValue != cachedVal) + ) onEvict(cachedVal) cachedVal match { case None => - case _ => logger.info(s"withCache update success: $cacheKey") + case _ => logger.info(s"withCache update success: $cacheKey") } } @@ -210,4 +226,3 @@ class SafeUpdateCache(val config: Config) def shutdown() = {} } -