Skip to content

Commit

Permalink
Merge pull request #555 from NDLANO/grep-indexing
Browse files Browse the repository at this point in the history
search-api: Add grep code indexing and search
  • Loading branch information
jnatten authored Dec 9, 2024
2 parents 7cb493f + 35448aa commit fcab9ea
Show file tree
Hide file tree
Showing 63 changed files with 1,168 additions and 821 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import com.sksamuel.elastic4s.requests.indexes.IndexRequest
import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicTemplateRequest
import com.typesafe.scalalogging.StrictLogging
import no.ndla.articleapi.Props
import no.ndla.articleapi.model.domain.ReindexResult
import no.ndla.articleapi.repository.ArticleRepository
import no.ndla.common.model.domain.article.Article
import no.ndla.search.SearchLanguage.languageAnalyzers
import no.ndla.search.model.domain.{BulkIndexResult, ReindexResult}
import no.ndla.search.{BaseIndexService, Elastic4sClient, SearchLanguage}

import scala.collection.mutable.ListBuffer
Expand All @@ -42,33 +42,20 @@ trait IndexService {
}

def indexDocuments(numShards: Option[Int]): Try[ReindexResult] = synchronized {
val start = System.currentTimeMillis()
createIndexWithGeneratedName(numShards).flatMap(indexName => {
val operations = for {
numIndexed <- sendToElastic(indexName)
aliasTarget <- getAliasTarget
_ <- updateAliasTarget(aliasTarget, indexName)
} yield numIndexed

operations match {
case Failure(f) =>
deleteIndexWithName(Some(indexName)): Unit
Failure(f)
case Success(totalIndexed) =>
Success(ReindexResult(totalIndexed, System.currentTimeMillis() - start))
}
})
indexDocumentsInBulk(numShards) {
sendToElastic
}
}

def sendToElastic(indexName: String): Try[Int] = {
def sendToElastic(indexName: String): Try[BulkIndexResult] = {
getRanges
.flatMap(ranges => {
ranges.traverse { case (start, end) =>
val toIndex = articleRepository.documentsWithIdBetween(start, end)
indexDocuments(toIndex, indexName)
}
})
.map(_.sum)
.map(countBulkIndexed)
}

def getRanges: Try[List[(Long, Long)]] = {
Expand All @@ -82,9 +69,9 @@ trait IndexService {
}
}

def indexDocuments(contents: Seq[Article], indexName: String): Try[Int] = {
def indexDocuments(contents: Seq[Article], indexName: String): Try[BulkIndexResult] = {
if (contents.isEmpty) {
Success(0)
Success(BulkIndexResult.empty)
} else {
val response = e4sClient.execute {
bulk(contents.map(content => {
Expand All @@ -95,25 +82,12 @@ trait IndexService {
response match {
case Success(r) =>
logger.info(s"Indexed ${contents.size} documents. No of failed items: ${r.result.failures.size}")
Success(contents.size)
Success(BulkIndexResult(r.result.successes.size, contents.size))
case Failure(ex) => Failure(ex)
}
}
}

def findAllIndexes(indexName: String): Try[Seq[String]] = {
val response = e4sClient.execute {
getAliases()
}

response match {
case Success(results) =>
Success(results.result.mappings.toList.map { case (index, _) => index.name }.filter(_.startsWith(indexName)))
case Failure(ex) =>
Failure(ex)
}
}

/** Returns Sequence of FieldDefinitions for a given field.
*
* @param fieldName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ case class CouldNotFindLanguageException(message: String) extends Run
class AudioStorageException(message: String) extends RuntimeException(message)
class LanguageMappingException(message: String) extends RuntimeException(message)
class ImportException(message: String) extends RuntimeException(message)
case class ElasticIndexingException(message: String) extends RuntimeException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,3 @@ object AudioMetaInformation extends SQLSyntaxSupport[AudioMetaInformation] {
rs.longOpt(au.c("id")).map(_ => fromResultSet(au)(rs))
}
}

case class ReindexResult(totalIndexed: Int, millisUsed: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@

package no.ndla.audioapi.service.search

import cats.implicits._
import com.sksamuel.elastic4s.ElasticDsl._
import cats.implicits.*
import com.sksamuel.elastic4s.ElasticDsl.*
import com.sksamuel.elastic4s.fields.ElasticField
import com.sksamuel.elastic4s.requests.indexes.IndexRequest
import com.sksamuel.elastic4s.requests.mappings.MappingDefinition
import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicTemplateRequest
import com.typesafe.scalalogging.StrictLogging
import no.ndla.audioapi.Props
import no.ndla.audioapi.model.domain.ReindexResult
import no.ndla.audioapi.repository.{AudioRepository, Repository}
import no.ndla.search.SearchLanguage.languageAnalyzers
import no.ndla.search.model.domain.{BulkIndexResult, ReindexResult}
import no.ndla.search.{BaseIndexService, Elastic4sClient, SearchLanguage}

import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}

trait IndexService {
this: Elastic4sClient with BaseIndexService with SearchConverterService with AudioRepository with Props =>
this: Elastic4sClient & BaseIndexService & SearchConverterService & AudioRepository & Props =>

trait IndexService[D, T] extends BaseIndexService with StrictLogging {
override val MaxResultWindowOption: Int = props.ElasticSearchIndexMaxResultWindow
Expand All @@ -45,33 +45,21 @@ trait IndexService {
} yield imported
}

def indexDocuments(numShards: Option[Int]): Try[ReindexResult] = {
synchronized {
val start = System.currentTimeMillis()
createIndexWithGeneratedName(numShards).flatMap(indexName => {
val operations = for {
numIndexed <- sendToElastic(indexName)
aliasTarget <- getAliasTarget
_ <- updateAliasTarget(aliasTarget, indexName)
} yield numIndexed

operations match {
case Failure(f) => deleteIndexWithName(Some(indexName)).flatMap(_ => Failure(f))
case Success(totalIndexed) => Success(ReindexResult(totalIndexed, System.currentTimeMillis() - start))
}
})
}
def indexDocuments(numShards: Option[Int]): Try[ReindexResult] = synchronized {
indexDocumentsInBulk(numShards)(sendToElastic)
}

def sendToElastic(indexName: String): Try[Int] = {
def sendToElastic(indexName: String): Try[BulkIndexResult] = {
getRanges
.flatMap(ranges => {
ranges.traverse { case (start, end) =>
val documentsToIndex = repository.documentsWithIdBetween(start, end)
documentsToIndex.flatMap(indexDocuments(_, indexName))
}
ranges
.traverse { case (start, end) =>
repository
.documentsWithIdBetween(start, end)
.flatMap(toIndex => indexDocuments(toIndex, indexName))
}
.map(countBulkIndexed)
})
.map(_.sum)
}

def getRanges: Try[List[(Long, Long)]] = {
Expand All @@ -88,53 +76,25 @@ trait IndexService {

}

def indexDocuments(contents: Seq[D], indexName: String): Try[Int] = {
def indexDocuments(contents: Seq[D], indexName: String): Try[BulkIndexResult] = {
if (contents.isEmpty) {
Success(0)
Success(BulkIndexResult.empty)
} else {
val requests = contents.traverse(content => createIndexRequests(content, indexName))
requests.flatMap(rs => {
executeRequests(rs.flatten) match {
case Success((numSuccessful, numFailures)) =>
logger.info(s"Indexed $numSuccessful documents ($searchIndex). No of failed items: $numFailures")
Success(contents.size)
case Success(result) =>
logger.info(
s"Indexed ${result.successful} documents ($searchIndex). No of failed items: ${result.failed}"
)
Success(result)
case Failure(ex) => Failure(ex)
}
})

}
}

def findAllIndexes(indexName: String): Try[Seq[String]] = {
val response = e4sClient.execute {
getAliases()
}

response match {
case Success(results) =>
Success(results.result.mappings.toList.map { case (index, _) => index.name }.filter(_.startsWith(indexName)))
case Failure(ex) =>
Failure(ex)
}
}

/** Executes elasticsearch requests in bulk. Returns success (without executing anything) if supplied with an empty
* list.
*
* @param requests
* a list of elasticsearch [[IndexRequest]]'s
* @return
* A Try suggesting if the request was successful or not with a tuple containing number of successful requests
* and number of failed requests (in that order)
*/
private def executeRequests(requests: Seq[IndexRequest]): Try[(Int, Int)] = {
requests match {
case Nil => Success((0, 0))
case head :: Nil => e4sClient.execute(head).map(r => if (r.isSuccess) (1, 0) else (0, 1))
case reqs => e4sClient.execute(bulk(reqs)).map(r => (r.result.successes.size, r.result.failures.size))
}
}

/** @deprecated
* Returns Sequence of FieldDefinitions for a given field.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,4 @@ case class NotFoundException(message: String, supportedLanguages: Seq[String] =
extends RuntimeException(message)
case class ConceptMissingIdException(message: String) extends RuntimeException(message)
case class ConceptExistsAlreadyException(message: String) extends RuntimeException(message)
case class ElasticIndexingException(message: String) extends RuntimeException(message)
case class OperationNotAllowedException(message: String) extends RuntimeException(message)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import no.ndla.common.model.domain.concept.Concept
import no.ndla.conceptapi.Props
import no.ndla.conceptapi.integration.TaxonomyApiClient
import no.ndla.conceptapi.integration.model.TaxonomyData
import no.ndla.conceptapi.model.api.{ConceptMissingIdException, ElasticIndexingException}
import no.ndla.conceptapi.model.domain.ReindexResult
import no.ndla.conceptapi.model.api.ConceptMissingIdException
import no.ndla.conceptapi.repository.Repository
import no.ndla.search.SearchLanguage.{NynorskLanguageAnalyzer, languageAnalyzers}
import no.ndla.search.model.domain.{BulkIndexResult, ElasticIndexingException, ReindexResult}
import no.ndla.search.{BaseIndexService, Elastic4sClient, SearchLanguage}

import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -70,36 +70,19 @@ trait IndexService {
} yield imported
}

def indexDocuments(numShards: Option[Int]): Try[ReindexResult] = {
synchronized {
val start = System.currentTimeMillis()
createIndexWithGeneratedName(numShards).flatMap(indexName => {
val operations = for {
numIndexed <- sendToElastic(indexName)
aliasTarget <- getAliasTarget
_ <- updateAliasTarget(aliasTarget, indexName)
} yield numIndexed

operations match {
case Failure(f) =>
deleteIndexWithName(Some(indexName)): Unit
Failure(f)
case Success(totalIndexed) =>
Success(ReindexResult(totalIndexed, System.currentTimeMillis() - start))
}
})
}
def indexDocuments(numShards: Option[Int]): Try[ReindexResult] = synchronized {
indexDocumentsInBulk(numShards)(sendToElastic)
}

private def sendToElastic(indexName: String): Try[Int] = {
private def sendToElastic(indexName: String): Try[BulkIndexResult] = {
for {
taxonomyData <- taxonomyApiClient.getSubjects
ranges <- getRanges
indexed <- ranges.traverse { case (start, end) =>
val toIndex = repository.documentsWithIdBetween(start, end)
indexDocuments(toIndex, indexName, taxonomyData)
indexDocuments(toIndex, indexName, taxonomyData).map(numIndexed => (numIndexed, toIndex.size))
}
} yield indexed.sum
} yield countIndexed(indexed)
}

private def getRanges: Try[List[(Long, Long)]] = {
Expand Down Expand Up @@ -142,19 +125,6 @@ trait IndexService {

def findAllIndexes: Try[Seq[String]] = findAllIndexes(this.searchIndex)

private def findAllIndexes(indexName: String): Try[Seq[String]] = {
val response = e4sClient.execute {
getAliases()
}

response match {
case Success(results) =>
Success(results.result.mappings.toList.map { case (index, _) => index.name }.filter(_.startsWith(indexName)))
case Failure(ex) =>
Failure(ex)
}
}

/** Returns Sequence of DynamicTemplateRequest for a given field.
*
* @param fieldName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import no.ndla.common.model.domain.draft.{Draft, DraftStatus}
import no.ndla.draftapi.Props
import no.ndla.draftapi.integration.ArticleApiClient
import no.ndla.draftapi.model.api.{ArticleDomainDump, ArticleDump, ContentId, NotFoundException}
import no.ndla.draftapi.model.domain.{ArticleIds, ImportId, ReindexResult}
import no.ndla.draftapi.model.domain.{ArticleIds, ImportId}
import no.ndla.draftapi.repository.DraftRepository
import no.ndla.draftapi.service.*
import no.ndla.draftapi.service.search.*
Expand All @@ -28,6 +28,7 @@ import scalikejdbc.ReadOnlyAutoSession
import sttp.model.StatusCode
import sttp.tapir.server.ServerEndpoint
import io.circe.generic.auto.*
import no.ndla.search.model.domain.ReindexResult
import sttp.tapir.generic.auto.*

import java.util.concurrent.{Executors, TimeUnit}
Expand Down

This file was deleted.

Loading

0 comments on commit fcab9ea

Please sign in to comment.