Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjusting max field index setting dynamically for query index #776

Merged
merged 6 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ dependencies {
implementation "com.github.seancfoley:ipaddress:5.3.3"

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:4.7.0"
testImplementation "org.mockito:mockito-core:5.1.0"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.SetOnce;
import org.opensearch.common.SetOnce;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.Version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
Expand All @@ -29,11 +32,13 @@ import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
import org.opensearch.rest.RestStatus

private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)
Expand All @@ -45,10 +50,17 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
const val NESTED = "nested"
const val TYPE = "type"
const val INDEX_PATTERN_SUFFIX = "-000001"
const val QUERY_INDEX_BASE_FIELDS_COUNT = 8 // 3 fields we defined and 5 builtin additional metadata fields
@JvmStatic
fun docLevelQueriesMappings(): String {
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
}
fun docLevelQueriesSettings(): Settings {
return Settings.builder().loadFromSource(
DocLevelMonitorQueries::class.java.classLoader.getResource("settings/doc-level-queries.json").readText(),
XContentType.JSON
).build()
}
}

suspend fun initDocLevelQueryIndex(): Boolean {
Expand All @@ -70,10 +82,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
val indexRequest = CreateIndexRequest(indexPattern)
.mapping(docLevelQueriesMappings())
.alias(Alias(alias))
.settings(
Settings.builder().put("index.hidden", true)
.build()
)
.settings(docLevelQueriesSettings())
return try {
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) }
createIndexResponse.isAcknowledged
Expand Down Expand Up @@ -321,6 +330,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
updateMappingRequest.source(mapOf<String, Any>("properties" to updatedProperties))
var updateMappingResponse = AcknowledgedResponse(false)
try {
// Adjust max field limit in mappings for query index, if needed.
checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex)
updateMappingResponse = client.suspendUntil {
client.admin().indices().putMapping(updateMappingRequest, it)
}
Expand All @@ -331,7 +342,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
// If we reached limit for total number of fields in mappings, do a rollover here
if (unwrappedException.message?.contains("Limit of total fields") == true) {
try {
// Do queryIndex rollover
targetQueryIndex = rolloverQueryIndex(monitor)
// Adjust max field limit in mappings for new index.
checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex)
// PUT mappings to newly created index
val updateMappingRequest = PutMappingRequest(targetQueryIndex)
updateMappingRequest.source(mapOf<String, Any>("properties" to updatedProperties))
Expand Down Expand Up @@ -371,14 +385,41 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return Pair(updateMappingResponse, targetQueryIndex)
}

private suspend fun rolloverQueryIndex(monitor: Monitor): String? {
/**
* Adjusts max field limit index setting for query index if source index has higher limit.
* This will prevent max field limit exception, when applying mappings to query index
*/
private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) {
val getSettingsResponse: GetSettingsResponse = client.suspendUntil {
admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex, concreteQueryIndex), it)
}
val sourceIndexLimit =
getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L
val queryIndexLimit =
getSettingsResponse.getSetting(concreteQueryIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L
// Our query index initially has 3 fields we defined and 5 more builtin metadata fields in mappings so we have to account for that
if (sourceIndexLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) {
val updateSettingsResponse: AcknowledgedResponse = client.suspendUntil {
admin().indices().updateSettings(
UpdateSettingsRequest(concreteQueryIndex).settings(
Settings.builder().put(
INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, sourceIndexLimit + QUERY_INDEX_BASE_FIELDS_COUNT
)
),
it
)
}
}
}

private suspend fun rolloverQueryIndex(monitor: Monitor): String {
val queryIndex = monitor.dataSources.queryIndex
val queryIndexPattern = monitor.dataSources.queryIndex + INDEX_PATTERN_SUFFIX

val request = RolloverRequest(queryIndex, null)
request.createIndexRequest.index(queryIndexPattern)
.mapping(docLevelQueriesMappings())
.settings(Settings.builder().put("index.hidden", true).build())
.settings(docLevelQueriesSettings())
val response: RolloverResponse = client.suspendUntil {
client.admin().indices().rolloverIndex(request, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
Expand All @@ -34,6 +35,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.mapper.MapperService
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
Expand Down Expand Up @@ -945,7 +947,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
fun `test queryIndex rollover and delete monitor success`() {

val testSourceIndex = "test_source_index"
createIndex(testSourceIndex, Settings.EMPTY)
createIndex(testSourceIndex, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery))
Expand All @@ -957,7 +959,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
// This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
docPayload.append("{")
for (i in 1..330) {
for (i in 1..3300) {
docPayload.append(""" "id$i.somefield.somefield$i":$i,""")
}
docPayload.append("\"test_field\" : \"us-west-2\" }")
Expand Down Expand Up @@ -1134,27 +1136,27 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}

/**
* 1. Create monitor with input source_index with 900 fields in mappings - can fit 1 in queryIndex
* 2. Update monitor and change input source_index to a new one with 900 fields in mappings
* 1. Create monitor with input source_index with 9000 fields in mappings - can fit 1 in queryIndex
* 2. Update monitor and change input source_index to a new one with 9000 fields in mappings
* 3. Expect queryIndex rollover resulting in 2 backing indices
* 4. Delete monitor and expect that all backing indices are deleted
* */
fun `test updating monitor no execution queryIndex rolling over`() {
val testSourceIndex1 = "test_source_index1"
val testSourceIndex2 = "test_source_index2"
createIndex(testSourceIndex1, Settings.EMPTY)
createIndex(testSourceIndex2, Settings.EMPTY)
createIndex(testSourceIndex1, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())
createIndex(testSourceIndex2, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger)
)
// This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api
// This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
docPayload.append("{")
for (i in 1..899) {
for (i in 1..9000) {
docPayload.append(""" "id$i":$i,""")
}
docPayload.append("\"test_field\" : \"us-west-2\" }")
Expand Down Expand Up @@ -1189,6 +1191,48 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals(0, getIndexResponse.indices.size)
}

fun `test queryIndex gets increased max fields in mappings`() {
val testSourceIndex = "test_source_index"
createIndex(testSourceIndex, Settings.builder().put("index.mapping.total_fields.limit", "10000").build())
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger)
)
// This doc should create 12000 fields in index mapping. It's easier to add mappings like this then via api
val docPayload: StringBuilder = StringBuilder(100000)
docPayload.append("{")
for (i in 1..9998) {
docPayload.append(""" "id$i":$i,""")
}
docPayload.append("\"test_field\" : \"us-west-2\" }")
// Indexing docs here as an easier means to set index mappings
indexDoc(testSourceIndex, "1", docPayload.toString())
// Create monitor
var monitorResponse = createMonitor(monitor)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor

// Expect queryIndex to rollover after setting new source_index with close to limit amount of fields in mappings
var getIndexResponse: GetIndexResponse =
client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get()
assertEquals(1, getIndexResponse.indices.size)
val field_max_limit = getIndexResponse
.getSetting(DOC_LEVEL_QUERIES_INDEX + "-000001", MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key).toInt()

assertEquals(10000 + DocLevelMonitorQueries.QUERY_INDEX_BASE_FIELDS_COUNT, field_max_limit)

deleteMonitor(monitorResponse.id)
waitUntil {
getIndexResponse =
client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get()
return@waitUntil getIndexResponse.indices.isEmpty()
}
assertEquals(0, getIndexResponse.indices.size)
}

fun `test queryIndex bwc when index was not an alias`() {
createIndex(DOC_LEVEL_QUERIES_INDEX, Settings.builder().put("index.hidden", true).build())
assertIndexExists(DOC_LEVEL_QUERIES_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.util.concurrent.TimeUnit

class AlertIndicesIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -235,7 +237,14 @@ class AlertIndicesIT : AlertingRestTestCase() {
client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s")

// Give some time for history to be rolled over and cleared
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alertIndices = getAlertIndices().size
val docCount = getAlertHistoryDocCount()
if (alertIndices > 2 || docCount > 0) {
return@waitUntil false
}
return@waitUntil true
}, 30, TimeUnit.SECONDS)

// Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted.
// This leaves two indices: alert index and an empty history write index
Expand Down Expand Up @@ -284,7 +293,14 @@ class AlertIndicesIT : AlertingRestTestCase() {
client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s")

// Give some time for history to be rolled over and cleared
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alertIndices = getAlertIndices().size
val docCount = getAlertHistoryDocCount()
if (alertIndices > 2 || docCount > 0) {
return@waitUntil false
}
return@waitUntil true
}, 30, TimeUnit.SECONDS)

// Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted.
// This leaves two indices: alert index and an empty history write index
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/resources/settings/doc-level-queries.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"index": {
"mapping": {
"total_fields": {
"limit": 10000
}
},
"hidden": true
}
}