Skip to content

Commit

Permalink
[Feature] Support Transform as an ISM action (opensearch-project#760)
Browse files Browse the repository at this point in the history
* Initial impl

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* fix style

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* end to end functional

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* ISM transform unit tests & integ tests

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Fix after core #8157 (opensearch-project#857)

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Upgrade the backport workflow (opensearch-project#862)

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Added 2.9 release notes. (opensearch-project#851)

* Added 2.9 release notes.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* Added 2.9 release notes.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* Added 2.9 release notes.

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

---------

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Handle NPE in isRollupIndex (opensearch-project#855)

* Handle NPE in isRollupIndex

`metadata.index()` can return `null`, so handle that case by returning
`false`.

Signed-off-by: Bryce Lampe <brycelampe@gmail.com>

* unit test

Signed-off-by: Bryce Lampe <brycelampe@gmail.com>

---------

Signed-off-by: Bryce Lampe <brycelampe@gmail.com>
Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Fix core XcontentType refactor (opensearch-project#873)

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* fix for max & min aggregations when no metric property exist (opensearch-project#870)

Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* core refactor change (opensearch-project#884)

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* update backport branch name (opensearch-project#885)

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* core refactor change (opensearch-project#887)

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Fix breaking change by core refactor (opensearch-project#888)

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* fix core breaking (opensearch-project#906)

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Support copy alias in rollover (opensearch-project#907)

* Support copy alias in rollover

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* 2.10

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

---------

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Set preference to _primary when searching control-center index (opensearch-project#911)

* Set preference to _primary when searching control-center index

Signed-off-by: gaobinlong <gbinlong@amazon.com>

* Use _primary_first instead

Signed-off-by: gaobinlong <gbinlong@amazon.com>

---------

Signed-off-by: gaobinlong <gbinlong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Add primary first preference to all search requests (opensearch-project#912)

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* fix intelliJ IDEA gradle sync error (opensearch-project#916)

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* make control center index as system index (opensearch-project#919)

Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Updates demo certs used in integ tests (opensearch-project#921)

Signed-off-by: Darshit Chanpura <dchanp@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Added 2.10 release notes (opensearch-project#925)

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Bump bwc version (opensearch-project#930)

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* fix integ tests; upgrade mappings versions

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Fix DCO

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Addressed pr comments; Add integ test case for re-execute the same transform action

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Addressed detekt error

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Added ISMTransform writeable test

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

* Addressed comments; Moved updateTransformStartTime to IndexManagementRestTestCase

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>

---------

Signed-off-by: Tanqiu Liu <liutanqiu@gmail.com>
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>
Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
Signed-off-by: Bryce Lampe <brycelampe@gmail.com>
Signed-off-by: Hailong Cui <ihailong@amazon.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: gaobinlong <gbinlong@amazon.com>
Signed-off-by: Darshit Chanpura <dchanp@amazon.com>
Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
Co-authored-by: Ashish Agrawal <ashisagr@amazon.com>
Co-authored-by: AWSHurneyt <hurneyt@amazon.com>
Co-authored-by: Bryce Lampe <brycelampe@gmail.com>
Co-authored-by: Hailong Cui <ihailong@amazon.com>
Co-authored-by: Subhobrata Dey <sbcd90@gmail.com>
Co-authored-by: gaobinlong <gbl_long@163.com>
Co-authored-by: Darshit Chanpura <35282393+DarshitChanpura@users.noreply.github.com>
Signed-off-by: Joshua Au <joshuahyau@gmail.com>
  • Loading branch information
9 people authored and Joshua152 committed Dec 12, 2023
1 parent d60bf2f commit 21fa899
Show file tree
Hide file tree
Showing 28 changed files with 1,626 additions and 58 deletions.
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ However, to build the `index management` plugin project, we also use the OpenSea

### Building from the command line

1. `./gradlew build` builds and tests project.
1. `./gradlew build` builds and tests project.
2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed.
3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed.
4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ data class ActionProperties(
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null,
val shrinkActionProperties: ShrinkActionProperties? = null
val shrinkActionProperties: ShrinkActionProperties? = null,
val transformActionProperties: TransformActionProperties? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
Expand All @@ -32,6 +33,7 @@ data class ActionProperties(
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
out.writeOptionalWriteable(shrinkActionProperties)
out.writeOptionalWriteable(transformActionProperties)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -40,6 +42,7 @@ data class ActionProperties(
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params)
if (transformActionProperties != null) builder.addObject(TransformActionProperties.TRANSFORM_ACTION_PROPERTIES, transformActionProperties, params)
return builder
}

Expand All @@ -52,7 +55,8 @@ data class ActionProperties(
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()
val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
val transformActionProperties: TransformActionProperties? = si.readOptionalWriteable { TransformActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}

fun parse(xcp: XContentParser): ActionProperties {
Expand All @@ -61,6 +65,7 @@ data class ActionProperties(
var rollupId: String? = null
var hasRollupFailed: Boolean? = null
var shrinkActionProperties: ShrinkActionProperties? = null
var transformActionProperties: TransformActionProperties? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -75,10 +80,13 @@ data class ActionProperties(
ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> {
shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp)
}
TransformActionProperties.TRANSFORM_ACTION_PROPERTIES -> {
transformActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else TransformActionProperties.parse(xcp)
}
}
}

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentFragment
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

data class TransformActionProperties(
val transformId: String?
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalString(transformId)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
if (transformId != null) builder.field(Properties.TRANSFORM_ID.key, transformId)
return builder
}

companion object {
const val TRANSFORM_ACTION_PROPERTIES = "transform_action_properties"

fun fromStreamInput(sin: StreamInput): TransformActionProperties {
val transformId: String? = sin.readOptionalString()
return TransformActionProperties(transformId)
}

fun parse(xcp: XContentParser): TransformActionProperties {
var transformId: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Properties.TRANSFORM_ID.key -> transformId = xcp.text()
}
}

return TransformActionProperties(transformId)
}
}

enum class Properties(val key: String) {
TRANSFORM_ID("transform_id")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val managedIndexCoordinator = ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider
client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry
)

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
Expand All @@ -49,7 +50,8 @@ class ISMActionsParser private constructor() {
RollupActionParser(),
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser()
SnapshotActionParser(),
TransformActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.common.regex.Regex
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.authuser.User
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.index.Index
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
Expand Down Expand Up @@ -110,7 +111,8 @@ class ManagedIndexCoordinator(
private val clusterService: ClusterService,
private val threadPool: ThreadPool,
indexManagementIndices: IndexManagementIndices,
private val indexMetadataProvider: IndexMetadataProvider
private val indexMetadataProvider: IndexMetadataProvider,
private val xContentRegistry: NamedXContentRegistry
) : ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {
Expand Down Expand Up @@ -422,7 +424,7 @@ class ManagedIndexCoordinator(

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
parseFromSearchResponse(response = response, parse = Policy.Companion::parse)
parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (ex: IndexNotFoundException) {
emptyList()
} catch (ex: ClusterBlockException) {
Expand Down Expand Up @@ -603,7 +605,7 @@ class ManagedIndexCoordinator(
}
mRes.forEach {
if (it.response.isExists) {
result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType(
result[it.id] = contentParser(it.response.sourceAsBytesRef, xContentRegistry).parseWithType(
it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep
import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformAction(
val ismTransform: ISMTransform,
index: Int
) : Action(name, index) {

companion object {
const val name = "transform"
const val ISM_TRANSFORM_FIELD = "ism_transform"
}

private val attemptCreateTransformJobStep = AttemptCreateTransformJobStep(this)
private val waitForTransformCompletionStep = WaitForTransformCompletionStep()
private val steps = listOf(attemptCreateTransformJobStep, waitForTransformCompletionStep)

@Suppress("ReturnCount")
override fun getStepToExecute(context: StepContext): Step {
// if stepMetaData is null, return first step
val stepMetaData = context.metadata.stepMetaData ?: return attemptCreateTransformJobStep

// if the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptCreateTransformJobStep.name -> waitForTransformCompletionStep
else -> attemptCreateTransformJobStep
}
}

return when (stepMetaData.name) {
AttemptCreateTransformJobStep.name -> attemptCreateTransformJobStep
else -> waitForTransformCompletionStep
}
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(ISM_TRANSFORM_FIELD, ismTransform)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
ismTransform.writeTo(out)
out.writeInt(actionIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.transform.model.ISMTransform

class TransformActionParser : ActionParser() {
override fun fromStreamInput(sin: StreamInput): Action {
val ismTransform = ISMTransform(sin)
val index = sin.readInt()
return TransformAction(ismTransform, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var ismTransform: ISMTransform? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
TransformAction.ISM_TRANSFORM_FIELD -> ismTransform = ISMTransform.parse(xcp)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in TransformAction.")
}
}

return TransformAction(ismTransform = requireNotNull(ismTransform) { "TransformAction transform is null." }, index)
}

override fun getActionType(): String {
return TransformAction.name
}
}
Loading

0 comments on commit 21fa899

Please sign in to comment.