diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index b17c0ea47c..8ecfddbf8d 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -7,6 +7,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeType; import io.micrometer.core.instrument.Measurement; @@ -497,12 +499,19 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy( assertThat(settingsIsmNode.get("rollover_alias").textValue(), equalTo(indexAlias)); final String expectedIndexPolicyName = indexAlias + "-policy"; + final String expectedPolicyIndexPattern = indexAlias + "*"; if (isOSBundle()) { // Check managed index await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(getIndexPolicyId(index), equalTo(expectedIndexPolicyName)); } ); + // Check policy index patterns are matching to indexAlias* + // Refer https://github.com/opensearch-project/data-prepper/pull/5118 + await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(getPolicyIndexPatterns(getIndexPolicyId(index)), equalTo(expectedPolicyIndexPattern)); + } + ); } // roll over initial index @@ -1745,7 +1754,18 @@ private String getIndexPolicyId(final String index) throws IOException { responseBody).map().get(index)).get("index.opendistro.index_state_management.policy_id"); return policyId; } + + private String getPolicyIndexPatterns(final String policyId) throws IOException { + // TODO: replace with new _opensearch API + final Request request = new Request(HttpMethod.GET, "/_opendistro/_ism/policies/" + policyId); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(responseBody); + final JsonNode indexPatterns = (ObjectNode) jsonNode.get("policy").get("ism_template").get("index_patterns"); + return indexPatterns.get(0).toString(); + } @SuppressWarnings("unchecked") private void wipeAllOpenSearchIndices() throws IOException { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index bb1565dcd1..72a970335a 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -228,7 +228,7 @@ public void setupIndex() throws IOException { private void checkAndCreateIndexTemplate() throws IOException { final boolean isISMEnabled = checkISMEnabled(); final Optional policyIdOptional = isISMEnabled ? - ismPolicyManagementStrategy.checkAndCreatePolicy() : + ismPolicyManagementStrategy.checkAndCreatePolicy(configuredIndexAlias) : Optional.empty(); if (!openSearchSinkConfiguration.getIndexConfiguration().getIndexTemplate().isEmpty()) { checkAndCreateIndexTemplate(isISMEnabled, policyIdOptional.orElse(null)); @@ -258,8 +258,8 @@ final void checkAndCreateIndexTemplate(final boolean isISMEnabled, final String templateStrategy.createTemplate(indexTemplate); } - final Optional checkAndCreatePolicy() throws IOException { - return ismPolicyManagementStrategy.checkAndCreatePolicy(); + final Optional checkAndCreatePolicy(final String indexAlias) throws IOException { + return ismPolicyManagementStrategy.checkAndCreatePolicy(indexAlias); } public void checkAndCreateIndex() throws IOException { @@ -322,6 +322,7 @@ private void attachPolicy( indexTemplate.putCustomSetting(IndexConstants.ISM_POLICY_ID_SETTING, ismPolicyId); } indexTemplate.putCustomSetting(IndexConstants.ISM_ROLLOVER_ALIAS_SETTING, rolloverAlias); + indexTemplate.putCustomSetting(IndexConstants.PLUGINS_ROLLOVER_ALIAS_SETTING, rolloverAlias); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java index d20003e222..ba7994ff10 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java @@ -21,6 +21,7 @@ public class IndexConstants { public static final String ISM_ENABLED_SETTING = "opendistro.index_state_management.enabled"; public static final String ISM_POLICY_ID_SETTING = "opendistro.index_state_management.policy_id"; public static final String ISM_ROLLOVER_ALIAS_SETTING = "opendistro.index_state_management.rollover_alias"; + public static final String PLUGINS_ROLLOVER_ALIAS_SETTING = "plugins.index_state_management.rollover_alias"; // TODO: extract out version number into version enum public static final String SERVICE_MAP_DEFAULT_TEMPLATE_FILE = "otel-v1-apm-service-map-index-template.json"; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagement.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagement.java index 988d00b2e5..b63fafe8f5 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagement.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagement.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import io.micrometer.core.instrument.util.StringUtils; import org.opensearch.client.Request; import org.opensearch.client.ResponseException; @@ -93,10 +94,19 @@ public IsmPolicyManagement(final OpenSearchClient openSearchClient, } @Override - public Optional checkAndCreatePolicy() throws IOException { + public Optional checkAndCreatePolicy(final String indexAlias) throws IOException { final String policyManagementEndpoint = POLICY_MANAGEMENT_ENDPOINT + policyName; String policyJsonString = retrievePolicyJsonString(policyFile); + if(!indexAlias.isEmpty()) { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(policyJsonString); + final ArrayNode iparray = mapper.createArrayNode(); + iparray.add(indexAlias + "*"); + ((ObjectNode) jsonNode.get("policy").get("ism_template")).put("index_patterns", iparray); + policyJsonString = jsonNode.toString(); + } + LOG.debug("Got the policystring as {} and indexAlias as {}", policyJsonString, indexAlias); Request request = createPolicyRequestFromFile(policyManagementEndpoint, policyJsonString); try { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementStrategy.java index 2973f885ed..10e84bf67a 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementStrategy.java @@ -13,7 +13,7 @@ interface IsmPolicyManagementStrategy { - Optional checkAndCreatePolicy() throws IOException; + Optional checkAndCreatePolicy(final String indexAlias) throws IOException; List getIndexPatterns(final String indexAlias); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagement.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagement.java index 2aa3cbc501..c0df1ac5eb 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagement.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagement.java @@ -34,7 +34,7 @@ public NoIsmPolicyManagement(final OpenSearchClient openSearchClient, } @Override - public Optional checkAndCreatePolicy() throws IOException { + public Optional checkAndCreatePolicy(final String indexAlias) throws IOException { return Optional.empty(); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java index 82b8f1b757..3729c4e8eb 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java @@ -412,7 +412,7 @@ void checkAndCreatePolicy_Normal() throws IOException { defaultIndexManager = indexManagerFactory.getIndexManager( IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); - assertEquals(Optional.empty(), defaultIndexManager.checkAndCreatePolicy()); + assertEquals(Optional.empty(), defaultIndexManager.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient).getLowLevelClient(); verify(restClient).performRequest(any()); verify(openSearchSinkConfiguration, times(4)).getIndexConfiguration(); @@ -428,7 +428,7 @@ void checkAndCreatePolicy_Exception() throws IOException { when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); when(restClient.performRequest(any())).thenThrow(responseException); when(responseException.getMessage()).thenReturn("Invalid field: [ism_template]"); - assertThrows(ResponseException.class, () -> defaultIndexManager.checkAndCreatePolicy()); + assertThrows(ResponseException.class, () -> defaultIndexManager.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient, times(2)).getLowLevelClient(); verify(restClient, times(2)).performRequest(any()); verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); @@ -441,7 +441,7 @@ void checkAndCreatePolicy_Exception() throws IOException { void checkAndCreatePolicy() throws IOException { defaultIndexManager = indexManagerFactory.getIndexManager( IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); - assertEquals(Optional.empty(), defaultIndexManager.checkAndCreatePolicy()); + assertEquals(Optional.empty(), defaultIndexManager.checkAndCreatePolicy(INDEX_ALIAS)); verify(indexConfiguration).getIndexAlias(); verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); verify(indexConfiguration).getIsmPolicyFile(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementTests.java index fba0604b3a..b80e423212 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IsmPolicyManagementTests.java @@ -99,7 +99,7 @@ public void constructor_NullRestClient() { @Test public void checkAndCreatePolicy_Normal() throws IOException { when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); - assertEquals(Optional.empty(), ismPolicyManagementStrategy.checkAndCreatePolicy()); + assertEquals(Optional.empty(), ismPolicyManagementStrategy.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient).getLowLevelClient(); verify(restClient).performRequest(any()); } @@ -114,7 +114,7 @@ public void checkAndCreatePolicy_OnlyOnePolicyFile_TwoExceptions() throws IOExce when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); when(restClient.performRequest(any())).thenThrow(responseException); when(responseException.getMessage()).thenReturn("Invalid field: [ism_template]"); - assertThrows(ResponseException.class, () -> ismPolicyManagementStrategy.checkAndCreatePolicy()); + assertThrows(ResponseException.class, () -> ismPolicyManagementStrategy.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient, times(2)).getLowLevelClient(); verify(restClient, times(2)).performRequest(any()); } @@ -129,7 +129,7 @@ public void checkAndCreatePolicy_OnlyOnePolicyFile_FirstExceptionThenSucceeds() when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); when(restClient.performRequest(any())).thenThrow(responseException).thenReturn(null); when(responseException.getMessage()).thenReturn("Invalid field: [ism_template]"); - assertEquals(Optional.of(POLICY_NAME), ismPolicyManagementStrategy.checkAndCreatePolicy()); + assertEquals(Optional.of(POLICY_NAME), ismPolicyManagementStrategy.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient, times(2)).getLowLevelClient(); verify(restClient, times(2)).performRequest(any()); } @@ -156,7 +156,7 @@ public void checkAndCreatePolicy_with_custom_ism_policy_from_s3() throws IOExcep when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); when(restClient.performRequest(any())).thenThrow(responseException).thenReturn(null); when(responseException.getMessage()).thenReturn("Invalid field: [ism_template]"); - assertEquals(Optional.of(POLICY_NAME), ismPolicyManagementStrategyWithTemplate.checkAndCreatePolicy()); + assertEquals(Optional.of(POLICY_NAME), ismPolicyManagementStrategyWithTemplate.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient, times(2)).getLowLevelClient(); verify(restClient, times(2)).performRequest(any()); } @@ -166,7 +166,7 @@ public void checkAndCreatePolicy_ExceptionFirstThenSucceed() throws IOException when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); when(restClient.performRequest(any())).thenThrow(responseException).thenReturn(null); when(responseException.getMessage()).thenReturn("Invalid field: [ism_template]"); - assertEquals(Optional.of(POLICY_NAME), ismPolicyManagementStrategy.checkAndCreatePolicy()); + assertEquals(Optional.of(POLICY_NAME), ismPolicyManagementStrategy.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient, times(2)).getLowLevelClient(); verify(restClient, times(2)).performRequest(any()); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagementTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagementTests.java index eeb1d1c1fc..2c531ed321 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagementTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/NoIsmPolicyManagementTests.java @@ -55,7 +55,7 @@ public void constructor_NullRestClient() { @Test public void checkAndCreatePolicy() throws IOException { - assertEquals(Optional.empty(), ismPolicyManagementStrategy.checkAndCreatePolicy()); + assertEquals(Optional.empty(), ismPolicyManagementStrategy.checkAndCreatePolicy(INDEX_ALIAS)); } @ParameterizedTest diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsRawIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsRawIndexManagerTests.java index c711aa7d96..075d26656b 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsRawIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsRawIndexManagerTests.java @@ -177,7 +177,7 @@ void checkISMEnabled_False() throws IOException { @Test void checkAndCreatePolicy_Normal() throws IOException { when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); - assertEquals(Optional.empty(), traceAnalyticsRawIndexManager.checkAndCreatePolicy()); + assertEquals(Optional.empty(), traceAnalyticsRawIndexManager.checkAndCreatePolicy(INDEX_ALIAS)); verify(openSearchSinkConfiguration).getIndexConfiguration(); verify(indexConfiguration).getIndexAlias(); verify(restHighLevelClient).getLowLevelClient(); @@ -189,7 +189,7 @@ void checkAndCreatePolicy_ExceptionFirstThenSucceeds() throws IOException { when(restHighLevelClient.getLowLevelClient()).thenReturn(restClient); when(restClient.performRequest(any())).thenThrow(responseException).thenReturn(null); when(responseException.getMessage()).thenReturn("Invalid field: [ism_template]"); - assertEquals(Optional.of("raw-span-policy"), traceAnalyticsRawIndexManager.checkAndCreatePolicy()); + assertEquals(Optional.of("raw-span-policy"), traceAnalyticsRawIndexManager.checkAndCreatePolicy(INDEX_ALIAS)); verify(restHighLevelClient, times(2)).getLowLevelClient(); verify(restClient, times(2)).performRequest(any()); verify(openSearchSinkConfiguration).getIndexConfiguration(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsServiceMapIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsServiceMapIndexManagerTests.java index 442218eec4..d928a67ac1 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsServiceMapIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TraceAnalyticsServiceMapIndexManagerTests.java @@ -169,7 +169,7 @@ void checkISMEnabledByDefault_False() throws IOException { @Test void checkAndCreatePolicy() throws IOException { - assertEquals(Optional.empty(), traceAnalyticsServiceMapIndexManager.checkAndCreatePolicy()); + assertEquals(Optional.empty(), traceAnalyticsServiceMapIndexManager.checkAndCreatePolicy(INDEX_ALIAS)); verify(openSearchSinkConfiguration).getIndexConfiguration(); verify(indexConfiguration).getIndexAlias(); } diff --git a/data-prepper-plugins/opensearch/src/test/resources/test-custom-index-policy-file.json b/data-prepper-plugins/opensearch/src/test/resources/test-custom-index-policy-file.json index bbc62b4ee5..16081c295e 100644 --- a/data-prepper-plugins/opensearch/src/test/resources/test-custom-index-policy-file.json +++ b/data-prepper-plugins/opensearch/src/test/resources/test-custom-index-policy-file.json @@ -16,7 +16,8 @@ } ], "ism_template": { - "index_patterns": ["sink-custom-index-ism-test-alias-*"] + "priority": 100, + "index_patterns": ["dummy-pattern-*"] } } -} \ No newline at end of file +}