diff --git a/app/collins/models/AssetLog.scala b/app/collins/models/AssetLog.scala index 2e94fe310..03e78482a 100644 --- a/app/collins/models/AssetLog.scala +++ b/app/collins/models/AssetLog.scala @@ -87,6 +87,11 @@ case class AssetLog( @Transient lazy val assetTag: String = asset.tag + // TODO(gabe): if this log is indexed (serialized first) before solr is + // updated with the asset document, this will throw! This can happen when + // creating a new asset then immediately performing some attribute sets + // which create logs. + // See https://github.com/tumblr/collins/issues/528 @Transient lazy val asset: Asset = Asset.findById(assetId).get diff --git a/app/collins/solr/SolrConfig.scala b/app/collins/solr/SolrConfig.scala index 4fd483178..108db0034 100644 --- a/app/collins/solr/SolrConfig.scala +++ b/app/collins/solr/SolrConfig.scala @@ -32,8 +32,9 @@ object SolrConfig extends Configurable { def socketTimeout = getInt("socketTimeout", 1000) def connectionTimeout = getInt("connectionTimeout", 5000) def maxTotalConnections = getInt("maxTotalConnections", 100) + def commitWithin = getInt("commitWithinMs", 50) def defaultMaxConnectionsPerHost = getInt("defaultMaxConnectionsPerHost", 100) - def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 10) milliseconds + def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 30) milliseconds override protected def validateConfig() { if (enabled) { diff --git a/app/collins/solr/SolrHelper.scala b/app/collins/solr/SolrHelper.scala index 344e0bb83..848cb1c77 100644 --- a/app/collins/solr/SolrHelper.scala +++ b/app/collins/solr/SolrHelper.scala @@ -102,7 +102,7 @@ object SolrHelper { logger.debug("Populating Asset Logs") val num = assets.map { asset => val logs = AssetLog.findByAsset(asset) - updateAssetLogs(logs, indexTime, false) + updateAssetLogs(logs, indexTime) logs.size }.sum _server.foreach { _.commit() } @@ -111,21 +111,20 @@ object SolrHelper { }.getOrElse(logger.warn("attempted to populate solr when no server was initialized")) } - def updateItems[T](items: Seq[T], serializer: SolrSerializer[T], indexTime: Date, commit: Boolean = true) { + def updateItems[T](items: Seq[T], serializer: SolrSerializer[T], indexTime: Date) = { _server.map { server => val docs = items.map { item => prepForInsertion(serializer.serialize(item, indexTime)) } if (docs.size > 0) { val fuckingJava = new java.util.ArrayList[SolrInputDocument] docs.foreach { doc => fuckingJava.add(doc) } - server.add(fuckingJava) - if (commit) { - server.commit() - if (items.size == 1) { - logger.debug(("Indexed %s: %s".format(serializer.docType.name.toLowerCase, items.head.toString))) - } else { - logger.info("Indexed %d %ss".format(docs.size, serializer.docType.name.toLowerCase)) - } - } + server.add(fuckingJava, SolrConfig.commitWithin) + logger.debug("Added %d %s documents to be indexed within %d ms".format( + fuckingJava.size, + serializer.docType.name.toLowerCase, + SolrConfig.commitWithin)) + // dont explicitly hard commit, let solr figure it out and make docs available + // to be searched ASAP. commit(boolean waitFlush, boolean waitSearcher, boolean softCommit) + server.commit(false, false, true) } else { logger.warn("No items to index!") } @@ -133,13 +132,9 @@ object SolrHelper { } - def updateAssets(assets: Seq[Asset], indexTime: Date, commit: Boolean = true) { - updateItems[Asset](assets, AssetSerializer, indexTime, commit) - } + def updateAssets(assets: Seq[Asset], indexTime: Date) = updateItems[Asset](assets, AssetSerializer, indexTime) - def updateAssetLogs(logs: Seq[AssetLog], indexTime: Date, commit: Boolean = true) { - updateItems[AssetLog](logs, AssetLogSerializer, indexTime, commit) - } + def updateAssetLogs(logs: Seq[AssetLog], indexTime: Date) = updateItems[AssetLog](logs, AssetLogSerializer, indexTime) def terminateSolr() { _server.foreach { diff --git a/app/collins/solr/SolrUpdater.scala b/app/collins/solr/SolrUpdater.scala index 6392e181b..6de6610c5 100644 --- a/app/collins/solr/SolrUpdater.scala +++ b/app/collins/solr/SolrUpdater.scala @@ -29,7 +29,7 @@ class AssetSolrUpdater extends Actor { new ConcurrentHashMap[String, java.lang.Boolean]()) private[this] val assetTagsRef = new AtomicReference(newAssetTagSet) - private[this] val logger = Logger("SolrUpdater") + private[this] val logger = Logger("AssetSolrUpdater") //mutex to prevent multiple concurrent scheduler calls val scheduled = new AtomicBoolean(false) @@ -66,8 +66,29 @@ class AssetSolrUpdater extends Actor { class AssetLogSolrUpdater extends Actor { + private[this] def newLogSet = Collections.newSetFromMap[AssetLog]( + new ConcurrentHashMap[AssetLog, java.lang.Boolean]()) + private[this] val logsRef = new AtomicReference(newLogSet) + private[this] val logger = Logger("AssetLogSolrUpdater") + val scheduled = new AtomicBoolean(false) + case object Reindex + def receive = { - case log: AssetLog => SolrHelper.updateAssetLogs(List(log), new Date) + case log: AssetLog => + logsRef.get.add(log) + if (scheduled.compareAndSet(false, true)) { + logger.debug("Scheduling reindex of log %d within %s".format(log.id, SolrConfig.assetBatchUpdateWindow)) + context.system.scheduler.scheduleOnce(SolrConfig.assetBatchUpdateWindow, self, Reindex) + } else { + logger.trace("Ignoring already scheduled reindex of log %d".format(log.id)) + } + case Reindex => + if (scheduled.get == true) { + val logs = logsRef.getAndSet(newLogSet).asScala.toSeq + logger.debug("Got Reindex task, working on %d logs".format(logs.size)) + SolrHelper.updateAssetLogs(logs, new Date) + scheduled.set(false) + } } } diff --git a/conf/solr/cores/collins/conf/solrconfig.xml b/conf/solr/cores/collins/conf/solrconfig.xml index 573ca8a6c..46d1463b7 100644 --- a/conf/solr/cores/collins/conf/solrconfig.xml +++ b/conf/solr/cores/collins/conf/solrconfig.xml @@ -181,22 +181,32 @@ If the updateLog is enabled, then it's highly recommended to have some sort of hard autoCommit to limit the log size. --> - - ${solr.autoCommit.maxTime:15000} - false + + ${solr.autoCommit.maxTime:15000} + ${solr.autoCommit.maxDocs:100} + false - - - ${solr.autoSoftCommit.maxTime:-1} + + + + ${solr.autoSoftCommit.maxTime:1000} - +