-
Notifications
You must be signed in to change notification settings - Fork 113
/
Rollup.kt
364 lines (343 loc) · 16.2 KB
/
Rollup.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.indexmanagement.rollup.model
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.authuser.User
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Dimension
import org.opensearch.indexmanagement.common.model.dimension.Histogram
import org.opensearch.indexmanagement.common.model.dimension.Terms
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_USER
import org.opensearch.indexmanagement.opensearchapi.instant
import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.opensearchapi.optionalUserField
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util._ID
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.CronSchedule
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.jobscheduler.spi.schedule.Schedule
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
import java.io.IOException
import java.time.Instant
data class Rollup(
val id: String = NO_ID,
val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
val enabled: Boolean,
val schemaVersion: Long,
var jobSchedule: Schedule,
val jobLastUpdatedTime: Instant,
val jobEnabledTime: Instant?,
val description: String,
val sourceIndex: String,
val targetIndex: String,
val metadataID: String?,
@Deprecated("Will be ignored, to check the roles use user field") val roles: List<String> = listOf(),
val pageSize: Int,
val delay: Long?,
val continuous: Boolean,
val dimensions: List<Dimension>,
val metrics: List<RollupMetrics>,
val user: User? = null
) : ScheduledJobParameter, Writeable {
init {
if (enabled) {
requireNotNull(jobEnabledTime) { "Job enabled time must be present if the job is enabled" }
} else {
require(jobEnabledTime == null) { "Job enabled time must not be present if the job is disabled" }
}
// Copy the delay parameter of the job into the job scheduler for continuous jobs only
if (jobSchedule.delay != delay && continuous) {
jobSchedule = when (jobSchedule) {
is CronSchedule -> {
val cronSchedule = jobSchedule as CronSchedule
CronSchedule(cronSchedule.cronExpression, cronSchedule.timeZone, delay ?: 0)
}
is IntervalSchedule -> {
val intervalSchedule = jobSchedule as IntervalSchedule
IntervalSchedule(intervalSchedule.startTime, intervalSchedule.interval, intervalSchedule.unit, delay ?: 0)
}
else -> jobSchedule
}
}
when (jobSchedule) {
is CronSchedule -> {
// Job scheduler already correctly throws errors for this
}
is IntervalSchedule -> {
require((jobSchedule as IntervalSchedule).interval >= MINIMUM_JOB_INTERVAL) { "Rollup job schedule interval must be greater than 0" }
}
}
require(sourceIndex != targetIndex) { "Your source and target index cannot be the same" }
require(dimensions.filter { it.type == Dimension.Type.DATE_HISTOGRAM }.size == 1) {
"Must specify precisely one date histogram dimension" // this covers empty dimensions case too
}
require(dimensions.first().type == Dimension.Type.DATE_HISTOGRAM) { "The first dimension must be a date histogram" }
require(pageSize in MINIMUM_PAGE_SIZE..MAXIMUM_PAGE_SIZE) { "Page size must be between 1 and 10,000" }
if (delay != null) {
require(delay >= MINIMUM_DELAY) { "Delay must be non-negative if set" }
require(delay <= Instant.now().toEpochMilli()) { "Delay must be less than the current unix time" }
}
}
override fun isEnabled() = enabled
override fun getName() = id // the id is user chosen and represents the rollup's name
override fun getEnabledTime() = jobEnabledTime
override fun getSchedule() = jobSchedule
override fun getLastUpdateTime() = jobLastUpdatedTime
override fun getLockDurationSeconds(): Long = ROLLUP_LOCK_DURATION_SECONDS
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(),
seqNo = sin.readLong(),
primaryTerm = sin.readLong(),
enabled = sin.readBoolean(),
schemaVersion = sin.readLong(),
jobSchedule = sin.let {
when (requireNotNull(sin.readEnum(ScheduleType::class.java)) { "ScheduleType cannot be null" }) {
ScheduleType.CRON -> CronSchedule(sin)
ScheduleType.INTERVAL -> IntervalSchedule(sin)
}
},
jobLastUpdatedTime = sin.readInstant(),
jobEnabledTime = sin.readOptionalInstant(),
description = sin.readString(),
sourceIndex = sin.readString(),
targetIndex = sin.readString(),
metadataID = sin.readOptionalString(),
roles = sin.readStringArray().toList(),
pageSize = sin.readInt(),
delay = sin.readOptionalLong(),
continuous = sin.readBoolean(),
dimensions = sin.let {
val dimensionsList = mutableListOf<Dimension>()
val size = it.readVInt()
repeat(size) { _ ->
val type = it.readEnum(Dimension.Type::class.java)
dimensionsList.add(
when (requireNotNull(type) { "Dimension type cannot be null" }) {
Dimension.Type.DATE_HISTOGRAM -> DateHistogram(sin)
Dimension.Type.TERMS -> Terms(sin)
Dimension.Type.HISTOGRAM -> Histogram(sin)
}
)
}
dimensionsList.toList()
},
metrics = sin.readList(::RollupMetrics),
user = if (sin.readBoolean()) {
User(sin)
} else null
)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
if (params.paramAsBoolean(WITH_TYPE, true)) builder.startObject(ROLLUP_TYPE)
builder.field(ROLLUP_ID_FIELD, id)
.field(ENABLED_FIELD, enabled)
.field(SCHEDULE_FIELD, jobSchedule)
.optionalTimeField(LAST_UPDATED_TIME_FIELD, jobLastUpdatedTime)
.optionalTimeField(ENABLED_TIME_FIELD, jobEnabledTime)
.field(DESCRIPTION_FIELD, description)
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(SOURCE_INDEX_FIELD, sourceIndex)
.field(TARGET_INDEX_FIELD, targetIndex)
.field(METADATA_ID_FIELD, metadataID)
.field(PAGE_SIZE_FIELD, pageSize)
.field(DELAY_FIELD, delay)
.field(CONTINUOUS_FIELD, continuous)
.field(DIMENSIONS_FIELD, dimensions.toTypedArray())
.field(RollupMetrics.METRICS_FIELD, metrics.toTypedArray())
if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user)
if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject()
builder.endObject()
return builder
}
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(seqNo)
out.writeLong(primaryTerm)
out.writeBoolean(enabled)
out.writeLong(schemaVersion)
if (schedule is CronSchedule) {
out.writeEnum(ScheduleType.CRON)
} else {
out.writeEnum(ScheduleType.INTERVAL)
}
schedule.writeTo(out)
out.writeInstant(jobLastUpdatedTime)
out.writeOptionalInstant(jobEnabledTime)
out.writeString(description)
out.writeString(sourceIndex)
out.writeString(targetIndex)
out.writeOptionalString(metadataID)
out.writeStringArray(emptyList<String>().toTypedArray())
out.writeInt(pageSize)
out.writeOptionalLong(delay)
out.writeBoolean(continuous)
out.writeVInt(dimensions.size)
for (dimension in dimensions) {
out.writeEnum(dimension.type)
when (dimension) {
is DateHistogram -> dimension.writeTo(out)
is Terms -> dimension.writeTo(out)
is Histogram -> dimension.writeTo(out)
}
}
out.writeCollection(metrics)
out.writeBoolean(user != null)
user?.writeTo(out)
}
companion object {
// TODO: Move this enum to Job Scheduler plugin
enum class ScheduleType {
CRON, INTERVAL;
}
const val ROLLUP_LOCK_DURATION_SECONDS = 1800L // 30 minutes
const val ROLLUP_TYPE = "rollup"
const val ROLLUP_ID_FIELD = "rollup_id"
const val ENABLED_FIELD = "enabled"
const val SCHEMA_VERSION_FIELD = "schema_version"
const val SCHEDULE_FIELD = "schedule"
const val LAST_UPDATED_TIME_FIELD = "last_updated_time"
const val ENABLED_TIME_FIELD = "enabled_time"
const val DESCRIPTION_FIELD = "description"
const val SOURCE_INDEX_FIELD = "source_index"
const val TARGET_INDEX_FIELD = "target_index"
const val METADATA_ID_FIELD = "metadata_id"
const val ROLES_FIELD = "roles"
const val PAGE_SIZE_FIELD = "page_size"
const val DELAY_FIELD = "delay"
const val CONTINUOUS_FIELD = "continuous"
const val DIMENSIONS_FIELD = "dimensions"
const val METRICS_FIELD = "metrics"
const val MINIMUM_JOB_INTERVAL = 1
const val MINIMUM_DELAY = 0
const val MINIMUM_PAGE_SIZE = 1
const val MAXIMUM_PAGE_SIZE = 10_000
const val ROLLUP_DOC_ID_FIELD = "$ROLLUP_TYPE.$_ID"
/*
* _doc_count has to be in root of document so that core's aggregator would pick it up and use it
* */
const val ROLLUP_DOC_COUNT_FIELD = "_doc_count"
const val ROLLUP_DOC_SCHEMA_VERSION_FIELD = "$ROLLUP_TYPE._$SCHEMA_VERSION_FIELD"
const val USER_FIELD = "user"
@Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth")
@JvmStatic
@JvmOverloads
@Throws(IOException::class)
fun parse(
xcp: XContentParser,
id: String = NO_ID,
seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
): Rollup {
var schedule: Schedule? = null
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
var lastUpdatedTime: Instant? = null
var enabledTime: Instant? = null
var enabled = true
var description: String? = null
var sourceIndex: String? = null
var targetIndex: String? = null
var metadataID: String? = null
var pageSize: Int? = null
var delay: Long? = null
var continuous = false
val dimensions = mutableListOf<Dimension>()
val metrics = mutableListOf<RollupMetrics>()
var user: User? = null
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ROLLUP_ID_FIELD -> { requireNotNull(xcp.text()) { "The rollup_id field is null" } /* Just used for searching */ }
ENABLED_FIELD -> enabled = xcp.booleanValue()
SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.longValue()
ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
LAST_UPDATED_TIME_FIELD -> lastUpdatedTime = xcp.instant()
DESCRIPTION_FIELD -> description = xcp.text()
SOURCE_INDEX_FIELD -> sourceIndex = xcp.text()
TARGET_INDEX_FIELD -> targetIndex = xcp.text()
METADATA_ID_FIELD -> metadataID = xcp.textOrNull()
ROLES_FIELD -> {
// Parsing but not storing the field, deprecated
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
xcp.text()
}
}
PAGE_SIZE_FIELD -> pageSize = xcp.intValue()
DELAY_FIELD -> delay = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.longValue()
CONTINUOUS_FIELD -> continuous = xcp.booleanValue()
DIMENSIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
dimensions.add(Dimension.parse(xcp))
}
}
METRICS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
metrics.add(RollupMetrics.parse(xcp))
}
}
USER_FIELD -> {
user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp)
}
else -> throw IllegalArgumentException("Invalid field [$fieldName] found in Rollup.")
}
}
if (enabled && enabledTime == null) {
enabledTime = Instant.now()
} else if (!enabled) {
enabledTime = null
}
// If the seqNo/primaryTerm are unassigned this job hasn't been created yet so we instantiate the startTime
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
}
}
return Rollup(
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
enabled = enabled,
schemaVersion = schemaVersion,
jobSchedule = requireNotNull(schedule) { "Rollup schedule is null" },
jobLastUpdatedTime = lastUpdatedTime ?: Instant.now(),
jobEnabledTime = enabledTime,
description = requireNotNull(description) { "Rollup description is null" },
sourceIndex = requireNotNull(sourceIndex) { "Rollup source index is null" },
targetIndex = requireNotNull(targetIndex) { "Rollup target index is null" },
metadataID = metadataID,
pageSize = requireNotNull(pageSize) { "Rollup page size is null" },
delay = delay,
continuous = continuous,
dimensions = dimensions,
metrics = metrics,
user = user
)
}
}
}
sealed class RollupJobValidationResult {
object Valid : RollupJobValidationResult()
data class Invalid(val reason: String) : RollupJobValidationResult()
data class Failure(val message: String, val e: Exception? = null) : RollupJobValidationResult()
}