Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tuning for Solr to improve indexing latency #529

Merged
merged 9 commits into from
May 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app/collins/models/AssetLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ case class AssetLog(

@Transient
lazy val assetTag: String = asset.tag
// TODO(gabe): if this log is indexed (serialized first) before solr is
// updated with the asset document, this will throw! This can happen when
// creating a new asset then immediately performing some attribute sets
// which create logs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue/ticket to fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this could break things if we create a config asset and then immediately perform sets on it? I think jetpants may do that in some cases but can't recall by memory. What exactly would break / what are the implications / is this a new problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evanelias this isn't new; it's been lurking since inception. Implications are that the tag set will fall (can't find asset to modify in solr index) until solr populates. Not cataclysmic, but definitely not a great consistency model to expose

// See https://github.com/tumblr/collins/issues/528
@Transient
lazy val asset: Asset = Asset.findById(assetId).get

Expand Down
3 changes: 2 additions & 1 deletion app/collins/solr/SolrConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ object SolrConfig extends Configurable {
def socketTimeout = getInt("socketTimeout", 1000)
def connectionTimeout = getInt("connectionTimeout", 5000)
def maxTotalConnections = getInt("maxTotalConnections", 100)
def commitWithin = getInt("commitWithinMs", 50)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any docs explaining what commitWithinMs is about?

Copy link
Contributor Author

@byxorna byxorna Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, its discussed in bullet 3 in the description of the PR, but also here

def defaultMaxConnectionsPerHost = getInt("defaultMaxConnectionsPerHost", 100)
def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 10) milliseconds
def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 30) milliseconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did these magic numbers come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same place the first numbers came from (the ether). its discussed in the description of the PR why these numbers were chosen as a default


override protected def validateConfig() {
if (enabled) {
Expand Down
29 changes: 12 additions & 17 deletions app/collins/solr/SolrHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object SolrHelper {
logger.debug("Populating Asset Logs")
val num = assets.map { asset =>
val logs = AssetLog.findByAsset(asset)
updateAssetLogs(logs, indexTime, false)
updateAssetLogs(logs, indexTime)
logs.size
}.sum
_server.foreach { _.commit() }
Expand All @@ -111,35 +111,30 @@ object SolrHelper {
}.getOrElse(logger.warn("attempted to populate solr when no server was initialized"))
}

def updateItems[T](items: Seq[T], serializer: SolrSerializer[T], indexTime: Date, commit: Boolean = true) {
def updateItems[T](items: Seq[T], serializer: SolrSerializer[T], indexTime: Date) = {
_server.map { server =>
val docs = items.map { item => prepForInsertion(serializer.serialize(item, indexTime)) }
if (docs.size > 0) {
val fuckingJava = new java.util.ArrayList[SolrInputDocument]
docs.foreach { doc => fuckingJava.add(doc) }
server.add(fuckingJava)
if (commit) {
server.commit()
if (items.size == 1) {
logger.debug(("Indexed %s: %s".format(serializer.docType.name.toLowerCase, items.head.toString)))
} else {
logger.info("Indexed %d %ss".format(docs.size, serializer.docType.name.toLowerCase))
}
}
server.add(fuckingJava, SolrConfig.commitWithin)
logger.debug("Added %d %s documents to be indexed within %d ms".format(
fuckingJava.size,
serializer.docType.name.toLowerCase,
SolrConfig.commitWithin))
// dont explicitly hard commit, let solr figure it out and make docs available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any downsides to doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because a hard commit isnt actually synchronous because this is happening via actor in background thread

// to be searched ASAP. commit(boolean waitFlush, boolean waitSearcher, boolean softCommit)
server.commit(false, false, true)
} else {
logger.warn("No items to index!")
}
}

}

def updateAssets(assets: Seq[Asset], indexTime: Date, commit: Boolean = true) {
updateItems[Asset](assets, AssetSerializer, indexTime, commit)
}
def updateAssets(assets: Seq[Asset], indexTime: Date) = updateItems[Asset](assets, AssetSerializer, indexTime)

def updateAssetLogs(logs: Seq[AssetLog], indexTime: Date, commit: Boolean = true) {
updateItems[AssetLog](logs, AssetLogSerializer, indexTime, commit)
}
def updateAssetLogs(logs: Seq[AssetLog], indexTime: Date) = updateItems[AssetLog](logs, AssetLogSerializer, indexTime)

def terminateSolr() {
_server.foreach {
Expand Down
25 changes: 23 additions & 2 deletions app/collins/solr/SolrUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class AssetSolrUpdater extends Actor {
new ConcurrentHashMap[String, java.lang.Boolean]())

private[this] val assetTagsRef = new AtomicReference(newAssetTagSet)
private[this] val logger = Logger("SolrUpdater")
private[this] val logger = Logger("AssetSolrUpdater")

//mutex to prevent multiple concurrent scheduler calls
val scheduled = new AtomicBoolean(false)
Expand Down Expand Up @@ -66,8 +66,29 @@ class AssetSolrUpdater extends Actor {

class AssetLogSolrUpdater extends Actor {

private[this] def newLogSet = Collections.newSetFromMap[AssetLog](
new ConcurrentHashMap[AssetLog, java.lang.Boolean]())
private[this] val logsRef = new AtomicReference(newLogSet)
private[this] val logger = Logger("AssetLogSolrUpdater")
val scheduled = new AtomicBoolean(false)
case object Reindex

def receive = {
case log: AssetLog => SolrHelper.updateAssetLogs(List(log), new Date)
case log: AssetLog =>
logsRef.get.add(log)
if (scheduled.compareAndSet(false, true)) {
logger.debug("Scheduling reindex of log %d within %s".format(log.id, SolrConfig.assetBatchUpdateWindow))
context.system.scheduler.scheduleOnce(SolrConfig.assetBatchUpdateWindow, self, Reindex)
} else {
logger.trace("Ignoring already scheduled reindex of log %d".format(log.id))
}
case Reindex =>
if (scheduled.get == true) {
val logs = logsRef.getAndSet(newLogSet).asScala.toSeq
logger.debug("Got Reindex task, working on %d logs".format(logs.size))
SolrHelper.updateAssetLogs(logs, new Date)
scheduled.set(false)
}
}

}
30 changes: 20 additions & 10 deletions conf/solr/cores/collins/conf/solrconfig.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,32 @@
If the updateLog is enabled, then it's highly recommended to
have some sort of hard autoCommit to limit the log size.
-->
<autoCommit>
<maxTime>${solr.autoCommit.maxTime:15000}</maxTime>
<openSearcher>false</openSearcher>
<autoCommit>
<maxTime>${solr.autoCommit.maxTime:15000}</maxTime>
<maxDocs>${solr.autoCommit.maxDocs:100}</maxDocs>
<openSearcher>false</openSearcher>
</autoCommit>

<!-- softAutoCommit is like autoCommit except it causes a
'soft' commit which only ensures that changes are visible
but does not ensure that data is synced to disk. This is
faster and more near-realtime friendly than a hard commit.
<!-- SoftAutoCommit

Perform a 'soft' commit automatically under certain conditions.
This commit avoids ensuring that data is synched to disk.

maxDocs - Maximum number of documents to add since the last
soft commit before automaticly triggering a new soft commit.

maxTime - Maximum amount of time in ms that is allowed to pass
since a document was added before automaticly
triggering a new soft commit.
-->
<autoSoftCommit>
<maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime>

<autoSoftCommit>
<!--<maxTime>1000</maxTime>-->
<maxTime>${solr.autoSoftCommit.maxTime:1000}</maxTime>
</autoSoftCommit>

</updateHandler>

<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Query section - these settings control query time things like caches
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
Expand Down