Skip to content

Commit

Permalink
Adjusting max field index setting dynamically for query index (#776)
Browse files Browse the repository at this point in the history
* added adjusting max field index setting dynamicly for query index

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit c9d84cb)
  • Loading branch information
petardz authored and github-actions[bot] committed Feb 3, 2023
1 parent 7df2550 commit 9f6f4ef
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 18 deletions.
2 changes: 1 addition & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ dependencies {
implementation "com.github.seancfoley:ipaddress:5.3.3"

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:4.7.0"
testImplementation "org.mockito:mockito-core:5.1.0"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.SetOnce;
import org.opensearch.common.SetOnce;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.Version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
Expand All @@ -29,11 +32,13 @@ import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
import org.opensearch.rest.RestStatus

private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)
Expand All @@ -45,10 +50,17 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
const val NESTED = "nested"
const val TYPE = "type"
const val INDEX_PATTERN_SUFFIX = "-000001"
const val QUERY_INDEX_BASE_FIELDS_COUNT = 8 // 3 fields we defined and 5 builtin additional metadata fields
@JvmStatic
fun docLevelQueriesMappings(): String {
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
}
fun docLevelQueriesSettings(): Settings {
return Settings.builder().loadFromSource(
DocLevelMonitorQueries::class.java.classLoader.getResource("settings/doc-level-queries.json").readText(),
XContentType.JSON
).build()
}
}

suspend fun initDocLevelQueryIndex(): Boolean {
Expand All @@ -70,10 +82,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
val indexRequest = CreateIndexRequest(indexPattern)
.mapping(docLevelQueriesMappings())
.alias(Alias(alias))
.settings(
Settings.builder().put("index.hidden", true)
.build()
)
.settings(docLevelQueriesSettings())
return try {
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) }
createIndexResponse.isAcknowledged
Expand Down Expand Up @@ -321,6 +330,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
updateMappingRequest.source(mapOf<String, Any>("properties" to updatedProperties))
var updateMappingResponse = AcknowledgedResponse(false)
try {
// Adjust max field limit in mappings for query index, if needed.
checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex)
updateMappingResponse = client.suspendUntil {
client.admin().indices().putMapping(updateMappingRequest, it)
}
Expand All @@ -331,7 +342,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
// If we reached limit for total number of fields in mappings, do a rollover here
if (unwrappedException.message?.contains("Limit of total fields") == true) {
try {
// Do queryIndex rollover
targetQueryIndex = rolloverQueryIndex(monitor)
// Adjust max field limit in mappings for new index.
checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex)
// PUT mappings to newly created index
val updateMappingRequest = PutMappingRequest(targetQueryIndex)
updateMappingRequest.source(mapOf<String, Any>("properties" to updatedProperties))
Expand Down Expand Up @@ -371,14 +385,41 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return Pair(updateMappingResponse, targetQueryIndex)
}

private suspend fun rolloverQueryIndex(monitor: Monitor): String? {
/**
* Adjusts max field limit index setting for query index if source index has higher limit.
* This will prevent max field limit exception, when applying mappings to query index
*/
private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) {
val getSettingsResponse: GetSettingsResponse = client.suspendUntil {
admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex, concreteQueryIndex), it)
}
val sourceIndexLimit =
getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L
val queryIndexLimit =
getSettingsResponse.getSetting(concreteQueryIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L
// Our query index initially has 3 fields we defined and 5 more builtin metadata fields in mappings so we have to account for that
if (sourceIndexLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) {
val updateSettingsResponse: AcknowledgedResponse = client.suspendUntil {
admin().indices().updateSettings(
UpdateSettingsRequest(concreteQueryIndex).settings(
Settings.builder().put(
INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, sourceIndexLimit + QUERY_INDEX_BASE_FIELDS_COUNT
)
),
it
)
}
}
}

private suspend fun rolloverQueryIndex(monitor: Monitor): String {
val queryIndex = monitor.dataSources.queryIndex
val queryIndexPattern = monitor.dataSources.queryIndex + INDEX_PATTERN_SUFFIX

val request = RolloverRequest(queryIndex, null)
request.createIndexRequest.index(queryIndexPattern)
.mapping(docLevelQueriesMappings())
.settings(Settings.builder().put("index.hidden", true).build())
.settings(docLevelQueriesSettings())
val response: RolloverResponse = client.suspendUntil {
client.admin().indices().rolloverIndex(request, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
Expand All @@ -34,6 +35,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.mapper.MapperService
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
Expand Down Expand Up @@ -947,7 +949,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
fun `test queryIndex rollover and delete monitor success`() {

val testSourceIndex = "test_source_index"
createIndex(testSourceIndex, Settings.EMPTY)
createIndex(testSourceIndex, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery))
Expand All @@ -959,7 +961,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
// This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
docPayload.append("{")
for (i in 1..330) {
for (i in 1..3300) {
docPayload.append(""" "id$i.somefield.somefield$i":$i,""")
}
docPayload.append("\"test_field\" : \"us-west-2\" }")
Expand Down Expand Up @@ -1136,27 +1138,27 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}

/**
* 1. Create monitor with input source_index with 900 fields in mappings - can fit 1 in queryIndex
* 2. Update monitor and change input source_index to a new one with 900 fields in mappings
* 1. Create monitor with input source_index with 9000 fields in mappings - can fit 1 in queryIndex
* 2. Update monitor and change input source_index to a new one with 9000 fields in mappings
* 3. Expect queryIndex rollover resulting in 2 backing indices
* 4. Delete monitor and expect that all backing indices are deleted
* */
fun `test updating monitor no execution queryIndex rolling over`() {
val testSourceIndex1 = "test_source_index1"
val testSourceIndex2 = "test_source_index2"
createIndex(testSourceIndex1, Settings.EMPTY)
createIndex(testSourceIndex2, Settings.EMPTY)
createIndex(testSourceIndex1, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())
createIndex(testSourceIndex2, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger)
)
// This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api
// This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
docPayload.append("{")
for (i in 1..899) {
for (i in 1..9000) {
docPayload.append(""" "id$i":$i,""")
}
docPayload.append("\"test_field\" : \"us-west-2\" }")
Expand Down Expand Up @@ -1191,6 +1193,48 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals(0, getIndexResponse.indices.size)
}

fun `test queryIndex gets increased max fields in mappings`() {
val testSourceIndex = "test_source_index"
createIndex(testSourceIndex, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger)
)
// This doc should create 12000 fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
docPayload.append("{")
for (i in 1..9998) {
docPayload.append(""" "id$i":$i,""")
}
docPayload.append("\"test_field\" : \"us-west-2\" }")
// Indexing docs here as an easier means to set index mappings
indexDoc(testSourceIndex, "1", docPayload.toString())
// Create monitor
var monitorResponse = createMonitor(monitor)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor

// Expect queryIndex to rollover after setting new source_index with close to limit amount of fields in mappings
var getIndexResponse: GetIndexResponse =
client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get()
assertEquals(1, getIndexResponse.indices.size)
val field_max_limit = getIndexResponse
.getSetting(DOC_LEVEL_QUERIES_INDEX + "-000001", MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key).toInt()

assertEquals(10000 + DocLevelMonitorQueries.QUERY_INDEX_BASE_FIELDS_COUNT, field_max_limit)

deleteMonitor(monitorResponse.id)
waitUntil {
getIndexResponse =
client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get()
return@waitUntil getIndexResponse.indices.isEmpty()
}
assertEquals(0, getIndexResponse.indices.size)
}

fun `test queryIndex bwc when index was not an alias`() {
createIndex(DOC_LEVEL_QUERIES_INDEX, Settings.builder().put("index.hidden", true).build())
assertIndexExists(DOC_LEVEL_QUERIES_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.util.concurrent.TimeUnit

class AlertIndicesIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -235,7 +237,14 @@ class AlertIndicesIT : AlertingRestTestCase() {
client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s")

// Give some time for history to be rolled over and cleared
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alertIndices = getAlertIndices().size
val docCount = getAlertHistoryDocCount()
if (alertIndices > 2 || docCount > 0) {
return@waitUntil false
}
return@waitUntil true
}, 30, TimeUnit.SECONDS)

// Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted.
// This leaves two indices: alert index and an empty history write index
Expand Down Expand Up @@ -284,7 +293,14 @@ class AlertIndicesIT : AlertingRestTestCase() {
client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s")

// Give some time for history to be rolled over and cleared
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alertIndices = getAlertIndices().size
val docCount = getAlertHistoryDocCount()
if (alertIndices > 2 || docCount > 0) {
return@waitUntil false
}
return@waitUntil true
}, 30, TimeUnit.SECONDS)

// Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted.
// This leaves two indices: alert index and an empty history write index
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/resources/settings/doc-level-queries.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"index": {
"mapping": {
"total_fields": {
"limit": 10000
}
},
"hidden": true
}
}

0 comments on commit 9f6f4ef

Please sign in to comment.