|
| 1 | +/* |
| 2 | + * SPDX-License-Identifier: Apache-2.0 |
| 3 | + * |
| 4 | + * The OpenSearch Contributors require contributions made to |
| 5 | + * this file be licensed under the Apache-2.0 license or a |
| 6 | + * compatible open source license. |
| 7 | + */ |
| 8 | + |
| 9 | +package org.opensearch.ingest.common; |
| 10 | + |
| 11 | +import org.opensearch.common.Nullable; |
| 12 | +import org.opensearch.common.hash.MurmurHash3; |
| 13 | +import org.opensearch.common.time.DateFormatter; |
| 14 | +import org.opensearch.common.time.DateFormatters; |
| 15 | +import org.opensearch.core.common.Strings; |
| 16 | +import org.opensearch.ingest.AbstractProcessor; |
| 17 | +import org.opensearch.ingest.ConfigurationUtils; |
| 18 | +import org.opensearch.ingest.IngestDocument; |
| 19 | +import org.opensearch.ingest.Processor; |
| 20 | + |
| 21 | +import java.nio.charset.StandardCharsets; |
| 22 | +import java.time.ZoneOffset; |
| 23 | +import java.time.ZonedDateTime; |
| 24 | +import java.time.temporal.ChronoUnit; |
| 25 | +import java.time.temporal.TemporalAccessor; |
| 26 | +import java.util.Locale; |
| 27 | +import java.util.Map; |
| 28 | + |
| 29 | +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; |
| 30 | + |
| 31 | +/** |
| 32 | + * Processor that sets document routing based on temporal structure. |
| 33 | + * |
| 34 | + * This processor extracts a timestamp from a specified field, truncates it |
| 35 | + * to a configurable granularity (hour/day/week/month), and uses the resulting |
| 36 | + * temporal bucket to compute a routing value for improved temporal locality. |
| 37 | + * |
| 38 | + * Introduced in OpenSearch 3.2.0 to enable intelligent document co-location |
| 39 | + * based on time-based patterns for log and metrics workloads. |
| 40 | + */ |
| 41 | +public final class TemporalRoutingProcessor extends AbstractProcessor { |
| 42 | + |
| 43 | + public static final String TYPE = "temporal_routing"; |
| 44 | + private static final String DEFAULT_FORMAT = "strict_date_optional_time"; |
| 45 | + |
| 46 | + private final String timestampField; |
| 47 | + private final Granularity granularity; |
| 48 | + private final DateFormatter dateFormatter; |
| 49 | + private final boolean ignoreMissing; |
| 50 | + private final boolean overrideExisting; |
| 51 | + private final boolean hashBucket; |
| 52 | + |
| 53 | + /** |
| 54 | + * Supported temporal granularities |
| 55 | + */ |
| 56 | + public enum Granularity { |
| 57 | + /** Hour granularity for hourly bucketing */ |
| 58 | + HOUR(ChronoUnit.HOURS), |
| 59 | + /** Day granularity for daily bucketing */ |
| 60 | + DAY(ChronoUnit.DAYS), |
| 61 | + /** Week granularity for weekly bucketing (ISO week) */ |
| 62 | + WEEK(ChronoUnit.WEEKS), |
| 63 | + /** Month granularity for monthly bucketing */ |
| 64 | + MONTH(ChronoUnit.MONTHS); |
| 65 | + |
| 66 | + private final ChronoUnit chronoUnit; |
| 67 | + |
| 68 | + Granularity(ChronoUnit chronoUnit) { |
| 69 | + this.chronoUnit = chronoUnit; |
| 70 | + } |
| 71 | + |
| 72 | + /** |
| 73 | + * Gets the ChronoUnit associated with this granularity |
| 74 | + * @return the ChronoUnit |
| 75 | + */ |
| 76 | + public ChronoUnit getChronoUnit() { |
| 77 | + return chronoUnit; |
| 78 | + } |
| 79 | + |
| 80 | + /** |
| 81 | + * Parses a string value to a Granularity enum |
| 82 | + * @param value the string representation of the granularity |
| 83 | + * @return the corresponding Granularity enum value |
| 84 | + * @throws IllegalArgumentException if the value is not valid |
| 85 | + */ |
| 86 | + public static Granularity fromString(String value) { |
| 87 | + try { |
| 88 | + return valueOf(value.toUpperCase(Locale.ROOT)); |
| 89 | + } catch (IllegalArgumentException e) { |
| 90 | + throw new IllegalArgumentException("Invalid granularity: " + value + ". Supported values are: hour, day, week, month"); |
| 91 | + } |
| 92 | + } |
| 93 | + } |
| 94 | + |
| 95 | + TemporalRoutingProcessor( |
| 96 | + String tag, |
| 97 | + @Nullable String description, |
| 98 | + String timestampField, |
| 99 | + Granularity granularity, |
| 100 | + String format, |
| 101 | + boolean ignoreMissing, |
| 102 | + boolean overrideExisting, |
| 103 | + boolean hashBucket |
| 104 | + ) { |
| 105 | + super(tag, description); |
| 106 | + this.timestampField = timestampField; |
| 107 | + this.granularity = granularity; |
| 108 | + this.dateFormatter = DateFormatter.forPattern(format); |
| 109 | + this.ignoreMissing = ignoreMissing; |
| 110 | + this.overrideExisting = overrideExisting; |
| 111 | + this.hashBucket = hashBucket; |
| 112 | + } |
| 113 | + |
| 114 | + @Override |
| 115 | + public IngestDocument execute(IngestDocument document) throws Exception { |
| 116 | + // Check if routing already exists and we shouldn't override |
| 117 | + if (!overrideExisting) { |
| 118 | + try { |
| 119 | + Object existingRouting = document.getFieldValue("_routing", Object.class, true); |
| 120 | + if (existingRouting != null) { |
| 121 | + return document; |
| 122 | + } |
| 123 | + } catch (Exception e) { |
| 124 | + // Field doesn't exist, continue with processing |
| 125 | + } |
| 126 | + } |
| 127 | + |
| 128 | + Object timestampValue = document.getFieldValue(timestampField, Object.class, ignoreMissing); |
| 129 | + |
| 130 | + if (timestampValue == null && ignoreMissing) { |
| 131 | + return document; |
| 132 | + } |
| 133 | + |
| 134 | + if (timestampValue == null) { |
| 135 | + throw new IllegalArgumentException("field [" + timestampField + "] not present as part of path [" + timestampField + "]"); |
| 136 | + } |
| 137 | + |
| 138 | + String routingValue = computeRoutingValue(timestampValue.toString()); |
| 139 | + document.setFieldValue("_routing", routingValue); |
| 140 | + |
| 141 | + return document; |
| 142 | + } |
| 143 | + |
| 144 | + /** |
| 145 | + * Computes routing value from timestamp by truncating to granularity |
| 146 | + * and optionally hashing for distribution |
| 147 | + */ |
| 148 | + private String computeRoutingValue(String timestamp) { |
| 149 | + // Parse timestamp using DateFormatter and convert to ZonedDateTime |
| 150 | + TemporalAccessor accessor = dateFormatter.parse(timestamp); |
| 151 | + ZonedDateTime dateTime = DateFormatters.from(accessor, Locale.ROOT, ZoneOffset.UTC); |
| 152 | + |
| 153 | + // Truncate to granularity |
| 154 | + ZonedDateTime truncated = truncateToGranularity(dateTime); |
| 155 | + |
| 156 | + // Create temporal bucket key |
| 157 | + String temporalBucket = createTemporalBucketKey(truncated); |
| 158 | + |
| 159 | + // Optionally hash for distribution |
| 160 | + if (hashBucket) { |
| 161 | + byte[] bucketBytes = temporalBucket.getBytes(StandardCharsets.UTF_8); |
| 162 | + long hash = MurmurHash3.hash128(bucketBytes, 0, bucketBytes.length, 0, new MurmurHash3.Hash128()).h1; |
| 163 | + return String.valueOf(hash == Long.MIN_VALUE ? 0L : (hash < 0 ? -hash : hash)); |
| 164 | + } |
| 165 | + |
| 166 | + return temporalBucket; |
| 167 | + } |
| 168 | + |
| 169 | + /** |
| 170 | + * Truncates datetime to the specified granularity |
| 171 | + * |
| 172 | + * IMPORTANT: This logic MUST be kept in sync with TemporalRoutingSearchProcessor.truncateToGranularity() |
| 173 | + * in the search-pipeline-common module to ensure consistent temporal bucketing. |
| 174 | + */ |
| 175 | + private ZonedDateTime truncateToGranularity(ZonedDateTime dateTime) { |
| 176 | + switch (granularity) { |
| 177 | + case HOUR: |
| 178 | + return dateTime.withMinute(0).withSecond(0).withNano(0); |
| 179 | + case DAY: |
| 180 | + return dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0); |
| 181 | + case WEEK: |
| 182 | + // Truncate to start of week (Monday) |
| 183 | + ZonedDateTime dayTruncated = dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0); |
| 184 | + return dayTruncated.with(java.time.temporal.TemporalAdjusters.previousOrSame(java.time.DayOfWeek.MONDAY)); |
| 185 | + case MONTH: |
| 186 | + return dateTime.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0); |
| 187 | + default: |
| 188 | + throw new IllegalArgumentException("Unsupported granularity: " + granularity); |
| 189 | + } |
| 190 | + } |
| 191 | + |
| 192 | + /** |
| 193 | + * Creates a string key for the temporal bucket |
| 194 | + * |
| 195 | + * IMPORTANT: This logic MUST be kept in sync with TemporalRoutingSearchProcessor.createTemporalBucket() |
| 196 | + * in the search-pipeline-common module. Both processors must generate identical bucket keys for the |
| 197 | + * same input to ensure documents are routed to the same shards during ingest and search. |
| 198 | + * |
| 199 | + * TODO: Consider moving this shared logic to a common module when search and ingest pipelines |
| 200 | + * can share code more easily. |
| 201 | + */ |
| 202 | + private String createTemporalBucketKey(ZonedDateTime truncated) { |
| 203 | + switch (granularity) { |
| 204 | + case HOUR: |
| 205 | + return truncated.getYear() |
| 206 | + + "-" |
| 207 | + + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()) |
| 208 | + + "-" |
| 209 | + + String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth()) |
| 210 | + + "T" |
| 211 | + + String.format(Locale.ROOT, "%02d", truncated.getHour()); |
| 212 | + case DAY: |
| 213 | + return truncated.getYear() |
| 214 | + + "-" |
| 215 | + + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()) |
| 216 | + + "-" |
| 217 | + + String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth()); |
| 218 | + case WEEK: |
| 219 | + // Use ISO week format: YYYY-WNN |
| 220 | + int weekOfYear = truncated.get(java.time.temporal.WeekFields.ISO.weekOfWeekBasedYear()); |
| 221 | + int weekYear = truncated.get(java.time.temporal.WeekFields.ISO.weekBasedYear()); |
| 222 | + return weekYear + "-W" + String.format(Locale.ROOT, "%02d", weekOfYear); |
| 223 | + case MONTH: |
| 224 | + return truncated.getYear() + "-" + String.format(Locale.ROOT, "%02d", truncated.getMonthValue()); |
| 225 | + default: |
| 226 | + throw new IllegalArgumentException("Unsupported granularity: " + granularity); |
| 227 | + } |
| 228 | + } |
| 229 | + |
| 230 | + @Override |
| 231 | + public String getType() { |
| 232 | + return TYPE; |
| 233 | + } |
| 234 | + |
| 235 | + String getTimestampField() { |
| 236 | + return timestampField; |
| 237 | + } |
| 238 | + |
| 239 | + Granularity getGranularity() { |
| 240 | + return granularity; |
| 241 | + } |
| 242 | + |
| 243 | + DateFormatter getDateFormatter() { |
| 244 | + return dateFormatter; |
| 245 | + } |
| 246 | + |
| 247 | + boolean isIgnoreMissing() { |
| 248 | + return ignoreMissing; |
| 249 | + } |
| 250 | + |
| 251 | + boolean isOverrideExisting() { |
| 252 | + return overrideExisting; |
| 253 | + } |
| 254 | + |
| 255 | + boolean isHashBucket() { |
| 256 | + return hashBucket; |
| 257 | + } |
| 258 | + |
| 259 | + /** |
| 260 | + * Factory for creating TemporalRoutingProcessor instances |
| 261 | + */ |
| 262 | + public static final class Factory implements Processor.Factory { |
| 263 | + |
| 264 | + @Override |
| 265 | + public TemporalRoutingProcessor create( |
| 266 | + Map<String, Processor.Factory> processorFactories, |
| 267 | + String tag, |
| 268 | + @Nullable String description, |
| 269 | + Map<String, Object> config |
| 270 | + ) throws Exception { |
| 271 | + |
| 272 | + String timestampField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "timestamp_field"); |
| 273 | + String granularityStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "granularity"); |
| 274 | + String format = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "format"); |
| 275 | + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false); |
| 276 | + boolean overrideExisting = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override_existing", true); |
| 277 | + boolean hashBucket = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "hash_bucket", false); |
| 278 | + |
| 279 | + // Set default format if not provided |
| 280 | + if (format == null) { |
| 281 | + format = DEFAULT_FORMAT; |
| 282 | + } |
| 283 | + |
| 284 | + // Validation |
| 285 | + if (Strings.isNullOrEmpty(timestampField)) { |
| 286 | + throw newConfigurationException(TYPE, tag, "timestamp_field", "cannot be null or empty"); |
| 287 | + } |
| 288 | + |
| 289 | + if (Strings.isNullOrEmpty(granularityStr)) { |
| 290 | + throw newConfigurationException(TYPE, tag, "granularity", "cannot be null or empty"); |
| 291 | + } |
| 292 | + |
| 293 | + Granularity granularity; |
| 294 | + try { |
| 295 | + granularity = Granularity.fromString(granularityStr); |
| 296 | + } catch (IllegalArgumentException e) { |
| 297 | + throw newConfigurationException(TYPE, tag, "granularity", e.getMessage()); |
| 298 | + } |
| 299 | + |
| 300 | + // Validate date format |
| 301 | + try { |
| 302 | + DateFormatter.forPattern(format); |
| 303 | + } catch (Exception e) { |
| 304 | + throw newConfigurationException(TYPE, tag, "format", "invalid date format: " + e.getMessage()); |
| 305 | + } |
| 306 | + |
| 307 | + return new TemporalRoutingProcessor( |
| 308 | + tag, |
| 309 | + description, |
| 310 | + timestampField, |
| 311 | + granularity, |
| 312 | + format, |
| 313 | + ignoreMissing, |
| 314 | + overrideExisting, |
| 315 | + hashBucket |
| 316 | + ); |
| 317 | + } |
| 318 | + } |
| 319 | +} |
0 commit comments